aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_interpreters/test_api.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_interpreters/test_api.py')
-rw-r--r--Lib/test/test_interpreters/test_api.py884
1 files changed, 789 insertions, 95 deletions
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index 66c7afce88f..0ee4582b5d1 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -1,18 +1,23 @@
+import contextlib
import os
import pickle
+import sys
from textwrap import dedent
import threading
import types
import unittest
from test import support
+from test.support import os_helper
+from test.support import script_helper
from test.support import import_helper
# Raise SkipTest if subinterpreters not supported.
_interpreters = import_helper.import_module('_interpreters')
+from concurrent import interpreters
from test.support import Py_GIL_DISABLED
-from test.support import interpreters
from test.support import force_not_colorized
-from test.support.interpreters import (
+import test._crossinterp_definitions as defs
+from concurrent.interpreters import (
InterpreterError, InterpreterNotFoundError, ExecutionFailed,
)
from .utils import (
@@ -29,6 +34,59 @@ WHENCE_STR_XI = 'cross-interpreter C-API'
WHENCE_STR_STDLIB = '_interpreters module'
+def is_pickleable(obj):
+ try:
+ pickle.dumps(obj)
+ except Exception:
+ return False
+ return True
+
+
+@contextlib.contextmanager
+def defined_in___main__(name, script, *, remove=False):
+ import __main__ as mainmod
+ mainns = vars(mainmod)
+ assert name not in mainns
+ exec(script, mainns, mainns)
+ if remove:
+ yield mainns.pop(name)
+ else:
+ try:
+ yield mainns[name]
+ finally:
+ mainns.pop(name, None)
+
+
+def build_excinfo(exctype, msg=None, formatted=None, errdisplay=None):
+ if isinstance(exctype, type):
+ assert issubclass(exctype, BaseException), exctype
+ exctype = types.SimpleNamespace(
+ __name__=exctype.__name__,
+ __qualname__=exctype.__qualname__,
+ __module__=exctype.__module__,
+ )
+ elif isinstance(exctype, str):
+ module, _, name = exctype.rpartition(exctype)
+ if not module and name in __builtins__:
+ module = 'builtins'
+ exctype = types.SimpleNamespace(
+ __name__=name,
+ __qualname__=exctype,
+ __module__=module or None,
+ )
+ else:
+ assert isinstance(exctype, types.SimpleNamespace)
+ assert msg is None or isinstance(msg, str), msg
+ assert formatted is None or isinstance(formatted, str), formatted
+ assert errdisplay is None or isinstance(errdisplay, str), errdisplay
+ return types.SimpleNamespace(
+ type=exctype,
+ msg=msg,
+ formatted=formatted,
+ errdisplay=errdisplay,
+ )
+
+
class ModuleTests(TestBase):
def test_queue_aliases(self):
@@ -75,7 +133,7 @@ class CreateTests(TestBase):
main, = interpreters.list_all()
interp = interpreters.create()
out = _run_output(interp, dedent("""
- from test.support import interpreters
+ from concurrent import interpreters
interp = interpreters.create()
print(interp.id)
"""))
@@ -138,7 +196,7 @@ class GetCurrentTests(TestBase):
main = interpreters.get_main()
interp = interpreters.create()
out = _run_output(interp, dedent("""
- from test.support import interpreters
+ from concurrent import interpreters
cur = interpreters.get_current()
print(cur.id)
"""))
@@ -155,7 +213,7 @@ class GetCurrentTests(TestBase):
with self.subTest('subinterpreter'):
interp = interpreters.create()
out = _run_output(interp, dedent("""
- from test.support import interpreters
+ from concurrent import interpreters
cur = interpreters.get_current()
print(id(cur))
cur = interpreters.get_current()
@@ -167,7 +225,7 @@ class GetCurrentTests(TestBase):
with self.subTest('per-interpreter'):
interp = interpreters.create()
out = _run_output(interp, dedent("""
- from test.support import interpreters
+ from concurrent import interpreters
cur = interpreters.get_current()
print(id(cur))
"""))
@@ -524,7 +582,7 @@ class TestInterpreterClose(TestBase):
main, = interpreters.list_all()
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
- from test.support import interpreters
+ from concurrent import interpreters
interp = interpreters.Interpreter({interp.id})
try:
interp.close()
@@ -541,7 +599,7 @@ class TestInterpreterClose(TestBase):
self.assertEqual(set(interpreters.list_all()),
{main, interp1, interp2})
interp1.exec(dedent(f"""
- from test.support import interpreters
+ from concurrent import interpreters
interp2 = interpreters.Interpreter({interp2.id})
interp2.close()
interp3 = interpreters.create()
@@ -748,7 +806,7 @@ class TestInterpreterExec(TestBase):
ham()
""")
scriptfile = self.make_script('script.py', tempdir, text="""
- from test.support import interpreters
+ from concurrent import interpreters
def script():
import spam
@@ -769,7 +827,7 @@ class TestInterpreterExec(TestBase):
~~~~~~~~~~~^^^^^^^^
{interpmod_line.strip()}
raise ExecutionFailed(excinfo)
- test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
+ concurrent.interpreters.ExecutionFailed: RuntimeError: uh-oh!
Uncaught in the interpreter:
@@ -839,9 +897,16 @@ class TestInterpreterExec(TestBase):
interp.exec(10)
def test_bytes_for_script(self):
+ r, w = self.pipe()
+ RAN = b'R'
+ DONE = b'D'
interp = interpreters.create()
- with self.assertRaises(TypeError):
- interp.exec(b'print("spam")')
+ interp.exec(f"""if True:
+ import os
+ os.write({w}, {RAN!r})
+ """)
+ os.write(w, DONE)
+ self.assertEqual(os.read(r, 1), RAN)
def test_with_background_threads_still_running(self):
r_interp, w_interp = self.pipe()
@@ -879,28 +944,46 @@ class TestInterpreterExec(TestBase):
with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
interp.exec('raise Exception("it worked!")')
+ def test_list_comprehension(self):
+ # gh-135450: List comprehensions caused an assertion failure
+ # in _PyCode_CheckNoExternalState()
+ import string
+ r_interp, w_interp = self.pipe()
+
+ interp = interpreters.create()
+ interp.exec(f"""if True:
+ import os
+ comp = [str(i) for i in range(10)]
+ os.write({w_interp}, ''.join(comp).encode())
+ """)
+ self.assertEqual(os.read(r_interp, 10).decode(), string.digits)
+ interp.close()
+
+
# test__interpreters covers the remaining
# Interpreter.exec() behavior.
-def call_func_noop():
- pass
+call_func_noop = defs.spam_minimal
+call_func_ident = defs.spam_returns_arg
+call_func_failure = defs.spam_raises
def call_func_return_shareable():
return (1, None)
-def call_func_return_not_shareable():
- return [1, 2, 3]
+def call_func_return_stateless_func():
+ return (lambda x: x)
-def call_func_failure():
- raise Exception('spam!')
+def call_func_return_pickleable():
+ return [1, 2, 3]
-def call_func_ident(value):
- return value
+def call_func_return_unpickleable():
+ x = 42
+ return (lambda: x)
def get_call_func_closure(value):
@@ -909,6 +992,11 @@ def get_call_func_closure(value):
return call_func_closure
+def call_func_exec_wrapper(script, ns):
+ res = exec(script, ns, ns)
+ return res, ns, id(ns)
+
+
class Spam:
@staticmethod
@@ -1005,86 +1093,663 @@ class TestInterpreterCall(TestBase):
# - preserves info (e.g. SyntaxError)
# - matching error display
- def test_call(self):
+ @contextlib.contextmanager
+ def assert_fails(self, expected):
+ with self.assertRaises(ExecutionFailed) as cm:
+ yield cm
+ uncaught = cm.exception.excinfo
+ self.assertEqual(uncaught.type.__name__, expected.__name__)
+
+ def assert_fails_not_shareable(self):
+ return self.assert_fails(interpreters.NotShareableError)
+
+ def assert_code_equal(self, code1, code2):
+ if code1 == code2:
+ return
+ self.assertEqual(code1.co_name, code2.co_name)
+ self.assertEqual(code1.co_flags, code2.co_flags)
+ self.assertEqual(code1.co_consts, code2.co_consts)
+ self.assertEqual(code1.co_varnames, code2.co_varnames)
+ self.assertEqual(code1.co_cellvars, code2.co_cellvars)
+ self.assertEqual(code1.co_freevars, code2.co_freevars)
+ self.assertEqual(code1.co_names, code2.co_names)
+ self.assertEqual(
+ _testinternalcapi.get_code_var_counts(code1),
+ _testinternalcapi.get_code_var_counts(code2),
+ )
+ self.assertEqual(code1.co_code, code2.co_code)
+
+ def assert_funcs_equal(self, func1, func2):
+ if func1 == func2:
+ return
+ self.assertIs(type(func1), type(func2))
+ self.assertEqual(func1.__name__, func2.__name__)
+ self.assertEqual(func1.__defaults__, func2.__defaults__)
+ self.assertEqual(func1.__kwdefaults__, func2.__kwdefaults__)
+ self.assertEqual(func1.__closure__, func2.__closure__)
+ self.assert_code_equal(func1.__code__, func2.__code__)
+ self.assertEqual(
+ _testinternalcapi.get_code_var_counts(func1),
+ _testinternalcapi.get_code_var_counts(func2),
+ )
+
+ def assert_exceptions_equal(self, exc1, exc2):
+ assert isinstance(exc1, Exception)
+ assert isinstance(exc2, Exception)
+ if exc1 == exc2:
+ return
+ self.assertIs(type(exc1), type(exc2))
+ self.assertEqual(exc1.args, exc2.args)
+
+ def test_stateless_funcs(self):
interp = interpreters.create()
- for i, (callable, args, kwargs) in enumerate([
- (call_func_noop, (), {}),
- (call_func_return_shareable, (), {}),
- (call_func_return_not_shareable, (), {}),
- (Spam.noop, (), {}),
+ func = call_func_noop
+ with self.subTest('no args, no return'):
+ res = interp.call(func)
+ self.assertIsNone(res)
+
+ func = call_func_return_shareable
+ with self.subTest('no args, returns shareable'):
+ res = interp.call(func)
+ self.assertEqual(res, (1, None))
+
+ func = call_func_return_stateless_func
+ expected = (lambda x: x)
+ with self.subTest('no args, returns stateless func'):
+ res = interp.call(func)
+ self.assert_funcs_equal(res, expected)
+
+ func = call_func_return_pickleable
+ with self.subTest('no args, returns pickleable'):
+ res = interp.call(func)
+ self.assertEqual(res, [1, 2, 3])
+
+ func = call_func_return_unpickleable
+ with self.subTest('no args, returns unpickleable'):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func)
+
+ def test_stateless_func_returns_arg(self):
+ interp = interpreters.create()
+
+ for arg in [
+ None,
+ 10,
+ 'spam!',
+ b'spam!',
+ (1, 2, 'spam!'),
+ memoryview(b'spam!'),
+ ]:
+ with self.subTest(f'shareable {arg!r}'):
+ assert _interpreters.is_shareable(arg)
+ res = interp.call(defs.spam_returns_arg, arg)
+ self.assertEqual(res, arg)
+
+ for arg in defs.STATELESS_FUNCTIONS:
+ with self.subTest(f'stateless func {arg!r}'):
+ res = interp.call(defs.spam_returns_arg, arg)
+ self.assert_funcs_equal(res, arg)
+
+ for arg in defs.TOP_FUNCTIONS:
+ if arg in defs.STATELESS_FUNCTIONS:
+ continue
+ with self.subTest(f'stateful func {arg!r}'):
+ res = interp.call(defs.spam_returns_arg, arg)
+ self.assert_funcs_equal(res, arg)
+ assert is_pickleable(arg)
+
+ for arg in [
+ Ellipsis,
+ NotImplemented,
+ object(),
+ 2**1000,
+ [1, 2, 3],
+ {'a': 1, 'b': 2},
+ types.SimpleNamespace(x=42),
+ # builtin types
+ object,
+ type,
+ Exception,
+ ModuleNotFoundError,
+ # builtin exceptions
+ Exception('uh-oh!'),
+ ModuleNotFoundError('mymodule'),
+ # builtin fnctions
+ len,
+ sys.exit,
+ # user classes
+ *defs.TOP_CLASSES,
+ *(c(*a) for c, a in defs.TOP_CLASSES.items()
+ if c not in defs.CLASSES_WITHOUT_EQUALITY),
+ ]:
+ with self.subTest(f'pickleable {arg!r}'):
+ res = interp.call(defs.spam_returns_arg, arg)
+ if type(arg) is object:
+ self.assertIs(type(res), object)
+ elif isinstance(arg, BaseException):
+ self.assert_exceptions_equal(res, arg)
+ else:
+ self.assertEqual(res, arg)
+ assert is_pickleable(arg)
+
+ for arg in [
+ types.MappingProxyType({}),
+ *(f for f in defs.NESTED_FUNCTIONS
+ if f not in defs.STATELESS_FUNCTIONS),
+ ]:
+ with self.subTest(f'unpickleable {arg!r}'):
+ assert not _interpreters.is_shareable(arg)
+ assert not is_pickleable(arg)
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(defs.spam_returns_arg, arg)
+
+ def test_full_args(self):
+ interp = interpreters.create()
+ expected = (1, 2, 3, 4, 5, 6, ('?',), {'g': 7, 'h': 8})
+ func = defs.spam_full_args
+ res = interp.call(func, 1, 2, 3, 4, '?', e=5, f=6, g=7, h=8)
+ self.assertEqual(res, expected)
+
+ def test_full_defaults(self):
+ # pickleable, but not stateless
+ interp = interpreters.create()
+ expected = (-1, -2, -3, -4, -5, -6, (), {'g': 8, 'h': 9})
+ res = interp.call(defs.spam_full_args_with_defaults, g=8, h=9)
+ self.assertEqual(res, expected)
+
+ def test_modified_arg(self):
+ interp = interpreters.create()
+ script = dedent("""
+ a = 7
+ b = 2
+ c = a ** b
+ """)
+ ns = {}
+ expected = {'a': 7, 'b': 2, 'c': 49}
+ res = interp.call(call_func_exec_wrapper, script, ns)
+ obj, resns, resid = res
+ del resns['__builtins__']
+ self.assertIsNone(obj)
+ self.assertEqual(ns, {})
+ self.assertEqual(resns, expected)
+ self.assertNotEqual(resid, id(ns))
+ self.assertNotEqual(resid, id(resns))
+
+ def test_func_in___main___valid(self):
+ # pickleable, already there'
+
+ with os_helper.temp_dir() as tempdir:
+ def new_mod(name, text):
+ script_helper.make_script(tempdir, name, dedent(text))
+
+ def run(text):
+ name = 'myscript'
+ text = dedent(f"""
+ import sys
+ sys.path.insert(0, {tempdir!r})
+
+ """) + dedent(text)
+ filename = script_helper.make_script(tempdir, name, text)
+ res = script_helper.assert_python_ok(filename)
+ return res.out.decode('utf-8').strip()
+
+ # no module indirection
+ with self.subTest('no indirection'):
+ text = run(f"""
+ from concurrent import interpreters
+
+ def spam():
+ # This a global var...
+ return __name__
+
+ if __name__ == '__main__':
+ interp = interpreters.create()
+ res = interp.call(spam)
+ print(res)
+ """)
+ self.assertEqual(text, '<fake __main__>')
+
+ # indirect as func, direct interp
+ new_mod('mymod', f"""
+ def run(interp, func):
+ return interp.call(func)
+ """)
+ with self.subTest('indirect as func, direct interp'):
+ text = run(f"""
+ from concurrent import interpreters
+ import mymod
+
+ def spam():
+ # This a global var...
+ return __name__
+
+ if __name__ == '__main__':
+ interp = interpreters.create()
+ res = mymod.run(interp, spam)
+ print(res)
+ """)
+ self.assertEqual(text, '<fake __main__>')
+
+ # indirect as func, indirect interp
+ new_mod('mymod', f"""
+ from concurrent import interpreters
+ def run(func):
+ interp = interpreters.create()
+ return interp.call(func)
+ """)
+ with self.subTest('indirect as func, indirect interp'):
+ text = run(f"""
+ import mymod
+
+ def spam():
+ # This a global var...
+ return __name__
+
+ if __name__ == '__main__':
+ res = mymod.run(spam)
+ print(res)
+ """)
+ self.assertEqual(text, '<fake __main__>')
+
+ def test_func_in___main___invalid(self):
+ interp = interpreters.create()
+
+ funcname = f'{__name__.replace(".", "_")}_spam_okay'
+ script = dedent(f"""
+ def {funcname}():
+ # This a global var...
+ return __name__
+ """)
+
+ with self.subTest('pickleable, added dynamically'):
+ with defined_in___main__(funcname, script) as arg:
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(defs.spam_returns_arg, arg)
+
+ with self.subTest('lying about __main__'):
+ with defined_in___main__(funcname, script, remove=True) as arg:
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(defs.spam_returns_arg, arg)
+
+ def test_func_in___main___hidden(self):
+ # When a top-level function that uses global variables is called
+ # through Interpreter.call(), it will be pickled, sent over,
+ # and unpickled. That requires that it be found in the other
+ # interpreter's __main__ module. However, the original script
+ # that defined the function is only run in the main interpreter,
+ # so pickle.loads() would normally fail.
+ #
+ # We work around this by running the script in the other
+ # interpreter. However, this is a one-off solution for the sake
+ # of unpickling, so we avoid modifying that interpreter's
+ # __main__ module by running the script in a hidden module.
+ #
+ # In this test we verify that the function runs with the hidden
+ # module as its __globals__ when called in the other interpreter,
+ # and that the interpreter's __main__ module is unaffected.
+ text = dedent("""
+ eggs = True
+
+ def spam(*, explicit=False):
+ if explicit:
+ import __main__
+ ns = __main__.__dict__
+ else:
+ # For now we have to have a LOAD_GLOBAL in the
+ # function in order for globals() to actually return
+ # spam.__globals__. Maybe it doesn't go through pickle?
+ # XXX We will fix this later.
+ spam
+ ns = globals()
+
+ func = ns.get('spam')
+ return [
+ id(ns),
+ ns.get('__name__'),
+ ns.get('__file__'),
+ id(func),
+ None if func is None else repr(func),
+ ns.get('eggs'),
+ ns.get('ham'),
+ ]
+
+ if __name__ == "__main__":
+ from concurrent import interpreters
+ interp = interpreters.create()
+
+ ham = True
+ print([
+ [
+ spam(explicit=True),
+ spam(),
+ ],
+ [
+ interp.call(spam, explicit=True),
+ interp.call(spam),
+ ],
+ ])
+ """)
+ with os_helper.temp_dir() as tempdir:
+ filename = script_helper.make_script(tempdir, 'my-script', text)
+ res = script_helper.assert_python_ok(filename)
+ stdout = res.out.decode('utf-8').strip()
+ local, remote = eval(stdout)
+
+ # In the main interpreter.
+ main, unpickled = local
+ nsid, _, _, funcid, func, _, _ = main
+ self.assertEqual(main, [
+ nsid,
+ '__main__',
+ filename,
+ funcid,
+ func,
+ True,
+ True,
+ ])
+ self.assertIsNot(func, None)
+ self.assertRegex(func, '^<function spam at 0x.*>$')
+ self.assertEqual(unpickled, main)
+
+ # In the subinterpreter.
+ main, unpickled = remote
+ nsid1, _, _, funcid1, _, _, _ = main
+ self.assertEqual(main, [
+ nsid1,
+ '__main__',
+ None,
+ funcid1,
+ None,
+ None,
+ None,
+ ])
+ nsid2, _, _, funcid2, func, _, _ = unpickled
+ self.assertEqual(unpickled, [
+ nsid2,
+ '<fake __main__>',
+ filename,
+ funcid2,
+ func,
+ True,
+ None,
+ ])
+ self.assertIsNot(func, None)
+ self.assertRegex(func, '^<function spam at 0x.*>$')
+ self.assertNotEqual(nsid2, nsid1)
+ self.assertNotEqual(funcid2, funcid1)
+
+ def test_func_in___main___uses_globals(self):
+ # See the note in test_func_in___main___hidden about pickle
+ # and the __main__ module.
+ #
+ # Additionally, the solution to that problem must provide
+ # for global variables on which a pickled function might rely.
+ #
+ # To check that, we run a script that has two global functions
+ # and a global variable in the __main__ module. One of the
+ # functions sets the global variable and the other returns
+ # the value.
+ #
+ # The script calls those functions multiple times in another
+ # interpreter, to verify the following:
+ #
+ # * the global variable is properly initialized
+ # * the global variable retains state between calls
+ # * the setter modifies that persistent variable
+ # * the getter uses the variable
+ # * the calls in the other interpreter do not modify
+ # the main interpreter
+ # * those calls don't modify the interpreter's __main__ module
+ # * the functions and variable do not actually show up in the
+ # other interpreter's __main__ module
+ text = dedent("""
+ count = 0
+
+ def inc(x=1):
+ global count
+ count += x
+
+ def get_count():
+ return count
+
+ if __name__ == "__main__":
+ counts = []
+ results = [count, counts]
+
+ from concurrent import interpreters
+ interp = interpreters.create()
+
+ val = interp.call(get_count)
+ counts.append(val)
+
+ interp.call(inc)
+ val = interp.call(get_count)
+ counts.append(val)
+
+ interp.call(inc, 3)
+ val = interp.call(get_count)
+ counts.append(val)
+
+ results.append(count)
+
+ modified = {name: interp.call(eval, f'{name!r} in vars()')
+ for name in ('count', 'inc', 'get_count')}
+ results.append(modified)
+
+ print(results)
+ """)
+ with os_helper.temp_dir() as tempdir:
+ filename = script_helper.make_script(tempdir, 'my-script', text)
+ res = script_helper.assert_python_ok(filename)
+ stdout = res.out.decode('utf-8').strip()
+ before, counts, after, modified = eval(stdout)
+ self.assertEqual(modified, {
+ 'count': False,
+ 'inc': False,
+ 'get_count': False,
+ })
+ self.assertEqual(before, 0)
+ self.assertEqual(after, 0)
+ self.assertEqual(counts, [0, 1, 4])
+
+ def test_raises(self):
+ interp = interpreters.create()
+ with self.assertRaises(ExecutionFailed):
+ interp.call(call_func_failure)
+
+ with self.assert_fails(ValueError):
+ interp.call(call_func_complex, '???', exc=ValueError('spam'))
+
+ def test_call_valid(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs, expected) in enumerate([
+ (call_func_noop, (), {}, None),
+ (call_func_ident, ('spamspamspam',), {}, 'spamspamspam'),
+ (call_func_return_shareable, (), {}, (1, None)),
+ (call_func_return_pickleable, (), {}, [1, 2, 3]),
+ (Spam.noop, (), {}, None),
+ (Spam.from_values, (), {}, Spam(())),
+ (Spam.from_values, (1, 2, 3), {}, Spam((1, 2, 3))),
+ (Spam, ('???',), {}, Spam('???')),
+ (Spam(101), (), {}, (101, (), {})),
+ (Spam(10101).run, (), {}, (10101, (), {})),
+ (call_func_complex, ('ident', 'spam'), {}, 'spam'),
+ (call_func_complex, ('full-ident', 'spam'), {}, ('spam', (), {})),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'},
+ ('spam', ('ham',), {'eggs': '!!!'})),
+ (call_func_complex, ('globals',), {}, __name__),
+ (call_func_complex, ('interpid',), {}, interp.id),
+ (call_func_complex, ('custom', 'spam!'), {}, Spam('spam!')),
]):
with self.subTest(f'success case #{i+1}'):
- res = interp.call(callable)
- self.assertIs(res, None)
+ res = interp.call(callable, *args, **kwargs)
+ self.assertEqual(res, expected)
+
+ def test_call_invalid(self):
+ interp = interpreters.create()
+
+ func = get_call_func_closure
+ with self.subTest(func):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func, 42)
+
+ func = get_call_func_closure(42)
+ with self.subTest(func):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func)
+
+ func = call_func_complex
+ op = 'closure'
+ with self.subTest(f'{func} ({op})'):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func, op, value='~~~')
+
+ op = 'custom-inner'
+ with self.subTest(f'{func} ({op})'):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func, op, 'eggs!')
+
+ def test_callable_requires_frame(self):
+ # There are various functions that require a current frame.
+ interp = interpreters.create()
+ for call, expected in [
+ ((eval, '[1, 2, 3]'),
+ [1, 2, 3]),
+ ((eval, 'sum([1, 2, 3])'),
+ 6),
+ ((exec, '...'),
+ None),
+ ]:
+ with self.subTest(str(call)):
+ res = interp.call(*call)
+ self.assertEqual(res, expected)
+
+ result_not_pickleable = [
+ globals,
+ locals,
+ vars,
+ ]
+ for func, expectedtype in {
+ globals: dict,
+ locals: dict,
+ vars: dict,
+ dir: list,
+ }.items():
+ with self.subTest(str(func)):
+ if func in result_not_pickleable:
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func)
+ else:
+ res = interp.call(func)
+ self.assertIsInstance(res, expectedtype)
+ self.assertIn('__builtins__', res)
+
+ def test_globals_from_builtins(self):
+ # The builtins exec(), eval(), globals(), locals(), vars(),
+ # and dir() each runs relative to the target interpreter's
+ # __main__ module, when called directly. However,
+ # globals(), locals(), and vars() don't work when called
+ # directly so we don't check them.
+ from _frozen_importlib import BuiltinImporter
+ interp = interpreters.create()
+
+ names = interp.call(dir)
+ self.assertEqual(names, [
+ '__builtins__',
+ '__doc__',
+ '__loader__',
+ '__name__',
+ '__package__',
+ '__spec__',
+ ])
+
+ values = {name: interp.call(eval, name)
+ for name in names if name != '__builtins__'}
+ self.assertEqual(values, {
+ '__name__': '__main__',
+ '__doc__': None,
+ '__spec__': None, # It wasn't imported, so no module spec?
+ '__package__': None,
+ '__loader__': BuiltinImporter,
+ })
+ with self.assertRaises(ExecutionFailed):
+ interp.call(eval, 'spam'),
+
+ interp.call(exec, f'assert dir() == {names}')
+
+ # Update the interpreter's __main__.
+ interp.prepare_main(spam=42)
+ expected = names + ['spam']
+
+ names = interp.call(dir)
+ self.assertEqual(names, expected)
+
+ value = interp.call(eval, 'spam')
+ self.assertEqual(value, 42)
+
+ interp.call(exec, f'assert dir() == {expected}, dir()')
+
+ def test_globals_from_stateless_func(self):
+ # A stateless func, which doesn't depend on any globals,
+ # doesn't go through pickle, so it runs in __main__.
+ def set_global(name, value):
+ globals()[name] = value
+
+ def get_global(name):
+ return globals().get(name)
+
+ interp = interpreters.create()
+
+ modname = interp.call(get_global, '__name__')
+ self.assertEqual(modname, '__main__')
+
+ res = interp.call(get_global, 'spam')
+ self.assertIsNone(res)
+
+ interp.exec('spam = True')
+ res = interp.call(get_global, 'spam')
+ self.assertTrue(res)
+
+ interp.call(set_global, 'spam', 42)
+ res = interp.call(get_global, 'spam')
+ self.assertEqual(res, 42)
+
+ interp.exec('assert spam == 42, repr(spam)')
+
+ def test_call_in_thread(self):
+ interp = interpreters.create()
for i, (callable, args, kwargs) in enumerate([
- (call_func_ident, ('spamspamspam',), {}),
- (get_call_func_closure, (42,), {}),
- (get_call_func_closure(42), (), {}),
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_pickleable, (), {}),
(Spam.from_values, (), {}),
(Spam.from_values, (1, 2, 3), {}),
- (Spam, ('???'), {}),
(Spam(101), (), {}),
(Spam(10101).run, (), {}),
+ (Spam.noop, (), {}),
(call_func_complex, ('ident', 'spam'), {}),
(call_func_complex, ('full-ident', 'spam'), {}),
(call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
(call_func_complex, ('globals',), {}),
(call_func_complex, ('interpid',), {}),
- (call_func_complex, ('closure',), {'value': '~~~'}),
(call_func_complex, ('custom', 'spam!'), {}),
- (call_func_complex, ('custom-inner', 'eggs!'), {}),
- (call_func_complex, ('???',), {'exc': ValueError('spam')}),
- ]):
- with self.subTest(f'invalid case #{i+1}'):
- with self.assertRaises(Exception):
- if args or kwargs:
- raise Exception((args, kwargs))
- interp.call(callable)
-
- with self.assertRaises(ExecutionFailed):
- interp.call(call_func_failure)
-
- def test_call_in_thread(self):
- interp = interpreters.create()
-
- for i, (callable, args, kwargs) in enumerate([
- (call_func_noop, (), {}),
- (call_func_return_shareable, (), {}),
- (call_func_return_not_shareable, (), {}),
- (Spam.noop, (), {}),
]):
with self.subTest(f'success case #{i+1}'):
with self.captured_thread_exception() as ctx:
- t = interp.call_in_thread(callable)
+ t = interp.call_in_thread(callable, *args, **kwargs)
t.join()
self.assertIsNone(ctx.caught)
for i, (callable, args, kwargs) in enumerate([
- (call_func_ident, ('spamspamspam',), {}),
(get_call_func_closure, (42,), {}),
(get_call_func_closure(42), (), {}),
- (Spam.from_values, (), {}),
- (Spam.from_values, (1, 2, 3), {}),
- (Spam, ('???'), {}),
- (Spam(101), (), {}),
- (Spam(10101).run, (), {}),
- (call_func_complex, ('ident', 'spam'), {}),
- (call_func_complex, ('full-ident', 'spam'), {}),
- (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
- (call_func_complex, ('globals',), {}),
- (call_func_complex, ('interpid',), {}),
- (call_func_complex, ('closure',), {'value': '~~~'}),
- (call_func_complex, ('custom', 'spam!'), {}),
- (call_func_complex, ('custom-inner', 'eggs!'), {}),
- (call_func_complex, ('???',), {'exc': ValueError('spam')}),
]):
with self.subTest(f'invalid case #{i+1}'):
- if args or kwargs:
- continue
with self.captured_thread_exception() as ctx:
- t = interp.call_in_thread(callable)
+ t = interp.call_in_thread(callable, *args, **kwargs)
t.join()
self.assertIsNotNone(ctx.caught)
@@ -1452,6 +2117,14 @@ class LowLevelTests(TestBase):
self.assertFalse(
self.interp_exists(interpid))
+ with self.subTest('basic C-API'):
+ interpid = _testinternalcapi.create_interpreter()
+ self.assertTrue(
+ self.interp_exists(interpid))
+ _testinternalcapi.destroy_interpreter(interpid, basic=True)
+ self.assertFalse(
+ self.interp_exists(interpid))
+
def test_get_config(self):
# This test overlaps with
# test.test_capi.test_misc.InterpreterConfigTests.
@@ -1585,18 +2258,14 @@ class LowLevelTests(TestBase):
with results:
exc = _interpreters.exec(interpid, script)
out = results.stdout()
- self.assertEqual(out, '')
- self.assert_ns_equal(exc, types.SimpleNamespace(
- type=types.SimpleNamespace(
- __name__='Exception',
- __qualname__='Exception',
- __module__='builtins',
- ),
- msg='uh-oh!',
+ expected = build_excinfo(
+ Exception, 'uh-oh!',
# We check these in other tests.
formatted=exc.formatted,
errdisplay=exc.errdisplay,
- ))
+ )
+ self.assertEqual(out, '')
+ self.assert_ns_equal(exc, expected)
with self.subTest('from C-API'):
with self.interpreter_from_capi() as interpid:
@@ -1608,25 +2277,50 @@ class LowLevelTests(TestBase):
self.assertEqual(exc.msg, 'it worked!')
def test_call(self):
- with self.subTest('no args'):
- interpid = _interpreters.create()
- exc = _interpreters.call(interpid, call_func_return_shareable)
- self.assertIs(exc, None)
+ interpid = _interpreters.create()
+
+ # Here we focus on basic args and return values.
+ # See TestInterpreterCall for full operational coverage,
+ # including supported callables.
+
+ with self.subTest('no args, return None'):
+ func = defs.spam_minimal
+ res, exc = _interpreters.call(interpid, func)
+ self.assertIsNone(exc)
+ self.assertIsNone(res)
+
+ with self.subTest('empty args, return None'):
+ func = defs.spam_minimal
+ res, exc = _interpreters.call(interpid, func, (), {})
+ self.assertIsNone(exc)
+ self.assertIsNone(res)
+
+ with self.subTest('no args, return non-None'):
+ func = defs.script_with_return
+ res, exc = _interpreters.call(interpid, func)
+ self.assertIsNone(exc)
+ self.assertIs(res, True)
+
+ with self.subTest('full args, return non-None'):
+ expected = (1, 2, 3, 4, 5, 6, (7, 8), {'g': 9, 'h': 0})
+ func = defs.spam_full_args
+ args = (1, 2, 3, 4, 7, 8)
+ kwargs = dict(e=5, f=6, g=9, h=0)
+ res, exc = _interpreters.call(interpid, func, args, kwargs)
+ self.assertIsNone(exc)
+ self.assertEqual(res, expected)
with self.subTest('uncaught exception'):
- interpid = _interpreters.create()
- exc = _interpreters.call(interpid, call_func_failure)
- self.assertEqual(exc, types.SimpleNamespace(
- type=types.SimpleNamespace(
- __name__='Exception',
- __qualname__='Exception',
- __module__='builtins',
- ),
- msg='spam!',
+ func = defs.spam_raises
+ res, exc = _interpreters.call(interpid, func)
+ expected = build_excinfo(
+ Exception, 'spam!',
# We check these in other tests.
formatted=exc.formatted,
errdisplay=exc.errdisplay,
- ))
+ )
+ self.assertIsNone(res)
+ self.assertEqual(exc, expected)
@requires_test_modules
def test_set___main___attrs(self):