Skip to content

gh-64192: Make imap()/imap_unordered() in multiprocessing.pool actually lazy #136871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d8e8a02
draft: impl lazy input consumption in mp.Pool.imap(_unordered)
obaltian Jul 20, 2025
f6f423c
Use semaphore to synchronize threads
obaltian Jul 20, 2025
937862d
Update buffersize behavior to match concurrent.futures.Executor behavior
obaltian Jul 21, 2025
b6f6caa
Release all `buffersize_lock` obj from the parent thread when terminate
obaltian Jul 21, 2025
3bafd5d
Add 2 basic `ThreadPool.imap()` tests w/ and w/o buffersize
obaltian Jul 21, 2025
e43232b
Fix accidental swap in imports
obaltian Jul 21, 2025
dd416e0
clear Pool._taskqueue_buffersize_semaphores safely
obaltian Jul 21, 2025
99f5a8c
Slightly optimize Pool._taskqueue_buffersize_semaphores terminate
obaltian Jul 21, 2025
2a53398
Rename `Pool.imap()` buffersize-related tests
obaltian Jul 21, 2025
f8878eb
Fix typo in `IMapIterator.__init__()`
obaltian Jul 22, 2025
2ca51e3
Add tests for buffersize combinations with other kwargs
obaltian Jul 22, 2025
bf27d5d
Remove if-branch in `_terminate_pool`
obaltian Jul 27, 2025
508c765
Add more edge-case tests for `imap` and `imap_unodered`
obaltian Jul 27, 2025
dff1167
Split inf iterable test for `imap` and `imap_unordered`
obaltian Jul 27, 2025
94cc0b9
Add doc for `buffersize` argument of `imap` and `imap_unordered`
obaltian Jul 27, 2025
816fb6c
add *versionadded* for `imap_unordered`
obaltian Jul 28, 2025
88cc10a
Remove ambiguity in `buffersize` description.
obaltian Jul 28, 2025
05e3b24
Set *versionadded* as next in docs
obaltian Jul 28, 2025
f1c25b3
Add whatsnew entry
obaltian Jul 28, 2025
199eb42
Fix aggreed comments on code formatting/minor refactoring
obaltian Jul 28, 2025
d1dee31
Remove `imap` and `imap_unordered` body code duplication
obaltian Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2448,7 +2448,7 @@ with the :class:`Pool` class.
Callbacks should complete immediately since otherwise the thread which
handles the results will get blocked.

.. method:: imap(func, iterable[, chunksize])
.. method:: imap(func, iterable[, chunksize[, buffersize]])

A lazier version of :meth:`.map`.

Expand All @@ -2462,12 +2462,27 @@ with the :class:`Pool` class.
``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the
result cannot be returned within *timeout* seconds.

.. method:: imap_unordered(func, iterable[, chunksize])
The *iterable* is collected immediately rather than lazily, unless a
*buffersize* is specified to limit the number of submitted tasks whose
results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.
To fully utilize pool's capacity when using this feature,
set *buffersize* at least to the number of processes in pool
(to consume *iterable* as you go), or even higher
(to prefetch the next ``N=buffersize-processes`` arguments).

.. versionadded:: next
Added the *buffersize* parameter.

.. method:: imap_unordered(func, iterable[, chunksize[, buffersize]])

The same as :meth:`imap` except that the ordering of the results from the
returned iterator should be considered arbitrary. (Only when there is
only one worker process is the order guaranteed to be "correct".)

.. versionadded:: next
Added the *buffersize* parameter.

.. method:: starmap(func, iterable[, chunksize])

Like :meth:`~multiprocessing.pool.Pool.map` except that the
Expand Down
16 changes: 16 additions & 0 deletions Doc/whatsnew/3.15.rst
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,22 @@
(Contributed by Bénédikt Tran in :gh:`135853`.)


multiprocessing
----

Check warning on line 265 in Doc/whatsnew/3.15.rst

View workflow job for this annotation

GitHub Actions / Docs / Docs

Title underline too short.

Check warning on line 265 in Doc/whatsnew/3.15.rst

View workflow job for this annotation

GitHub Actions / Docs / Docs

Title underline too short.

* Add the optional ``buffersize`` parameter to
:meth:`multiprocessing.pool.Pool.imap` and
:meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of
submitted tasks whose results have not yet been yielded. If the buffer is
full, iteration over the *iterables* pauses until a result is yielded from
the buffer. To fully utilize pool's capacity when using this feature, set
*buffersize* at least to the number of processes in pool (to consume
*iterable* as you go), or even higher (to prefetch the next
``N=buffersize-processes`` arguments).

(Contributed by Oleksandr Baltian in :gh:`136871`.)


os.path
-------

Expand Down
160 changes: 101 additions & 59 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#

import collections
import functools
import itertools
import os
import queue
Expand Down Expand Up @@ -190,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
# The _taskqueue_buffersize_semaphores exist to allow calling .release()
# on every active semaphore when the pool is terminating to let task_handler
# wake up to stop. It's a dict so that each iterator object can efficiently
# deregister its semaphore when iterator finishes.
self._taskqueue_buffersize_semaphores = {}
# The _change_notifier queue exist to wake up self._handle_workers()
# when the cache (self._cache) is empty or when there is a change in
# the _state variable of the thread that runs _handle_workers.
Expand Down Expand Up @@ -256,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
self._result_handler, self._cache,
self._taskqueue_buffersize_semaphores),
exitpriority=15
)
self._state = RUN
Expand Down Expand Up @@ -382,73 +389,42 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)

def _guarded_task_generation(self, result_job, func, iterable):
def _guarded_task_generation(self, result_job, func, iterable, sema=None):
'''Provides a generator of tasks for imap and imap_unordered with
appropriate handling for iterables which throw exceptions during
iteration.'''
try:
i = -1
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})

if sema is None:
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})

else:
enumerated_iter = iter(enumerate(iterable))
while True:
sema.acquire()
try:
i, x = next(enumerated_iter)
except StopIteration:
break
yield (result_job, i, func, (x,), {})

except Exception as e:
yield (result_job, i+1, _helper_reraises_exception, (e,), {})

def imap(self, func, iterable, chunksize=1):
def imap(self, func, iterable, chunksize=1, buffersize=None):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
self._check_running()
if chunksize == 1:
result = IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
result._set_length
))
return result
else:
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
return self._imap(IMapIterator, func, iterable, chunksize, buffersize)

def imap_unordered(self, func, iterable, chunksize=1):
def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it this method the same as _imap? Is there a reason why we duplicated them since only IMapIterator vs IMapUnorderedIterator is changed. I don't know if it's a good target for refactoring where we would pass the wrapper:

@staticmethod
def _imap(IteratorClass, pool, func, iterable, chunksize=1, buffersize=None):
    result = IteratorClass(pool, buffersize)
    ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are almost the same, indeed. Sure, I will make implementation shared, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't know if it's something that was thought to be duplicated. But it appears that it is. If @gpshead thinks it's better to duplicate the code for future improvements, then we can keep it as is (because if in the future only one of the two functions has issues or a different behavior, it's better to have two separate functions)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another argument to reuse code is the fact that _map_async is also shared here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the shared method in a separate commit, can revert it if it's unwanted.

'''
Like `imap()` method but ordering of results is arbitrary.
'''
self._check_running()
if chunksize == 1:
result = IMapUnorderedIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
result._set_length
))
return result
else:
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0!r}".format(chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
return self._imap(IMapUnorderedIterator, func, iterable, chunksize,
buffersize)

def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
Expand Down Expand Up @@ -497,6 +473,50 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
)
return result

def _imap(self, iterator_cls, func, iterable, chunksize=1,
buffersize=None):
self._check_running()
self._check_chunksize(chunksize)
self._check_buffersize(buffersize)

result = iterator_cls(self, buffersize)
if chunksize == 1:
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable,
result._buffersize_sema),
result._set_length,
)
)
return result
else:
task_batches = Pool._get_tasks(func, iterable, chunksize)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, mapstar,
task_batches,
result._buffersize_sema),
result._set_length,
)
)
return (item for chunk in result for item in chunk)

@staticmethod
def _check_chunksize(chunksize):
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(chunksize)
)

@staticmethod
def _check_buffersize(buffersize):
if buffersize is None:
return
if not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize < 1:
raise ValueError("buffersize must be None or > 0")

@staticmethod
def _wait_for_updates(sentinels, change_notifier, timeout=None):
wait(sentinels, timeout=timeout)
Expand Down Expand Up @@ -679,7 +699,8 @@ def _help_stuff_finish(inqueue, task_handler, size):

@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
worker_handler, task_handler, result_handler, cache):
worker_handler, task_handler, result_handler, cache,
taskqueue_buffersize_semaphores):
# this is guaranteed to only be called once
util.debug('finalizing pool')

Expand All @@ -690,6 +711,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
change_notifier.put(None)

task_handler._state = TERMINATE
# Release all semaphores to wake up task_handler to stop.
for job_id, buffersize_sema in tuple(
taskqueue_buffersize_semaphores.items()
):
buffersize_sema.release()
taskqueue_buffersize_semaphores.pop(job_id, None)

util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
Expand Down Expand Up @@ -836,7 +863,7 @@ def _set(self, i, success_result):

class IMapIterator(object):

def __init__(self, pool):
def __init__(self, pool, buffersize=None):
self._pool = pool
self._cond = threading.Condition(threading.Lock())
self._job = next(job_counter)
Expand All @@ -846,6 +873,13 @@ def __init__(self, pool):
self._length = None
self._unsorted = {}
self._cache[self._job] = self
if buffersize is None:
self._buffersize_sema = None
else:
self._buffersize_sema = threading.Semaphore(buffersize)
self._pool._taskqueue_buffersize_semaphores[self._job] = (
self._buffersize_sema
)

def __iter__(self):
return self
Expand All @@ -856,22 +890,30 @@ def next(self, timeout=None):
item = self._items.popleft()
except IndexError:
if self._index == self._length:
self._pool = None
raise StopIteration from None
self._stop_iterator()
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
self._pool = None
raise StopIteration from None
self._stop_iterator()
raise TimeoutError from None

if self._buffersize_sema is not None:
self._buffersize_sema.release()

success, value = item
if success:
return value
raise value

def _stop_iterator(self):
if self._pool is not None:
# `self._pool` could be set to `None` in previous `.next()` calls
self._pool._taskqueue_buffersize_semaphores.pop(self._job, None)
self._pool = None
raise StopIteration from None

__next__ = next # XXX

def _set(self, i, obj):
Expand Down
Loading
Loading