aiopg: close cannot be used while an asynchronous query is underway

I’m getting this error if I do multiple fetchs. I’m not sure what I’m doing wrong:


Exception ignored in: <function ResultProxy.__init__.<locals>.<lambda> at 0x7f979638fe18>
Traceback (most recent call last):
  File "/home/anthbot/Pyvenv/anthbotv5/lib/python3.5/site-packages/aiopg/sa/result.py", line 235, in <lambda>
    self._weak = weakref.ref(self, lambda wr: cursor.close())
  File "/home/anthbot/Pyvenv/anthbotv5/lib/python3.5/site-packages/aiopg/cursor.py", line 50, in close
    self._impl.close()
psycopg2.ProgrammingError: close cannot be used while an asynchronous query is underway

This is the query:

query = sa.select([role_item]).where(role_item.c.auto_role_id == auto_role_id)
        return await fetch_ite(query)

This is the function:

async def _fetch(query, type=DBType.ONE):
    async with engine.acquire() as conn:
        res = await conn.execute(query)
        if type == DBType.ONE:
            return await res.fetchone()
        elif type == DBType.ALL:
            return await res.fetchall()
        elif type == DBType.ITE:
            return res
        return None

Any help would be appreciated.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 1
  • Comments: 52 (33 by maintainers)

Most upvoted comments

@asvetlov release new version, please

I think tracking cursors is good idea, also we should:

  1. force one cursor per connection in any given time
  2. close or error if previous cursor is not finalized

It works also for me. But I would like to keep the syntax

result = await conn.execute(query)

Is there a specific reason to keep track of these weakrefs? Shouldn’t be garbage collected automatically?

Looks like @barrachri is right, basically ResultProxy for some reason loose reference tries to close cursor, while original query still executing. One option is to refactor code to use context manager https://github.com/aio-libs/aiopg/blob/7d8f58b83b2482de0ef1bfced2d0576e9f8be347/tests/pep492/test_async_await.py#L149-L152 It should explicitly close cursor.

Can anyone test this solution?

I got this exception (close cannot…) with small usual queries like SELECT blabla, bla FROM blabla WHERE blabla=true;

You should close cursor by calling cursor.close() after using fetchone.

@jettify should we bump to 0.14 as it can change behavior?

Make issue on latest git:

import asyncio
from aiopg.sa import create_engine
import datetime
import sqlalchemy as sa
import random

test_table = sa.Table(
    'test_table', sa.MetaData(),
    sa.Column('va1'),
    sa.Column('va2'),
    sa.Column('va3'),
    sa.Column('va4'),
    sa.Column('va5'),
    sa.Column('va6'),
    sa.Column('va7'),
    sa.Column('va8'),
)

create = "CREATE TABLE test_table(" \
         "va1 varchar(100), " \
         "va2 varchar(100), " \
         "va3 varchar(100), " \
         "va4 varchar(100), " \
         "va5 varchar(100), " \
         "va6 varchar(100), " \
         "va7 varchar(100), " \
         "va8 varchar(100))"

select = test_table.select()
insert = test_table.insert().values(
    va1="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va2="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va3="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va4="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va5="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va6="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va7="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va8="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd"
)
drop = "DROP TABLE test_table"

async def sql_insert(engine, insert):
    await asyncio.sleep(random.random())
    async with engine.acquire() as conn:
        rp = await conn.execute(select)
        # res = await rp.fetchall()

    async with engine.acquire() as conn:
        rp = await conn.execute(insert)
        print(rp)
    print(engine.size, engine.freesize)


async def clear(engine):
    async with engine.acquire() as conn:
        try:
            await conn.execute(drop)
        except:
            pass

        await conn.execute(create)


async def aiopg_test(engine):
    for i in range(1, 10001):
        await sql_insert(engine, insert)


async def test():
    engine = await create_engine(
        host="192.168.0.2",
        port=5432,
        database="demo",
        user="user",
        password="pass",
        minsize=1, maxsize=10)
    await clear(engine)

    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))

    await asyncio.sleep(10000)

loop = asyncio.get_event_loop()
loop.run_until_complete(test())
loop.close()

If you uncomment the line “res = await rp.fetchall()” the error does not occur.

ProxyResult.fetchall() closing cursor, may be).