blob: a442fe056cef159475f801af982c2efdd4f959ad (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import random
import unittest
from functools import lru_cache
from threading import Barrier, Thread
from test.support import threading_helper
@threading_helper.requires_working_threading()
class TestLRUCache(unittest.TestCase):
def _test_concurrent_operations(self, maxsize):
num_threads = 10
b = Barrier(num_threads)
@lru_cache(maxsize=maxsize)
def func(arg=0):
return object()
def thread_func():
b.wait()
for i in range(1000):
r = random.randint(0, 1000)
if i < 800:
func(i)
elif i < 900:
func.cache_info()
else:
func.cache_clear()
threads = []
for i in range(num_threads):
t = Thread(target=thread_func)
threads.append(t)
with threading_helper.start_threads(threads):
pass
def test_concurrent_operations_unbounded(self):
self._test_concurrent_operations(maxsize=None)
def test_concurrent_operations_bounded(self):
self._test_concurrent_operations(maxsize=128)
def _test_reentrant_cache_clear(self, maxsize):
num_threads = 10
b = Barrier(num_threads)
@lru_cache(maxsize=maxsize)
def func(arg=0):
func.cache_clear()
return object()
def thread_func():
b.wait()
for i in range(1000):
func(random.randint(0, 10000))
threads = []
for i in range(num_threads):
t = Thread(target=thread_func)
threads.append(t)
with threading_helper.start_threads(threads):
pass
def test_reentrant_cache_clear_unbounded(self):
self._test_reentrant_cache_clear(maxsize=None)
def test_reentrant_cache_clear_bounded(self):
self._test_reentrant_cache_clear(maxsize=128)
if __name__ == "__main__":
unittest.main()
|