diff options
Diffstat (limited to 'Lib/test/test_free_threading')
-rw-r--r-- | Lib/test/test_free_threading/test_generators.py | 51 | ||||
-rw-r--r-- | Lib/test/test_free_threading/test_heapq.py | 267 | ||||
-rw-r--r-- | Lib/test/test_free_threading/test_itertools.py | 37 | ||||
-rw-r--r-- | Lib/test/test_free_threading/test_itertools_combinatoric.py | 51 |
4 files changed, 403 insertions, 3 deletions
diff --git a/Lib/test/test_free_threading/test_generators.py b/Lib/test/test_free_threading/test_generators.py new file mode 100644 index 00000000000..d01675eb38b --- /dev/null +++ b/Lib/test/test_free_threading/test_generators.py @@ -0,0 +1,51 @@ +import concurrent.futures +import unittest +from threading import Barrier +from unittest import TestCase +import random +import time + +from test.support import threading_helper, Py_GIL_DISABLED + +threading_helper.requires_working_threading(module=True) + + +def random_sleep(): + delay_us = random.randint(50, 100) + time.sleep(delay_us * 1e-6) + +def random_string(): + return ''.join(random.choice('0123456789ABCDEF') for _ in range(10)) + +def set_gen_name(g, b): + b.wait() + random_sleep() + g.__name__ = random_string() + return g.__name__ + +def set_gen_qualname(g, b): + b.wait() + random_sleep() + g.__qualname__ = random_string() + return g.__qualname__ + + +@unittest.skipUnless(Py_GIL_DISABLED, "Enable only in FT build") +class TestFTGenerators(TestCase): + NUM_THREADS = 4 + + def concurrent_write_with_func(self, func): + gen = (x for x in range(42)) + for j in range(1000): + with concurrent.futures.ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor: + b = Barrier(self.NUM_THREADS) + futures = {executor.submit(func, gen, b): i for i in range(self.NUM_THREADS)} + for fut in concurrent.futures.as_completed(futures): + gen_name = fut.result() + self.assertEqual(len(gen_name), 10) + + def test_concurrent_write(self): + with self.subTest(func=set_gen_name): + self.concurrent_write_with_func(func=set_gen_name) + with self.subTest(func=set_gen_qualname): + self.concurrent_write_with_func(func=set_gen_qualname) diff --git a/Lib/test/test_free_threading/test_heapq.py b/Lib/test/test_free_threading/test_heapq.py new file mode 100644 index 00000000000..ee7adfb2b78 --- /dev/null +++ b/Lib/test/test_free_threading/test_heapq.py @@ -0,0 +1,267 @@ +import unittest + +import heapq + +from enum import Enum +from threading import Thread, Barrier, Lock +from random import shuffle, randint + +from test.support import threading_helper +from test import test_heapq + + +NTHREADS = 10 +OBJECT_COUNT = 5_000 + + +class Heap(Enum): + MIN = 1 + MAX = 2 + + +@threading_helper.requires_working_threading() +class TestHeapq(unittest.TestCase): + def setUp(self): + self.test_heapq = test_heapq.TestHeapPython() + + def test_racing_heapify(self): + heap = list(range(OBJECT_COUNT)) + shuffle(heap) + + self.run_concurrently( + worker_func=heapq.heapify, args=(heap,), nthreads=NTHREADS + ) + self.test_heapq.check_invariant(heap) + + def test_racing_heappush(self): + heap = [] + + def heappush_func(heap): + for item in reversed(range(OBJECT_COUNT)): + heapq.heappush(heap, item) + + self.run_concurrently( + worker_func=heappush_func, args=(heap,), nthreads=NTHREADS + ) + self.test_heapq.check_invariant(heap) + + def test_racing_heappop(self): + heap = self.create_heap(OBJECT_COUNT, Heap.MIN) + + # Each thread pops (OBJECT_COUNT / NTHREADS) items + self.assertEqual(OBJECT_COUNT % NTHREADS, 0) + per_thread_pop_count = OBJECT_COUNT // NTHREADS + + def heappop_func(heap, pop_count): + local_list = [] + for _ in range(pop_count): + item = heapq.heappop(heap) + local_list.append(item) + + # Each local list should be sorted + self.assertTrue(self.is_sorted_ascending(local_list)) + + self.run_concurrently( + worker_func=heappop_func, + args=(heap, per_thread_pop_count), + nthreads=NTHREADS, + ) + self.assertEqual(len(heap), 0) + + def test_racing_heappushpop(self): + heap = self.create_heap(OBJECT_COUNT, Heap.MIN) + pushpop_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT) + + def heappushpop_func(heap, pushpop_items): + for item in pushpop_items: + popped_item = heapq.heappushpop(heap, item) + self.assertTrue(popped_item <= item) + + self.run_concurrently( + worker_func=heappushpop_func, + args=(heap, pushpop_items), + nthreads=NTHREADS, + ) + self.assertEqual(len(heap), OBJECT_COUNT) + self.test_heapq.check_invariant(heap) + + def test_racing_heapreplace(self): + heap = self.create_heap(OBJECT_COUNT, Heap.MIN) + replace_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT) + + def heapreplace_func(heap, replace_items): + for item in replace_items: + heapq.heapreplace(heap, item) + + self.run_concurrently( + worker_func=heapreplace_func, + args=(heap, replace_items), + nthreads=NTHREADS, + ) + self.assertEqual(len(heap), OBJECT_COUNT) + self.test_heapq.check_invariant(heap) + + def test_racing_heapify_max(self): + max_heap = list(range(OBJECT_COUNT)) + shuffle(max_heap) + + self.run_concurrently( + worker_func=heapq.heapify_max, args=(max_heap,), nthreads=NTHREADS + ) + self.test_heapq.check_max_invariant(max_heap) + + def test_racing_heappush_max(self): + max_heap = [] + + def heappush_max_func(max_heap): + for item in range(OBJECT_COUNT): + heapq.heappush_max(max_heap, item) + + self.run_concurrently( + worker_func=heappush_max_func, args=(max_heap,), nthreads=NTHREADS + ) + self.test_heapq.check_max_invariant(max_heap) + + def test_racing_heappop_max(self): + max_heap = self.create_heap(OBJECT_COUNT, Heap.MAX) + + # Each thread pops (OBJECT_COUNT / NTHREADS) items + self.assertEqual(OBJECT_COUNT % NTHREADS, 0) + per_thread_pop_count = OBJECT_COUNT // NTHREADS + + def heappop_max_func(max_heap, pop_count): + local_list = [] + for _ in range(pop_count): + item = heapq.heappop_max(max_heap) + local_list.append(item) + + # Each local list should be sorted + self.assertTrue(self.is_sorted_descending(local_list)) + + self.run_concurrently( + worker_func=heappop_max_func, + args=(max_heap, per_thread_pop_count), + nthreads=NTHREADS, + ) + self.assertEqual(len(max_heap), 0) + + def test_racing_heappushpop_max(self): + max_heap = self.create_heap(OBJECT_COUNT, Heap.MAX) + pushpop_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT) + + def heappushpop_max_func(max_heap, pushpop_items): + for item in pushpop_items: + popped_item = heapq.heappushpop_max(max_heap, item) + self.assertTrue(popped_item >= item) + + self.run_concurrently( + worker_func=heappushpop_max_func, + args=(max_heap, pushpop_items), + nthreads=NTHREADS, + ) + self.assertEqual(len(max_heap), OBJECT_COUNT) + self.test_heapq.check_max_invariant(max_heap) + + def test_racing_heapreplace_max(self): + max_heap = self.create_heap(OBJECT_COUNT, Heap.MAX) + replace_items = self.create_random_list(-5_000, 10_000, OBJECT_COUNT) + + def heapreplace_max_func(max_heap, replace_items): + for item in replace_items: + heapq.heapreplace_max(max_heap, item) + + self.run_concurrently( + worker_func=heapreplace_max_func, + args=(max_heap, replace_items), + nthreads=NTHREADS, + ) + self.assertEqual(len(max_heap), OBJECT_COUNT) + self.test_heapq.check_max_invariant(max_heap) + + def test_lock_free_list_read(self): + n, n_threads = 1_000, 10 + l = [] + barrier = Barrier(n_threads * 2) + + count = 0 + lock = Lock() + + def worker(): + with lock: + nonlocal count + x = count + count += 1 + + barrier.wait() + for i in range(n): + if x % 2: + heapq.heappush(l, 1) + heapq.heappop(l) + else: + try: + l[0] + except IndexError: + pass + + self.run_concurrently(worker, (), n_threads * 2) + + @staticmethod + def is_sorted_ascending(lst): + """ + Check if the list is sorted in ascending order (non-decreasing). + """ + return all(lst[i - 1] <= lst[i] for i in range(1, len(lst))) + + @staticmethod + def is_sorted_descending(lst): + """ + Check if the list is sorted in descending order (non-increasing). + """ + return all(lst[i - 1] >= lst[i] for i in range(1, len(lst))) + + @staticmethod + def create_heap(size, heap_kind): + """ + Create a min/max heap where elements are in the range (0, size - 1) and + shuffled before heapify. + """ + heap = list(range(OBJECT_COUNT)) + shuffle(heap) + if heap_kind == Heap.MIN: + heapq.heapify(heap) + else: + heapq.heapify_max(heap) + + return heap + + @staticmethod + def create_random_list(a, b, size): + """ + Create a list of random numbers between a and b (inclusive). + """ + return [randint(-a, b) for _ in range(size)] + + def run_concurrently(self, worker_func, args, nthreads): + """ + Run the worker function concurrently in multiple threads. + """ + barrier = Barrier(nthreads) + + def wrapper_func(*args): + # Wait for all threads to reach this point before proceeding. + barrier.wait() + worker_func(*args) + + with threading_helper.catch_threading_exception() as cm: + workers = ( + Thread(target=wrapper_func, args=args) for _ in range(nthreads) + ) + with threading_helper.start_threads(workers): + pass + + # Worker threads should not raise any exceptions + self.assertIsNone(cm.exc_value) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_free_threading/test_itertools.py b/Lib/test/test_free_threading/test_itertools.py index 8360afbf78c..9d366041917 100644 --- a/Lib/test/test_free_threading/test_itertools.py +++ b/Lib/test/test_free_threading/test_itertools.py @@ -1,6 +1,6 @@ import unittest from threading import Thread, Barrier -from itertools import batched, cycle +from itertools import batched, chain, cycle from test.support import threading_helper @@ -17,7 +17,7 @@ class ItertoolsThreading(unittest.TestCase): barrier.wait() while True: try: - _ = next(it) + next(it) except StopIteration: break @@ -44,7 +44,10 @@ class ItertoolsThreading(unittest.TestCase): def work(it): barrier.wait() for _ in range(number_of_cycles): - _ = next(it) + try: + next(it) + except StopIteration: + pass data = (1, 2, 3, 4) for it in range(number_of_iterations): @@ -59,6 +62,34 @@ class ItertoolsThreading(unittest.TestCase): barrier.reset() + @threading_helper.reap_threads + def test_chain(self): + number_of_threads = 6 + number_of_iterations = 20 + + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + while True: + try: + next(it) + except StopIteration: + break + + data = [(1, )] * 200 + for it in range(number_of_iterations): + chain_iterator = chain(*data) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[chain_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_free_threading/test_itertools_combinatoric.py b/Lib/test/test_free_threading/test_itertools_combinatoric.py new file mode 100644 index 00000000000..5b3b88deedd --- /dev/null +++ b/Lib/test/test_free_threading/test_itertools_combinatoric.py @@ -0,0 +1,51 @@ +import unittest +from threading import Thread, Barrier +from itertools import combinations, product +from test.support import threading_helper + + +threading_helper.requires_working_threading(module=True) + +def test_concurrent_iteration(iterator, number_of_threads): + barrier = Barrier(number_of_threads) + def iterator_worker(it): + barrier.wait() + while True: + try: + _ = next(it) + except StopIteration: + return + + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=iterator_worker, args=[iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + +class ItertoolsThreading(unittest.TestCase): + + @threading_helper.reap_threads + def test_combinations(self): + number_of_threads = 10 + number_of_iterations = 24 + + for it in range(number_of_iterations): + iterator = combinations((1, 2, 3, 4, 5), 2) + test_concurrent_iteration(iterator, number_of_threads) + + @threading_helper.reap_threads + def test_product(self): + number_of_threads = 10 + number_of_iterations = 24 + + for it in range(number_of_iterations): + iterator = product((1, 2, 3, 4, 5), (10, 20, 30)) + test_concurrent_iteration(iterator, number_of_threads) + + +if __name__ == "__main__": + unittest.main() |