Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

gh-131466: concurrent.futures.Executor.map: avoid temporarily exceeding buffersize while collecting the next result #131467

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,6 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, fs - done)


def _result_or_cancel(fut, timeout=None):
try:
try:
return fut.result(timeout)
finally:
fut.cancel()
finally:
# Break a reference cycle with the exception in self._exception
del fut
Copy link
Contributor Author

@ebonnal ebonnal Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @graingert!
Context:
As a side effect, this PR may remove the need for _result_or_cancel (introduced in #95169). If fetching the next result raises a TimeoutError, its future will still be in fs and will be properly cancelled by the result_iterator's finally block.

Question:
Do you remember in which scenario the del fut was required? Removing it in the current main does not break any tests 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is if fut.result() raises an exception there's a reference cycle where fut.exception().__traceback__ -> fut.exception()

Probably worth adding a test, a git grep for no_other_refs will find a similar one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Will add the test 🫡

Copy link
Contributor Author

@ebonnal ebonnal Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi #131701 adds the test @graingert @picnixz 🙏🏻



class Future(object):
"""Represents the result of an asynchronous computation."""

Expand Down Expand Up @@ -625,23 +613,35 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
# reverse so that the next (FIFO) future is on the right
fs.reverse()
# careful not to keep references to futures or results
while fs:
# wait for the next result
if timeout is None:
fs[-1].result()
else:
fs[-1].result(end_time - time.monotonic())

# buffer next task
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
# Careful not to keep a reference to the popped future
if timeout is None:
yield _result_or_cancel(fs.pop())
else:
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

# yield the awaited result
yield fs.pop()._result

finally:
# break the reference cycle with fs[-1]._exception's traceback
if fs:
fs.pop().cancel()

for future in fs:
future.cancel()

return result_iterator()

def shutdown(self, wait=True, *, cancel_futures=False):
Expand Down
27 changes: 26 additions & 1 deletion Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import time
import weakref
from concurrent import futures
from operator import add
from contextlib import suppress
from functools import partial
from operator import add, truediv
from test import support
from test.support import Py_GIL_DISABLED

Expand Down Expand Up @@ -143,6 +145,29 @@ def test_map_buffersize_when_buffer_is_full(self):
msg="should have fetched only `buffersize` elements from `ints`.",
)

def test_map_buffersize_when_error(self):
ints = [1, 2, 3, 0, 4, 5, 6]
index_of_zero = ints.index(0)
ints_iter = iter(ints)
buffersize = 2
reciprocal = partial(truediv, 1)
results = []
with suppress(ZeroDivisionError):
for result in self.executor.map(
reciprocal, ints_iter, buffersize=buffersize
):
results.append(result)
self.assertEqual(
len(results),
index_of_zero,
msg="should have mapped until reaching the zero.",
)
self.assertEqual(
len(results) + buffersize + len(list(ints_iter)),
len(ints),
msg="ints should be either processed, or buffered, or not fetched.",
)

def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
Expand Down
Loading