blob: 56bfc1721992c8539c9e428562d5d7c29fb41dd2 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import threading
import unittest
from test import support
from test.support import import_helper
from test.support import threading_helper
# Raise SkipTest if subinterpreters not supported.
import_helper.import_module('_interpreters')
from test.support import interpreters
from .utils import TestBase
class StressTests(TestBase):
# In these tests we generally want a lot of interpreters,
# but not so many that any test takes too long.
@support.requires_resource('cpu')
def test_create_many_sequential(self):
alive = []
for _ in range(100):
interp = interpreters.create()
alive.append(interp)
@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_create_many_threaded(self):
alive = []
def task():
interp = interpreters.create()
alive.append(interp)
threads = (threading.Thread(target=task) for _ in range(200))
with threading_helper.start_threads(threads):
pass
@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_many_threads_running_interp_in_other_interp(self):
interp = interpreters.create()
script = f"""if True:
import _interpreters
_interpreters.run_string({interp.id}, '1')
"""
def run():
interp = interpreters.create()
alreadyrunning = (f'{interpreters.InterpreterError}: '
'interpreter already running')
success = False
while not success:
try:
interp.exec(script)
except interpreters.ExecutionFailed as exc:
if exc.excinfo.msg != 'interpreter already running':
raise # re-raise
assert exc.excinfo.type.__name__ == 'InterpreterError'
else:
success = True
threads = (threading.Thread(target=run) for _ in range(200))
with threading_helper.start_threads(threads):
pass
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
unittest.main()
|