aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_interpreters/test_stress.py
blob: e25e67a0d4f4453ee01f8c1c4a0b1e081173be69 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import threading
import unittest

from test import support
from test.support import import_helper
from test.support import threading_helper
# Raise SkipTest if subinterpreters not supported.
import_helper.import_module('_interpreters')
from concurrent import interpreters
from .utils import TestBase


class StressTests(TestBase):

    # In these tests we generally want a lot of interpreters,
    # but not so many that any test takes too long.

    @support.requires_resource('cpu')
    def test_create_many_sequential(self):
        alive = []
        for _ in range(100):
            interp = interpreters.create()
            alive.append(interp)
        del alive
        support.gc_collect()

    @support.bigmemtest(size=200, memuse=32*2**20, dry_run=False)
    def test_create_many_threaded(self, size):
        alive = []
        start = threading.Event()
        def task():
            # try to create all interpreters simultaneously
            if not start.wait(support.SHORT_TIMEOUT):
                raise TimeoutError
            interp = interpreters.create()
            alive.append(interp)
        threads = [threading.Thread(target=task) for _ in range(size)]
        with threading_helper.start_threads(threads):
            start.set()
        del alive
        support.gc_collect()

    @threading_helper.requires_working_threading()
    @support.bigmemtest(size=200, memuse=34*2**20, dry_run=False)
    def test_many_threads_running_interp_in_other_interp(self, size):
        start = threading.Event()
        interp = interpreters.create()

        script = f"""if True:
            import _interpreters
            _interpreters.run_string({interp.id}, '1')
            """

        def run():
            interp = interpreters.create()
            alreadyrunning = (f'{interpreters.InterpreterError}: '
                              'interpreter already running')
            # try to run all interpreters simultaneously
            if not start.wait(support.SHORT_TIMEOUT):
                raise TimeoutError
            success = False
            while not success:
                try:
                    interp.exec(script)
                except interpreters.ExecutionFailed as exc:
                    if exc.excinfo.msg != 'interpreter already running':
                        raise  # re-raise
                    assert exc.excinfo.type.__name__ == 'InterpreterError'
                else:
                    success = True

        threads = [threading.Thread(target=run) for _ in range(size)]
        with threading_helper.start_threads(threads):
            start.set()
        support.gc_collect()


if __name__ == '__main__':
    # Test needs to be a package, so we can do relative imports.
    unittest.main()