Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Fix RabbitMQ user-defined queue parameters #76

Merged
merged 3 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ Every entry has a category for which we use the following visual abbreviations:

<!-- ## Unreleased -->

## [2020.11.30]

- 🐞 The RabbitMQ backbone plugin ignored user-defined queue parameters, such as
`durable` or `lazy` queues. It now respects such parameters again.
[#76](https://github.com/tenzir/threatbus/pull/76)

## [2020.11.26]

- 🐞 The Zeek app did not perform an outbound connection to Threat Bus in
Expand Down Expand Up @@ -70,3 +76,4 @@ Every entry has a category for which we use the following visual abbreviations:

[2020.10.29]: https://github.com/tenzir/threatbus/releases/tag/2020.10.29
[2020.11.26]: https://github.com/tenzir/threatbus/releases/tag/2020.11.26
[2020.11.30]: https://github.com/tenzir/threatbus/releases/tag/2020.11.30
2 changes: 1 addition & 1 deletion plugins/backbones/threatbus_rabbitmq/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@
packages=["threatbus_rabbitmq"],
python_requires=">=3.7",
url="https://github.com/tenzir/threatbus",
version="2020.11.26",
version="2020.11.30",
)
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,27 @@ def on_exchange_declare_ok(self, _frame, userdata: Tuple[str, str]):
@param _frame Unused pika response
@param userdata A tuple of exchange_name and queue_name. The exchange with the given name was created, hence this method is invoked. The queue name should be created.
"""
if type(userdata) is not tuple or len(userdata) != 2:
self.logger.warn(
f"Aborting with unexpected `userdata` after exchange was declared: {userdata}"
)
return
cb = partial(self.on_queue_declare_ok, userdata=userdata)
self._channel.queue_declare(queue=userdata[1], callback=cb)
queue_kwargs = self.queue_kwargs.copy()
queue_kwargs["callback"] = cb
self._channel.queue_declare(queue=userdata[1], **queue_kwargs)

def on_queue_declare_ok(self, _frame, userdata: Tuple[str, str]):
"""
Inspects the given userdata (exchange_name, queue_name) and binds the queue to the exchange.
@param _frame Unused pika response
@param userdata A tuple of exchange_name and queue_name. Both have been created, hence this method is invoked.
"""
if type(userdata) is not tuple or len(userdata) != 2:
self.logger.warn(
f"Aborting with unexpected `userdata` after queue was declared: {userdata}"
)
return
cb = partial(self.on_queue_bind_ok, userdata=userdata[1])
self._channel.queue_bind(exchange=userdata[0], queue=userdata[1], callback=cb)

Expand Down