Skip to content

Commit 6fae7df

Browse files
committed
Readme updated.
1 parent e177cf8 commit 6fae7df

File tree

4 files changed

+114
-32
lines changed

4 files changed

+114
-32
lines changed

README.md

+57-32
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,48 @@ Launch the workers:
5555
Then run the main code:
5656
`python3 broker.py`
5757

58-
## PubSubBroker and ListQueueBroker configuration
59-
60-
We have two brokers with similar interfaces, but with different logic.
61-
The PubSubBroker uses redis' pubsub mechanism and is very powerful,
62-
but it executes every task on all workers, because PUBSUB broadcasts message
63-
to all subscribers.
64-
65-
If you want your messages to be processed only once, please use ListQueueBroker.
66-
It uses redis' [LPUSH](https://redis.io/commands/lpush/) and [BRPOP](https://redis.io/commands/brpop/) commands to deal with messages.
67-
68-
Brokers parameters:
69-
* `url` - url to redis.
70-
* `task_id_generator` - custom task_id genertaor.
71-
* `result_backend` - custom result backend.
72-
* `queue_name` - name of the pub/sub channel in redis.
73-
* `max_connection_pool_size` - maximum number of connections in pool.
74-
* Any other keyword arguments are passed to `redis.asyncio.BlockingConnectionPool`.
75-
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
76-
(or set it to `None` to try reconnects indefinitely).
58+
59+
## Brokers
60+
61+
This package contains 6 broker implementations.
62+
3 broker types:
63+
* PubSub broker
64+
* ListQueue broker
65+
* Stream broker
66+
67+
Each of type is implemented for each redis architecture:
68+
* Single node
69+
* Cluster
70+
* Sentinel
71+
72+
Here's a small breakdown of how they differ from eachother.
73+
74+
75+
### PubSub
76+
77+
By default on old redis versions PUBSUB was the way of making redis into a queue.
78+
But using PUBSUB means that all messages delivered to all subscribed consumers.
79+
80+
> [!WARNING]
81+
> This broker doesn't support acknowledgements. If during message processing
82+
> Worker was suddenly killed the message is going to be lost.
83+
84+
### ListQueue
85+
86+
This broker creates a list of messages at some key. Adding new tasks will be done
87+
by appending them from the left side using `lpush`, and taking them from the right side using `brpop`.
88+
89+
> [!WARNING]
90+
> This broker doesn't support acknowledgements. If during message processing
91+
> Worker was suddenly killed the message is going to be lost.
92+
93+
### Stream
94+
95+
Stream brokers use redis [stream type](https://redis.io/docs/latest/develop/data-types/streams/) to store and fetch messages.
96+
97+
> [!TIP]
98+
> This broker **supports** acknowledgements and therefore is fine to use in cases when data durability is
99+
> required.
77100
78101
## RedisAsyncResultBackend configuration
79102

@@ -85,18 +108,20 @@ RedisAsyncResultBackend parameters:
85108
* Any other keyword arguments are passed to `redis.asyncio.BlockingConnectionPool`.
86109
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
87110
(or set it to `None` to try reconnects indefinitely).
88-
> IMPORTANT: **It is highly recommended to use expire time ​​in RedisAsyncResultBackend**
111+
112+
> [!WARNING]
113+
> **It is highly recommended to use expire time in RedisAsyncResultBackend**
89114
> If you want to add expiration, either `result_ex_time` or `result_px_time` must be set.
90-
>```python
91-
># First variant
92-
>redis_async_result = RedisAsyncResultBackend(
93-
> redis_url="redis://localhost:6379",
94-
> result_ex_time=1000,
95-
>)
115+
> ```python
116+
> # First variant
117+
> redis_async_result = RedisAsyncResultBackend(
118+
> redis_url="redis://localhost:6379",
119+
> result_ex_time=1000,
120+
> )
96121
>
97-
># Second variant
98-
>redis_async_result = RedisAsyncResultBackend(
99-
> redis_url="redis://localhost:6379",
100-
> result_px_time=1000000,
101-
>)
102-
>```
122+
> # Second variant
123+
> redis_async_result = RedisAsyncResultBackend(
124+
> redis_url="redis://localhost:6379",
125+
> result_px_time=1000000,
126+
> )
127+
> ```

taskiq_redis/redis_broker.py

+19
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,25 @@ def __init__(
168168
additional_streams: Optional[Dict[str, str]] = None,
169169
**connection_kwargs: Any,
170170
) -> None:
171+
"""
172+
Constructs a new broker that uses streams.
173+
174+
:param url: url to redis.
175+
:param queue_name: name for a key with stream in redis.
176+
:param max_connection_pool_size: maximum number of connections in pool.
177+
Each worker opens its own connection. Therefore this value has to be
178+
at least number of workers + 1.
179+
:param consumer_group_name: name for a consumer group.
180+
Redis will keep track of acked messages for this group.
181+
:param consumer_name: name for a consumer. By default it is a random uuid.
182+
:param consumer_id: id for a consumer. ID of a message to start reading from.
183+
$ means start from the latest message.
184+
:param mkstream: create stream if it does not exist.
185+
:param xread_block: block time in ms for xreadgroup.
186+
Better to set it to a bigger value, to avoid unnecessary calls.
187+
:param additional_streams: additional streams to read from.
188+
Each key is a stream name, value is a consumer id.
189+
"""
171190
super().__init__(
172191
url,
173192
task_id_generator=None,

taskiq_redis/redis_cluster_broker.py

+19
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ def __init__(
9595
additional_streams: Optional[Dict[str, str]] = None,
9696
**connection_kwargs: Any,
9797
) -> None:
98+
"""
99+
Constructs a new broker that uses streams.
100+
101+
:param url: url to redis.
102+
:param queue_name: name for a key with stream in redis.
103+
:param max_connection_pool_size: maximum number of connections in pool.
104+
Each worker opens its own connection. Therefore this value has to be
105+
at least number of workers + 1.
106+
:param consumer_group_name: name for a consumer group.
107+
Redis will keep track of acked messages for this group.
108+
:param consumer_name: name for a consumer. By default it is a random uuid.
109+
:param consumer_id: id for a consumer. ID of a message to start reading from.
110+
$ means start from the latest message.
111+
:param mkstream: create stream if it does not exist.
112+
:param xread_block: block time in ms for xreadgroup.
113+
Better to set it to a bigger value, to avoid unnecessary calls.
114+
:param additional_streams: additional streams to read from.
115+
Each key is a stream name, value is a consumer id.
116+
"""
98117
super().__init__(
99118
url,
100119
queue_name=queue_name,

taskiq_redis/redis_sentinel_broker.py

+19
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,25 @@ def __init__(
160160
additional_streams: Optional[Dict[str, str]] = None,
161161
**connection_kwargs: Any,
162162
) -> None:
163+
"""
164+
Constructs a new broker that uses streams.
165+
166+
:param sentinels: list of nodes to connect to.
167+
:param queue_name: name for a key with stream in redis.
168+
:param max_connection_pool_size: maximum number of connections in pool.
169+
Each worker opens its own connection. Therefore this value has to be
170+
at least number of workers + 1.
171+
:param consumer_group_name: name for a consumer group.
172+
Redis will keep track of acked messages for this group.
173+
:param consumer_name: name for a consumer. By default it is a random uuid.
174+
:param consumer_id: id for a consumer. ID of a message to start reading from.
175+
$ means start from the latest message.
176+
:param mkstream: create stream if it does not exist.
177+
:param xread_block: block time in ms for xreadgroup.
178+
Better to set it to a bigger value, to avoid unnecessary calls.
179+
:param additional_streams: additional streams to read from.
180+
Each key is a stream name, value is a consumer id.
181+
"""
163182
super().__init__(
164183
sentinels=sentinels,
165184
master_name=master_name,

0 commit comments

Comments
 (0)