sqlalchemy: Inconsistency JSON results among PostgreSQL drivers

Describe the bug

By default, different PostgreSQL drivers may return different Python types for JSON(B) values. For example of the same JSON dict, psycopg2 returns dict while asyncpg returns str. SQLAlchemy should eliminate the difference and return consistent results. The issue is, the asyncpg dialect/driver cannot parse JSON into dict if the SA type is unspecified (executing raw SQL for example), please see below.

To Reproduce

import asyncio

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import create_async_engine


async def main():
    e = sa.create_engine("postgresql+psycopg2:///", future=True)
    ae = create_async_engine("postgresql+asyncpg:///")

    for json_type in [sa.JSON, JSONB]:
        print(json_type.__visit_name__)

        # create test tables
        metadata = sa.MetaData()
        users = sa.Table("users", metadata, sa.Column("profile", json_type))
        metadata.create_all(e)

        queries = [
            ("clause", sa.select([users.c.profile])),
            ("text  ", sa.text("SELECT profile FROM users")),
        ]

        with e.connect() as conn:
            conn.execute(users.insert().values(profile=dict(a=1, b=2)))
            for exec_type, obj in queries:
                print("sync ", exec_type, type(conn.scalar(obj)).__name__)
            conn.execute(users.delete())

        async with ae.connect() as conn:
            await conn.execute(users.insert().values(profile=dict(a=1, b=2)))
            for exec_type, obj in queries:
                print("async", exec_type, type(await conn.scalar(obj)).__name__)
            await conn.execute(users.delete())

        metadata.drop_all(e)


asyncio.run(main())

Expected results

JSON
sync  clause dict
sync  text   dict
async clause dict
async text   dict
JSONB
sync  clause dict
sync  text   dict
async clause dict
async text   dict

Actual results

JSON
sync  clause dict
sync  text   dict
async clause dict
async text   str
JSONB
sync  clause dict
sync  text   dict
async clause dict
async text   str

Versions.

  • OS: Darwin Kernel Version 19.5.0
  • Python: Python 3.8.2
  • SQLAlchemy: 1.4.0b1 @ 4d17fe4063adef50c1d529993e0b047f503940e2
  • Database: PostgreSQL 12.4
  • DBAPI: asyncpg 0.21.0, psycopg2 2.8.5 (dt dec pq3 ext lo64)

Additional context

I have 2 candidate solutions:

  1. Override NullType.result_processor(). This is simpler but slightly hacky:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..a62db1a5e 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -70,6 +70,9 @@ try:
 except ImportError:
     _python_UUID = None

+JSON_COLTYPE = 114
+JSONB_COLTYPE = 3802
+

 class AsyncpgTime(sqltypes.Time):
     def get_dbapi_type(self, dbapi):
@@ -218,6 +221,15 @@ class AsyncpgOID(OID):
         return dbapi.INTEGER


+class AsyncpgNullType(sqltypes.NullType):
+    def result_processor(self, dialect, coltype):
+        if coltype == JSON_COLTYPE:
+            return json.JSON().result_processor(dialect, coltype)
+        if coltype == JSONB_COLTYPE:
+            return json.JSONB().result_processor(dialect, coltype)
+        return super().result_processor(dialect, coltype)
+
+
 class PGExecutionContext_asyncpg(PGExecutionContext):
     def pre_exec(self):
         if self.isddl:
@@ -720,6 +732,7 @@ class PGDialect_asyncpg(PGDialect):
             sqltypes.Enum: AsyncPgEnum,
             OID: AsyncpgOID,
             REGCLASS: AsyncpgREGCLASS,
+            sqltypes.NullType: AsyncpgNullType,
         },
     )

(This was the solution in GINO, see python-gino/gino#305, python-gino/gino#403)

  1. Use asyncpg-native set_type_codec(). This is more consistent with psycopg2 impl but longer. Moreover, it revealed another issue, please see TODO below:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..330e71732 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -41,6 +41,7 @@ in conjunction with :func:`_sa.craete_engine`::
 import collections
 import decimal
 import itertools
+import json as json_lib
 import re

 from . import json
@@ -123,11 +124,17 @@ class AsyncpgJSON(json.JSON):
     def get_dbapi_type(self, dbapi):
         return dbapi.JSON

+    def result_processor(self, dialect, coltype):
+        return None
+

 class AsyncpgJSONB(json.JSONB):
     def get_dbapi_type(self, dbapi):
         return dbapi.JSONB

+    def result_processor(self, dialect, coltype):
+        return None
+

 class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
     def get_dbapi_type(self, dbapi):
@@ -500,6 +507,7 @@ class AsyncAdapt_asyncpg_connection:

         """

+        # TODO: this causes failure in future set_type_codec() below
         await self._connection.set_type_codec(
             "char",
             schema="pg_catalog",
@@ -508,6 +516,24 @@ class AsyncAdapt_asyncpg_connection:
             format="text",
         )

+    def setup_json_handlers(self, serializer, deserializer):
+        self.await_(
+            self._connection.set_type_codec(
+                "json",
+                encoder=serializer,
+                decoder=deserializer,
+                schema="pg_catalog",
+            )
+        )
+        self.await_(
+            self._connection.set_type_codec(
+                "jsonb",
+                encoder=serializer,
+                decoder=deserializer,
+                schema="pg_catalog",
+            )
+        )
+
     def _handle_exception(self, error):
         if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
             exception_mapping = self.dbapi._asyncpg_error_translate
@@ -796,5 +822,34 @@ class PGDialect_asyncpg(PGDialect):
                 e, self.dbapi.InterfaceError
             ) and "connection is closed" in str(e)

+    def on_connect(self):
+        fns = []
+        if self.isolation_level is not None:
+
+            def on_connect(conn):
+                self.set_isolation_level(conn, self.isolation_level)
+
+            fns.append(on_connect)
+
+        if self.dbapi:
+
+            def on_connect(conn):
+                conn.setup_json_handlers(
+                    lambda x: x,
+                    self._json_deserializer or json_lib.loads,
+                )
+
+            fns.append(on_connect)
+
+        if fns:
+
+            def on_connect(conn):
+                for fn in fns:
+                    fn(conn)
+
+            return on_connect
+        else:
+            return None
+

 dialect = PGDialect_asyncpg

Have a nice day!

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 19 (15 by maintainers)

Commits related to this issue

Most upvoted comments

I’ve created a new patch with only the change to asyncpg. See here https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/2252

@fantix if you could take a look it would be great. I’m not sure if the problem with array of json(b) is due to a bug in how sqlalchemy handles it, or it’s on asyncpg side. The other dialects we test, psycopg and pg8000 seem to work.

oh now I got a working version, but I have no idea why. OH, I think it was double-encoding the JSON, ha, OK. Here’s a working patch

diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..cd5828960 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -42,6 +42,7 @@ import collections
 import decimal
 import itertools
 import re
+import json as _py_json
 
 from . import json
 from .base import _DECIMAL_TYPES
@@ -123,11 +124,17 @@ class AsyncpgJSON(json.JSON):
     def get_dbapi_type(self, dbapi):
         return dbapi.JSON
 
+    def result_processor(self, dialect, coltype):
+        return None
+
 
 class AsyncpgJSONB(json.JSONB):
     def get_dbapi_type(self, dbapi):
         return dbapi.JSONB
 
+    def result_processor(self, dialect, coltype):
+        return None
+
 
 class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
     def get_dbapi_type(self, dbapi):
@@ -467,13 +474,17 @@ class AsyncAdapt_asyncpg_connection:
         "isolation_level",
         "readonly",
         "deferrable",
+        "json_encoder",
+        "json_decoder",
         "_transaction",
         "_started",
     )
 
     await_ = staticmethod(await_only)
 
-    def __init__(self, dbapi, connection):
+    def __init__(
+        self, dbapi, connection
+    ):
         self.dbapi = dbapi
         self._connection = connection
         self.isolation_level = "read_committed"
@@ -500,6 +511,21 @@ class AsyncAdapt_asyncpg_connection:
 
         """
 
+        await self._connection.set_type_codec(
+            "json",
+            encoder=lambda value: value,
+            decoder=_py_json.loads,
+            schema="pg_catalog",
+        )
+        await self._connection.set_type_codec(
+            "jsonb",
+            encoder=lambda value: value,
+            decoder=_py_json.loads,
+            schema="pg_catalog",
+        )
+
+        # if this is first, then it raises the error.  that doesnt
+        # make any sense
         await self._connection.set_type_codec(
             "char",
             schema="pg_catalog",