1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import threading
import unittest
from test import support
# The race conditions these tests were written for only happen every now and
# then, even with the current numbers. To find rare race conditions, bumping
# these up will help, but it makes the test runtime highly variable under
# free-threading. Overhead is much higher under ThreadSanitizer, but it's
# also much better at detecting certain races, so we don't need as many
# items/threads.
if support.check_sanitizer(thread=True):
NUMITEMS = 1000
NUMTHREADS = 2
else:
NUMITEMS = 100000
NUMTHREADS = 5
NUMMUTATORS = 2
class ContendedTupleIterationTest(unittest.TestCase):
def make_testdata(self, n):
return tuple(range(n))
def assert_iterator_results(self, results, expected):
# Most iterators are not atomic (yet?) so they can skip or duplicate
# items, but they should not invent new items (like the range
# iterator currently does).
extra_items = set(results) - set(expected)
self.assertEqual(set(), extra_items)
def run_threads(self, func, *args, numthreads=NUMTHREADS):
threads = []
for _ in range(numthreads):
t = threading.Thread(target=func, args=args)
t.start()
threads.append(t)
return threads
def test_iteration(self):
"""Test iteration over a shared container"""
seq = self.make_testdata(NUMITEMS)
results = []
start = threading.Barrier(NUMTHREADS)
def worker():
idx = 0
start.wait()
for item in seq:
idx += 1
results.append(idx)
threads = self.run_threads(worker)
for t in threads:
t.join()
# Each thread has its own iterator, so results should be entirely predictable.
self.assertEqual(results, [NUMITEMS] * NUMTHREADS)
def test_shared_iterator(self):
"""Test iteration over a shared iterator"""
seq = self.make_testdata(NUMITEMS)
it = iter(seq)
results = []
start = threading.Barrier(NUMTHREADS)
def worker():
items = []
start.wait()
# We want a tight loop, so put items in the shared list at the end.
for item in it:
items.append(item)
results.extend(items)
threads = self.run_threads(worker)
for t in threads:
t.join()
self.assert_iterator_results(results, seq)
class ContendedListIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return list(range(n))
def test_iteration_while_mutating(self):
"""Test iteration over a shared mutating container."""
seq = self.make_testdata(NUMITEMS)
results = []
start = threading.Barrier(NUMTHREADS + NUMMUTATORS)
endmutate = threading.Event()
def mutator():
orig = seq[:]
# Make changes big enough to cause resizing of the list, with
# items shifted around for good measure.
replacement = (orig * 3)[NUMITEMS//2:]
start.wait()
while not endmutate.is_set():
seq.extend(replacement)
seq[:0] = orig
seq.__imul__(2)
seq.extend(seq)
seq[:] = orig
def worker():
items = []
start.wait()
# We want a tight loop, so put items in the shared list at the end.
for item in seq:
items.append(item)
results.extend(items)
mutators = ()
try:
threads = self.run_threads(worker)
mutators = self.run_threads(mutator, numthreads=NUMMUTATORS)
for t in threads:
t.join()
finally:
endmutate.set()
for m in mutators:
m.join()
self.assert_iterator_results(results, list(seq))
class ContendedRangeIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return range(n)
def assert_iterator_results(self, results, expected):
# Range iterators that are shared between threads will (right now)
# sometimes produce items after the end of the range, sometimes
# _far_ after the end of the range. That should be fixed, but for
# now, let's just check they're integers that could have resulted
# from stepping beyond the range bounds.
extra_items = set(results) - set(expected)
for item in extra_items:
self.assertEqual((item - expected.start) % expected.step, 0)
|