diff options
Diffstat (limited to 'Lib/test/test_interpreters.py')
-rw-r--r-- | Lib/test/test_interpreters.py | 535 |
1 files changed, 0 insertions, 535 deletions
diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py deleted file mode 100644 index 3451a4c8759..00000000000 --- a/Lib/test/test_interpreters.py +++ /dev/null @@ -1,535 +0,0 @@ -import contextlib -import os -import threading -from textwrap import dedent -import unittest -import time - -import _xxsubinterpreters as _interpreters -from test.support import interpreters - - -def _captured_script(script): - r, w = os.pipe() - indented = script.replace('\n', '\n ') - wrapped = dedent(f""" - import contextlib - with open({w}, 'w') as spipe: - with contextlib.redirect_stdout(spipe): - {indented} - """) - return wrapped, open(r) - - -def clean_up_interpreters(): - for interp in interpreters.list_all(): - if interp.id == 0: # main - continue - try: - interp.close() - except RuntimeError: - pass # already destroyed - - -def _run_output(interp, request, shared=None): - script, rpipe = _captured_script(request) - with rpipe: - interp.run(script) - return rpipe.read() - - -@contextlib.contextmanager -def _running(interp): - r, w = os.pipe() - def run(): - interp.run(dedent(f""" - # wait for "signal" - with open({r}) as rpipe: - rpipe.read() - """)) - - t = threading.Thread(target=run) - t.start() - - yield - - with open(w, 'w') as spipe: - spipe.write('done') - t.join() - - -class TestBase(unittest.TestCase): - - def tearDown(self): - clean_up_interpreters() - - -class CreateTests(TestBase): - - def test_in_main(self): - interp = interpreters.create() - lst = interpreters.list_all() - self.assertEqual(interp.id, lst[1].id) - - def test_in_thread(self): - lock = threading.Lock() - id = None - interp = interpreters.create() - lst = interpreters.list_all() - def f(): - nonlocal id - id = interp.id - lock.acquire() - lock.release() - - t = threading.Thread(target=f) - with lock: - t.start() - t.join() - self.assertEqual(interp.id, lst[1].id) - - def test_in_subinterpreter(self): - main, = interpreters.list_all() - interp = interpreters.create() - out = _run_output(interp, dedent(""" - from test.support import interpreters - interp = interpreters.create() - print(interp) - """)) - interp2 = out.strip() - - self.assertEqual(len(set(interpreters.list_all())), len({main, interp, interp2})) - - def test_after_destroy_all(self): - before = set(interpreters.list_all()) - # Create 3 subinterpreters. - interp_lst = [] - for _ in range(3): - interps = interpreters.create() - interp_lst.append(interps) - # Now destroy them. - for interp in interp_lst: - interp.close() - # Finally, create another. - interp = interpreters.create() - self.assertEqual(len(set(interpreters.list_all())), len(before | {interp})) - - def test_after_destroy_some(self): - before = set(interpreters.list_all()) - # Create 3 subinterpreters. - interp1 = interpreters.create() - interp2 = interpreters.create() - interp3 = interpreters.create() - # Now destroy 2 of them. - interp1.close() - interp2.close() - # Finally, create another. - interp = interpreters.create() - self.assertEqual(len(set(interpreters.list_all())), len(before | {interp3, interp})) - - -class GetCurrentTests(TestBase): - - def test_main(self): - main_interp_id = _interpreters.get_main() - cur_interp_id = interpreters.get_current().id - self.assertEqual(cur_interp_id, main_interp_id) - - def test_subinterpreter(self): - main = _interpreters.get_main() - interp = interpreters.create() - out = _run_output(interp, dedent(""" - from test.support import interpreters - cur = interpreters.get_current() - print(cur) - """)) - cur = out.strip() - self.assertNotEqual(cur, main) - - -class ListAllTests(TestBase): - - def test_initial(self): - interps = interpreters.list_all() - self.assertEqual(1, len(interps)) - - def test_after_creating(self): - main = interpreters.get_current() - first = interpreters.create() - second = interpreters.create() - - ids = [] - for interp in interpreters.list_all(): - ids.append(interp.id) - - self.assertEqual(ids, [main.id, first.id, second.id]) - - def test_after_destroying(self): - main = interpreters.get_current() - first = interpreters.create() - second = interpreters.create() - first.close() - - ids = [] - for interp in interpreters.list_all(): - ids.append(interp.id) - - self.assertEqual(ids, [main.id, second.id]) - - -class TestInterpreterId(TestBase): - - def test_in_main(self): - main = interpreters.get_current() - self.assertEqual(0, main.id) - - def test_with_custom_num(self): - interp = interpreters.Interpreter(1) - self.assertEqual(1, interp.id) - - def test_for_readonly_property(self): - interp = interpreters.Interpreter(1) - with self.assertRaises(AttributeError): - interp.id = 2 - - -class TestInterpreterIsRunning(TestBase): - - def test_main(self): - main = interpreters.get_current() - self.assertTrue(main.is_running()) - - def test_subinterpreter(self): - interp = interpreters.create() - self.assertFalse(interp.is_running()) - - with _running(interp): - self.assertTrue(interp.is_running()) - self.assertFalse(interp.is_running()) - - def test_from_subinterpreter(self): - interp = interpreters.create() - out = _run_output(interp, dedent(f""" - import _xxsubinterpreters as _interpreters - if _interpreters.is_running({interp.id}): - print(True) - else: - print(False) - """)) - self.assertEqual(out.strip(), 'True') - - def test_already_destroyed(self): - interp = interpreters.create() - interp.close() - with self.assertRaises(RuntimeError): - interp.is_running() - - -class TestInterpreterDestroy(TestBase): - - def test_basic(self): - interp1 = interpreters.create() - interp2 = interpreters.create() - interp3 = interpreters.create() - self.assertEqual(4, len(interpreters.list_all())) - interp2.close() - self.assertEqual(3, len(interpreters.list_all())) - - def test_all(self): - before = set(interpreters.list_all()) - interps = set() - for _ in range(3): - interp = interpreters.create() - interps.add(interp) - self.assertEqual(len(set(interpreters.list_all())), len(before | interps)) - for interp in interps: - interp.close() - self.assertEqual(len(set(interpreters.list_all())), len(before)) - - def test_main(self): - main, = interpreters.list_all() - with self.assertRaises(RuntimeError): - main.close() - - def f(): - with self.assertRaises(RuntimeError): - main.close() - - t = threading.Thread(target=f) - t.start() - t.join() - - def test_already_destroyed(self): - interp = interpreters.create() - interp.close() - with self.assertRaises(RuntimeError): - interp.close() - - def test_from_current(self): - main, = interpreters.list_all() - interp = interpreters.create() - script = dedent(f""" - from test.support import interpreters - try: - main = interpreters.get_current() - main.close() - except RuntimeError: - pass - """) - - interp.run(script) - self.assertEqual(len(set(interpreters.list_all())), len({main, interp})) - - def test_from_sibling(self): - main, = interpreters.list_all() - interp1 = interpreters.create() - script = dedent(f""" - from test.support import interpreters - interp2 = interpreters.create() - interp2.close() - """) - interp1.run(script) - - self.assertEqual(len(set(interpreters.list_all())), len({main, interp1})) - - def test_from_other_thread(self): - interp = interpreters.create() - def f(): - interp.close() - - t = threading.Thread(target=f) - t.start() - t.join() - - def test_still_running(self): - main, = interpreters.list_all() - interp = interpreters.create() - with _running(interp): - with self.assertRaises(RuntimeError): - interp.close() - self.assertTrue(interp.is_running()) - - -class TestInterpreterRun(TestBase): - - SCRIPT = dedent(""" - with open('{}', 'w') as out: - out.write('{}') - """) - FILENAME = 'spam' - - def setUp(self): - super().setUp() - self.interp = interpreters.create() - self._fs = None - - def tearDown(self): - if self._fs is not None: - self._fs.close() - super().tearDown() - - @property - def fs(self): - if self._fs is None: - self._fs = FSFixture(self) - return self._fs - - def test_success(self): - script, file = _captured_script('print("it worked!", end="")') - with file: - self.interp.run(script) - out = file.read() - - self.assertEqual(out, 'it worked!') - - def test_in_thread(self): - script, file = _captured_script('print("it worked!", end="")') - with file: - def f(): - self.interp.run(script) - - t = threading.Thread(target=f) - t.start() - t.join() - out = file.read() - - self.assertEqual(out, 'it worked!') - - @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") - def test_fork(self): - import tempfile - with tempfile.NamedTemporaryFile('w+') as file: - file.write('') - file.flush() - - expected = 'spam spam spam spam spam' - script = dedent(f""" - import os - try: - os.fork() - except RuntimeError: - with open('{file.name}', 'w') as out: - out.write('{expected}') - """) - self.interp.run(script) - - file.seek(0) - content = file.read() - self.assertEqual(content, expected) - - def test_already_running(self): - with _running(self.interp): - with self.assertRaises(RuntimeError): - self.interp.run('print("spam")') - - def test_bad_script(self): - with self.assertRaises(TypeError): - self.interp.run(10) - - def test_bytes_for_script(self): - with self.assertRaises(TypeError): - self.interp.run(b'print("spam")') - - -class TestIsShareable(TestBase): - - def test_default_shareables(self): - shareables = [ - # singletons - None, - # builtin objects - b'spam', - 'spam', - 10, - -10, - ] - for obj in shareables: - with self.subTest(obj): - self.assertTrue( - interpreters.is_shareable(obj)) - - def test_not_shareable(self): - class Cheese: - def __init__(self, name): - self.name = name - def __str__(self): - return self.name - - class SubBytes(bytes): - """A subclass of a shareable type.""" - - not_shareables = [ - # singletons - True, - False, - NotImplemented, - ..., - # builtin types and objects - type, - object, - object(), - Exception(), - 100.0, - # user-defined types and objects - Cheese, - Cheese('Wensleydale'), - SubBytes(b'spam'), - ] - for obj in not_shareables: - with self.subTest(repr(obj)): - self.assertFalse( - interpreters.is_shareable(obj)) - - -class TestChannel(TestBase): - - def test_create_cid(self): - r, s = interpreters.create_channel() - self.assertIsInstance(r, interpreters.RecvChannel) - self.assertIsInstance(s, interpreters.SendChannel) - - def test_sequential_ids(self): - before = interpreters.list_all_channels() - channels1 = interpreters.create_channel() - channels2 = interpreters.create_channel() - channels3 = interpreters.create_channel() - after = interpreters.list_all_channels() - - self.assertEqual(len(set(after) - set(before)), - len({channels1, channels2, channels3})) - - -class TestSendRecv(TestBase): - - def test_send_recv_main(self): - r, s = interpreters.create_channel() - orig = b'spam' - s.send(orig) - obj = r.recv() - - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) - - def test_send_recv_same_interpreter(self): - interp = interpreters.create() - out = _run_output(interp, dedent(""" - from test.support import interpreters - r, s = interpreters.create_channel() - orig = b'spam' - s.send(orig) - obj = r.recv() - assert obj is not orig - assert obj == orig - """)) - - def test_send_recv_different_threads(self): - r, s = interpreters.create_channel() - - def f(): - while True: - try: - obj = r.recv() - break - except interpreters.ChannelEmptyError: - time.sleep(0.1) - s.send(obj) - t = threading.Thread(target=f) - t.start() - - s.send(b'spam') - t.join() - obj = r.recv() - - self.assertEqual(obj, b'spam') - - def test_send_recv_nowait_main(self): - r, s = interpreters.create_channel() - orig = b'spam' - s.send(orig) - obj = r.recv_nowait() - - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) - - def test_send_recv_nowait_same_interpreter(self): - interp = interpreters.create() - out = _run_output(interp, dedent(""" - from test.support import interpreters - r, s = interpreters.create_channel() - orig = b'spam' - s.send(orig) - obj = r.recv_nowait() - assert obj is not orig - assert obj == orig - """)) - - r, s = interpreters.create_channel() - - def f(): - while True: - try: - obj = r.recv_nowait() - break - except _interpreters.ChannelEmptyError: - time.sleep(0.1) - s.send(obj) |