aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_free_threading
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_free_threading')
-rw-r--r--Lib/test/test_free_threading/test_dict.py16
-rw-r--r--Lib/test/test_free_threading/test_functools.py75
-rw-r--r--Lib/test/test_free_threading/test_generators.py51
-rw-r--r--Lib/test/test_free_threading/test_heapq.py267
-rw-r--r--Lib/test/test_free_threading/test_io.py109
-rw-r--r--Lib/test/test_free_threading/test_itertools.py95
-rw-r--r--Lib/test/test_free_threading/test_itertools_batched.py38
-rw-r--r--Lib/test/test_free_threading/test_itertools_combinatoric.py51
8 files changed, 664 insertions, 38 deletions
diff --git a/Lib/test/test_free_threading/test_dict.py b/Lib/test/test_free_threading/test_dict.py
index 476cc3178d8..5d5d4e226ca 100644
--- a/Lib/test/test_free_threading/test_dict.py
+++ b/Lib/test/test_free_threading/test_dict.py
@@ -228,6 +228,22 @@ class TestDict(TestCase):
self.assertEqual(count, 0)
+ def test_racing_object_get_set_dict(self):
+ e = Exception()
+
+ def writer():
+ for i in range(10000):
+ e.__dict__ = {1:2}
+
+ def reader():
+ for i in range(10000):
+ e.__dict__
+
+ t1 = Thread(target=writer)
+ t2 = Thread(target=reader)
+
+ with threading_helper.start_threads([t1, t2]):
+ pass
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_free_threading/test_functools.py b/Lib/test/test_free_threading/test_functools.py
new file mode 100644
index 00000000000..a442fe056ce
--- /dev/null
+++ b/Lib/test/test_free_threading/test_functools.py
@@ -0,0 +1,75 @@
+import random
+import unittest
+
+from functools import lru_cache
+from threading import Barrier, Thread
+
+from test.support import threading_helper
+
+@threading_helper.requires_working_threading()
+class TestLRUCache(unittest.TestCase):
+
+ def _test_concurrent_operations(self, maxsize):
+ num_threads = 10
+ b = Barrier(num_threads)
+ @lru_cache(maxsize=maxsize)
+ def func(arg=0):
+ return object()
+
+
+ def thread_func():
+ b.wait()
+ for i in range(1000):
+ r = random.randint(0, 1000)
+ if i < 800:
+ func(i)
+ elif i < 900:
+ func.cache_info()
+ else:
+ func.cache_clear()
+
+ threads = []
+ for i in range(num_threads):
+ t = Thread(target=thread_func)
+ threads.append(t)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ def test_concurrent_operations_unbounded(self):
+ self._test_concurrent_operations(maxsize=None)
+
+ def test_concurrent_operations_bounded(self):
+ self._test_concurrent_operations(maxsize=128)
+
+ def _test_reentrant_cache_clear(self, maxsize):
+ num_threads = 10
+ b = Barrier(num_threads)
+ @lru_cache(maxsize=maxsize)
+ def func(arg=0):
+ func.cache_clear()
+ return object()
+
+
+ def thread_func():
+ b.wait()
+ for i in range(1000):
+ func(random.randint(0, 10000))
+
+ threads = []
+ for i in range(num_threads):
+ t = Thread(target=thread_func)
+ threads.append(t)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ def test_reentrant_cache_clear_unbounded(self):
+ self._test_reentrant_cache_clear(maxsize=None)
+
+ def test_reentrant_cache_clear_bounded(self):
+ self._test_reentrant_cache_clear(maxsize=128)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_free_threading/test_generators.py b/Lib/test/test_free_threading/test_generators.py
new file mode 100644
index 00000000000..d01675eb38b
--- /dev/null
+++ b/Lib/test/test_free_threading/test_generators.py
@@ -0,0 +1,51 @@
+import concurrent.futures
+import unittest
+from threading import Barrier
+from unittest import TestCase
+import random
+import time
+
+from test.support import threading_helper, Py_GIL_DISABLED
+
+threading_helper.requires_working_threading(module=True)
+
+
+def random_sleep():
+ delay_us = random.randint(50, 100)
+ time.sleep(delay_us * 1e-6)
+
+def random_string():
+ return ''.join(random.choice('0123456789ABCDEF') for _ in range(10))
+
+def set_gen_name(g, b):
+ b.wait()
+ random_sleep()
+ g.__name__ = random_string()
+ return g.__name__
+
+def set_gen_qualname(g, b):
+ b.wait()
+ random_sleep()
+ g.__qualname__ = random_string()
+ return g.__qualname__
+
+
+@unittest.skipUnless(Py_GIL_DISABLED, "Enable only in FT build")
+class TestFTGenerators(TestCase):
+ NUM_THREADS = 4
+
+ def concurrent_write_with_func(self, func):
+ gen = (x for x in range(42))
+ for j in range(1000):
+ with concurrent.futures.ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ b = Barrier(self.NUM_THREADS)
+ futures = {executor.submit(func, gen, b): i for i in range(self.NUM_THREADS)}
+ for fut in concurrent.futures.as_completed(futures):
+ gen_name = fut.result()
+ self.assertEqual(len(gen_name), 10)
+
+ def test_concurrent_write(self):
+ with self.subTest(func=set_gen_name):
+ self.concurrent_write_with_func(func=set_gen_name)
+ with self.subTest(func=set_gen_qualname):
+ self.concurrent_write_with_func(func=set_gen_qualname)
diff --git a/Lib/test/test_free_threading/test_heapq.py b/Lib/test/test_free_threading/test_heapq.py
new file mode 100644
index 00000000000..ee7adfb2b78
--- /dev/null
+++ b/Lib/test/test_free_threading/test_heapq.py
@@ -0,0 +1,267 @@
+import unittest
+
+import heapq
+
+from enum import Enum
+from threading import Thread, Barrier, Lock
+from random import shuffle, randint
+
+from test.support import threading_helper
+from test import test_heapq
+
+
+NTHREADS = 10
+OBJECT_COUNT = 5_000
+
+
+class Heap(Enum):
+ MIN = 1
+ MAX = 2
+
+
+@threading_helper.requires_working_threading()
+class TestHeapq(unittest.TestCase):
+ def setUp(self):
+ self.test_heapq = test_heapq.TestHeapPython()
+
+ def test_racing_heapify(self):
+ heap = list(range(OBJECT_COUNT))
+ shuffle(heap)
+
+ self.run_concurrently(
+ worker_func=heapq.heapify, args=(heap,), nthreads=NTHREADS
+ )
+ self.test_heapq.check_invariant(heap)
+
+ def test_racing_heappush(self):
+ heap = []
+
+ def heappush_func(heap):
+ for item in reversed(range(OBJECT_COUNT)):
+ heapq.heappush(heap, item)
+
+ self.run_concurrently(
+ worker_func=heappush_func, args=(heap,), nthreads=NTHREADS
+ )
+ self.test_heapq.check_invariant(heap)
+
+ def test_racing_heappop(self):
+ heap = self.create_heap(OBJECT_COUNT, Heap.MIN)
+
+ # Each thread pops (OBJECT_COUNT / NTHREADS) items
+ self.assertEqual(OBJECT_COUNT % NTHREADS, 0)
+ per_thread_pop_count = OBJECT_COUNT // NTHREADS
+
+ def heappop_func(heap, pop_count):
+ local_list = []
+ for _ in range(pop_count):
+ item = heapq.heappop(heap)
+ local_list.append(item)
+
+ # Each local list should be sorted
+ self.assertTrue(self.is_sorted_ascending(local_list))
+
+ self.run_concurrently(
+ worker_func=heappop_func,
+ args=(heap, per_thread_pop_count),
+ nthreads=NTHREADS,
+ )
+ self.assertEqual(len(heap), 0)
+
+ def test_racing_heappushpop(self):
+ heap = self.create_heap(OBJECT_COUNT, Heap.MIN)
+ pushpop_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT)
+
+ def heappushpop_func(heap, pushpop_items):
+ for item in pushpop_items:
+ popped_item = heapq.heappushpop(heap, item)
+ self.assertTrue(popped_item <= item)
+
+ self.run_concurrently(
+ worker_func=heappushpop_func,
+ args=(heap, pushpop_items),
+ nthreads=NTHREADS,
+ )
+ self.assertEqual(len(heap), OBJECT_COUNT)
+ self.test_heapq.check_invariant(heap)
+
+ def test_racing_heapreplace(self):
+ heap = self.create_heap(OBJECT_COUNT, Heap.MIN)
+ replace_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT)
+
+ def heapreplace_func(heap, replace_items):
+ for item in replace_items:
+ heapq.heapreplace(heap, item)
+
+ self.run_concurrently(
+ worker_func=heapreplace_func,
+ args=(heap, replace_items),
+ nthreads=NTHREADS,
+ )
+ self.assertEqual(len(heap), OBJECT_COUNT)
+ self.test_heapq.check_invariant(heap)
+
+ def test_racing_heapify_max(self):
+ max_heap = list(range(OBJECT_COUNT))
+ shuffle(max_heap)
+
+ self.run_concurrently(
+ worker_func=heapq.heapify_max, args=(max_heap,), nthreads=NTHREADS
+ )
+ self.test_heapq.check_max_invariant(max_heap)
+
+ def test_racing_heappush_max(self):
+ max_heap = []
+
+ def heappush_max_func(max_heap):
+ for item in range(OBJECT_COUNT):
+ heapq.heappush_max(max_heap, item)
+
+ self.run_concurrently(
+ worker_func=heappush_max_func, args=(max_heap,), nthreads=NTHREADS
+ )
+ self.test_heapq.check_max_invariant(max_heap)
+
+ def test_racing_heappop_max(self):
+ max_heap = self.create_heap(OBJECT_COUNT, Heap.MAX)
+
+ # Each thread pops (OBJECT_COUNT / NTHREADS) items
+ self.assertEqual(OBJECT_COUNT % NTHREADS, 0)
+ per_thread_pop_count = OBJECT_COUNT // NTHREADS
+
+ def heappop_max_func(max_heap, pop_count):
+ local_list = []
+ for _ in range(pop_count):
+ item = heapq.heappop_max(max_heap)
+ local_list.append(item)
+
+ # Each local list should be sorted
+ self.assertTrue(self.is_sorted_descending(local_list))
+
+ self.run_concurrently(
+ worker_func=heappop_max_func,
+ args=(max_heap, per_thread_pop_count),
+ nthreads=NTHREADS,
+ )
+ self.assertEqual(len(max_heap), 0)
+
+ def test_racing_heappushpop_max(self):
+ max_heap = self.create_heap(OBJECT_COUNT, Heap.MAX)
+ pushpop_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT)
+
+ def heappushpop_max_func(max_heap, pushpop_items):
+ for item in pushpop_items:
+ popped_item = heapq.heappushpop_max(max_heap, item)
+ self.assertTrue(popped_item >= item)
+
+ self.run_concurrently(
+ worker_func=heappushpop_max_func,
+ args=(max_heap, pushpop_items),
+ nthreads=NTHREADS,
+ )
+ self.assertEqual(len(max_heap), OBJECT_COUNT)
+ self.test_heapq.check_max_invariant(max_heap)
+
+ def test_racing_heapreplace_max(self):
+ max_heap = self.create_heap(OBJECT_COUNT, Heap.MAX)
+ replace_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT)
+
+ def heapreplace_max_func(max_heap, replace_items):
+ for item in replace_items:
+ heapq.heapreplace_max(max_heap, item)
+
+ self.run_concurrently(
+ worker_func=heapreplace_max_func,
+ args=(max_heap, replace_items),
+ nthreads=NTHREADS,
+ )
+ self.assertEqual(len(max_heap), OBJECT_COUNT)
+ self.test_heapq.check_max_invariant(max_heap)
+
+ def test_lock_free_list_read(self):
+ n, n_threads = 1_000, 10
+ l = []
+ barrier = Barrier(n_threads * 2)
+
+ count = 0
+ lock = Lock()
+
+ def worker():
+ with lock:
+ nonlocal count
+ x = count
+ count += 1
+
+ barrier.wait()
+ for i in range(n):
+ if x % 2:
+ heapq.heappush(l, 1)
+ heapq.heappop(l)
+ else:
+ try:
+ l[0]
+ except IndexError:
+ pass
+
+ self.run_concurrently(worker, (), n_threads * 2)
+
+ @staticmethod
+ def is_sorted_ascending(lst):
+ """
+ Check if the list is sorted in ascending order (non-decreasing).
+ """
+ return all(lst[i - 1] <= lst[i] for i in range(1, len(lst)))
+
+ @staticmethod
+ def is_sorted_descending(lst):
+ """
+ Check if the list is sorted in descending order (non-increasing).
+ """
+ return all(lst[i - 1] >= lst[i] for i in range(1, len(lst)))
+
+ @staticmethod
+ def create_heap(size, heap_kind):
+ """
+ Create a min/max heap where elements are in the range (0, size - 1) and
+ shuffled before heapify.
+ """
+ heap = list(range(OBJECT_COUNT))
+ shuffle(heap)
+ if heap_kind == Heap.MIN:
+ heapq.heapify(heap)
+ else:
+ heapq.heapify_max(heap)
+
+ return heap
+
+ @staticmethod
+ def create_random_list(a, b, size):
+ """
+ Create a list of random numbers between a and b (inclusive).
+ """
+ return [randint(-a, b) for _ in range(size)]
+
+ def run_concurrently(self, worker_func, args, nthreads):
+ """
+ Run the worker function concurrently in multiple threads.
+ """
+ barrier = Barrier(nthreads)
+
+ def wrapper_func(*args):
+ # Wait for all threads to reach this point before proceeding.
+ barrier.wait()
+ worker_func(*args)
+
+ with threading_helper.catch_threading_exception() as cm:
+ workers = (
+ Thread(target=wrapper_func, args=args) for _ in range(nthreads)
+ )
+ with threading_helper.start_threads(workers):
+ pass
+
+ # Worker threads should not raise any exceptions
+ self.assertIsNone(cm.exc_value)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_free_threading/test_io.py b/Lib/test/test_free_threading/test_io.py
new file mode 100644
index 00000000000..f9bec740ddf
--- /dev/null
+++ b/Lib/test/test_free_threading/test_io.py
@@ -0,0 +1,109 @@
+import threading
+from unittest import TestCase
+from test.support import threading_helper
+from random import randint
+from io import BytesIO
+from sys import getsizeof
+
+
+class TestBytesIO(TestCase):
+ # Test pretty much everything that can break under free-threading.
+ # Non-deterministic, but at least one of these things will fail if
+ # BytesIO object is not free-thread safe.
+
+ def check(self, funcs, *args):
+ barrier = threading.Barrier(len(funcs))
+ threads = []
+
+ for func in funcs:
+ thread = threading.Thread(target=func, args=(barrier, *args))
+
+ threads.append(thread)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ @threading_helper.requires_working_threading()
+ @threading_helper.reap_threads
+ def test_free_threading(self):
+ """Test for segfaults and aborts."""
+
+ def write(barrier, b, *ignore):
+ barrier.wait()
+ try: b.write(b'0' * randint(100, 1000))
+ except ValueError: pass # ignore write fail to closed file
+
+ def writelines(barrier, b, *ignore):
+ barrier.wait()
+ b.write(b'0\n' * randint(100, 1000))
+
+ def truncate(barrier, b, *ignore):
+ barrier.wait()
+ try: b.truncate(0)
+ except: BufferError # ignore exported buffer
+
+ def read(barrier, b, *ignore):
+ barrier.wait()
+ b.read()
+
+ def read1(barrier, b, *ignore):
+ barrier.wait()
+ b.read1()
+
+ def readline(barrier, b, *ignore):
+ barrier.wait()
+ b.readline()
+
+ def readlines(barrier, b, *ignore):
+ barrier.wait()
+ b.readlines()
+
+ def readinto(barrier, b, into, *ignore):
+ barrier.wait()
+ b.readinto(into)
+
+ def close(barrier, b, *ignore):
+ barrier.wait()
+ b.close()
+
+ def getvalue(barrier, b, *ignore):
+ barrier.wait()
+ b.getvalue()
+
+ def getbuffer(barrier, b, *ignore):
+ barrier.wait()
+ b.getbuffer()
+
+ def iter(barrier, b, *ignore):
+ barrier.wait()
+ list(b)
+
+ def getstate(barrier, b, *ignore):
+ barrier.wait()
+ b.__getstate__()
+
+ def setstate(barrier, b, st, *ignore):
+ barrier.wait()
+ b.__setstate__(st)
+
+ def sizeof(barrier, b, *ignore):
+ barrier.wait()
+ getsizeof(b)
+
+ self.check([write] * 10, BytesIO())
+ self.check([writelines] * 10, BytesIO())
+ self.check([write] * 10 + [truncate] * 10, BytesIO())
+ self.check([truncate] + [read] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [read1] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [readline] * 10, BytesIO(b'0\n'*20480))
+ self.check([truncate] + [readlines] * 10, BytesIO(b'0\n'*20480))
+ self.check([truncate] + [readinto] * 10, BytesIO(b'0\n'*204800), bytearray(b'0\n'*204800))
+ self.check([close] + [write] * 10, BytesIO())
+ self.check([truncate] + [getvalue] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [getbuffer] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [iter] * 10, BytesIO(b'0\n'*20480))
+ self.check([truncate] + [getstate] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [setstate] * 10, BytesIO(b'0\n'*204800), (b'123', 0, None))
+ self.check([truncate] + [sizeof] * 10, BytesIO(b'0\n'*204800))
+
+ # no tests for seek or tell because they don't break anything
diff --git a/Lib/test/test_free_threading/test_itertools.py b/Lib/test/test_free_threading/test_itertools.py
new file mode 100644
index 00000000000..9d366041917
--- /dev/null
+++ b/Lib/test/test_free_threading/test_itertools.py
@@ -0,0 +1,95 @@
+import unittest
+from threading import Thread, Barrier
+from itertools import batched, chain, cycle
+from test.support import threading_helper
+
+
+threading_helper.requires_working_threading(module=True)
+
+class ItertoolsThreading(unittest.TestCase):
+
+ @threading_helper.reap_threads
+ def test_batched(self):
+ number_of_threads = 10
+ number_of_iterations = 20
+ barrier = Barrier(number_of_threads)
+ def work(it):
+ barrier.wait()
+ while True:
+ try:
+ next(it)
+ except StopIteration:
+ break
+
+ data = tuple(range(1000))
+ for it in range(number_of_iterations):
+ batch_iterator = batched(data, 2)
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=work, args=[batch_iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+ @threading_helper.reap_threads
+ def test_cycle(self):
+ number_of_threads = 6
+ number_of_iterations = 10
+ number_of_cycles = 400
+
+ barrier = Barrier(number_of_threads)
+ def work(it):
+ barrier.wait()
+ for _ in range(number_of_cycles):
+ try:
+ next(it)
+ except StopIteration:
+ pass
+
+ data = (1, 2, 3, 4)
+ for it in range(number_of_iterations):
+ cycle_iterator = cycle(data)
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=work, args=[cycle_iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+ @threading_helper.reap_threads
+ def test_chain(self):
+ number_of_threads = 6
+ number_of_iterations = 20
+
+ barrier = Barrier(number_of_threads)
+ def work(it):
+ barrier.wait()
+ while True:
+ try:
+ next(it)
+ except StopIteration:
+ break
+
+ data = [(1, )] * 200
+ for it in range(number_of_iterations):
+ chain_iterator = chain(*data)
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=work, args=[chain_iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py
deleted file mode 100644
index a754b4f9ea9..00000000000
--- a/Lib/test/test_free_threading/test_itertools_batched.py
+++ /dev/null
@@ -1,38 +0,0 @@
-import unittest
-from threading import Thread, Barrier
-from itertools import batched
-from test.support import threading_helper
-
-
-threading_helper.requires_working_threading(module=True)
-
-class EnumerateThreading(unittest.TestCase):
-
- @threading_helper.reap_threads
- def test_threading(self):
- number_of_threads = 10
- number_of_iterations = 20
- barrier = Barrier(number_of_threads)
- def work(it):
- barrier.wait()
- while True:
- try:
- _ = next(it)
- except StopIteration:
- break
-
- data = tuple(range(1000))
- for it in range(number_of_iterations):
- batch_iterator = batched(data, 2)
- worker_threads = []
- for ii in range(number_of_threads):
- worker_threads.append(
- Thread(target=work, args=[batch_iterator]))
-
- with threading_helper.start_threads(worker_threads):
- pass
-
- barrier.reset()
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/Lib/test/test_free_threading/test_itertools_combinatoric.py b/Lib/test/test_free_threading/test_itertools_combinatoric.py
new file mode 100644
index 00000000000..5b3b88deedd
--- /dev/null
+++ b/Lib/test/test_free_threading/test_itertools_combinatoric.py
@@ -0,0 +1,51 @@
+import unittest
+from threading import Thread, Barrier
+from itertools import combinations, product
+from test.support import threading_helper
+
+
+threading_helper.requires_working_threading(module=True)
+
+def test_concurrent_iteration(iterator, number_of_threads):
+ barrier = Barrier(number_of_threads)
+ def iterator_worker(it):
+ barrier.wait()
+ while True:
+ try:
+ _ = next(it)
+ except StopIteration:
+ return
+
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=iterator_worker, args=[iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+class ItertoolsThreading(unittest.TestCase):
+
+ @threading_helper.reap_threads
+ def test_combinations(self):
+ number_of_threads = 10
+ number_of_iterations = 24
+
+ for it in range(number_of_iterations):
+ iterator = combinations((1, 2, 3, 4, 5), 2)
+ test_concurrent_iteration(iterator, number_of_threads)
+
+ @threading_helper.reap_threads
+ def test_product(self):
+ number_of_threads = 10
+ number_of_iterations = 24
+
+ for it in range(number_of_iterations):
+ iterator = product((1, 2, 3, 4, 5), (10, 20, 30))
+ test_concurrent_iteration(iterator, number_of_threads)
+
+
+if __name__ == "__main__":
+ unittest.main()