From 233ccc16ad0345a1c9e7f160febea6417e1f50e5 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 19 Mar 2025 15:09:35 +0000 Subject: [PATCH 01/17] `Executor.map`: avoid temporarily exceeding the `buffersize` while collecting the next result --- Lib/concurrent/futures/_base.py | 11 +++++---- Lib/test/test_concurrent_futures/executor.py | 26 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d5ba39e3d71774..800f1852904d83 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -628,17 +628,18 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + result = _result_or_cancel(fs.pop()) + else: + result = _result_or_cancel(fs.pop(), end_time - time.monotonic()) if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) - # Careful not to keep a reference to the popped future - if timeout is None: - yield _result_or_cancel(fs.pop()) - else: - yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) + yield result finally: for future in fs: future.cancel() diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index d88c34d1c8c8e4..ac91ec791b33ed 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -1,9 +1,12 @@ import itertools +import operator import threading import time import weakref from concurrent import futures from operator import add +from functools import partial +from contextlib import suppress from test import support from test.support import Py_GIL_DISABLED @@ -143,6 +146,29 @@ def test_map_buffersize_when_buffer_is_full(self): msg="should have fetched only `buffersize` elements from `ints`.", ) + def test_map_buffersize_when_error(self): + ints = [1, 2, 3, 0, 4, 5, 6] + index_of_zero = ints.index(0) + ints_iter = iter(ints) + buffersize = 2 + reciprocal = partial(operator.truediv, 1) + results = [] + with suppress(ZeroDivisionError): + for result in self.executor.map( + reciprocal, ints_iter, buffersize=buffersize + ): + results.append(result) + self.assertEqual( + len(results), + index_of_zero, + msg="should have mapped until reaching the zero.", + ) + self.assertEqual( + len(results) + buffersize + len(list(ints_iter)), + len(ints), + msg="ints should be either processed, or buffered, or not fetched.", + ) + def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a # sentinel in the call queue blocks (the queue is full while processes From 72d7028ac4202dc22549737e4203e1707fd960eb Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 19 Mar 2025 18:45:50 +0000 Subject: [PATCH 02/17] avoid keeping a ref to result for test_thread_pool.ThreadPoolExecutorTest.test_free_reference --- Lib/concurrent/futures/_base.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 800f1852904d83..403bae31804ece 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -621,6 +621,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # collected independently of the result_iterator closure. executor_weakref = weakref.ref(self) + result = collections.deque(maxlen=1) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): @@ -630,16 +631,18 @@ def result_iterator(): while fs: # Careful not to keep a reference to the popped future if timeout is None: - result = _result_or_cancel(fs.pop()) + result.append(_result_or_cancel(fs.pop())) else: - result = _result_or_cancel(fs.pop(), end_time - time.monotonic()) + result.append( + _result_or_cancel(fs.pop(), end_time - time.monotonic()) + ) if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) - yield result + yield result.pop() finally: for future in fs: future.cancel() From 2a30697d9749976f7018c4278f844786005298fe Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 19 Mar 2025 18:50:53 +0000 Subject: [PATCH 03/17] update comment about not keeping references to popped future/result --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 403bae31804ece..1fdabe7a3cb865 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -629,7 +629,7 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs: - # Careful not to keep a reference to the popped future + # Careful not to keep a reference to the popped future or its result if timeout is None: result.append(_result_or_cancel(fs.pop())) else: From ab4182befdf3786fd3b011a32373a102786d972f Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 19 Mar 2025 19:47:28 +0000 Subject: [PATCH 04/17] introduce `current_timeout` variable --- Lib/concurrent/futures/_base.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 1fdabe7a3cb865..660f8e57ad4c40 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -628,20 +628,21 @@ def result_iterator(): try: # reverse to keep finishing order fs.reverse() + current_timeout = timeout while fs: # Careful not to keep a reference to the popped future or its result - if timeout is None: - result.append(_result_or_cancel(fs.pop())) - else: - result.append( - _result_or_cancel(fs.pop(), end_time - time.monotonic()) - ) + if current_timeout is not None: + current_timeout = end_time - time.monotonic() + + result.append(_result_or_cancel(fs.pop(), current_timeout)) + if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) + yield result.pop() finally: for future in fs: From 7a1ae463ddb50f622dfcc7f5b7c97c6881e0007f Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 19 Mar 2025 20:55:45 +0000 Subject: [PATCH 05/17] comment on the necessity of the result container --- Lib/concurrent/futures/_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 660f8e57ad4c40..cbfd8e9029f216 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -621,6 +621,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # collected independently of the result_iterator closure. executor_weakref = weakref.ref(self) + # used by the result_iterator to avoid keeping a reference to the result result = collections.deque(maxlen=1) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. From 268927d965e1a369e7f49d0d56253dd9ae89717b Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 20 Mar 2025 11:07:08 +0000 Subject: [PATCH 06/17] avoid container --- Lib/concurrent/futures/_base.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index cbfd8e9029f216..79d43585d4f17a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -621,8 +621,6 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # collected independently of the result_iterator closure. executor_weakref = weakref.ref(self) - # used by the result_iterator to avoid keeping a reference to the result - result = collections.deque(maxlen=1) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): @@ -635,8 +633,10 @@ def result_iterator(): if current_timeout is not None: current_timeout = end_time - time.monotonic() - result.append(_result_or_cancel(fs.pop(), current_timeout)) + # wait for the next result + _result_or_cancel(fs[-1], current_timeout) + # buffer next task if ( buffersize and (executor := executor_weakref()) @@ -644,7 +644,8 @@ def result_iterator(): ): fs.appendleft(executor.submit(fn, *args)) - yield result.pop() + # yield the awaited result + yield fs.pop().result() finally: for future in fs: future.cancel() From de09affe5d6e00d6e4ab6c3b0d7bdd5d2c4be1f2 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 20 Mar 2025 11:09:32 +0000 Subject: [PATCH 07/17] remove current_timeout usage --- Lib/concurrent/futures/_base.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 79d43585d4f17a..12774faccd845e 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -627,14 +627,13 @@ def result_iterator(): try: # reverse to keep finishing order fs.reverse() - current_timeout = timeout + # Careful not to keep a reference to the popped future or its result while fs: - # Careful not to keep a reference to the popped future or its result - if current_timeout is not None: - current_timeout = end_time - time.monotonic() - # wait for the next result - _result_or_cancel(fs[-1], current_timeout) + if timeout is None: + _result_or_cancel(fs[-1]) + else: + _result_or_cancel(fs[-1], end_time - time.monotonic()) # buffer next task if ( From 1814bfed3ccdd5cdf9ffdae3beb479204e2e4fe2 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 20 Mar 2025 11:28:46 +0000 Subject: [PATCH 08/17] fix comments format --- Lib/concurrent/futures/_base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 12774faccd845e..52c413b974a0b4 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -629,13 +629,13 @@ def result_iterator(): fs.reverse() # Careful not to keep a reference to the popped future or its result while fs: - # wait for the next result + # Wait for the next result if timeout is None: _result_or_cancel(fs[-1]) else: _result_or_cancel(fs[-1], end_time - time.monotonic()) - # buffer next task + # Buffer next task if ( buffersize and (executor := executor_weakref()) @@ -643,7 +643,7 @@ def result_iterator(): ): fs.appendleft(executor.submit(fn, *args)) - # yield the awaited result + # Yield the awaited result yield fs.pop().result() finally: for future in fs: From 720632162a80b99723f995adbb56c8511c096f41 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 20 Mar 2025 14:25:10 +0000 Subject: [PATCH 09/17] rephrase comments --- Lib/concurrent/futures/_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 52c413b974a0b4..a5dcb81385f071 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -625,9 +625,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # before the first iterator value is required. def result_iterator(): try: - # reverse to keep finishing order + # Reverse so that the next (FIFO) future is on the right fs.reverse() - # Careful not to keep a reference to the popped future or its result + # Careful not to keep references to futures or results while fs: # Wait for the next result if timeout is None: From 162add1441609cb53c4b0b33aa9ce1baf2f6131b Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 23 Mar 2025 00:21:42 +0000 Subject: [PATCH 10/17] order imports --- Lib/test/test_concurrent_futures/executor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index ac91ec791b33ed..2f46d561d0128f 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -1,12 +1,11 @@ import itertools -import operator import threading import time import weakref from concurrent import futures -from operator import add -from functools import partial from contextlib import suppress +from functools import partial +from operator import add, truediv from test import support from test.support import Py_GIL_DISABLED @@ -151,7 +150,7 @@ def test_map_buffersize_when_error(self): index_of_zero = ints.index(0) ints_iter = iter(ints) buffersize = 2 - reciprocal = partial(operator.truediv, 1) + reciprocal = partial(truediv, 1) results = [] with suppress(ZeroDivisionError): for result in self.executor.map( From f2c5fd0d25feda6dff5d8d6c93ed861288152759 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 23 Mar 2025 13:20:11 +0000 Subject: [PATCH 11/17] format comments --- Lib/concurrent/futures/_base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index a5dcb81385f071..4c9a69f24e098b 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -625,17 +625,17 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # before the first iterator value is required. def result_iterator(): try: - # Reverse so that the next (FIFO) future is on the right + # reverse so that the next (FIFO) future is on the right fs.reverse() - # Careful not to keep references to futures or results + # careful not to keep references to futures or results while fs: - # Wait for the next result + # wait for the next result if timeout is None: _result_or_cancel(fs[-1]) else: _result_or_cancel(fs[-1], end_time - time.monotonic()) - # Buffer next task + # buffer next task if ( buffersize and (executor := executor_weakref()) @@ -643,7 +643,7 @@ def result_iterator(): ): fs.appendleft(executor.submit(fn, *args)) - # Yield the awaited result + # yield the awaited result yield fs.pop().result() finally: for future in fs: From 9474769bdad5765c68be7a003acbaca3e0465aab Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 23 Mar 2025 13:28:33 +0000 Subject: [PATCH 12/17] remove _result_or_cancel --- Lib/concurrent/futures/_base.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 4c9a69f24e098b..0fb02e3e14d37a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -304,18 +304,6 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, fs - done) - -def _result_or_cancel(fut, timeout=None): - try: - try: - return fut.result(timeout) - finally: - fut.cancel() - finally: - # Break a reference cycle with the exception in self._exception - del fut - - class Future(object): """Represents the result of an asynchronous computation.""" @@ -631,9 +619,9 @@ def result_iterator(): while fs: # wait for the next result if timeout is None: - _result_or_cancel(fs[-1]) + fs[-1].result() else: - _result_or_cancel(fs[-1], end_time - time.monotonic()) + fs[-1].result(end_time - time.monotonic()) # buffer next task if ( From 2a2119ecd5ce096e7f03885708bd1ceea10f4b6b Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 23 Mar 2025 13:38:11 +0000 Subject: [PATCH 13/17] access awaited result via _result attribute --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 0fb02e3e14d37a..cfc003425efb99 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -632,7 +632,7 @@ def result_iterator(): fs.appendleft(executor.submit(fn, *args)) # yield the awaited result - yield fs.pop().result() + yield fs.pop()._result finally: for future in fs: future.cancel() From 3be695611ac8c0a853f15565bee53cad11569966 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 24 Mar 2025 13:23:50 +0000 Subject: [PATCH 14/17] break a reference cycle with `fs[-1]._exception` --- Lib/concurrent/futures/_base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index cfc003425efb99..90d4e8b8a69bc4 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -634,6 +634,9 @@ def result_iterator(): # yield the awaited result yield fs.pop()._result finally: + if fs: + # break a reference cycle with fs[-1]._exception + fs.pop().cancel() for future in fs: future.cancel() return result_iterator() From 0d70be9db5076f3f55a8f2503e42a7244944bf02 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 24 Mar 2025 14:11:49 +0000 Subject: [PATCH 15/17] break other potential reference cycles with fs, not only the one caused by fs[-1] --- Lib/concurrent/futures/_base.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 90d4e8b8a69bc4..a6fdc45144e71c 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -634,11 +634,10 @@ def result_iterator(): # yield the awaited result yield fs.pop()._result finally: - if fs: - # break a reference cycle with fs[-1]._exception + while fs: + # cancel pending futures, popping them to break potential + # reference cycles with future._exception.__traceback__ fs.pop().cancel() - for future in fs: - future.cancel() return result_iterator() def shutdown(self, wait=True, *, cancel_futures=False): From f509097366bcab683b45aa91fd2112ec15542ecb Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 25 Mar 2025 00:27:05 +0000 Subject: [PATCH 16/17] lighter ref cycle break --- Lib/concurrent/futures/_base.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index a6fdc45144e71c..ff4bc99442f2bd 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -633,11 +633,14 @@ def result_iterator(): # yield the awaited result yield fs.pop()._result + except: + # break the reference cycle with fs[-1]._exception's traceback + fs.pop().cancel() + raise finally: - while fs: - # cancel pending futures, popping them to break potential - # reference cycles with future._exception.__traceback__ - fs.pop().cancel() + for future in fs: + future.cancel() + return result_iterator() def shutdown(self, wait=True, *, cancel_futures=False): From d50dabd8ddf1d87d698053bd59e1f7da4b19a29c Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 25 Mar 2025 04:06:52 +0000 Subject: [PATCH 17/17] move the ref cycle break into the finally block --- Lib/concurrent/futures/_base.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index ff4bc99442f2bd..fd0f8331162f19 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -633,11 +633,12 @@ def result_iterator(): # yield the awaited result yield fs.pop()._result - except: - # break the reference cycle with fs[-1]._exception's traceback - fs.pop().cancel() - raise + finally: + # break the reference cycle with fs[-1]._exception's traceback + if fs: + fs.pop().cancel() + for future in fs: future.cancel()