From d8e8a021726d6f7e4b821231aad2523aa2345829 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 20 Jul 2025 08:19:01 +0200 Subject: [PATCH 01/21] draft: impl lazy input consumption in mp.Pool.imap(_unordered) --- Lib/multiprocessing/pool.py | 109 ++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 24 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..a0b50a53745368 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -390,21 +390,57 @@ def _guarded_task_generation(self, result_job, func, iterable): i = -1 for i, x in enumerate(iterable): yield (result_job, i, func, (x,), {}) + + except Exception as e: + yield (result_job, i+1, _helper_reraises_exception, (e,), {}) + + def _guarded_task_generation_lazy(self, result_job, func, iterable, + lazy_task_gen_helper): + '''Provides a generator of tasks for imap and imap_unordered with + appropriate handling for iterables which throw exceptions during + iteration.''' + if not lazy_task_gen_helper.feature_enabled: + yield from self._guarded_task_generation(result_job, func, iterable) + return + + try: + i = -1 + enumerated_iter = iter(enumerate(iterable)) + thread = threading.current_thread() + max_generated_tasks = self._processes + lazy_task_gen_helper.buffersize + + while thread._state == RUN: + with lazy_task_gen_helper.iterator_cond: + if lazy_task_gen_helper.not_finished_tasks >= max_generated_tasks: + continue # wait for some task to be (picked up and) finished + + try: + i, x = enumerated_iter.__next__() + except StopIteration: + break + + yield (result_job, i, func, (x,), {}) + lazy_task_gen_helper.tasks_generated += 1 + except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) - def imap(self, func, iterable, chunksize=1): + def imap(self, func, iterable, chunksize=1, buffersize=None): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' self._check_running() if chunksize == 1: - result = IMapIterator(self) + result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, func, iterable), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + func, + iterable, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return result else: if chunksize < 1: @@ -412,42 +448,50 @@ def imap(self, func, iterable, chunksize=1): "Chunksize must be 1+, not {0:n}".format( chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self) + result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, - mapstar, - task_batches), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + mapstar, + task_batches, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return (item for chunk in result for item in chunk) - def imap_unordered(self, func, iterable, chunksize=1): + def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): ''' Like `imap()` method but ordering of results is arbitrary. ''' self._check_running() if chunksize == 1: - result = IMapUnorderedIterator(self) + result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, func, iterable), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + func, + iterable, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return result else: if chunksize < 1: raise ValueError( "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self) + result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, - mapstar, - task_batches), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + mapstar, + task_batches, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return (item for chunk in result for item in chunk) def apply_async(self, func, args=(), kwds={}, callback=None, @@ -835,8 +879,7 @@ def _set(self, i, success_result): # class IMapIterator(object): - - def __init__(self, pool): + def __init__(self, pool, buffersize): self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) @@ -846,6 +889,7 @@ def __init__(self, pool): self._length = None self._unsorted = {} self._cache[self._job] = self + self._lazy_task_gen_helper = _LazyTaskGenHelper(buffersize, self._cond) def __iter__(self): return self @@ -866,6 +910,7 @@ def next(self, timeout=None): self._pool = None raise StopIteration from None raise TimeoutError from None + self._lazy_task_gen_helper.tasks_finished += 1 success, value = item if success: @@ -914,6 +959,22 @@ def _set(self, i, obj): del self._cache[self._job] self._pool = None +# +# Class to store stats for lazy task generation and share them +# between the main thread and `_guarded_task_generation()` thread. +# +class _LazyTaskGenHelper(object): + def __init__(self, buffersize, iterator_cond): + self.feature_enabled = buffersize is not None + self.buffersize = buffersize + self.tasks_generated = 0 + self.tasks_finished = 0 + self.iterator_cond = iterator_cond + + @property + def not_finished_tasks(self): + return self.tasks_generated - self.tasks_finished + # # # From f6f423cad66cec8901a51f47ea8570b01a525310 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 20 Jul 2025 14:41:16 +0200 Subject: [PATCH 02/21] Use semaphore to synchronize threads Using `threading.Semaphore` makes it easier to cap the number of concurrently ran tasks. It also makes it possible to remove busy wait in child thread by waiting for semaphore. Also I've updated code to use the backpressure pattern - the new tasks are scheduled as soon as the user consumes the old ones. --- Lib/multiprocessing/pool.py | 112 ++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 63 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index a0b50a53745368..abdd512980c849 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -14,6 +14,7 @@ # import collections +import functools import itertools import os import queue @@ -395,32 +396,20 @@ def _guarded_task_generation(self, result_job, func, iterable): yield (result_job, i+1, _helper_reraises_exception, (e,), {}) def _guarded_task_generation_lazy(self, result_job, func, iterable, - lazy_task_gen_helper): - '''Provides a generator of tasks for imap and imap_unordered with + backpressure_sema): + """Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during - iteration.''' - if not lazy_task_gen_helper.feature_enabled: - yield from self._guarded_task_generation(result_job, func, iterable) - return - + iteration.""" try: i = -1 enumerated_iter = iter(enumerate(iterable)) - thread = threading.current_thread() - max_generated_tasks = self._processes + lazy_task_gen_helper.buffersize - - while thread._state == RUN: - with lazy_task_gen_helper.iterator_cond: - if lazy_task_gen_helper.not_finished_tasks >= max_generated_tasks: - continue # wait for some task to be (picked up and) finished - + while True: + backpressure_sema.acquire() try: - i, x = enumerated_iter.__next__() + i, x = next(enumerated_iter) except StopIteration: break - yield (result_job, i, func, (x,), {}) - lazy_task_gen_helper.tasks_generated += 1 except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) @@ -430,31 +419,32 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' self._check_running() + if chunksize < 1: + raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize)) + + result = IMapIterator(self, buffersize) + + if result._backpressure_sema is None: + task_generation = self._guarded_task_generation + else: + task_generation = functools.partial( + self._guarded_task_generation_lazy, + backpressure_sema=result._backpressure_sema, + ) + if chunksize == 1: - result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - func, - iterable, - result._lazy_task_gen_helper), + task_generation(result._job, func, iterable), result._set_length, ) ) return result else: - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0:n}".format( - chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - mapstar, - task_batches, - result._lazy_task_gen_helper), + task_generation(result._job, mapstar, task_batches), result._set_length, ) ) @@ -465,30 +455,34 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): Like `imap()` method but ordering of results is arbitrary. ''' self._check_running() + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0!r}".format(chunksize) + ) + + result = IMapUnorderedIterator(self, buffersize) + + if result._backpressure_sema is None: + task_generation = self._guarded_task_generation + else: + task_generation = functools.partial( + self._guarded_task_generation_lazy, + backpressure_sema=result._backpressure_sema, + ) + if chunksize == 1: - result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - func, - iterable, - result._lazy_task_gen_helper), + task_generation(result._job, func, iterable), result._set_length, ) ) return result else: - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - mapstar, - task_batches, - result._lazy_task_gen_helper), + task_generation(result._job, mapstar, task_batches), result._set_length, ) ) @@ -889,7 +883,13 @@ def __init__(self, pool, buffersize): self._length = None self._unsorted = {} self._cache[self._job] = self - self._lazy_task_gen_helper = _LazyTaskGenHelper(buffersize, self._cond) + + if buffersize is None: + self._backpressure_sema = None + else: + self._backpressure_sema = threading.Semaphore( + value=self._pool._processes + buffersize + ) def __iter__(self): return self @@ -910,7 +910,9 @@ def next(self, timeout=None): self._pool = None raise StopIteration from None raise TimeoutError from None - self._lazy_task_gen_helper.tasks_finished += 1 + + if self._backpressure_sema: + self._backpressure_sema.release() success, value = item if success: @@ -959,22 +961,6 @@ def _set(self, i, obj): del self._cache[self._job] self._pool = None -# -# Class to store stats for lazy task generation and share them -# between the main thread and `_guarded_task_generation()` thread. -# -class _LazyTaskGenHelper(object): - def __init__(self, buffersize, iterator_cond): - self.feature_enabled = buffersize is not None - self.buffersize = buffersize - self.tasks_generated = 0 - self.tasks_finished = 0 - self.iterator_cond = iterator_cond - - @property - def not_finished_tasks(self): - return self.tasks_generated - self.tasks_finished - # # # From 937862dc236ae34fe230441e43cdc5618da42deb Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 21 Jul 2025 23:39:42 +0200 Subject: [PATCH 03/21] Update buffersize behavior to match concurrent.futures.Executor behavior This new behavior allow smaller real concurrency number than number of running processes. Previously, it was not allowed since we implicitly incremented buffersize by `self._processes`. --- Lib/multiprocessing/pool.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index abdd512980c849..9aaaaf55dab594 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -27,7 +27,7 @@ # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util -from . import get_context, TimeoutError +from . import TimeoutError, get_context from .connection import wait # @@ -421,6 +421,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): self._check_running() if chunksize < 1: raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize)) + if buffersize is not None: + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") result = IMapIterator(self, buffersize) @@ -459,6 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): raise ValueError( "Chunksize must be 1+, not {0!r}".format(chunksize) ) + if buffersize is not None: + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") result = IMapUnorderedIterator(self, buffersize) @@ -887,9 +897,7 @@ def __init__(self, pool, buffersize): if buffersize is None: self._backpressure_sema = None else: - self._backpressure_sema = threading.Semaphore( - value=self._pool._processes + buffersize - ) + self._backpressure_sema = threading.Semaphore(buffersize) def __iter__(self): return self @@ -911,7 +919,7 @@ def next(self, timeout=None): raise StopIteration from None raise TimeoutError from None - if self._backpressure_sema: + if self._backpressure_sema is not None: self._backpressure_sema.release() success, value = item From b6f6caa8685ff09d9ff629c4f469c701a2aa1698 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:10:05 +0200 Subject: [PATCH 04/21] Release all `buffersize_lock` obj from the parent thread when terminate --- Lib/multiprocessing/pool.py | 104 ++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 9aaaaf55dab594..b8caac82e00b0c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -191,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.SimpleQueue() + # The _taskqueue_buffersize_semaphores exist to allow calling .release() + # on every active semaphore when the pool is terminating to let task_handler + # wake up to stop. It's a dict so that each iterator object can efficiently + # deregister its semaphore when iterator finishes. + self._taskqueue_buffersize_semaphores = {} # The _change_notifier queue exist to wake up self._handle_workers() # when the cache (self._cache) is empty or when there is a change in # the _state variable of the thread that runs _handle_workers. @@ -257,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(), self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._change_notifier, self._worker_handler, self._task_handler, - self._result_handler, self._cache), + self._result_handler, self._cache, + self._taskqueue_buffersize_semaphores), exitpriority=15 ) self._state = RUN @@ -383,33 +389,27 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None, return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) - def _guarded_task_generation(self, result_job, func, iterable): + def _guarded_task_generation(self, result_job, func, iterable, + buffersize_sema=None): '''Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.''' try: i = -1 - for i, x in enumerate(iterable): - yield (result_job, i, func, (x,), {}) - except Exception as e: - yield (result_job, i+1, _helper_reraises_exception, (e,), {}) + if buffersize_sema is None: + for i, x in enumerate(iterable): + yield (result_job, i, func, (x,), {}) - def _guarded_task_generation_lazy(self, result_job, func, iterable, - backpressure_sema): - """Provides a generator of tasks for imap and imap_unordered with - appropriate handling for iterables which throw exceptions during - iteration.""" - try: - i = -1 - enumerated_iter = iter(enumerate(iterable)) - while True: - backpressure_sema.acquire() - try: - i, x = next(enumerated_iter) - except StopIteration: - break - yield (result_job, i, func, (x,), {}) + else: + enumerated_iter = iter(enumerate(iterable)) + while True: + buffersize_sema.acquire() + try: + i, x = next(enumerated_iter) + except StopIteration: + break + yield (result_job, i, func, (x,), {}) except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) @@ -428,19 +428,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): raise ValueError("buffersize must be None or > 0") result = IMapIterator(self, buffersize) - - if result._backpressure_sema is None: - task_generation = self._guarded_task_generation - else: - task_generation = functools.partial( - self._guarded_task_generation_lazy, - backpressure_sema=result._backpressure_sema, - ) - if chunksize == 1: self._taskqueue.put( ( - task_generation(result._job, func, iterable), + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), result._set_length, ) ) @@ -449,7 +441,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - task_generation(result._job, mapstar, task_batches), + self._guarded_task_generation(result._job, mapstar, task_batches, + result._buffersize_sema), result._set_length, ) ) @@ -471,19 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): raise ValueError("buffersize must be None or > 0") result = IMapUnorderedIterator(self, buffersize) - - if result._backpressure_sema is None: - task_generation = self._guarded_task_generation - else: - task_generation = functools.partial( - self._guarded_task_generation_lazy, - backpressure_sema=result._backpressure_sema, - ) - if chunksize == 1: self._taskqueue.put( ( - task_generation(result._job, func, iterable), + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), result._set_length, ) ) @@ -492,7 +477,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - task_generation(result._job, mapstar, task_batches), + self._guarded_task_generation(result._job, mapstar, task_batches, + result._buffersize_sema), result._set_length, ) ) @@ -727,7 +713,8 @@ def _help_stuff_finish(inqueue, task_handler, size): @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, - worker_handler, task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache, + taskqueue_buffersize_semaphores): # this is guaranteed to only be called once util.debug('finalizing pool') @@ -738,6 +725,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, change_notifier.put(None) task_handler._state = TERMINATE + # Release all semaphores to wake up task_handler to stop. + for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()): + taskqueue_buffersize_semaphores.pop(job_id) + sema.release() util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) @@ -893,11 +884,13 @@ def __init__(self, pool, buffersize): self._length = None self._unsorted = {} self._cache[self._job] = self - if buffersize is None: - self._backpressure_sema = None + self._buffersize_sema = None else: - self._backpressure_sema = threading.Semaphore(buffersize) + self._buffersize_sema = threading.Semaphore(buffersize) + self._pool._taskqueue_buffersize_semaphores[self] = ( + self._buffersize_sema + ) def __iter__(self): return self @@ -908,25 +901,30 @@ def next(self, timeout=None): item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None - raise StopIteration from None + self._stop_iterator() self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None - raise StopIteration from None + self._stop_iterator() raise TimeoutError from None - if self._backpressure_sema is not None: - self._backpressure_sema.release() + if self._buffersize_sema is not None: + self._buffersize_sema.release() success, value = item if success: return value raise value + def _stop_iterator(self): + if self._pool is not None: + # could be deleted in previous `.next()` calls + self._pool._taskqueue_buffersize_semaphores.pop(self._job) + self._pool = None + raise StopIteration from None + __next__ = next # XXX def _set(self, i, obj): From 3bafd5d2a6a28ea8cdb459efa77a13e35dcb6517 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:34:03 +0200 Subject: [PATCH 05/21] Add 2 basic `ThreadPool.imap()` tests w/ and w/o buffersize --- Lib/test/_test_multiprocessing.py | 58 +++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index a1259ff1d63d18..414c90725c4391 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2929,6 +2929,64 @@ def test_imap(self): self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__) + def test_imap_inf_iterable_with_slow_task(self): + if self.TYPE in ("processes", "manager"): + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + processes = 4 + p = self.Pool(processes) + + tasks_started_later = 2 + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in range(1, processes + tasks_started_later + 1): + last_produced_task_arg.value = arg + yield arg + + it = p.imap(functools.partial(sqr, wait=0.2), produce_args()) + + next(it) + time.sleep(0.2) + # `iterable` should've been advanced only up by `processes` times, + # but in fact advances further (by `>=processes+1`). + # In this case, it advances to the maximum value. + self.assertGreater(last_produced_task_arg.value, processes + 1) + + p.terminate() + p.join() + + def test_imap_inf_iterable_with_slow_task_and_buffersize(self): + if self.TYPE in ("processes", "manager"): + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + processes = 4 + p = self.Pool(processes) + + tasks_started_later = 2 + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in range(1, processes + tasks_started_later + 1): + last_produced_task_arg.value = arg + yield arg + + it = p.imap( + functools.partial(sqr, wait=0.2), + produce_args(), + buffersize=processes, + ) + + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 1) + + p.terminate() + p.join() + def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) From e43232b127d8b73dceab17bf64309c92153d772e Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:37:56 +0200 Subject: [PATCH 06/21] Fix accidental swap in imports --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b8caac82e00b0c..836846d00c3ebc 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -27,7 +27,7 @@ # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util -from . import TimeoutError, get_context +from . import get_context, TimeoutError from .connection import wait # From dd416e0edbeaab43da3b58b2fa16c6bbeb202103 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:41:48 +0200 Subject: [PATCH 07/21] clear Pool._taskqueue_buffersize_semaphores safely --- Lib/multiprocessing/pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 836846d00c3ebc..79148f835b4b2c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -727,7 +727,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, task_handler._state = TERMINATE # Release all semaphores to wake up task_handler to stop. for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()): - taskqueue_buffersize_semaphores.pop(job_id) + taskqueue_buffersize_semaphores.pop(job_id, None) sema.release() util.debug('helping task handler/workers to finish') @@ -920,8 +920,8 @@ def next(self, timeout=None): def _stop_iterator(self): if self._pool is not None: - # could be deleted in previous `.next()` calls - self._pool._taskqueue_buffersize_semaphores.pop(self._job) + # `self._pool` could be set to `None` in previous `.next()` calls + self._pool._taskqueue_buffersize_semaphores.pop(self._job, None) self._pool = None raise StopIteration from None From 99f5a8c74c899c1f4a859598966780c9875d246a Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:51:23 +0200 Subject: [PATCH 08/21] Slightly optimize Pool._taskqueue_buffersize_semaphores terminate --- Lib/multiprocessing/pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 79148f835b4b2c..36951cfae9ed31 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -726,9 +726,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, task_handler._state = TERMINATE # Release all semaphores to wake up task_handler to stop. - for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()): - taskqueue_buffersize_semaphores.pop(job_id, None) - sema.release() + for job_id in tuple(taskqueue_buffersize_semaphores.keys()): + sema = taskqueue_buffersize_semaphores.pop(job_id, None) + if sema is not None: + sema.release() util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) From 2a53398a47f2659bad653dd1eb5a899ba525864d Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:55:01 +0200 Subject: [PATCH 09/21] Rename `Pool.imap()` buffersize-related tests --- Lib/test/_test_multiprocessing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 414c90725c4391..8eebe100fc3adf 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2929,7 +2929,7 @@ def test_imap(self): self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__) - def test_imap_inf_iterable_with_slow_task(self): + def test_imap_fast_iterable_with_slow_task(self): if self.TYPE in ("processes", "manager"): self.skipTest("test not appropriate for {}".format(self.TYPE)) @@ -2956,7 +2956,7 @@ def produce_args(): p.terminate() p.join() - def test_imap_inf_iterable_with_slow_task_and_buffersize(self): + def test_imap_fast_iterable_with_slow_task_and_buffersize(self): if self.TYPE in ("processes", "manager"): self.skipTest("test not appropriate for {}".format(self.TYPE)) From f8878ebc0d0f40b8bb23a291aaca9dd630ec47c6 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 08:05:41 +0200 Subject: [PATCH 10/21] Fix typo in `IMapIterator.__init__()` --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 36951cfae9ed31..caaf9497b1f169 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -889,7 +889,7 @@ def __init__(self, pool, buffersize): self._buffersize_sema = None else: self._buffersize_sema = threading.Semaphore(buffersize) - self._pool._taskqueue_buffersize_semaphores[self] = ( + self._pool._taskqueue_buffersize_semaphores[self._job] = ( self._buffersize_sema ) From 2ca51e359cd0dacbc68ab043504101290b11376b Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 09:06:09 +0200 Subject: [PATCH 11/21] Add tests for buffersize combinations with other kwargs --- Lib/test/_test_multiprocessing.py | 79 ++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 17 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 8eebe100fc3adf..0c67f625643b1b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2916,21 +2916,45 @@ def test_async_timeout(self): p.join() def test_imap(self): - it = self.pool.imap(sqr, list(range(10))) - self.assertEqual(list(it), list(map(sqr, list(range(10))))) - - it = self.pool.imap(sqr, list(range(10))) - for i in range(10): - self.assertEqual(next(it), i*i) - self.assertRaises(StopIteration, it.__next__) + optimal_buffersize = 4 # `self.pool` size + buffersize_variants = [ + {"buffersize": None}, + {"buffersize": 1}, + {"buffersize": optimal_buffersize}, + {"buffersize": optimal_buffersize * 2}, + ] - it = self.pool.imap(sqr, list(range(1000)), chunksize=100) - for i in range(1000): - self.assertEqual(next(it), i*i) - self.assertRaises(StopIteration, it.__next__) + for kwargs in ({}, *buffersize_variants): + with self.subTest(**kwargs): + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + self.assertEqual(list(it), list(map(sqr, list(range(10))))) + + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + for i in range(10): + self.assertEqual(next(it), i * i) + self.assertRaises(StopIteration, it.__next__) + + for kwargs in ( + {"chunksize": 100}, + {"chunksize": 100, "buffersize": optimal_buffersize}, + ): + with self.subTest(**kwargs): + iterable = range(1000) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + for i in range(1000): + self.assertEqual(next(it), i * i) + self.assertRaises(StopIteration, it.__next__) def test_imap_fast_iterable_with_slow_task(self): - if self.TYPE in ("processes", "manager"): + if self.TYPE != "threads": self.skipTest("test not appropriate for {}".format(self.TYPE)) processes = 4 @@ -2957,7 +2981,7 @@ def produce_args(): p.join() def test_imap_fast_iterable_with_slow_task_and_buffersize(self): - if self.TYPE in ("processes", "manager"): + if self.TYPE != "threads": self.skipTest("test not appropriate for {}".format(self.TYPE)) processes = 4 @@ -3014,11 +3038,32 @@ def test_imap_handle_iterable_exception(self): self.assertRaises(SayWhenError, it.__next__) def test_imap_unordered(self): - it = self.pool.imap_unordered(sqr, list(range(10))) - self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) + optimal_buffersize = 4 # `self.pool` size + buffersize_variants = [ + {"buffersize": None}, + {"buffersize": 1}, + {"buffersize": optimal_buffersize}, + {"buffersize": optimal_buffersize * 2}, + ] - it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) - self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) + for kwargs in ({}, *buffersize_variants): + with self.subTest(**kwargs): + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap_unordered(sqr, iterable, **kwargs) + self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) + + for kwargs in ( + {"chunksize": 100}, + {"chunksize": 100, "buffersize": optimal_buffersize}, + ): + with self.subTest(**kwargs): + iterable = range(1000) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap_unordered(sqr, iterable, **kwargs) + self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) def test_imap_unordered_handle_iterable_exception(self): if self.TYPE == 'manager': From bf27d5d8a6d0a5d300c618aef53fd4cb7744994a Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 27 Jul 2025 18:04:15 +0200 Subject: [PATCH 12/21] Remove if-branch in `_terminate_pool` --- Lib/multiprocessing/pool.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index caaf9497b1f169..e7f40207156d5e 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -726,10 +726,11 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, task_handler._state = TERMINATE # Release all semaphores to wake up task_handler to stop. - for job_id in tuple(taskqueue_buffersize_semaphores.keys()): - sema = taskqueue_buffersize_semaphores.pop(job_id, None) - if sema is not None: - sema.release() + for job_id, buffersize_sema in tuple( + taskqueue_buffersize_semaphores.items() + ): + buffersize_sema.release() + taskqueue_buffersize_semaphores.pop(job_id, None) util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) From 508c76551d62cbcd8ad587da85c01aebb142754d Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 27 Jul 2025 18:11:03 +0200 Subject: [PATCH 13/21] Add more edge-case tests for `imap` and `imap_unodered` These tests mostly come from a similar PR adding `buffersize` param to `concurrent.futures.Executor.map` - https://github.com/python/cpython/pull/125663/files --- Lib/test/_test_multiprocessing.py | 180 ++++++++++++++++++++---------- 1 file changed, 122 insertions(+), 58 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 0c67f625643b1b..17531a00479164 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2953,64 +2953,6 @@ def test_imap(self): self.assertEqual(next(it), i * i) self.assertRaises(StopIteration, it.__next__) - def test_imap_fast_iterable_with_slow_task(self): - if self.TYPE != "threads": - self.skipTest("test not appropriate for {}".format(self.TYPE)) - - processes = 4 - p = self.Pool(processes) - - tasks_started_later = 2 - last_produced_task_arg = Value("i") - - def produce_args(): - for arg in range(1, processes + tasks_started_later + 1): - last_produced_task_arg.value = arg - yield arg - - it = p.imap(functools.partial(sqr, wait=0.2), produce_args()) - - next(it) - time.sleep(0.2) - # `iterable` should've been advanced only up by `processes` times, - # but in fact advances further (by `>=processes+1`). - # In this case, it advances to the maximum value. - self.assertGreater(last_produced_task_arg.value, processes + 1) - - p.terminate() - p.join() - - def test_imap_fast_iterable_with_slow_task_and_buffersize(self): - if self.TYPE != "threads": - self.skipTest("test not appropriate for {}".format(self.TYPE)) - - processes = 4 - p = self.Pool(processes) - - tasks_started_later = 2 - last_produced_task_arg = Value("i") - - def produce_args(): - for arg in range(1, processes + tasks_started_later + 1): - last_produced_task_arg.value = arg - yield arg - - it = p.imap( - functools.partial(sqr, wait=0.2), - produce_args(), - buffersize=processes, - ) - - time.sleep(0.2) - self.assertEqual(last_produced_task_arg.value, processes) - - next(it) - time.sleep(0.2) - self.assertEqual(last_produced_task_arg.value, processes + 1) - - p.terminate() - p.join() - def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) @@ -3101,6 +3043,128 @@ def test_imap_unordered_handle_iterable_exception(self): self.assertIn(value, expected_values) expected_values.remove(value) + def test_imap_and_imap_unordered_buffersize_type_validation(self): + for method_name in ("imap", "imap_unordered"): + for buffersize in ("foo", 2.0): + with ( + self.subTest(method=method_name, buffersize=buffersize), + self.assertRaisesRegex( + TypeError, "buffersize must be an integer or None" + ), + ): + method = getattr(self.pool, method_name) + method(str, range(4), buffersize=buffersize) + + def test_imap_and_imap_unordered_buffersize_value_validation(self): + for method_name in ("imap", "imap_unordered"): + for buffersize in (0, -1): + with ( + self.subTest(method=method_name, buffersize=buffersize), + self.assertRaisesRegex( + ValueError, "buffersize must be None or > 0" + ), + ): + method = getattr(self.pool, method_name) + method(str, range(4), buffersize=buffersize) + + def test_imap_and_imap_unordered_when_buffer_is_full(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + processes = 4 + p = self.Pool(processes) + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in itertools.count(1): + last_produced_task_arg.value = arg + yield arg + + method = getattr(p, method_name) + it = method(functools.partial(sqr, wait=0.2), produce_args()) + + time.sleep(0.2) + # `iterable` could've been advanced only `processes` times, + # but in fact it advances further (`> processes`) because of + # not waiting for workers or user code to catch up. + self.assertGreater(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertGreater(last_produced_task_arg.value, processes + 1) + + next(it) + time.sleep(0.2) + self.assertGreater(last_produced_task_arg.value, processes + 2) + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_when_buffer_is_full(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + processes = 4 + p = self.Pool(processes) + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in itertools.count(1): + last_produced_task_arg.value = arg + yield arg + + method = getattr(p, method_name) + it = method( + functools.partial(sqr, wait=0.2), + produce_args(), + buffersize=processes, + ) + + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 1) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 2) + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_on_infinite_iterable(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + p = self.Pool(4) + method = getattr(p, method_name) + + res = method(str, itertools.count(), buffersize=2) + + self.assertEqual(next(res, None), "0") + self.assertEqual(next(res, None), "1") + self.assertEqual(next(res, None), "2") + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + method = getattr(self.pool, method_name) + + res = method(str, [], buffersize=2) + + self.assertIsNone(next(res, None)) + def test_make_pool(self): expected_error = (RemoteError if self.TYPE == 'manager' else ValueError) From dff116774308cdbddca8a4375a61ec6dac3c8635 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 27 Jul 2025 23:31:10 +0200 Subject: [PATCH 14/21] Split inf iterable test for `imap` and `imap_unordered` --- Lib/test/_test_multiprocessing.py | 43 +++++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 17531a00479164..d85582fb528c4b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3138,32 +3138,41 @@ def produce_args(): p.terminate() p.join() - def test_imap_and_imap_unordered_buffersize_on_infinite_iterable(self): + def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + method = getattr(self.pool, method_name) + + res = method(str, [], buffersize=2) + + self.assertIsNone(next(res, None)) + + def test_imap_buffersize_on_infinite_iterable(self): if self.TYPE != "threads": self.skipTest("test not appropriate for {}".format(self.TYPE)) - for method_name in ("imap", "imap_unordered"): - with self.subTest(method=method_name): - p = self.Pool(4) - method = getattr(p, method_name) + p = self.Pool(4) + res = p.imap(str, itertools.count(), buffersize=2) - res = method(str, itertools.count(), buffersize=2) + self.assertEqual(next(res, None), "0") + self.assertEqual(next(res, None), "1") + self.assertEqual(next(res, None), "2") - self.assertEqual(next(res, None), "0") - self.assertEqual(next(res, None), "1") - self.assertEqual(next(res, None), "2") + p.terminate() + p.join() - p.terminate() - p.join() + def test_imap_unordered_buffersize_on_infinite_iterable(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) - def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): - for method_name in ("imap", "imap_unordered"): - with self.subTest(method=method_name): - method = getattr(self.pool, method_name) + p = self.Pool(4) + res = p.imap(str, itertools.count(), buffersize=2) + first_three_results = sorted(next(res, None) for _ in range(3)) - res = method(str, [], buffersize=2) + self.assertEqual(first_three_results, ["0", "1", "2"]) - self.assertIsNone(next(res, None)) + p.terminate() + p.join() def test_make_pool(self): expected_error = (RemoteError if self.TYPE == 'manager' From 94cc0b9bf0271ba31e90bd933b8c43abe5174223 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 00:19:46 +0200 Subject: [PATCH 15/21] Add doc for `buffersize` argument of `imap` and `imap_unordered` --- Doc/library/multiprocessing.rst | 15 +++++++++++++-- .../2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst | 8 ++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index d18ada3511d891..35c8c25d51fe4a 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2448,7 +2448,7 @@ with the :class:`Pool` class. Callbacks should complete immediately since otherwise the thread which handles the results will get blocked. - .. method:: imap(func, iterable[, chunksize]) + .. method:: imap(func, iterable[, chunksize[, buffersize]]) A lazier version of :meth:`.map`. @@ -2462,7 +2462,18 @@ with the :class:`Pool` class. ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the result cannot be returned within *timeout* seconds. - .. method:: imap_unordered(func, iterable[, chunksize]) + The *iterable* is collected immediately rather than lazily, unless a + *buffersize* is specified to limit the number of submitted tasks whose + results have not yet been yielded. If the buffer is full, iteration over + the *iterables* pauses until a result is yielded from the buffer. + To fully utilize pool's capacity, set *buffersize* to the number of + processes in pool (to consume *iterable* as you go) or even higher + (to prefetch *buffersize - processes* arguments). + + .. versionadded:: 3.15 + Added the *buffersize* parameter. + + .. method:: imap_unordered(func, iterable[, chunksize[, buffersize]]) The same as :meth:`imap` except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is diff --git a/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst b/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst new file mode 100644 index 00000000000000..f9716bd0326e17 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst @@ -0,0 +1,8 @@ +Add the optional ``buffersize`` parameter to +:meth:`multiprocessing.pool.Pool.imap` and +:meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of +submitted tasks whose results have not yet been yielded. If the buffer is +full, iteration over the *iterables* pauses until a result is yielded from +the buffer. To fully utilize pool's capacity, set *buffersize* to the number +of processes in pool (to consume *iterable* as you go) or even higher (to +prefetch *buffersize - processes* arguments). From 816fb6cd6daf19622a494f2892823567a307a7c0 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 10:55:08 +0200 Subject: [PATCH 16/21] add *versionadded* for `imap_unordered` --- Doc/library/multiprocessing.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 35c8c25d51fe4a..288f9606bf1319 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2479,6 +2479,9 @@ with the :class:`Pool` class. returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be "correct".) + .. versionadded:: 3.15 + Added the *buffersize* parameter. + .. method:: starmap(func, iterable[, chunksize]) Like :meth:`~multiprocessing.pool.Pool.map` except that the From 88cc10a60f61e803dc071fa0e1df23434eb49d59 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 11:01:55 +0200 Subject: [PATCH 17/21] Remove ambiguity in `buffersize` description. Previously it was a bit ambigious to say "pass this to fully utilize pool's capacity" as the fastest way would still be not passing `buffersize` at all. Now this section clearly says "... when using this feature". --- Doc/library/multiprocessing.rst | 7 ++++--- ...5.rst => 2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst} | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) rename Misc/NEWS.d/next/Library/{2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst => 2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst} (56%) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 288f9606bf1319..def4aafd110c62 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2466,9 +2466,10 @@ with the :class:`Pool` class. *buffersize* is specified to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the *iterables* pauses until a result is yielded from the buffer. - To fully utilize pool's capacity, set *buffersize* to the number of - processes in pool (to consume *iterable* as you go) or even higher - (to prefetch *buffersize - processes* arguments). + To fully utilize pool's capacity when using this feature, + set *buffersize* at least to the number of processes in pool + (to consume *iterable* as you go), or even higher + (to prefetch the next ``N=buffersize-processes`` arguments). .. versionadded:: 3.15 Added the *buffersize* parameter. diff --git a/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst b/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst similarity index 56% rename from Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst rename to Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst index f9716bd0326e17..ca40bc111176e0 100644 --- a/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst +++ b/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst @@ -3,6 +3,7 @@ Add the optional ``buffersize`` parameter to :meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the *iterables* pauses until a result is yielded from -the buffer. To fully utilize pool's capacity, set *buffersize* to the number -of processes in pool (to consume *iterable* as you go) or even higher (to -prefetch *buffersize - processes* arguments). +the buffer. To fully utilize pool's capacity when using this feature, set +*buffersize* at least to the number of processes in pool (to consume +*iterable* as you go), or even higher (to prefetch the next +``N=buffersize-processes`` arguments). From 05e3b241085d8a6e896e0a694173627c60c13699 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 23:53:29 +0200 Subject: [PATCH 18/21] Set *versionadded* as next in docs --- Doc/library/multiprocessing.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index def4aafd110c62..0ce5800806a166 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2471,7 +2471,7 @@ with the :class:`Pool` class. (to consume *iterable* as you go), or even higher (to prefetch the next ``N=buffersize-processes`` arguments). - .. versionadded:: 3.15 + .. versionadded:: next Added the *buffersize* parameter. .. method:: imap_unordered(func, iterable[, chunksize[, buffersize]]) @@ -2480,7 +2480,7 @@ with the :class:`Pool` class. returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be "correct".) - .. versionadded:: 3.15 + .. versionadded:: next Added the *buffersize* parameter. .. method:: starmap(func, iterable[, chunksize]) From 503982fcc3879be9512a1bd4f9f9d9d65422e840 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 29 Jul 2025 00:01:18 +0200 Subject: [PATCH 19/21] Add whatsnew entry --- Doc/whatsnew/3.15.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Doc/whatsnew/3.15.rst b/Doc/whatsnew/3.15.rst index 1128da875a8847..381243960e724f 100644 --- a/Doc/whatsnew/3.15.rst +++ b/Doc/whatsnew/3.15.rst @@ -261,6 +261,22 @@ math (Contributed by Bénédikt Tran in :gh:`135853`.) +multiprocessing +--------------- + +* Add the optional ``buffersize`` parameter to + :meth:`multiprocessing.pool.Pool.imap` and + :meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of + submitted tasks whose results have not yet been yielded. If the buffer is + full, iteration over the *iterables* pauses until a result is yielded from + the buffer. To fully utilize pool's capacity when using this feature, set + *buffersize* at least to the number of processes in pool (to consume + *iterable* as you go), or even higher (to prefetch the next + ``N=buffersize-processes`` arguments). + + (Contributed by Oleksandr Baltian in :gh:`136871`.) + + os.path ------- From b92cad9c9d9d5c857b67a3f6716c0da350c2ad1c Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 29 Jul 2025 00:34:55 +0200 Subject: [PATCH 20/21] Fix aggreed comments on code formatting/minor refactoring --- Lib/multiprocessing/pool.py | 62 ++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e7f40207156d5e..fc5e68541b2fa6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -389,22 +389,21 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None, return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) - def _guarded_task_generation(self, result_job, func, iterable, - buffersize_sema=None): + def _guarded_task_generation(self, result_job, func, iterable, sema=None): '''Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.''' try: i = -1 - if buffersize_sema is None: + if sema is None: for i, x in enumerate(iterable): yield (result_job, i, func, (x,), {}) else: enumerated_iter = iter(enumerate(iterable)) while True: - buffersize_sema.acquire() + sema.acquire() try: i, x = next(enumerated_iter) except StopIteration: @@ -419,13 +418,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' self._check_running() - if chunksize < 1: - raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize)) - if buffersize is not None: - if not isinstance(buffersize, int): - raise TypeError("buffersize must be an integer or None") - if buffersize < 1: - raise ValueError("buffersize must be None or > 0") + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) result = IMapIterator(self, buffersize) if chunksize == 1: @@ -441,8 +435,12 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, mapstar, task_batches, - result._buffersize_sema), + self._guarded_task_generation( + result._job, + mapstar, + task_batches, + result._buffersize_sema, + ), result._set_length, ) ) @@ -453,15 +451,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): Like `imap()` method but ordering of results is arbitrary. ''' self._check_running() - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0!r}".format(chunksize) - ) - if buffersize is not None: - if not isinstance(buffersize, int): - raise TypeError("buffersize must be an integer or None") - if buffersize < 1: - raise ValueError("buffersize must be None or > 0") + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) result = IMapUnorderedIterator(self, buffersize) if chunksize == 1: @@ -477,8 +468,12 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, mapstar, task_batches, - result._buffersize_sema), + self._guarded_task_generation( + result._job, + mapstar, + task_batches, + result._buffersize_sema, + ), result._set_length, ) ) @@ -531,6 +526,22 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, ) return result + @staticmethod + def _check_chunksize(chunksize): + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0:n}".format(chunksize) + ) + + @staticmethod + def _check_buffersize(buffersize): + if buffersize is None: + return + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") + @staticmethod def _wait_for_updates(sentinels, change_notifier, timeout=None): wait(sentinels, timeout=timeout) @@ -876,7 +887,8 @@ def _set(self, i, success_result): # class IMapIterator(object): - def __init__(self, pool, buffersize): + + def __init__(self, pool, buffersize=None): self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) From 02ebc6ac3a648c7bbb23a672d9fac684eca27839 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 29 Jul 2025 00:45:01 +0200 Subject: [PATCH 21/21] Remove `imap` and `imap_unordered` body code duplication --- Lib/multiprocessing/pool.py | 87 +++++++++++++------------------------ 1 file changed, 31 insertions(+), 56 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index fc5e68541b2fa6..5c5930a0d7aa4d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -417,67 +417,14 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - self._check_running() - self._check_chunksize(chunksize) - self._check_buffersize(buffersize) - - result = IMapIterator(self, buffersize) - if chunksize == 1: - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, func, iterable, - result._buffersize_sema), - result._set_length, - ) - ) - return result - else: - task_batches = Pool._get_tasks(func, iterable, chunksize) - self._taskqueue.put( - ( - self._guarded_task_generation( - result._job, - mapstar, - task_batches, - result._buffersize_sema, - ), - result._set_length, - ) - ) - return (item for chunk in result for item in chunk) + return self._imap(IMapIterator, func, iterable, chunksize, buffersize) def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): ''' Like `imap()` method but ordering of results is arbitrary. ''' - self._check_running() - self._check_chunksize(chunksize) - self._check_buffersize(buffersize) - - result = IMapUnorderedIterator(self, buffersize) - if chunksize == 1: - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, func, iterable, - result._buffersize_sema), - result._set_length, - ) - ) - return result - else: - task_batches = Pool._get_tasks(func, iterable, chunksize) - self._taskqueue.put( - ( - self._guarded_task_generation( - result._job, - mapstar, - task_batches, - result._buffersize_sema, - ), - result._set_length, - ) - ) - return (item for chunk in result for item in chunk) + return self._imap(IMapUnorderedIterator, func, iterable, chunksize, + buffersize) def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): @@ -526,6 +473,34 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, ) return result + def _imap(self, iterator_cls, func, iterable, chunksize=1, + buffersize=None): + self._check_running() + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) + + result = iterator_cls(self, buffersize) + if chunksize == 1: + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), + result._set_length, + ) + ) + return result + else: + task_batches = Pool._get_tasks(func, iterable, chunksize) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, mapstar, + task_batches, + result._buffersize_sema), + result._set_length, + ) + ) + return (item for chunk in result for item in chunk) + @staticmethod def _check_chunksize(chunksize): if chunksize < 1: