Skip to content

Commit

Permalink
Limit device concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
dmulcahey committed Sep 28, 2024
1 parent 563dcfe commit 61a6a25
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions zigpy/device.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import asyncio
import contextlib
from datetime import datetime, timezone
import enum
import itertools
import logging
import sys
import time
import typing
import warnings

Expand Down Expand Up @@ -93,10 +95,34 @@ def __init__(self, application: ControllerApplication, ieee: t.EUI64, nwk: t.NWK
self._send_sequence: int = 0

self._packet_debouncer = zigpy.datastructures.Debouncer()
self._concurrent_requests_semaphore = zigpy.util.DynamicBoundedSemaphore(1)

# Retained for backwards compatibility, will be removed in a future release
self.status = Status.NEW

@contextlib.asynccontextmanager
async def _limit_concurrency(self):
"""Async context manager to limit device request concurrency."""

start_time = time.monotonic()
was_locked = self._concurrent_requests_semaphore.locked()

if was_locked:
LOGGER.debug(
"Device concurrency (%s) reached, delaying request (%s enqueued)",
self._concurrent_requests_semaphore.max_value,
self._concurrent_requests_semaphore.num_waiting,
)

async with self._concurrent_requests_semaphore:
if was_locked:
LOGGER.debug(
"Previously delayed request is now running, delayed by %0.2fs",
time.monotonic() - start_time,
)

yield

def get_sequence(self) -> t.uint8_t:
self._send_sequence = (self._send_sequence + 1) % 256
return self._send_sequence
Expand Down Expand Up @@ -330,16 +356,17 @@ async def request(
extended_timeout=extended_timeout,
)

if not expect_reply:
await send_request()
return None
async with self._limit_concurrency():
if not expect_reply:
await send_request()
return None

# Only create a pending request if we are expecting a reply
with self._pending.new(sequence) as req:
await send_request()
# Only create a pending request if we are expecting a reply
with self._pending.new(sequence) as req:
await send_request()

async with asyncio_timeout(timeout):
return await req.result
async with asyncio_timeout(timeout):
return await req.result

def handle_message(
self,
Expand Down

0 comments on commit 61a6a25

Please # to comment.