-
Notifications
You must be signed in to change notification settings - Fork 643
Description
If a Notifier
is created using an asyncio loop, any errors that occurs in the receive callbacks will not call the Listener._on_error()
callback. It does when not running asyncio.
Looking at Notifier._rx_thread()
: When a loop is present it will call Notifier._on_message_received()
using the self._loop.call_soon_threadsafe()
call. However the except Exception as exc:
in line 127 will never be reached, because any errors in the call_soon_threadsafe()
callback does not return the exceptions for the function it calls. This in turn implies that when asyncio is enabled, the Listener._on_error()
callback won't be called either which is unexpected.
Lines 111 to 137 in 654a02a
def _rx_thread(self, bus: BusABC) -> None: | |
# determine message handling callable early, not inside while loop | |
if self._loop: | |
handle_message: Callable[[Message], Any] = functools.partial( | |
self._loop.call_soon_threadsafe, | |
self._on_message_received, # type: ignore[arg-type] | |
) | |
else: | |
handle_message = self._on_message_received | |
while self._running: | |
try: | |
if msg := bus.recv(self.timeout): | |
with self._lock: | |
handle_message(msg) | |
except Exception as exc: # pylint: disable=broad-except | |
self.exception = exc | |
if self._loop is not None: | |
self._loop.call_soon_threadsafe(self._on_error, exc) | |
# Raise anyway | |
raise | |
elif not self._on_error(exc): | |
# If it was not handled, raise the exception here | |
raise | |
else: | |
# It was handled, so only log it | |
logger.debug("suppressed exception: %s", exc) |
The fix would be to either encapsulate Notifier._on_message_received()
in an except block and call the _on_error()
call back from it. Another solution is to make it call an extra handler when running async:
def _rx_thread(self, bus: BusABC) -> None:
# determine message handling callable early, not inside while loop
if self._loop:
def rx_handler(msg: Message) -> None:
try:
self._on_message_received(msg)
except Exception as exc:
if not self._on_error(exc):
raise
else:
# It was handled, so only log it
logger.debug("suppressed exception: %s", exc)
handle_message: Callable[[Message], Any] = functools.partial(
self._loop.call_soon_threadsafe,
rx_handler, # type: ignore[arg-type]
)
else:
handle_message = self._on_message_received
...
Probably related to #1865