-
-
Notifications
You must be signed in to change notification settings - Fork 32.5k
gh-64192: Make imap()
/imap_unordered()
in multiprocessing.pool
actually lazy
#136871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
gh-64192: Make imap()
/imap_unordered()
in multiprocessing.pool
actually lazy
#136871
Conversation
Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
3cf40e3
to
c1f8081
Compare
Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
c1f8081
to
ec37be8
Compare
Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
Please open an issue first for this one (I don't know if it's something that is discussed in the sprint). |
Previous discussion for executor: #74028 |
@picnixz thanks. The issue already exists, I will add more detailed description here soon, just wanted to run all tests for now. |
imap()
/imap_unordered()
in multiprocessing.Pool
actually lazyimap()
/imap_unordered()
in multiprocessing.Pool
actually lazy
Thanks! I'm actually adding the GH issue inthe title so that we can backref it easily. |
I'm adding a skipnews just for the CI otherwise you'll get a notification saying that the labels are incorrect. |
imap()
/imap_unordered()
in multiprocessing.Pool
actually lazyimap()
/imap_unordered()
in multiprocessing.pool
actually lazy
fc37cb4
to
2e2435c
Compare
Using `threading.Semaphore` makes it easier to cap the number of concurrently ran tasks. It also makes it possible to remove busy wait in child thread by waiting for semaphore. Also I've updated code to use the backpressure pattern - the new tasks are scheduled as soon as the user consumes the old ones.
This new behavior allow smaller real concurrency number than number of running processes. Previously, it was not allowed since we implicitly incremented buffersize by `self._processes`.
These tests mostly come from a similar PR adding `buffersize` param to `concurrent.futures.Executor.map` - https://github.com/python/cpython/pull/125663/files
2e2435c
to
d0b584d
Compare
d0b584d
to
94cc0b9
Compare
the *iterables* pauses until a result is yielded from the buffer. | ||
To fully utilize pool's capacity, set *buffersize* to the number of | ||
processes in pool (to consume *iterable* as you go) or even higher | ||
(to prefetch *buffersize - processes* arguments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was questioning myself whether we should also describe the difference in buffersize
usefulness
between multiprocessing.Pool
and multiprocessing.ThreadPool
, I would be glad to hear an opinion on that – what do you think?
This feature is less useful for using with multiprocessing.ThreadPool
class, where user can pass generator as iterable
. multiprocessing.Pool
with processes currently can't accept generators as they aren't pickable, so the user still needs to pass iterable
as, for example, list, which is O(n). However, there is another huge benefit to using it – tasks will also be submitted lazily (while user iterates over results), and not-needed-yet results won't stack up in memory. So I think the feature is useful for any kind of pool and docs shouldn't suggest to use it specifically for threads.
Context recap (#74028)
Let's consider that we have an input
iterable
andN = len(iterable)
.Current
multiprocessing.Pool.imap
andmultiprocessing.Pool.imap_unordered
are O(N) in space (unecessarily expensive on large iterables, completely impossible to use on infinite iterables):The call
results: Iterator = pool.imap(func, iterable)
iterates over all the elements of the iterable, submitting N tasks to thepool
(results are collected into a list of size N). Following calls tonext(results)
take the oldest result from the list (FIFO) (waiting for it if not available yet) and return it.Proposal: add an optional buffersize param
With this proposal, the call
results: Iterator = pool.imap(func, iterable, buffersize=b)
will iterate only over the first b elements ofiterable
(acquiringbuffersize
semaphore while iterating), submitting b tasks to worker threads and then will return the results iterator.Calls to
next(results)
will releasebuffersize
semaphore (allowingtask_handler
thread to get the next input element fromiterable
to submit a new task to a worker thread) and then return the result.buffersize
semaphores from iterators not exhausted yet are also being released on pool termination to avoid deadlocks.Benefits:
buffersize
the client code takes back the control over the speed of iteration over the inputiterable
: after an initial spike of b calls tofunc
to fill the buffer, the iteration over inputiterable
will follow the rate of the iteration over theresults
(controlled by the client), which is critical whenfunc
involves talking to services that you don't want to overload.Feature history
buffersize
support has been recently merged intoconcurrent.futures.Executor.map
implementation (#125663) and many code/test/doc parts are based on ones from there to ensure consistency between modules.I want to thank to authors of that PR for the references.
Links: