Skip to content

[Question] Asynchronous NOTIFY listener #567

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
tomkp75 opened this issue May 5, 2020 · 12 comments · Fixed by #802
Closed

[Question] Asynchronous NOTIFY listener #567

tomkp75 opened this issue May 5, 2020 · 12 comments · Fixed by #802

Comments

@tomkp75
Copy link

tomkp75 commented May 5, 2020

Hello,

How would one use an async function as the callback of conn.add_listener? I would like to relay specific notifications to API calls using aiohttp.ClientSession.

Thanks,

Tom

@tomkp75 tomkp75 changed the title Asynchronous NOTIFY listener [Question] Asynchronous NOTIFY listener May 5, 2020
@elprans
Copy link
Member

elprans commented May 5, 2020

This should do the trick:

conn.add_listener(
    'CHANNEL', 
    functools.partial(asyncio.ensure_future, async_callback()),
)

@tomkp75
Copy link
Author

tomkp75 commented May 5, 2020

Thanks. Would it be an idea for add_listener to take in an awaitable directly?

@charterchap
Copy link

charterchap commented May 14, 2020

I ended up with something like this:

async def listener(self):
    async def process(*args):
        print(args)
        # await self.q.put(args)

    if not self.conn_listen:
        self.conn_listen = await asyncpg.connect(dsn)

    await self.conn_listen.add_listener('my_notify_channel',
        lambda *args: loop.create_task(process(args))
    )

def main():
    listener_task=loop.create_task(listener())
    loop.run_forever()

@adriangabura
Copy link

I ended up with something like this:

async def listener(self):
    async def process(*args):
        print(args)
        # await self.q.put(args)

    if not self.conn_listen:
        self.conn_listen = await asyncpg.connect(dsn)

    await self.conn_listen.add_listener('my_notify_channel',
        lambda *args: loop.create_task(process(args))
    )

def main():
    listener_task=loop.create_task(listener())
    loop.run_forever()

Thank you! I needed this for FastAPI websockets. Your solution is very elegant!

@theo-brown
Copy link

I second this, it seems a bit weird that the asynchronous Postgres library can't deal with asynchronous callback functions!

@theo-brown
Copy link

theo-brown commented Aug 9, 2021

Can you perform asynchronous database operations within the callback function?

My attempt to do it doesn't work:

def __init__(self):
    ...
    self.conn.add_listener('channel', self.on_notify)
    ...
 
def on_notify(self, *args):
        asyncio.create_task(self.async_on_notify(*args))

async def async_on_notify(self, *args):
       await self.conn.execute(...)

It appears to block at the self.conn.execute line

@elprans
Copy link
Member

elprans commented Aug 9, 2021

It appears to block at the self.conn.execute line

I'm not sure what you mean by "block" here. If you mean "coroutine pauses execution", then it's what await does by definition. If you wanted to create a background task in async_on_notify and do other things in that handler, then you have to use asyncio.create_task() or asyncio.gather or any other concurrency primitive in asyncio.

theo-brown added a commit to theo-brown/asyncpg that referenced this issue Aug 9, 2021
elprans added a commit that referenced this issue Aug 9, 2021
The `Connection.add_listener()`, `Connection.add_log_listener()` and
`Connection.add_termination_listener()` now allow coroutine functions as
callbacks.

Fixes: #567.
theo-brown added a commit to theo-brown/asyncpg that referenced this issue Aug 9, 2021
Signed-off-by: Theo Brown <7982453+theo-brown@users.noreply.github.com>
theo-brown added a commit to theo-brown/asyncpg that referenced this issue Aug 9, 2021
Signed-off-by: Theo Brown <7982453+theo-brown@users.noreply.github.com>
@fantix
Copy link
Member

fantix commented Aug 9, 2021

Can you perform asynchronous database operations within the callback function?

My attempt to do it doesn't work:

Yes you can, but you should avoid concurrently using the same connection from two Tasks, by using your own asyncio synchronization mechanism (asyncio.Lock for example, but not recommended because it needs to be used with extra caution) or simply borrowing another connection in the callback task.

@theo-brown
Copy link

theo-brown commented Aug 9, 2021

I'm not sure what you mean by "block" here. If you mean "coroutine pauses execution", then it's what await does by definition. If you wanted to create a background task...

Yes, I did want it to pause execution. I want to successively (not concurrently) perform several database operations. However, the database operations never successfully execute, which is what has confused me.
I've changed my code to use a connection pool rather than a single connection, but it still doesn't correctly execute any database interactions in the callback function.

def async_init(self):
    ...
    self.pool = await asyncpg.create_pool(...)
    self.listener_connection = await self.pool.acquire()
    await self.listener_connection.add_listener('channel', self.on_notify)
    ...
 
def on_notify(self, *args):
    asyncio.create_task(self.async_on_notify(*args))

async def async_on_notify(self, *args):
   record = await self.pool.fetch(query)
   print(record.get('value'))

This does not print the value of the record - it doesn't print anything.

@elprans
Copy link
Member

elprans commented Aug 9, 2021

Make sure the listening connection actually lives long enough. The code you provided implies it gets garbage collected after __init__ exits, which would close the connection along with its listeners.

elprans added a commit that referenced this issue Aug 9, 2021
The `Connection.add_listener()`, `Connection.add_log_listener()` and
`Connection.add_termination_listener()` now allow coroutine functions as
callbacks.

Fixes: #567.
@theo-brown
Copy link

Ooh good shout! However I'm pretty sure the listener still exists, because if I replace the code in async_on_notify with a print statement, the print statement will still run

elprans added a commit that referenced this issue Aug 10, 2021
The `Connection.add_listener()`, `Connection.add_log_listener()` and
`Connection.add_termination_listener()` now allow coroutine functions as
callbacks.

Fixes: #567.
@theo-brown
Copy link

Make sure the listening connection actually lives long enough. The code you provided implies it gets garbage collected after __init__ exits, which would close the connection along with its listeners.

How would I prevent it from being garbage collected? What's the correct way of initialising a listener that exists for the whole program?

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants