aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Tools/ftscalingbench/ftscalingbench.py
diff options
context:
space:
mode:
authorSam Gross <colesbury@gmail.com>2024-10-28 17:47:23 -0400
committerGitHub <noreply@github.com>2024-10-28 17:47:23 -0400
commit00ea179879726ae221ac7084bdc14752c45ff558 (patch)
tree5bfd8c45ce1fc162b608e050baf821adaf3613aa /Tools/ftscalingbench/ftscalingbench.py
parentb5b06349eb71b7cf9e5082e26e6fe0145875f95b (diff)
downloadcpython-00ea179879726ae221ac7084bdc14752c45ff558.tar.gz
cpython-00ea179879726ae221ac7084bdc14752c45ff558.zip
gh-125985: Add free threading scaling micro benchmarks (#125986)
These consist of a number of short snippets that help identify scaling bottlenecks in the free threaded interpreter. The current bottlenecks are in calling functions in benchmarks that call functions (due to `LOAD_ATTR` not yet using deferred reference counting) and when accessing thread-local data.
Diffstat (limited to 'Tools/ftscalingbench/ftscalingbench.py')
-rw-r--r--Tools/ftscalingbench/ftscalingbench.py324
1 files changed, 324 insertions, 0 deletions
diff --git a/Tools/ftscalingbench/ftscalingbench.py b/Tools/ftscalingbench/ftscalingbench.py
new file mode 100644
index 00000000000..767aeae9349
--- /dev/null
+++ b/Tools/ftscalingbench/ftscalingbench.py
@@ -0,0 +1,324 @@
+# This script runs a set of small benchmarks to help identify scaling
+# bottlenecks in the free-threaded interpreter. The benchmarks consist
+# of patterns that ought to scale well, but haven't in the past. This is
+# typically due to reference count contention or lock contention.
+#
+# This is not intended to be a general multithreading benchmark suite, nor
+# are the benchmarks intended to be representative of real-world workloads.
+#
+# On Linux, to avoid confounding hardware effects, the script attempts to:
+# * Use a single CPU socket (to avoid NUMA effects)
+# * Use distinct physical cores (to avoid hyperthreading/SMT effects)
+# * Use "performance" cores (Intel, ARM) on CPUs that have performance and
+# efficiency cores
+#
+# It also helps to disable dynamic frequency scaling (i.e., "Turbo Boost")
+#
+# Intel:
+# > echo "1" | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo
+#
+# AMD:
+# > echo "0" | sudo tee /sys/devices/system/cpu/cpufreq/boost
+#
+
+import math
+import os
+import queue
+import sys
+import threading
+import time
+
+# The iterations in individual benchmarks are scaled by this factor.
+WORK_SCALE = 100
+
+ALL_BENCHMARKS = {}
+
+threads = []
+in_queues = []
+out_queues = []
+
+
+def register_benchmark(func):
+ ALL_BENCHMARKS[func.__name__] = func
+ return func
+
+@register_benchmark
+def object_cfunction():
+ accu = 0
+ tab = [1] * 100
+ for i in range(1000 * WORK_SCALE):
+ tab.pop(0)
+ tab.append(i)
+ accu += tab[50]
+ return accu
+
+@register_benchmark
+def cmodule_function():
+ for i in range(1000 * WORK_SCALE):
+ math.floor(i * i)
+
+@register_benchmark
+def mult_constant():
+ x = 1.0
+ for i in range(3000 * WORK_SCALE):
+ x *= 1.01
+
+def simple_gen():
+ for i in range(10):
+ yield i
+
+@register_benchmark
+def generator():
+ accu = 0
+ for i in range(100 * WORK_SCALE):
+ for v in simple_gen():
+ accu += v
+ return accu
+
+class Counter:
+ def __init__(self):
+ self.i = 0
+
+ def next_number(self):
+ self.i += 1
+ return self.i
+
+@register_benchmark
+def pymethod():
+ c = Counter()
+ for i in range(1000 * WORK_SCALE):
+ c.next_number()
+ return c.i
+
+def next_number(i):
+ return i + 1
+
+@register_benchmark
+def pyfunction():
+ accu = 0
+ for i in range(1000 * WORK_SCALE):
+ accu = next_number(i)
+ return accu
+
+def double(x):
+ return x + x
+
+module = sys.modules[__name__]
+
+@register_benchmark
+def module_function():
+ total = 0
+ for i in range(1000 * WORK_SCALE):
+ total += module.double(i)
+ return total
+
+class MyObject:
+ pass
+
+@register_benchmark
+def load_string_const():
+ accu = 0
+ for i in range(1000 * WORK_SCALE):
+ if i == 'a string':
+ accu += 7
+ else:
+ accu += 1
+ return accu
+
+@register_benchmark
+def load_tuple_const():
+ accu = 0
+ for i in range(1000 * WORK_SCALE):
+ if i == (1, 2):
+ accu += 7
+ else:
+ accu += 1
+ return accu
+
+@register_benchmark
+def create_pyobject():
+ for i in range(1000 * WORK_SCALE):
+ o = MyObject()
+
+@register_benchmark
+def create_closure():
+ for i in range(1000 * WORK_SCALE):
+ def foo(x):
+ return x
+ foo(i)
+
+@register_benchmark
+def create_dict():
+ for i in range(1000 * WORK_SCALE):
+ d = {
+ "key": "value",
+ }
+
+thread_local = threading.local()
+
+@register_benchmark
+def thread_local_read():
+ tmp = thread_local
+ tmp.x = 10
+ for i in range(500 * WORK_SCALE):
+ _ = tmp.x
+ _ = tmp.x
+ _ = tmp.x
+ _ = tmp.x
+ _ = tmp.x
+
+
+def bench_one_thread(func):
+ t0 = time.perf_counter_ns()
+ func()
+ t1 = time.perf_counter_ns()
+ return t1 - t0
+
+
+def bench_parallel(func):
+ t0 = time.perf_counter_ns()
+ for inq in in_queues:
+ inq.put(func)
+ for outq in out_queues:
+ outq.get()
+ t1 = time.perf_counter_ns()
+ return t1 - t0
+
+
+def benchmark(func):
+ delta_one_thread = bench_one_thread(func)
+ delta_many_threads = bench_parallel(func)
+
+ speedup = delta_one_thread * len(threads) / delta_many_threads
+ if speedup >= 1:
+ factor = speedup
+ direction = "faster"
+ else:
+ factor = 1 / speedup
+ direction = "slower"
+
+ use_color = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
+ color = reset_color = ""
+ if use_color:
+ if speedup <= 1.1:
+ color = "\x1b[31m" # red
+ elif speedup < len(threads)/2:
+ color = "\x1b[33m" # yellow
+ reset_color = "\x1b[0m"
+
+ print(f"{color}{func.__name__:<18} {round(factor, 1):>4}x {direction}{reset_color}")
+
+def determine_num_threads_and_affinity():
+ if sys.platform != "linux":
+ return [None] * os.cpu_count()
+
+ # Try to use `lscpu -p` on Linux
+ import subprocess
+ try:
+ output = subprocess.check_output(["lscpu", "-p=cpu,node,core,MAXMHZ"],
+ text=True, env={"LC_NUMERIC": "C"})
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ return [None] * os.cpu_count()
+
+ table = []
+ for line in output.splitlines():
+ if line.startswith("#"):
+ continue
+ cpu, node, core, maxhz = line.split(",")
+ if maxhz == "":
+ maxhz = "0"
+ table.append((int(cpu), int(node), int(core), float(maxhz)))
+
+ cpus = []
+ cores = set()
+ max_mhz_all = max(row[3] for row in table)
+ for cpu, node, core, maxmhz in table:
+ # Choose only CPUs on the same node, unique cores, and try to avoid
+ # "efficiency" cores.
+ if node == 0 and core not in cores and maxmhz == max_mhz_all:
+ cpus.append(cpu)
+ cores.add(core)
+ return cpus
+
+
+def thread_run(cpu, in_queue, out_queue):
+ if cpu is not None and hasattr(os, "sched_setaffinity"):
+ # Set the affinity for the current thread
+ os.sched_setaffinity(0, (cpu,))
+
+ while True:
+ func = in_queue.get()
+ if func is None:
+ break
+ func()
+ out_queue.put(None)
+
+
+def initialize_threads(opts):
+ if opts.threads == -1:
+ cpus = determine_num_threads_and_affinity()
+ else:
+ cpus = [None] * opts.threads # don't set affinity
+
+ print(f"Running benchmarks with {len(cpus)} threads")
+ for cpu in cpus:
+ inq = queue.Queue()
+ outq = queue.Queue()
+ in_queues.append(inq)
+ out_queues.append(outq)
+ t = threading.Thread(target=thread_run, args=(cpu, inq, outq), daemon=True)
+ threads.append(t)
+ t.start()
+
+
+def main(opts):
+ global WORK_SCALE
+ if not hasattr(sys, "_is_gil_enabled") or sys._is_gil_enabled():
+ sys.stderr.write("expected to be run with the GIL disabled\n")
+
+ benchmark_names = opts.benchmarks
+ if benchmark_names:
+ for name in benchmark_names:
+ if name not in ALL_BENCHMARKS:
+ sys.stderr.write(f"Unknown benchmark: {name}\n")
+ sys.exit(1)
+ else:
+ benchmark_names = ALL_BENCHMARKS.keys()
+
+ WORK_SCALE = opts.scale
+
+ if not opts.baseline_only:
+ initialize_threads(opts)
+
+ do_bench = not opts.baseline_only and not opts.parallel_only
+ for name in benchmark_names:
+ func = ALL_BENCHMARKS[name]
+ if do_bench:
+ benchmark(func)
+ continue
+
+ if opts.parallel_only:
+ delta_ns = bench_parallel(func)
+ else:
+ delta_ns = bench_one_thread(func)
+
+ time_ms = delta_ns / 1_000_000
+ print(f"{func.__name__:<18} {time_ms:.1f} ms")
+
+
+if __name__ == "__main__":
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-t", "--threads", type=int, default=-1,
+ help="number of threads to use")
+ parser.add_argument("--scale", type=int, default=100,
+ help="work scale factor for the benchmark (default=100)")
+ parser.add_argument("--baseline-only", default=False, action="store_true",
+ help="only run the baseline benchmarks (single thread)")
+ parser.add_argument("--parallel-only", default=False, action="store_true",
+ help="only run the parallel benchmark (many threads)")
+ parser.add_argument("benchmarks", nargs="*",
+ help="benchmarks to run")
+ options = parser.parse_args()
+ main(options)