Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are:
- The deprecated
connect
anddisconnect
methods have been removed - The deprecated
filtered_messages
andunfiltered_messages
methods have been removed - User-managed queues for incoming messages have been replaced with a single client-wide queue
- Some arguments to the
Client
have been renamed or removed
The deprecated connect
and disconnect
methods have been removed. The best way to connect and disconnect from the broker is through the client's context manager:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.publish("temperature/outside", payload=28.4)
asyncio.run(main())
If your use case does not allow you to use a context manager, you can use the client’s __aenter__
and __aexit__
methods almost interchangeably in place of the removed connect
and disconnect
methods.
The __aenter__
and __aexit__
methods are designed to be called by the async with
statement when the execution enters and exits the context manager. However, we can also execute them manually:
import asyncio
import aiomqtt
async def main():
client = aiomqtt.Client("test.mosquitto.org")
await client.__aenter__()
try:
await client.publish("temperature/outside", payload=28.4)
finally:
await client.__aexit__(None, None, None)
asyncio.run(main())
__aenter__
is equivalent to connect
. __aexit__
is equivalent to disconnect
except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly.
`__aexit__` expects three arguments: `exc_type`, `exc`, and `tb`. These arguments describe the exception that caused the context manager to exit, if any. You can pass `None` to all of these arguments in a manual call to `__aexit__`.
The filtered_messages
, unfiltered_messages
, and messages
methods have been removed and replaced with a single client-wide message queue.
For previous versions, a minimal example of printing all messages (unfiltered) looked like this:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async with client.messages() as messages:
async for message in messages:
print(message.payload)
asyncio.run(main())
We now no longer need the line async with client.messages() as messages:
, but instead access the message generator directly with client.messages
:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async for message in client.messages:
print(message.payload)
asyncio.run(main())
To handle messages from different topics differently, we can use Topic.matches()
:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
async for message in client.messages:
if message.topic.matches("humidity/inside"):
print(f"[humidity/inside] {message.payload}")
if message.topic.matches("+/outside"):
print(f"[+/outside] {message.payload}")
if message.topic.matches("temperature/#"):
print(f"[temperature/#] {message.payload}")
asyncio.run(main())
In our example, messages to `temperature/outside` are handled twice!
The filtered_messages
, unfiltered_messages
, and messages
methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via Client.messages
.
If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a "distributor" on top:
import asyncio
import aiomqtt
async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")
async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")
temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()
async def distributor(client):
# Sort messages into the appropriate queues
async for message in client.messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())
asyncio.run(main())
- The
queue_class
andqueue_maxsize
arguments tofiltered_messages
,unfiltered_messages
, andmessages
have been moved to theClient
and have been renamed toqueue_type
andmax_queued_incoming_messages
- The
max_queued_messages
client argument has been renamed tomax_queued_outgoing_messages
- The deprecated
message_retry_set
client argument has been removed