Skip to content

gh-64192: Make imap()/imap_unordered() in multiprocessing.pool actually lazy #136871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

obaltian
Copy link

@obaltian obaltian commented Jul 20, 2025

Context recap (#64192)

Let's consider that we have an input iterable and N = len(iterable).

Current multiprocessing.Pool.imap and multiprocessing.Pool.imap_unordered are O(N) in space (unecessarily expensive on large iterables, completely impossible to use on infinite iterables):
The call results: Iterator = pool.imap(func, iterable) iterates over all the elements of the iterable, submitting N tasks to the pool (results are collected into a list of size N). Following calls to next(results) take the oldest result from the list (FIFO) (waiting for it if not available yet) and return it.

Proposal: add an optional buffersize param

With this proposal, the call results: Iterator = pool.imap(func, iterable, buffersize=b) will iterate only over the first b elements of iterable (acquiring buffersize semaphore while iterating), submitting b tasks to worker threads and then will return the results iterator.

Calls to next(results) will release buffersize semaphore (allowing task_handler thread to get the next input element from iterable to submit a new task to a worker thread) and then return the result.

buffersize semaphores from iterators not exhausted yet are also being released on pool termination to avoid deadlocks.

Benefits:

  • The space complexity becomes O(b)
  • When using a buffersize the client code takes back the control over the speed of iteration over the input iterable: after an initial spike of b calls to func to fill the buffer, the iteration over input iterable will follow the rate of the iteration over the results (controlled by the client), which is critical when func involves talking to services that you don't want to overload.

Feature history

buffersize support has been recently merged into concurrent.futures.Executor.map implementation (#125663) and many code/test/doc parts are based on ones from there to ensure consistency between modules.
I want to thank authors of that PR for the references.

Links:

@python-cla-bot
Copy link

python-cla-bot bot commented Jul 20, 2025

All commit authors signed the Contributor License Agreement.

CLA signed

@bedevere-app
Copy link

bedevere-app bot commented Jul 20, 2025

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@obaltian obaltian force-pushed the feature/add-buffersize-to-multiprocessing branch from 3cf40e3 to c1f8081 Compare July 20, 2025 13:09
@bedevere-app
Copy link

bedevere-app bot commented Jul 20, 2025

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@obaltian obaltian force-pushed the feature/add-buffersize-to-multiprocessing branch from c1f8081 to ec37be8 Compare July 20, 2025 13:16
@bedevere-app
Copy link

bedevere-app bot commented Jul 20, 2025

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@picnixz
Copy link
Member

picnixz commented Jul 20, 2025

Please open an issue first for this one (I don't know if it's something that is discussed in the sprint).

@picnixz
Copy link
Member

picnixz commented Jul 20, 2025

Previous discussion for executor: #74028

@obaltian
Copy link
Author

@picnixz thanks. The issue already exists, I will add more detailed description here soon, just wanted to run all tests for now.

@obaltian
Copy link
Author

@picnixz here is a related issue - #64192, I've left a summary at the end of that thread. Thank you.

@picnixz picnixz changed the title Make imap()/imap_unordered() in multiprocessing.Pool actually lazy gh-64192: Make imap()/imap_unordered() in multiprocessing.Pool actually lazy Jul 20, 2025
@picnixz
Copy link
Member

picnixz commented Jul 20, 2025

Thanks! I'm actually adding the GH issue inthe title so that we can backref it easily.

@picnixz
Copy link
Member

picnixz commented Jul 20, 2025

I'm adding a skipnews just for the CI otherwise you'll get a notification saying that the labels are incorrect.

@obaltian obaltian changed the title gh-64192: Make imap()/imap_unordered() in multiprocessing.Pool actually lazy gh-64192: Make imap()/imap_unordered() in multiprocessing.pool actually lazy Jul 20, 2025
@obaltian obaltian force-pushed the feature/add-buffersize-to-multiprocessing branch 5 times, most recently from fc37cb4 to 2e2435c Compare July 27, 2025 22:40
obaltian added 9 commits July 28, 2025 00:47
Using `threading.Semaphore` makes it easier to cap the number of
concurrently ran tasks. It also makes it possible to remove busy
wait in child thread by waiting for semaphore.

Also I've updated code to use the backpressure pattern - the new
tasks are scheduled as soon as the user consumes the old ones.
This new behavior allow smaller real concurrency number than
number of running processes. Previously, it was not allowed
since we implicitly incremented buffersize by `self._processes`.
@obaltian obaltian force-pushed the feature/add-buffersize-to-multiprocessing branch from 2e2435c to d0b584d Compare July 27, 2025 22:47
@obaltian obaltian force-pushed the feature/add-buffersize-to-multiprocessing branch from d0b584d to 94cc0b9 Compare July 27, 2025 23:00
the *iterables* pauses until a result is yielded from the buffer.
To fully utilize pool's capacity, set *buffersize* to the number of
processes in pool (to consume *iterable* as you go) or even higher
(to prefetch *buffersize - processes* arguments).
Copy link
Author

@obaltian obaltian Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was questioning myself whether we should also describe the difference in buffersize usefulness
between multiprocessing.Pool and multiprocessing.ThreadPool, I would be glad to hear an opinion on that – what do you think?

This feature is more useful with multiprocessing.ThreadPool class, where user can pass generator as iterable. multiprocessing.Pool with processes currently can't accept generators as they aren't picklable, so the user still needs to pass iterable as, for example, list, which is O(n). However, there is another huge benefit to using it – tasks will also be submitted lazily (while user iterates over results), and not-needed-yet results won't stack up in memory. So I think the feature is useful for any kind of pool and docs shouldn't suggest to use it specifically for threads.

@obaltian obaltian marked this pull request as ready for review July 28, 2025 00:30
@obaltian obaltian requested a review from gpshead as a code owner July 28, 2025 00:30
@obaltian
Copy link
Author

obaltian commented Jul 28, 2025

Hello @ambv - we've been talking about this PR during EuroPython sprints a week ago and I wanted to let you know that it's ready (as you said you would review it).

Hello @gpshead - I'm also glad to say it's ready, but we've already been discussing the idea with Łukasz so you can agree how do we proceed further.

Thanks in advance.

@gpshead gpshead added type-feature A feature request or enhancement and removed skip news labels Jul 28, 2025
@gpshead gpshead moved this to In Progress in Multiprocessing issues Jul 28, 2025
obaltian added 2 commits July 28, 2025 10:55
Previously it was a bit ambigious to say
"pass this to fully utilize pool's capacity" as the fastest
way would still be not passing `buffersize` at all.
Now this section clearly says "... when using this feature".
@@ -382,72 +389,99 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)

def _guarded_task_generation(self, result_job, func, iterable):
def _guarded_task_generation(self, result_job, func, iterable,
buffersize_sema=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just name it sema because it's not specific to buffersize (it can be used for anything else)

task_batches),
result._set_length
))
self._guarded_task_generation(result._job, mapstar, task_batches,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how many characters there are here, but could you prevent overflowing 80 chars? chop down calls if needed, or use hanging indents:

func_long(
    x, y, z, ...)

# or 

func_long(
    x, y, z, ...
)

Comment on lines +456 to +464
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0!r}".format(chunksize)
)
if buffersize is not None:
if not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize < 1:
raise ValueError("buffersize must be None or > 0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have separate validation methods for that as this is now duplicated? it can be a static method, namely self._check_chunksize(chunksize); self._check_buffersize(buffersize)

return (item for chunk in result for item in chunk)

def imap_unordered(self, func, iterable, chunksize=1):
def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it this method the same as _imap? Is there a reason why we duplicated them since only IMapIterator vs IMapUnorderedIterator is changed. I don't know if it's a good target for refactoring where we would pass the wrapper:

@staticmethod
def _imap(IteratorClass, pool, func, iterable, chunksize=1, buffersize=None):
    result = IteratorClass(pool, buffersize)
    ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are almost the same, indeed. Sure, I will make implementation shared, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't know if it's something that was thought to be duplicated. But it appears that it is. If @gpshead thinks it's better to duplicate the code for future improvements, then we can keep it as is (because if in the future only one of the two functions has issues or a different behavior, it's better to have two separate functions)

@@ -835,8 +876,7 @@ def _set(self, i, success_result):
#

class IMapIterator(object):

def __init__(self, pool):
def __init__(self, pool, buffersize):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep a blank line here as well as buffersize=None to prevent breaking consumers using this class.

@@ -0,0 +1,9 @@
Add the optional ``buffersize`` parameter to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a new feature, we'll also need a Doc/whatsnew/3.15.rst entry.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add it. Just thought it's being written by the maintainers.

(to consume *iterable* as you go), or even higher
(to prefetch the next ``N=buffersize-processes`` arguments).

.. versionadded:: 3.15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionadded:: 3.15
.. versionadded:: next


The same as :meth:`imap` except that the ordering of the results from the
returned iterator should be considered arbitrary. (Only when there is
only one worker process is the order guaranteed to be "correct".)

.. versionadded:: 3.15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionadded:: 3.15
.. versionadded:: next

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting review type-feature A feature request or enhancement
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

3 participants