-
-
Notifications
You must be signed in to change notification settings - Fork 11.1k
Description
Describe the issue:
Using python 3.13 free threading, I observe that multithreaded performance (MFLOPS) scales poorly for numpy array computation, especially on small arrays. For comparison, performance scales well for (1) multiprocess computation, and (2) multithreaded/multiprocess ordinary python list computation. Although I measure MFLOPS here, I would guess the underlying performance issue is some per-array-access overhead, perhaps a lock that results in thread contention.
In the attached benchmark, the workers are embarrassingly parallel. I create an array/list in each worker thread/process and then do computation on it. I only time the computation, not the setup.
The main takeaway from the plots below is that numpy performance drops dramatically when using 8+ threads in the same process (solid orange line).
The original reporter used an AMD Ryzen Threadripper PRO 5955WX 16-Cores, but @ngoldbaum edited the description and reproducer script and used a Macbook Pro M3 Max to reproduce the original report with the updated script.
Benchmark text output:
goldbaum at Nathans-MBP in ~/Documents/numpy-experiments
○ PYTHON_GIL=0 python parallel_bench.py --array-length 100 --num-iterations 5000
Python Version: 3.13.0 experimental free-threading build (main, Nov 5 2024, 16:45:19) [Clang 16.0.0 (clang-1600.0.26.3)]
NumPy Version: 2.1.3
os.cpu_count(): 11
1 numpy threads
1 numpy processes
1 list threads
1 list processes
2 numpy threads
2 numpy processes
2 list threads
2 list processes
4 numpy threads
4 numpy processes
4 list threads
4 list processes
8 numpy threads
8 numpy processes
8 list threads
8 list processes
16 numpy threads
16 numpy processes
16 list threads
16 list processes
32 numpy threads
32 numpy processes
32 list threads
32 list processes
64 numpy threads
64 numpy processes
64 list threads
64 list processes
128 numpy threads
128 numpy processes
128 list threads
128 list processes
256 numpy threads
256 numpy processes
256 list threads
256 list processes
512 numpy threads
512 numpy processes
512 list threads
512 list processes
1024 numpy threads
1024 numpy processes
1024 list threads
1024 list processes
2048 numpy threads
2048 numpy processes
2048 list threads
2048 list processes
# Workers | Numpy Threads | Numpy Processes | List Threads | List Processes
-----------+-------------------------+-------------------------+-------------------------+------------------------
1 | 0.01s, 84.01 MFLOPS | 0.30s, 3.34 MFLOPS | 0.01s, 88.63 MFLOPS | 0.31s, 3.21 MFLOPS
2 | 0.01s, 154.94 MFLOPS | 0.33s, 6.05 MFLOPS | 0.01s, 171.32 MFLOPS | 0.33s, 6.01 MFLOPS
4 | 0.01s, 288.72 MFLOPS | 0.38s, 10.49 MFLOPS | 0.01s, 332.36 MFLOPS | 0.38s, 10.40 MFLOPS
8 | 0.04s, 206.42 MFLOPS | 0.52s, 15.28 MFLOPS | 0.02s, 340.70 MFLOPS | 0.53s, 15.22 MFLOPS
16 | 0.11s, 141.60 MFLOPS | 1.05s, 15.22 MFLOPS | 0.02s, 820.67 MFLOPS | 1.05s, 15.30 MFLOPS
32 | 0.24s, 134.02 MFLOPS | 2.08s, 15.37 MFLOPS | 0.02s, 1634.83 MFLOPS | 2.24s, 14.29 MFLOPS
64 | 0.48s, 132.52 MFLOPS | 4.06s, 15.78 MFLOPS | 0.12s, 518.98 MFLOPS | 4.04s, 15.84 MFLOPS
128 | 2.96s, 43.29 MFLOPS | 8.07s, 15.87 MFLOPS | 0.23s, 554.86 MFLOPS | 8.33s, 15.37 MFLOPS
256 | 15.05s, 17.01 MFLOPS | N/A | 0.45s, 563.20 MFLOPS | N/A
512 | 49.28s, 10.39 MFLOPS | N/A | 0.23s, 2187.81 MFLOPS | N/A
1024 | 160.21s, 6.39 MFLOPS | N/A | 1.56s, 657.09 MFLOPS | N/A
2048 | 599.69s, 3.42 MFLOPS | N/A | 2.58s, 793.79 MFLOPS | N/A
Plot saved to mflops_array_length_100.png
Reproduce the code example:
import time
import threading
import multiprocessing
import numpy as np
import sys
import os
import argparse
import matplotlib.pyplot as plt
def numpy_worker(barrier, array_length, num_iterations):
"""Worker function for NumPy computations."""
x = np.arange(array_length, dtype=np.float64)
barrier.wait()
for _ in range(num_iterations):
x += 0.01 # Element-wise operation
x[0] += x.mean() * 0.01 # Reduction operation
def list_worker(barrier, array_length, num_iterations):
"""Worker function for list computations."""
x = [float(xi) for xi in range(array_length)]
barrier.wait() # Synchronize start
for _ in range(num_iterations):
x = [xi + 0.01 for xi in x] # Element-wise operation
x[0] += sum(x) / len(x) * 0.01 # Reduction operation
def launch_workers(worker_func, num_workers, method, array_length, num_iterations):
"""Launches workers using threading or multiprocessing."""
if method == 'threads':
barrier = threading.Barrier(num_workers + 1)
workers = []
for _ in range(num_workers):
t = threading.Thread(target=worker_func, args=(barrier, array_length, num_iterations))
workers.append(t)
t.start()
barrier.wait() # Synchronize all threads
start_time = time.time()
for t in workers:
t.join()
end_time = time.time()
elif method == 'processes':
# Use a multiprocessing.Event for synchronization
start_event = multiprocessing.Event()
workers = []
for _ in range(num_workers):
p = multiprocessing.Process(target=worker_func, args=(start_event, array_length, num_iterations))
workers.append(p)
p.start()
start_time = time.time()
start_event.set() # Signal all processes to start
for p in workers:
p.join()
end_time = time.time()
else:
raise ValueError("Unknown method")
return start_time, end_time
def run_benchmark(kind, method, num_workers, args):
"""Runs the benchmark for the specified configuration."""
if kind == 'numpy':
worker = numpy_worker
elif kind == 'list':
worker = list_worker
else:
raise ValueError("Unknown kind")
# Start workers and measure time
start_time, end_time = launch_workers(worker, num_workers, method, args.array_length, args.num_iterations)
elapsed_time = end_time - start_time
total_ops = num_workers * args.num_iterations * args.array_length * 2
mflop_per_sec = total_ops / (elapsed_time * 1e6)
return elapsed_time, mflop_per_sec
def print_results_table(results, num_workers_list):
"""Prints the benchmark results in a formatted table."""
headers = ["# Workers", "Numpy Threads", "Numpy Processes", "List Threads", "List Processes"]
row_format = "{:<10} | {:<23} | {:<23} | {:<23} | {:<23}"
separator = "-" * 10 + "-+-" + "-+-".join(["-" * 23] * 4)
print()
print(row_format.format(*headers))
print(separator)
for num_workers in num_workers_list:
row = [str(num_workers)]
for key in ["numpy_threads", "numpy_processes", "list_threads", "list_processes"]:
if key in results[num_workers]:
elapsed, mflop = results[num_workers][key]
row.append(f"{elapsed:.2f}s, {mflop:.2f} MFLOPS")
else:
row.append("N/A")
print(row_format.format(*row))
def save_plot(plot_data, array_length):
"""Generates and saves the benchmark plot."""
plt.figure(figsize=(6.5, 4), tight_layout=True)
styles = {
'numpy_threads': ('solid', '#FFA500'),
'numpy_processes': ('dotted', '#FFA500'),
'list_threads': ('solid', '#1E90FF'),
'list_processes': ('dotted', '#1E90FF'),
}
for key in plot_data:
x = plot_data[key]['num_workers']
y = plot_data[key]['mflop_per_sec']
linestyle, color = styles[key]
plt.plot(x, y, linestyle=linestyle, color=color, marker='o', label=key.replace('_', ' ').title(), linewidth=2)
# add labels for the far-right datapoints
printed_labels = []
# Add padding to ensure labels outside the plot fit within the figure
plt.gcf().subplots_adjust(right=0.8)
for key in plot_data:
x = plot_data[key]['num_workers']
y = plot_data[key]['mflop_per_sec']
_, color = styles[key]
# Check if the current value is sufficiently distinct from previously printed labels
if all(abs(y[-1] - prev) / max(y[-1], prev) > 0.2 for prev in printed_labels):
# Position the label just outside the right edge of the plot
x_pos = plt.xlim()[1] * 1.25 # 125% of the x-axis range (outside the plot)
plt.text(
x_pos, y[-1], f"{y[-1]:.1f}",
fontsize=12, color=color, ha='left', va='center'
)
printed_labels.append(y[-1]) # Mark this label as printed
plt.xlabel('Number of Workers', fontsize=14)
plt.ylabel('MFLOPS', fontsize=14)
plt.title(f'MFLOPS for array length {array_length}', fontsize=16)
plt.legend(fontsize=12)
plt.xscale('log', base=2)
plt.yscale('log')
plt.xticks(plot_data['numpy_threads']['num_workers'], labels=plot_data['numpy_threads']['num_workers'], fontsize=12)
plt.yticks(fontsize=12)
plt.grid(True, which="both", ls="--", linewidth=0.5)
plt.gcf().subplots_adjust(right=0.9, bottom=0.2)
filename = f'mflops_array_length_{array_length}.png'
plt.savefig(filename)
print(f'Plot saved to {filename}')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Benchmark multithreading and multiprocessing with NumPy and list operations.')
parser.add_argument('--array-length', type=int, default=1000, help='Length of the arrays/lists used in computations.')
parser.add_argument('--num-iterations', type=int, default=4000, help='Number of iterations each worker performs.')
parser.add_argument('--num-workers-list', type=int, nargs='+', default=[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048], help='List of worker counts to benchmark.')
args = parser.parse_args()
print(f"Python Version: {sys.version}")
print(f"NumPy Version: {np.__version__}")
print(f"os.cpu_count(): {os.cpu_count()}")
results = {num_workers: {} for num_workers in args.num_workers_list}
plot_data = {
'numpy_threads': {'num_workers': [], 'mflop_per_sec': []},
'numpy_processes': {'num_workers': [], 'mflop_per_sec': []},
'list_threads': {'num_workers': [], 'mflop_per_sec': []},
'list_processes': {'num_workers': [], 'mflop_per_sec': []},
}
for num_workers in args.num_workers_list:
for kind in ['numpy', 'list']:
for method in ['threads', 'processes']:
print(num_workers, kind, method)
if num_workers > 128 and method == 'processes':
continue
elapsed_time, mflop_per_sec = run_benchmark(kind, method, num_workers, args)
key = f"{kind}_{method}"
results[num_workers][key] = (elapsed_time, mflop_per_sec)
plot_data[key]['num_workers'].append(num_workers)
plot_data[key]['mflop_per_sec'].append(mflop_per_sec)
print_results_table(results, args.num_workers_list)
save_plot(plot_data, args.array_length)
Error message:
No response
Python and NumPy Versions:
2.1.3
3.13.0 experimental free-threading build | packaged by conda-forge | (main, Oct 8 2024, 20:16:19) [GCC 13.3.0]
Runtime Environment:
[{'numpy_version': '2.1.3',
'python': '3.13.0 experimental free-threading build | packaged by '
'conda-forge | (main, Oct 8 2024, 20:16:19) [GCC 13.3.0]',
'uname': uname_result(system='Linux', node='eric-Lambda-Vector', release='6.8.0-48-generic', version='#48~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Mon Oct 7 11:24:13 UTC 2', machine='x86_64')},
{'simd_extensions': {'baseline': ['SSE', 'SSE2', 'SSE3'],
'found': ['SSSE3',
'SSE41',
'POPCNT',
'SSE42',
'AVX',
'F16C',
'FMA3',
'AVX2'],
'not_found': ['AVX512F',
'AVX512CD',
'AVX512_KNL',
'AVX512_KNM',
'AVX512_SKX',
'AVX512_CLX',
'AVX512_CNL',
'AVX512_ICL']}},
{'architecture': 'Haswell',
'filepath': '/home/eric/miniforge3/envs/habitat-lab3-py313/lib/python3.13t/site-packages/numpy.libs/libscipy_openblas64_-ff651d7f.so',
'internal_api': 'openblas',
'num_threads': 32,
'prefix': 'libscipy_openblas',
'threading_layer': 'pthreads',
'user_api': 'blas',
'version': '0.3.27'}]
AMD Ryzen Threadripper PRO 5955WX 16-Cores
Context for the issue:
Our application is RL training with a robotics simulator. We use multiprocessing, with each worker doing mostly-independent CPU-heavy work. I was excited to try python 3.13 free-threading to reduce the cost of gathering results from workers--use multithreading instead of multiprocessing and thus avoid interprocess communication overhead. Instead, I see a big drop in overall performance. We use a lot of small numpy arrays for 3D math (3D positions, 4D rotation quaternions, 4x4 transform matrices, etc.).