-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
gh-131466: concurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next result
#131467
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
base: main
Are you sure you want to change the base?
gh-131466: concurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next result
#131467
Conversation
…llecting the next result
…Test.test_free_reference
concurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next resultconcurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next result
Lib/concurrent/futures/_base.py
Outdated
yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) | ||
|
||
# Yield the awaited result | ||
yield fs.pop().result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be discussed: this could be replaced by a lighter yield fs.pop()._result
because the prior call to _result_or_cancel
guarantees that at this point the result is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand that we could possibly exceed buffersize
while collecting the next result, is there a real-word use case where it would really cause an issue? the reason is that we access to fs[-1]
and then do fs.pop()
.
I see that have a del fut
in _result_or_cancel()
but can you confirm that it's sufficient to not hold any reference to the yet-to-be-popped future?
Asking Gregory as well since he's the mp expert c: |
@picnixz sorry I re-asked your review because you made me realize that we actually don't need
I'm digging deeper into #95169 's context to check if I miss any non-tested scenario, especially regarding this:
|
yes, that's what I wanted to ask, but I'm not an expert here so i'll let you investigate first c: |
Is this PR blocked by the other one or should I do something in particular? |
@ebonnal Sorry for the late reply. What about this simpler and IMHO cleaner way below? The second to last line may be a bit controversial (it changes the type of a variable), but I've used that list-pop trick in my diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd58..de34b86d1ee 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -625,21 +625,26 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
# before the first iterator value is required.
def result_iterator():
try:
+ result = None
# reverse to keep finishing order
fs.reverse()
while fs:
+ # Careful not to keep a reference to the popped future
+ if timeout is None:
+ result = _result_or_cancel(fs.pop())
+ else:
+ result = _result_or_cancel(fs.pop(), end_time - time.monotonic())
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
- # Careful not to keep a reference to the popped future
- if timeout is None:
- yield _result_or_cancel(fs.pop())
- else:
- yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
+ # Careful not to keep a reference to the result
+ result = [result]
+ yield result.pop()
finally:
+ del result
for future in fs:
future.cancel()
return result_iterator() |
Thank you for taking a look @dalcinl !
What exactly do you find unclean in my proposal and justifying a list creation for each yielded element? (fun trick though!) |
I guess it is just a matter of subjective taste, my patch looks slightly shorter, but I should say that the primary motivation was avoiding the use of the (conventionally) private I'm biased, as I maintain an custom implementation of this routine, and I prefer to avoid the use of private APIs and attributes. Standard library modules may not be bound to such constraints. Long story short, I believe both your proposal and mine are functionally equivalent, so FWIW, this PR has my +1. |
I thought that was acceptable because
Fair, actually I remember now that another alternative I had considered in the early days of this PR was:
Which is similar to your approach but reuses the same container (a
Thanks again for your review, I appreciate it, let's wait and gather more feedback 👀 ! |
I looks even better!! I'll borrow your approach for my own code. I you ever update this PR, please do not forget the |
Context recap:
If we have:
What happens when calling
next(results)
:arg
frominterable
and put a task forfn(arg)
in the buffer-> During step 2. there is
buffersize + 1
buffered tasks.This PR swaps steps 1. and 2. so that
buffersize
is never exceeded, even duringnext
.concurrent.futures.Executor.map
temporarily exceeds itsbuffersize
while collecting the next result #131466