Skip to content

BUG: poor multithreaded performance scaling for small arrays (nogil) #27786

@eundersander

Description

@eundersander

Describe the issue:

Using python 3.13 free threading, I observe that multithreaded performance (MFLOPS) scales poorly for numpy array computation, especially on small arrays. For comparison, performance scales well for (1) multiprocess computation, and (2) multithreaded/multiprocess ordinary python list computation. Although I measure MFLOPS here, I would guess the underlying performance issue is some per-array-access overhead, perhaps a lock that results in thread contention.

In the attached benchmark, the workers are embarrassingly parallel. I create an array/list in each worker thread/process and then do computation on it. I only time the computation, not the setup.

The main takeaway from the plots below is that numpy performance drops dramatically when using 8+ threads in the same process (solid orange line).

The original reporter used an AMD Ryzen Threadripper PRO 5955WX 16-Cores, but @ngoldbaum edited the description and reproducer script and used a Macbook Pro M3 Max to reproduce the original report with the updated script.

mflops_array_length_100

Benchmark text output:

goldbaum at Nathans-MBP in ~/Documents/numpy-experiments
○  PYTHON_GIL=0 python parallel_bench.py --array-length 100 --num-iterations 5000
Python Version: 3.13.0 experimental free-threading build (main, Nov  5 2024, 16:45:19) [Clang 16.0.0 (clang-1600.0.26.3)]
NumPy Version: 2.1.3
os.cpu_count(): 11
1 numpy threads
1 numpy processes
1 list threads
1 list processes
2 numpy threads
2 numpy processes
2 list threads
2 list processes
4 numpy threads
4 numpy processes
4 list threads
4 list processes
8 numpy threads
8 numpy processes
8 list threads
8 list processes
16 numpy threads
16 numpy processes
16 list threads
16 list processes
32 numpy threads
32 numpy processes
32 list threads
32 list processes
64 numpy threads
64 numpy processes
64 list threads
64 list processes
128 numpy threads
128 numpy processes
128 list threads
128 list processes
256 numpy threads
256 numpy processes
256 list threads
256 list processes
512 numpy threads
512 numpy processes
512 list threads
512 list processes
1024 numpy threads
1024 numpy processes
1024 list threads
1024 list processes
2048 numpy threads
2048 numpy processes
2048 list threads
2048 list processes

# Workers  | Numpy Threads           | Numpy Processes         | List Threads            | List Processes
-----------+-------------------------+-------------------------+-------------------------+------------------------
1          | 0.01s, 84.01 MFLOPS     | 0.30s, 3.34 MFLOPS      | 0.01s, 88.63 MFLOPS     | 0.31s, 3.21 MFLOPS
2          | 0.01s, 154.94 MFLOPS    | 0.33s, 6.05 MFLOPS      | 0.01s, 171.32 MFLOPS    | 0.33s, 6.01 MFLOPS
4          | 0.01s, 288.72 MFLOPS    | 0.38s, 10.49 MFLOPS     | 0.01s, 332.36 MFLOPS    | 0.38s, 10.40 MFLOPS
8          | 0.04s, 206.42 MFLOPS    | 0.52s, 15.28 MFLOPS     | 0.02s, 340.70 MFLOPS    | 0.53s, 15.22 MFLOPS
16         | 0.11s, 141.60 MFLOPS    | 1.05s, 15.22 MFLOPS     | 0.02s, 820.67 MFLOPS    | 1.05s, 15.30 MFLOPS
32         | 0.24s, 134.02 MFLOPS    | 2.08s, 15.37 MFLOPS     | 0.02s, 1634.83 MFLOPS   | 2.24s, 14.29 MFLOPS
64         | 0.48s, 132.52 MFLOPS    | 4.06s, 15.78 MFLOPS     | 0.12s, 518.98 MFLOPS    | 4.04s, 15.84 MFLOPS
128        | 2.96s, 43.29 MFLOPS     | 8.07s, 15.87 MFLOPS     | 0.23s, 554.86 MFLOPS    | 8.33s, 15.37 MFLOPS
256        | 15.05s, 17.01 MFLOPS    | N/A                     | 0.45s, 563.20 MFLOPS    | N/A
512        | 49.28s, 10.39 MFLOPS    | N/A                     | 0.23s, 2187.81 MFLOPS   | N/A
1024       | 160.21s, 6.39 MFLOPS    | N/A                     | 1.56s, 657.09 MFLOPS    | N/A
2048       | 599.69s, 3.42 MFLOPS    | N/A                     | 2.58s, 793.79 MFLOPS    | N/A
Plot saved to mflops_array_length_100.png

Reproduce the code example:

import time
import threading
import multiprocessing
import numpy as np
import sys
import os
import argparse
import matplotlib.pyplot as plt

def numpy_worker(barrier, array_length, num_iterations):
    """Worker function for NumPy computations."""
    x = np.arange(array_length, dtype=np.float64)
    barrier.wait()
    for _ in range(num_iterations):
        x += 0.01  # Element-wise operation
        x[0] += x.mean() * 0.01  # Reduction operation

def list_worker(barrier, array_length, num_iterations):
    """Worker function for list computations."""
    x = [float(xi) for xi in range(array_length)]
    barrier.wait()  # Synchronize start
    for _ in range(num_iterations):
        x = [xi + 0.01 for xi in x]  # Element-wise operation
        x[0] += sum(x) / len(x) * 0.01  # Reduction operation

def launch_workers(worker_func, num_workers, method, array_length, num_iterations):
    """Launches workers using threading or multiprocessing."""
    if method == 'threads':
        barrier = threading.Barrier(num_workers + 1)
        workers = []
        for _ in range(num_workers):
            t = threading.Thread(target=worker_func, args=(barrier, array_length, num_iterations))
            workers.append(t)
            t.start()
        barrier.wait()  # Synchronize all threads
        start_time = time.time()
        for t in workers:
            t.join()
        end_time = time.time()
    elif method == 'processes':
        # Use a multiprocessing.Event for synchronization
        start_event = multiprocessing.Event()
        workers = []
        for _ in range(num_workers):
            p = multiprocessing.Process(target=worker_func, args=(start_event, array_length, num_iterations))
            workers.append(p)
            p.start()
        start_time = time.time()
        start_event.set()  # Signal all processes to start
        for p in workers:
            p.join()
        end_time = time.time()
    else:
        raise ValueError("Unknown method")
    return start_time, end_time

def run_benchmark(kind, method, num_workers, args):
    """Runs the benchmark for the specified configuration."""
    if kind == 'numpy':
        worker = numpy_worker
    elif kind == 'list':
        worker = list_worker
    else:
        raise ValueError("Unknown kind")

    # Start workers and measure time
    start_time, end_time = launch_workers(worker, num_workers, method, args.array_length, args.num_iterations)

    elapsed_time = end_time - start_time
    total_ops = num_workers * args.num_iterations * args.array_length * 2
    mflop_per_sec = total_ops / (elapsed_time * 1e6)
    return elapsed_time, mflop_per_sec

def print_results_table(results, num_workers_list):
    """Prints the benchmark results in a formatted table."""
    headers = ["# Workers", "Numpy Threads", "Numpy Processes", "List Threads", "List Processes"]
    row_format = "{:<10} | {:<23} | {:<23} | {:<23} | {:<23}"
    separator = "-" * 10 + "-+-" + "-+-".join(["-" * 23] * 4)

    print()
    print(row_format.format(*headers))
    print(separator)
    for num_workers in num_workers_list:
        row = [str(num_workers)]
        for key in ["numpy_threads", "numpy_processes", "list_threads", "list_processes"]:
            if key in results[num_workers]:
                elapsed, mflop = results[num_workers][key]
                row.append(f"{elapsed:.2f}s, {mflop:.2f} MFLOPS")
            else:
                row.append("N/A")
        print(row_format.format(*row))

def save_plot(plot_data, array_length):
    """Generates and saves the benchmark plot."""
    plt.figure(figsize=(6.5, 4), tight_layout=True)
    styles = {
        'numpy_threads': ('solid', '#FFA500'),
        'numpy_processes': ('dotted', '#FFA500'),
        'list_threads': ('solid', '#1E90FF'),
        'list_processes': ('dotted', '#1E90FF'),
    }

    for key in plot_data:
        x = plot_data[key]['num_workers']
        y = plot_data[key]['mflop_per_sec']
        linestyle, color = styles[key]
        plt.plot(x, y, linestyle=linestyle, color=color, marker='o', label=key.replace('_', ' ').title(), linewidth=2)

    # add labels for the far-right datapoints
    printed_labels = []
    # Add padding to ensure labels outside the plot fit within the figure
    plt.gcf().subplots_adjust(right=0.8)
    for key in plot_data:
        x = plot_data[key]['num_workers']
        y = plot_data[key]['mflop_per_sec']
        _, color = styles[key]

        # Check if the current value is sufficiently distinct from previously printed labels
        if all(abs(y[-1] - prev) / max(y[-1], prev) > 0.2 for prev in printed_labels):
            # Position the label just outside the right edge of the plot
            x_pos = plt.xlim()[1] * 1.25  # 125% of the x-axis range (outside the plot)
            plt.text(
                x_pos, y[-1], f"{y[-1]:.1f}",
                fontsize=12, color=color, ha='left', va='center'
            )
            printed_labels.append(y[-1])  # Mark this label as printed
            
    plt.xlabel('Number of Workers', fontsize=14)
    plt.ylabel('MFLOPS', fontsize=14)
    plt.title(f'MFLOPS for array length {array_length}', fontsize=16)
    plt.legend(fontsize=12)
    plt.xscale('log', base=2)
    plt.yscale('log')
    plt.xticks(plot_data['numpy_threads']['num_workers'], labels=plot_data['numpy_threads']['num_workers'], fontsize=12)
    plt.yticks(fontsize=12)
    plt.grid(True, which="both", ls="--", linewidth=0.5)
    plt.gcf().subplots_adjust(right=0.9, bottom=0.2)
    filename = f'mflops_array_length_{array_length}.png'
    plt.savefig(filename)
    print(f'Plot saved to {filename}')

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Benchmark multithreading and multiprocessing with NumPy and list operations.')
    parser.add_argument('--array-length', type=int, default=1000, help='Length of the arrays/lists used in computations.')
    parser.add_argument('--num-iterations', type=int, default=4000, help='Number of iterations each worker performs.')
    parser.add_argument('--num-workers-list', type=int, nargs='+', default=[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048], help='List of worker counts to benchmark.')
    args = parser.parse_args()

    print(f"Python Version: {sys.version}")
    print(f"NumPy Version: {np.__version__}")
    print(f"os.cpu_count(): {os.cpu_count()}")

    results = {num_workers: {} for num_workers in args.num_workers_list}
    plot_data = {
        'numpy_threads': {'num_workers': [], 'mflop_per_sec': []},
        'numpy_processes': {'num_workers': [], 'mflop_per_sec': []},
        'list_threads': {'num_workers': [], 'mflop_per_sec': []},
        'list_processes': {'num_workers': [], 'mflop_per_sec': []},
    }

    for num_workers in args.num_workers_list:
        for kind in ['numpy', 'list']:
            for method in ['threads', 'processes']:
                print(num_workers, kind, method)
                if num_workers > 128 and method == 'processes':
                    continue
                elapsed_time, mflop_per_sec = run_benchmark(kind, method, num_workers, args)
                key = f"{kind}_{method}"
                results[num_workers][key] = (elapsed_time, mflop_per_sec)
                plot_data[key]['num_workers'].append(num_workers)
                plot_data[key]['mflop_per_sec'].append(mflop_per_sec)

    print_results_table(results, args.num_workers_list)
    save_plot(plot_data, args.array_length)

Error message:

No response

Python and NumPy Versions:

2.1.3
3.13.0 experimental free-threading build | packaged by conda-forge | (main, Oct 8 2024, 20:16:19) [GCC 13.3.0]

Runtime Environment:


[{'numpy_version': '2.1.3',
  'python': '3.13.0 experimental free-threading build | packaged by '
            'conda-forge | (main, Oct  8 2024, 20:16:19) [GCC 13.3.0]',
  'uname': uname_result(system='Linux', node='eric-Lambda-Vector', release='6.8.0-48-generic', version='#48~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Mon Oct  7 11:24:13 UTC 2', machine='x86_64')},
 {'simd_extensions': {'baseline': ['SSE', 'SSE2', 'SSE3'],
                      'found': ['SSSE3',
                                'SSE41',
                                'POPCNT',
                                'SSE42',
                                'AVX',
                                'F16C',
                                'FMA3',
                                'AVX2'],
                      'not_found': ['AVX512F',
                                    'AVX512CD',
                                    'AVX512_KNL',
                                    'AVX512_KNM',
                                    'AVX512_SKX',
                                    'AVX512_CLX',
                                    'AVX512_CNL',
                                    'AVX512_ICL']}},
 {'architecture': 'Haswell',
  'filepath': '/home/eric/miniforge3/envs/habitat-lab3-py313/lib/python3.13t/site-packages/numpy.libs/libscipy_openblas64_-ff651d7f.so',
  'internal_api': 'openblas',
  'num_threads': 32,
  'prefix': 'libscipy_openblas',
  'threading_layer': 'pthreads',
  'user_api': 'blas',
  'version': '0.3.27'}]

AMD Ryzen Threadripper PRO 5955WX 16-Cores

Context for the issue:

Our application is RL training with a robotics simulator. We use multiprocessing, with each worker doing mostly-independent CPU-heavy work. I was excited to try python 3.13 free-threading to reduce the cost of gathering results from workers--use multithreading instead of multiprocessing and thus avoid interprocess communication overhead. Instead, I see a big drop in overall performance. We use a lot of small numpy arrays for 3D math (3D positions, 4D rotation quaternions, 4x4 transform matrices, etc.).

Metadata

Metadata

Assignees

Labels

00 - Bug39 - free-threadingPRs and issues related to support for free-threading CPython (a.k.a. no-GIL, PEP 703)

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions