Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

streamBridge anonymous queue declaration failure is not recoverable #364

Open
andrcatros opened this issue Apr 4, 2023 · 0 comments
Open

Comments

@andrcatros
Copy link

This issue is specifically related to exclusive anonymous queues declared for streamBridge functionality. I wasn't sure if this is the right place to ask about this or if I should raise this in spring-cloud-stream instead.

For reference, we use spring-cloud-stream-binder-rabbit version 3.2.5

Due to a specific set of configs in infrastructure, in a very small number of cases client services reset their connection to RabbitMQ while RabbitMQ keeps that connection alive for a further 2 minutes and in those 2 minutes, it does not delete exclusive queues declared by those connections. When this happens, client services reset their RabbitMQ consumers and attempt to auto-declare exclusive anonymous queues for streamBridge with the same queue name. This auto-declaration fails with a RESOURCE_LOCKED error because RabbitMQ has not deleted the old exclusive queue. When we don't have this odd issue of RabbitMQ keeping connections alive, these auto-declarations after consumer restart work just fine.

It takes 2 minutes for our RabbitMQ to remove the old exclusive queue. Our expectation is that when the exclusive queue is finally deleted, the service is able to successfully auto-declare the queue.

However, what happens is that the auto-declaration of these anonymous queues starts to fail with a NOT_FOUND error. As far as I can tell, after the initial queue declaration failure, BlockingQueueConsumer only attempts passive declaration of the queue. However, this passive declaration will never succeed since anonymous queues have to be actively declared by the connection that consumes them.

Any guidance on how to handle these errors and / or set configuration properties so these anonymous queues are always actively declared would be appreciated.

Original resource locked error:

{"@timestamp":"2023-02-15T13:01:14.248Z","@version":"1","message":"Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'publishToAnyQueue-in-0.anonymous.IcwHphPpTAyziT8yNrwqsA' in vhost 'prod-host'. It could be originally declared on another connection or the exclusive property value does not matc..., class-id=50, method-id=10)","logger_name":"org.springframework.amqp.rabbit.connection.CachingConnectionFactory","thread_name":"AMQP Connection 172.30.232.103:30205","level":"ERROR","level_value":40000}

Not found error logs - if we don't restart the service, it keeps attempting to passively declare this queue:
{"@timestamp":"2023-02-15T13:04:23.818Z","@version":"1","message":"Queue declaration failed; retries left=1","logger_name":"org.springframework.amqp.rabbit.listener.BlockingQueueConsumer","thread_name":"publishToAnyQueue-in-0.anonymous.IcwHphPpTAyziT8yNrwqsA-11","level":"WARN","level_value":30000,"stack_trace":"org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[publishToAnyQueue-in-0.anonymous.IcwHphPpTAyziT8yNrwqsA]\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:744)\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:621)\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:608)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1375)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1220)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: java.io.IOException: null\n\tat com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)\n\tat com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)\n\tat com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)\n\tat com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012)\n\tat com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46)\n\tat jdk.internal.reflect.GeneratedMethodAccessor190.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1162)\n\tat jdk.proxy3/jdk.proxy3.$Proxy136.queueDeclarePassive(Unknown Source)\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:722)\n\t... 5 common frames omitted\nCaused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'publishToAnyQueue-in-0.anonymous.IcwHphPpTAyziT8yNrwqsA' in vhost 'prod-host', class-id=50, method-id=10)\n\tat com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)\n\tat com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)\n\tat com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)\n\tat com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)\n\tat com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)\n\t... 13 common frames omitted\nCaused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'publishToAnyQueue-in-0.anonymous.IcwHphPpTAyziT8yNrwqsA' in vhost 'prod-host', class-id=50, method-id=10)\n\tat com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)\n\tat com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)\n\tat com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)\n\tat com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)\n\tat com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)\n\tat com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)\n\tat com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)\n\t... 1 common frames omitted\n"}

# for free to subscribe to this conversation on GitHub. Already have an account? #.
Labels
None yet
Development

No branches or pull requests

1 participant