@@ -376,56 +376,120 @@ defmodule BroadwayRabbitMQ.Producer do
376
376
377
377
## Dead-letter Exchanges
378
378
379
- Here's an example of how to use a dead-letter exchange setup with broadway_rabbitmq:
379
+ Dead-letter exchanges are normal RabbitMQ exchanges designated for receiving messages
380
+ that have been rejected elsewhere. You can reference a dead-letter exchange when
381
+ defining a queue by supplying an `"x-dead-letter-exchange"` argument and optionally an
382
+ `"x-dead-letter-routing-key"`. When a queue has a dead-letter exchange defined, then
383
+ failing a message with `Broadway.failed/2` or raising an exception in `Broadway.handle_message/3`
384
+ causes the message to be republished to the exchange named in the `"x-dead-letter-exchange"`
385
+ argument. The message's original routing key is kept unless the `"x-dead-letter-routing-key"`
386
+ argument specifies an override.
387
+
388
+ A bit more care is needed during setup when using dead-letter exchanges because the dead-letter
389
+ exchange must be declared and exist _before_ you attempt to reference it when declaring a queue.
390
+ For this reason, you may need to take additional steps when setting up your RabbitMQ instance
391
+ beyond what is available to you in the `:after_connect` option.
392
+
393
+ You can declare your exchanges and queues in a `Mix` task or before your application starts:
394
+
395
+ {:ok, connection} = AMQP.Connection.open()
396
+ {:ok, channel} = AMQP.Channel.open(connection)
397
+
398
+ # Declare exchanges
399
+ :ok = AMQP.Exchange.declare(channel, "my_exchange", :fanout, durable: true)
400
+ :ok = AMQP.Exchange.declare(channel, "my_exchange.dlx", :fanout, durable: true)
401
+
402
+ # Define and bind queues within the exchange depending on your needs
403
+ {:ok, _} = AMQP.Queue.declare(channel, "my_queue.dlx", durable: true)
404
+ :ok = AMQP.Queue.bind(channel, "my_queue.dlx", "my_exchange.dlx", [])
405
+
406
+ {:ok, _} =
407
+ AMQP.Queue.declare(channel, "my_queue",
408
+ durable: true,
409
+ arguments: [
410
+ {"x-dead-letter-exchange", :longstr, "my_exchange.dlx"},
411
+ {"x-dead-letter-routing-key", :longstr, "my_queue.dlx"}
412
+ ]
413
+ )
414
+
415
+ :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", [])
416
+
417
+
418
+ Once you have your exchanges and queues established, you can start Broadway pipelines
419
+ to consume messages in the queues. For a thorough example, we need one pipeline which
420
+ will fail messages and another pipeline to consume messages from the dead-letter exchange.
421
+
422
+ In this example, `MyPipeline` represents the primary pipeline which may reject/fail
423
+ messages:
380
424
381
425
defmodule MyPipeline do
382
426
use Broadway
383
-
427
+ require Logger
384
428
@queue "my_queue"
385
- @exchange "my_exchange"
386
- @queue_dlx "my_queue.dlx"
387
- @exchange_dlx "my_exchange.dlx"
388
429
389
430
def start_link(_opts) do
390
431
Broadway.start_link(__MODULE__,
391
432
name: __MODULE__,
392
433
producer: [
393
- module: {
394
- BroadwayRabbitMQ.Producer,
395
- on_failure: :reject,
396
- after_connect: &declare_rabbitmq_topology/1,
397
- queue: @queue,
398
- declare: [
399
- durable: true,
400
- arguments: [
401
- {"x-dead-letter-exchange", :longstr, @exchange_dlx},
402
- {"x-dead-letter-routing-key", :longstr, @queue_dlx}
403
- ]
404
- ],
405
- bindings: [{@exchange, []}],
406
- },
434
+ module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue},
407
435
concurrency: 2
408
436
],
409
437
processors: [default: [concurrency: 4]]
410
438
)
411
439
end
412
440
413
- defp declare_rabbitmq_topology(amqp_channel) do
414
- with :ok <- AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true),
415
- :ok <- AMQP.Exchange.declare(amqp_channel, @exchange_dlx, :fanout, durable: true),
416
- {:ok, _} <- AMQP.Queue.declare(amqp_channel, @queue_dlx, durable: true),
417
- :ok <- AMQP.Queue.bind(amqp_channel, @queue_dlx, @exchange_dlx) do
418
- :ok
419
- end
441
+ @impl true
442
+ def handle_message(_processor, message, _context) do
443
+ # Raising errors or returning a "failed" message here sends the message to the
444
+ # dead-letter queue, e.g.
445
+ Logger.debug("Failing message; this should republish to the dead-letter exchange")
446
+
447
+ Broadway.Message.failed(
448
+ message,
449
+ "Failing a message triggers republication to the dead-letter exchange"
450
+ )
451
+ end
452
+ end
453
+
454
+ In order to see the message forwarding in action, we can spin up another pipeline that
455
+ consumes messages from the `my_queue.dlx` queue:
456
+
457
+ defmodule DeadPipeline do
458
+ use Broadway
459
+ require Logger
460
+
461
+ @queue_dlx "my_queue.dlx"
462
+
463
+ def start_link(_opts) do
464
+ Broadway.start_link(__MODULE__,
465
+ name: __MODULE__,
466
+ producer: [
467
+ module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue_dlx},
468
+ concurrency: 2
469
+ ],
470
+ processors: [default: [concurrency: 4]]
471
+ )
420
472
end
421
473
422
474
@impl true
423
475
def handle_message(_processor, message, _context) do
424
- # Raising errors or returning a "failed" message here sends the message to the
425
- # dead-letter queue.
476
+ Logger.debug("Dead letter message received!")
477
+ message
426
478
end
427
479
end
428
480
481
+
482
+
483
+ To test out the dead-letter republishing behavior, try publishing a message into your
484
+ primary exchange and observe that it gets republished and consumed by the queue in the
485
+ dead-letter exchange, e.g.
486
+
487
+ iex> {:ok, connection} = AMQP.Connection.open()
488
+ iex> {:ok, channel} = AMQP.Channel.open(connection)
489
+ iex> AMQP.Basic.publish(channel, "my_exchange", "my_queue", "Am I dead?")
490
+ :ok
491
+ [debug] Failing message; this should republish to the dead-letter exchange
492
+ [debug] Dead letter message received!
429
493
"""
430
494
431
495
use GenStage
0 commit comments