sqlalchemy: Inconsistency JSON results among PostgreSQL drivers
Describe the bug
By default, different PostgreSQL drivers may return different Python types for JSON(B) values. For example of the same JSON dict, psycopg2 returns dict while asyncpg returns str. SQLAlchemy should eliminate the difference and return consistent results. The issue is, the asyncpg dialect/driver cannot parse JSON into dict if the SA type is unspecified (executing raw SQL for example), please see below.
To Reproduce
import asyncio
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import create_async_engine
async def main():
e = sa.create_engine("postgresql+psycopg2:///", future=True)
ae = create_async_engine("postgresql+asyncpg:///")
for json_type in [sa.JSON, JSONB]:
print(json_type.__visit_name__)
# create test tables
metadata = sa.MetaData()
users = sa.Table("users", metadata, sa.Column("profile", json_type))
metadata.create_all(e)
queries = [
("clause", sa.select([users.c.profile])),
("text ", sa.text("SELECT profile FROM users")),
]
with e.connect() as conn:
conn.execute(users.insert().values(profile=dict(a=1, b=2)))
for exec_type, obj in queries:
print("sync ", exec_type, type(conn.scalar(obj)).__name__)
conn.execute(users.delete())
async with ae.connect() as conn:
await conn.execute(users.insert().values(profile=dict(a=1, b=2)))
for exec_type, obj in queries:
print("async", exec_type, type(await conn.scalar(obj)).__name__)
await conn.execute(users.delete())
metadata.drop_all(e)
asyncio.run(main())
Expected results
JSON
sync clause dict
sync text dict
async clause dict
async text dict
JSONB
sync clause dict
sync text dict
async clause dict
async text dict
Actual results
JSON
sync clause dict
sync text dict
async clause dict
async text str
JSONB
sync clause dict
sync text dict
async clause dict
async text str
Versions.
- OS: Darwin Kernel Version 19.5.0
- Python: Python 3.8.2
- SQLAlchemy: 1.4.0b1 @ 4d17fe4063adef50c1d529993e0b047f503940e2
- Database: PostgreSQL 12.4
- DBAPI: asyncpg 0.21.0, psycopg2 2.8.5 (dt dec pq3 ext lo64)
Additional context
I have 2 candidate solutions:
- Override
NullType.result_processor(). This is simpler but slightly hacky:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..a62db1a5e 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -70,6 +70,9 @@ try:
except ImportError:
_python_UUID = None
+JSON_COLTYPE = 114
+JSONB_COLTYPE = 3802
+
class AsyncpgTime(sqltypes.Time):
def get_dbapi_type(self, dbapi):
@@ -218,6 +221,15 @@ class AsyncpgOID(OID):
return dbapi.INTEGER
+class AsyncpgNullType(sqltypes.NullType):
+ def result_processor(self, dialect, coltype):
+ if coltype == JSON_COLTYPE:
+ return json.JSON().result_processor(dialect, coltype)
+ if coltype == JSONB_COLTYPE:
+ return json.JSONB().result_processor(dialect, coltype)
+ return super().result_processor(dialect, coltype)
+
+
class PGExecutionContext_asyncpg(PGExecutionContext):
def pre_exec(self):
if self.isddl:
@@ -720,6 +732,7 @@ class PGDialect_asyncpg(PGDialect):
sqltypes.Enum: AsyncPgEnum,
OID: AsyncpgOID,
REGCLASS: AsyncpgREGCLASS,
+ sqltypes.NullType: AsyncpgNullType,
},
)
(This was the solution in GINO, see python-gino/gino#305, python-gino/gino#403)
- Use asyncpg-native
set_type_codec(). This is more consistent with psycopg2 impl but longer. Moreover, it revealed another issue, please see TODO below:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..330e71732 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -41,6 +41,7 @@ in conjunction with :func:`_sa.craete_engine`::
import collections
import decimal
import itertools
+import json as json_lib
import re
from . import json
@@ -123,11 +124,17 @@ class AsyncpgJSON(json.JSON):
def get_dbapi_type(self, dbapi):
return dbapi.JSON
+ def result_processor(self, dialect, coltype):
+ return None
+
class AsyncpgJSONB(json.JSONB):
def get_dbapi_type(self, dbapi):
return dbapi.JSONB
+ def result_processor(self, dialect, coltype):
+ return None
+
class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
def get_dbapi_type(self, dbapi):
@@ -500,6 +507,7 @@ class AsyncAdapt_asyncpg_connection:
"""
+ # TODO: this causes failure in future set_type_codec() below
await self._connection.set_type_codec(
"char",
schema="pg_catalog",
@@ -508,6 +516,24 @@ class AsyncAdapt_asyncpg_connection:
format="text",
)
+ def setup_json_handlers(self, serializer, deserializer):
+ self.await_(
+ self._connection.set_type_codec(
+ "json",
+ encoder=serializer,
+ decoder=deserializer,
+ schema="pg_catalog",
+ )
+ )
+ self.await_(
+ self._connection.set_type_codec(
+ "jsonb",
+ encoder=serializer,
+ decoder=deserializer,
+ schema="pg_catalog",
+ )
+ )
+
def _handle_exception(self, error):
if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
exception_mapping = self.dbapi._asyncpg_error_translate
@@ -796,5 +822,34 @@ class PGDialect_asyncpg(PGDialect):
e, self.dbapi.InterfaceError
) and "connection is closed" in str(e)
+ def on_connect(self):
+ fns = []
+ if self.isolation_level is not None:
+
+ def on_connect(conn):
+ self.set_isolation_level(conn, self.isolation_level)
+
+ fns.append(on_connect)
+
+ if self.dbapi:
+
+ def on_connect(conn):
+ conn.setup_json_handlers(
+ lambda x: x,
+ self._json_deserializer or json_lib.loads,
+ )
+
+ fns.append(on_connect)
+
+ if fns:
+
+ def on_connect(conn):
+ for fn in fns:
+ fn(conn)
+
+ return on_connect
+ else:
+ return None
+
dialect = PGDialect_asyncpg
Have a nice day!
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 19 (15 by maintainers)
Commits related to this issue
- patch from https://github.com/sqlalchemy/sqlalchemy/issues/5584 — committed to python-gino/sqlalchemy by fantix 4 years ago
- patch from https://github.com/sqlalchemy/sqlalchemy/issues/5584 — committed to python-gino/sqlalchemy by fantix 4 years ago
- Improve Asyncpg json handling Set default type codec for ``json`` and ``jsonb`` types when using the asyncpg driver. By default asyncpg will not decode them and return strings instead. Fixes: #5584 ... — committed to python-gino/sqlalchemy by CaselIT 4 years ago
I’ve created a new patch with only the change to asyncpg. See here https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/2252
@fantix if you could take a look it would be great. I’m not sure if the problem with array of json(b) is due to a bug in how sqlalchemy handles it, or it’s on asyncpg side. The other dialects we test, psycopg and pg8000 seem to work.
oh now I got a working version, but I have no idea why. OH, I think it was double-encoding the JSON, ha, OK. Here’s a working patch