aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_free_threading/test_iteration.py
blob: a51ad0cf83a0065453afca5634cf406b6023437f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import threading
import unittest
from test import support

# The race conditions these tests were written for only happen every now and
# then, even with the current numbers. To find rare race conditions, bumping
# these up will help, but it makes the test runtime highly variable under
# free-threading. Overhead is much higher under ThreadSanitizer, but it's
# also much better at detecting certain races, so we don't need as many
# items/threads.
if support.check_sanitizer(thread=True):
    NUMITEMS = 1000
    NUMTHREADS = 2
else:
    NUMITEMS = 100000
    NUMTHREADS = 5
NUMMUTATORS = 2

class ContendedTupleIterationTest(unittest.TestCase):
    def make_testdata(self, n):
        return tuple(range(n))

    def assert_iterator_results(self, results, expected):
        # Most iterators are not atomic (yet?) so they can skip or duplicate
        # items, but they should not invent new items (like the range
        # iterator currently does).
        extra_items = set(results) - set(expected)
        self.assertEqual(set(), extra_items)

    def run_threads(self, func, *args, numthreads=NUMTHREADS):
        threads = []
        for _ in range(numthreads):
            t = threading.Thread(target=func, args=args)
            t.start()
            threads.append(t)
        return threads

    def test_iteration(self):
        """Test iteration over a shared container"""
        seq = self.make_testdata(NUMITEMS)
        results = []
        start = threading.Barrier(NUMTHREADS)
        def worker():
            idx = 0
            start.wait()
            for item in seq:
                idx += 1
            results.append(idx)
        threads = self.run_threads(worker)
        for t in threads:
            t.join()
        # Each thread has its own iterator, so results should be entirely predictable.
        self.assertEqual(results, [NUMITEMS] * NUMTHREADS)

    def test_shared_iterator(self):
        """Test iteration over a shared iterator"""
        seq = self.make_testdata(NUMITEMS)
        it = iter(seq)
        results = []
        start = threading.Barrier(NUMTHREADS)
        def worker():
            items = []
            start.wait()
            # We want a tight loop, so put items in the shared list at the end.
            for item in it:
                items.append(item)
            results.extend(items)
        threads = self.run_threads(worker)
        for t in threads:
            t.join()
        self.assert_iterator_results(results, seq)

class ContendedListIterationTest(ContendedTupleIterationTest):
    def make_testdata(self, n):
        return list(range(n))

    def test_iteration_while_mutating(self):
        """Test iteration over a shared mutating container."""
        seq = self.make_testdata(NUMITEMS)
        results = []
        start = threading.Barrier(NUMTHREADS + NUMMUTATORS)
        endmutate = threading.Event()
        def mutator():
            orig = seq[:]
            # Make changes big enough to cause resizing of the list, with
            # items shifted around for good measure.
            replacement = (orig * 3)[NUMITEMS//2:]
            start.wait()
            while not endmutate.is_set():
                seq.extend(replacement)
                seq[:0] = orig
                seq.__imul__(2)
                seq.extend(seq)
                seq[:] = orig
        def worker():
            items = []
            start.wait()
            # We want a tight loop, so put items in the shared list at the end.
            for item in seq:
                items.append(item)
            results.extend(items)
        mutators = ()
        try:
            threads = self.run_threads(worker)
            mutators = self.run_threads(mutator, numthreads=NUMMUTATORS)
            for t in threads:
                t.join()
        finally:
            endmutate.set()
            for m in mutators:
                m.join()
        self.assert_iterator_results(results, list(seq))


class ContendedRangeIterationTest(ContendedTupleIterationTest):
    def make_testdata(self, n):
        return range(n)

    def assert_iterator_results(self, results, expected):
        # Range iterators that are shared between threads will (right now)
        # sometimes produce items after the end of the range, sometimes
        # _far_ after the end of the range. That should be fixed, but for
        # now, let's just check they're integers that could have resulted
        # from stepping beyond the range bounds.
        extra_items = set(results) - set(expected)
        for item in extra_items:
            self.assertEqual((item - expected.start) % expected.step, 0)