Skip to content

Commit

Permalink
Fixed worker processes getting lost when pruning idle workers (#604)
Browse files Browse the repository at this point in the history
Fixes #603.

Co-authored-by: Alex Grönholm <alex.gronholm@nextday.fi>
  • Loading branch information
AnaelGorfinkel and agronholm authored Aug 30, 2023
1 parent 969f188 commit dea1921
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
4 changes: 4 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Re-added the ``item_type`` argument to ``create_memory_object_stream()`` (but using it
raises a deprecation warning and does nothing with regards to the static types of the
returned streams)
- Fixed processes spawned by ``anyio.to_process()`` being "lost" as unusable to the
process pool when processes that have idled over 5 minutes are pruned at part of the
``to_process.run_sync()`` call, leading to increased memory consumption
(PR by Anael Gorfinkel)

**4.0.0rc1**

Expand Down
12 changes: 6 additions & 6 deletions src/anyio/to_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ async def send_raw_command(pickled_cmd: bytes) -> object:
if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
break

process, idle_since = idle_workers.popleft()
process.kill()
workers.remove(process)
killed_processes.append(process)
process_to_kill, idle_since = idle_workers.popleft()
process_to_kill.kill()
workers.remove(process_to_kill)
killed_processes.append(process_to_kill)

with CancelScope(shield=True):
for process in killed_processes:
await process.aclose()
for killed_process in killed_processes:
await killed_process.aclose()

break

Expand Down
29 changes: 29 additions & 0 deletions tests/test_to_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import time
from functools import partial
from unittest.mock import Mock

import pytest

Expand All @@ -15,6 +16,7 @@
to_process,
wait_all_tasks_blocked,
)
from anyio.abc import Process

pytestmark = pytest.mark.anyio

Expand Down Expand Up @@ -95,3 +97,30 @@ async def test_cancel_during() -> None:

# The previous worker was killed so we should get a new one now
assert await to_process.run_sync(os.getpid) != worker_pid


async def test_exec_while_pruning() -> None:
"""
Test that in the case when one or more idle workers are pruned, the originally
selected idle worker is re-added to the queue of idle workers.
"""

worker_pid1 = await to_process.run_sync(os.getpid)
workers = to_process._process_pool_workers.get()
idle_workers = to_process._process_pool_idle_workers.get()
real_worker = next(iter(workers))

fake_idle_process = Mock(Process)
workers.add(fake_idle_process)
try:
# Add a mock worker process that's guaranteed to be eligible for pruning
idle_workers.appendleft(
(fake_idle_process, -to_process.WORKER_MAX_IDLE_TIME - 1)
)

worker_pid2 = await to_process.run_sync(os.getpid)
assert worker_pid1 == worker_pid2
fake_idle_process.kill.assert_called_once_with()
assert idle_workers[0][0] is real_worker
finally:
workers.discard(fake_idle_process)

0 comments on commit dea1921

Please # to comment.