Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

the _confirm_selected of Channel shouble reset to be False when channel reopen #285

Open
cnwenf opened this issue Aug 12, 2019 · 1 comment

Comments

@cnwenf
Copy link

cnwenf commented Aug 12, 2019

If the channel is reopened (in the event of a channel error, such as exchange not found), _confirm_selected should be set to False again, otherwise the client will continue to wait for the ACK, but the server will not reply.

@m6312
Copy link

m6312 commented Mar 18, 2022

I encountered this issue with amqp==5.0.9, celery==5.2.3, kombu==5.2.4.

It can be recreated with the following run.py script with the celeryconfig.py file. The script (successfully) publishes a message, then (unsuccessfully) tries to purge a non-existent queue, then publishes a second message. The publishing of the second message hangs forever.

# celeryconfig.py
BROKER_URL = 'amqp://guest:guest@localhost:5672'
BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
RESULT_BACKEND = None
RESULT_PERSISTENT = False
# run.py
from amqp import NotFound
from celery import Celery, current_app

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def dummy_task():
    pass

def purge_queue(queue_name):
    with current_app.connection_or_acquire() as connection:
        channel = connection.default_channel
        channel.queue_purge(queue_name)

def main():
    dummy_task.apply_async()
    try:
        purge_queue('non-existent-queue')
    except NotFound:
        pass
    print('Before 2nd apply_sync', flush=True)
    dummy_task.apply_async()
    print('After 2nd apply_sync', flush=True)

if __name__ == '__main__':
    main()

It seems like the problem is:

  • amqp.channel.Channel.basic_publish_confirm calls self.confirm_select() when the first message is posted, enabling publisher confirms on the channel.
  • Purging a non-existent queue causes the channel to be closed and re-opened (amqp.channel.Channel._on_close is called at some point). The channel is re-opened without publisher confirms enabled.
  • apply_async/basic_publish_confirm still use the same amqp.channel.Channel object, which has self._confirm_selected = True, even though the channel itself doesn't have publisher confirms enabled.
  • basic_publish_confirm waits forever for a publish confirmation that will never arrive.

This can be worked around by adding a timeout argument to the hanging apply_async call. This stops the apply_async call from hanging forever, but does end up publishing the message twice. This doesn't feel like the real fix though. I think the real fix might be to reset self._confirm_selected back to False after a channel has been re-opened. I'm not super familiar with this codebase though so I might be missing something.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants