aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_crossinterp.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_crossinterp.py')
-rw-r--r--Lib/test/test_crossinterp.py1134
1 files changed, 1052 insertions, 82 deletions
diff --git a/Lib/test/test_crossinterp.py b/Lib/test/test_crossinterp.py
index 5ebb78b0ea9..2fa0077a09b 100644
--- a/Lib/test/test_crossinterp.py
+++ b/Lib/test/test_crossinterp.py
@@ -1,7 +1,9 @@
+import contextlib
import itertools
import sys
import types
import unittest
+import warnings
from test.support import import_helper
@@ -9,80 +11,500 @@ _testinternalcapi = import_helper.import_module('_testinternalcapi')
_interpreters = import_helper.import_module('_interpreters')
from _interpreters import NotShareableError
-
+from test import _code_definitions as code_defs
from test import _crossinterp_definitions as defs
-BUILTIN_TYPES = [o for _, o in __builtins__.items()
- if isinstance(o, type)]
-EXCEPTION_TYPES = [cls for cls in BUILTIN_TYPES
+@contextlib.contextmanager
+def ignore_byteswarning():
+ with warnings.catch_warnings():
+ warnings.filterwarnings('ignore', category=BytesWarning)
+ yield
+
+
+# builtin types
+
+BUILTINS_TYPES = [o for _, o in __builtins__.items() if isinstance(o, type)]
+EXCEPTION_TYPES = [cls for cls in BUILTINS_TYPES
if issubclass(cls, BaseException)]
OTHER_TYPES = [o for n, o in vars(types).items()
if (isinstance(o, type) and
- n not in ('DynamicClassAttribute', '_GeneratorWrapper'))]
+ n not in ('DynamicClassAttribute', '_GeneratorWrapper'))]
+BUILTIN_TYPES = [
+ *BUILTINS_TYPES,
+ *OTHER_TYPES,
+]
+
+# builtin exceptions
+
+try:
+ raise Exception
+except Exception as exc:
+ CAUGHT = exc
+EXCEPTIONS_WITH_SPECIAL_SIG = {
+ BaseExceptionGroup: (lambda msg: (msg, [CAUGHT])),
+ ExceptionGroup: (lambda msg: (msg, [CAUGHT])),
+ UnicodeError: (lambda msg: (None, msg, None, None, None)),
+ UnicodeEncodeError: (lambda msg: ('utf-8', '', 1, 3, msg)),
+ UnicodeDecodeError: (lambda msg: ('utf-8', b'', 1, 3, msg)),
+ UnicodeTranslateError: (lambda msg: ('', 1, 3, msg)),
+}
+BUILTIN_EXCEPTIONS = [
+ *(cls(*sig('error!')) for cls, sig in EXCEPTIONS_WITH_SPECIAL_SIG.items()),
+ *(cls('error!') for cls in EXCEPTION_TYPES
+ if cls not in EXCEPTIONS_WITH_SPECIAL_SIG),
+]
+
+# other builtin objects
+
+METHOD = defs.SpamOkay().okay
+BUILTIN_METHOD = [].append
+METHOD_DESCRIPTOR_WRAPPER = str.join
+METHOD_WRAPPER = object().__str__
+WRAPPER_DESCRIPTOR = object.__init__
+BUILTIN_WRAPPERS = {
+ METHOD: types.MethodType,
+ BUILTIN_METHOD: types.BuiltinMethodType,
+ dict.__dict__['fromkeys']: types.ClassMethodDescriptorType,
+ types.FunctionType.__code__: types.GetSetDescriptorType,
+ types.FunctionType.__globals__: types.MemberDescriptorType,
+ METHOD_DESCRIPTOR_WRAPPER: types.MethodDescriptorType,
+ METHOD_WRAPPER: types.MethodWrapperType,
+ WRAPPER_DESCRIPTOR: types.WrapperDescriptorType,
+ staticmethod(defs.SpamOkay.okay): None,
+ classmethod(defs.SpamOkay.okay): None,
+ property(defs.SpamOkay.okay): None,
+}
+BUILTIN_FUNCTIONS = [
+ # types.BuiltinFunctionType
+ len,
+ sys.is_finalizing,
+ sys.exit,
+ _testinternalcapi.get_crossinterp_data,
+]
+assert 'emptymod' not in sys.modules
+with import_helper.ready_to_import('emptymod', ''):
+ import emptymod as EMPTYMOD
+MODULES = [
+ sys,
+ defs,
+ unittest,
+ EMPTYMOD,
+]
+OBJECT = object()
+EXCEPTION = Exception()
+LAMBDA = (lambda: None)
+BUILTIN_SIMPLE = [
+ OBJECT,
+ # singletons
+ None,
+ True,
+ False,
+ Ellipsis,
+ NotImplemented,
+ # bytes
+ *(i.to_bytes(2, 'little', signed=True)
+ for i in range(-1, 258)),
+ # str
+ 'hello world',
+ '你好世界',
+ '',
+ # int
+ sys.maxsize + 1,
+ sys.maxsize,
+ -sys.maxsize - 1,
+ -sys.maxsize - 2,
+ *range(-1, 258),
+ 2**1000,
+ # float
+ 0.0,
+ 1.1,
+ -1.0,
+ 0.12345678,
+ -0.12345678,
+]
+TUPLE_EXCEPTION = (0, 1.0, EXCEPTION)
+TUPLE_OBJECT = (0, 1.0, OBJECT)
+TUPLE_NESTED_EXCEPTION = (0, 1.0, (EXCEPTION,))
+TUPLE_NESTED_OBJECT = (0, 1.0, (OBJECT,))
+MEMORYVIEW_EMPTY = memoryview(b'')
+MEMORYVIEW_NOT_EMPTY = memoryview(b'spam'*42)
+MAPPING_PROXY_EMPTY = types.MappingProxyType({})
+BUILTIN_CONTAINERS = [
+ # tuple (flat)
+ (),
+ (1,),
+ ("hello", "world", ),
+ (1, True, "hello"),
+ TUPLE_EXCEPTION,
+ TUPLE_OBJECT,
+ # tuple (nested)
+ ((1,),),
+ ((1, 2), (3, 4)),
+ ((1, 2), (3, 4), (5, 6)),
+ TUPLE_NESTED_EXCEPTION,
+ TUPLE_NESTED_OBJECT,
+ # buffer
+ MEMORYVIEW_EMPTY,
+ MEMORYVIEW_NOT_EMPTY,
+ # list
+ [],
+ [1, 2, 3],
+ [[1], (2,), {3: 4}],
+ # dict
+ {},
+ {1: 7, 2: 8, 3: 9},
+ {1: [1], 2: (2,), 3: {3: 4}},
+ # set
+ set(),
+ {1, 2, 3},
+ {frozenset({1}), (2,)},
+ # frozenset
+ frozenset([]),
+ frozenset({frozenset({1}), (2,)}),
+ # bytearray
+ bytearray(b''),
+ # other
+ MAPPING_PROXY_EMPTY,
+ types.SimpleNamespace(),
+]
+ns = {}
+exec("""
+try:
+ raise Exception
+except Exception as exc:
+ TRACEBACK = exc.__traceback__
+ FRAME = TRACEBACK.tb_frame
+""", ns, ns)
+BUILTIN_OTHER = [
+ # types.CellType
+ types.CellType(),
+ # types.FrameType
+ ns['FRAME'],
+ # types.TracebackType
+ ns['TRACEBACK'],
+]
+del ns
+
+# user-defined objects
+
+USER_TOP_INSTANCES = [c(*a) for c, a in defs.TOP_CLASSES.items()]
+USER_NESTED_INSTANCES = [c(*a) for c, a in defs.NESTED_CLASSES.items()]
+USER_INSTANCES = [
+ *USER_TOP_INSTANCES,
+ *USER_NESTED_INSTANCES,
+]
+USER_EXCEPTIONS = [
+ defs.MimimalError('error!'),
+]
+
+# shareable objects
+
+TUPLES_WITHOUT_EQUALITY = [
+ TUPLE_EXCEPTION,
+ TUPLE_OBJECT,
+ TUPLE_NESTED_EXCEPTION,
+ TUPLE_NESTED_OBJECT,
+]
+_UNSHAREABLE_SIMPLE = [
+ Ellipsis,
+ NotImplemented,
+ OBJECT,
+ sys.maxsize + 1,
+ -sys.maxsize - 2,
+ 2**1000,
+]
+with ignore_byteswarning():
+ _SHAREABLE_SIMPLE = [o for o in BUILTIN_SIMPLE
+ if o not in _UNSHAREABLE_SIMPLE]
+ _SHAREABLE_CONTAINERS = [
+ *(o for o in BUILTIN_CONTAINERS if type(o) is memoryview),
+ *(o for o in BUILTIN_CONTAINERS
+ if type(o) is tuple and o not in TUPLES_WITHOUT_EQUALITY),
+ ]
+ _UNSHAREABLE_CONTAINERS = [o for o in BUILTIN_CONTAINERS
+ if o not in _SHAREABLE_CONTAINERS]
+SHAREABLE = [
+ *_SHAREABLE_SIMPLE,
+ *_SHAREABLE_CONTAINERS,
+]
+NOT_SHAREABLE = [
+ *_UNSHAREABLE_SIMPLE,
+ *_UNSHAREABLE_CONTAINERS,
+ *BUILTIN_TYPES,
+ *BUILTIN_WRAPPERS,
+ *BUILTIN_EXCEPTIONS,
+ *BUILTIN_FUNCTIONS,
+ *MODULES,
+ *BUILTIN_OTHER,
+ # types.CodeType
+ *(f.__code__ for f in defs.FUNCTIONS),
+ *(f.__code__ for f in defs.FUNCTION_LIKE),
+ # types.FunctionType
+ *defs.FUNCTIONS,
+ defs.SpamOkay.okay,
+ LAMBDA,
+ *defs.FUNCTION_LIKE,
+ # coroutines and generators
+ *defs.FUNCTION_LIKE_APPLIED,
+ # user classes
+ *defs.CLASSES,
+ *USER_INSTANCES,
+ # user exceptions
+ *USER_EXCEPTIONS,
+]
+
+# pickleable objects
+
+PICKLEABLE = [
+ *BUILTIN_SIMPLE,
+ *(o for o in BUILTIN_CONTAINERS if o not in [
+ MEMORYVIEW_EMPTY,
+ MEMORYVIEW_NOT_EMPTY,
+ MAPPING_PROXY_EMPTY,
+ ] or type(o) is dict),
+ *BUILTINS_TYPES,
+ *BUILTIN_EXCEPTIONS,
+ *BUILTIN_FUNCTIONS,
+ *defs.TOP_FUNCTIONS,
+ defs.SpamOkay.okay,
+ *defs.FUNCTION_LIKE,
+ *defs.TOP_CLASSES,
+ *USER_TOP_INSTANCES,
+ *USER_EXCEPTIONS,
+ # from OTHER_TYPES
+ types.NoneType,
+ types.EllipsisType,
+ types.NotImplementedType,
+ types.GenericAlias,
+ types.UnionType,
+ types.SimpleNamespace,
+ # from BUILTIN_WRAPPERS
+ METHOD,
+ BUILTIN_METHOD,
+ METHOD_DESCRIPTOR_WRAPPER,
+ METHOD_WRAPPER,
+ WRAPPER_DESCRIPTOR,
+]
+assert not any(isinstance(o, types.MappingProxyType) for o in PICKLEABLE)
+
+
+# helpers
+
+DEFS = defs
+with open(code_defs.__file__) as infile:
+ _code_defs_text = infile.read()
+with open(DEFS.__file__) as infile:
+ _defs_text = infile.read()
+ _defs_text = _defs_text.replace('from ', '# from ')
+DEFS_TEXT = f"""
+#######################################
+# from {code_defs.__file__}
+
+{_code_defs_text}
+
+#######################################
+# from {defs.__file__}
+
+{_defs_text}
+"""
+del infile, _code_defs_text, _defs_text
+
+
+def load_defs(module=None):
+ """Return a new copy of the test._crossinterp_definitions module.
+
+ The module's __name__ matches the "module" arg, which is either
+ a str or a module.
+
+ If the "module" arg is a module then the just-loaded defs are also
+ copied into that module.
+
+ Note that the new module is not added to sys.modules.
+ """
+ if module is None:
+ modname = DEFS.__name__
+ elif isinstance(module, str):
+ modname = module
+ module = None
+ else:
+ modname = module.__name__
+ # Create the new module and populate it.
+ defs = import_helper.create_module(modname)
+ defs.__file__ = DEFS.__file__
+ exec(DEFS_TEXT, defs.__dict__)
+ # Copy the defs into the module arg, if any.
+ if module is not None:
+ for name, value in defs.__dict__.items():
+ if name.startswith('_'):
+ continue
+ assert not hasattr(module, name), (name, getattr(module, name))
+ setattr(module, name, value)
+ return defs
+
+
+@contextlib.contextmanager
+def using___main__():
+ """Make sure __main__ module exists (and clean up after)."""
+ modname = '__main__'
+ if modname not in sys.modules:
+ with import_helper.isolated_modules():
+ yield import_helper.add_module(modname)
+ else:
+ with import_helper.module_restored(modname) as mod:
+ yield mod
+
+
+@contextlib.contextmanager
+def temp_module(modname):
+ """Create the module and add to sys.modules, then remove it after."""
+ assert modname not in sys.modules, (modname,)
+ with import_helper.isolated_modules():
+ yield import_helper.add_module(modname)
+
+
+@contextlib.contextmanager
+def missing_defs_module(modname, *, prep=False):
+ assert modname not in sys.modules, (modname,)
+ if prep:
+ with import_helper.ready_to_import(modname, DEFS_TEXT):
+ yield modname
+ else:
+ with import_helper.isolated_modules():
+ yield modname
class _GetXIDataTests(unittest.TestCase):
MODE = None
+ def assert_functions_equal(self, func1, func2):
+ assert type(func1) is types.FunctionType, repr(func1)
+ assert type(func2) is types.FunctionType, repr(func2)
+ self.assertEqual(func1.__name__, func2.__name__)
+ self.assertEqual(func1.__code__, func2.__code__)
+ self.assertEqual(func1.__defaults__, func2.__defaults__)
+ self.assertEqual(func1.__kwdefaults__, func2.__kwdefaults__)
+ # We don't worry about __globals__ for now.
+
+ def assert_exc_args_equal(self, exc1, exc2):
+ args1 = exc1.args
+ args2 = exc2.args
+ if isinstance(exc1, ExceptionGroup):
+ self.assertIs(type(args1), type(args2))
+ self.assertEqual(len(args1), 2)
+ self.assertEqual(len(args1), len(args2))
+ self.assertEqual(args1[0], args2[0])
+ group1 = args1[1]
+ group2 = args2[1]
+ self.assertEqual(len(group1), len(group2))
+ for grouped1, grouped2 in zip(group1, group2):
+ # Currently the "extra" attrs are not preserved
+ # (via __reduce__).
+ self.assertIs(type(exc1), type(exc2))
+ self.assert_exc_equal(grouped1, grouped2)
+ else:
+ self.assertEqual(args1, args2)
+
+ def assert_exc_equal(self, exc1, exc2):
+ self.assertIs(type(exc1), type(exc2))
+
+ if type(exc1).__eq__ is not object.__eq__:
+ self.assertEqual(exc1, exc2)
+
+ self.assert_exc_args_equal(exc1, exc2)
+ # XXX For now we do not preserve tracebacks.
+ if exc1.__traceback__ is not None:
+ self.assertEqual(exc1.__traceback__, exc2.__traceback__)
+ self.assertEqual(
+ getattr(exc1, '__notes__', None),
+ getattr(exc2, '__notes__', None),
+ )
+ # We assume there are no cycles.
+ if exc1.__cause__ is None:
+ self.assertIs(exc1.__cause__, exc2.__cause__)
+ else:
+ self.assert_exc_equal(exc1.__cause__, exc2.__cause__)
+ if exc1.__context__ is None:
+ self.assertIs(exc1.__context__, exc2.__context__)
+ else:
+ self.assert_exc_equal(exc1.__context__, exc2.__context__)
+
+ def assert_equal_or_equalish(self, obj, expected):
+ cls = type(expected)
+ if cls.__eq__ is not object.__eq__:
+ self.assertEqual(obj, expected)
+ elif cls is types.FunctionType:
+ self.assert_functions_equal(obj, expected)
+ elif isinstance(expected, BaseException):
+ self.assert_exc_equal(obj, expected)
+ elif cls is types.MethodType:
+ raise NotImplementedError(cls)
+ elif cls is types.BuiltinMethodType:
+ raise NotImplementedError(cls)
+ elif cls is types.MethodWrapperType:
+ raise NotImplementedError(cls)
+ elif cls.__bases__ == (object,):
+ self.assertEqual(obj.__dict__, expected.__dict__)
+ else:
+ raise NotImplementedError(cls)
+
def get_xidata(self, obj, *, mode=None):
mode = self._resolve_mode(mode)
return _testinternalcapi.get_crossinterp_data(obj, mode)
def get_roundtrip(self, obj, *, mode=None):
mode = self._resolve_mode(mode)
- xid =_testinternalcapi.get_crossinterp_data(obj, mode)
+ return self._get_roundtrip(obj, mode)
+
+ def _get_roundtrip(self, obj, mode):
+ xid = _testinternalcapi.get_crossinterp_data(obj, mode)
return _testinternalcapi.restore_crossinterp_data(xid)
- def iter_roundtrip_values(self, values, *, mode=None):
+ def assert_roundtrip_identical(self, values, *, mode=None):
mode = self._resolve_mode(mode)
for obj in values:
- with self.subTest(obj):
- xid = _testinternalcapi.get_crossinterp_data(obj, mode)
- got = _testinternalcapi.restore_crossinterp_data(xid)
- yield obj, got
-
- def assert_roundtrip_identical(self, values, *, mode=None):
- for obj, got in self.iter_roundtrip_values(values, mode=mode):
- # XXX What about between interpreters?
- self.assertIs(got, obj)
+ with self.subTest(repr(obj)):
+ got = self._get_roundtrip(obj, mode)
+ self.assertIs(got, obj)
def assert_roundtrip_equal(self, values, *, mode=None, expecttype=None):
- for obj, got in self.iter_roundtrip_values(values, mode=mode):
- self.assertEqual(got, obj)
- self.assertIs(type(got),
- type(obj) if expecttype is None else expecttype)
-
-# def assert_roundtrip_equal_not_identical(self, values, *,
-# mode=None, expecttype=None):
-# mode = self._resolve_mode(mode)
-# for obj in values:
-# cls = type(obj)
-# with self.subTest(obj):
-# got = self._get_roundtrip(obj, mode)
-# self.assertIsNot(got, obj)
-# self.assertIs(type(got), type(obj))
-# self.assertEqual(got, obj)
-# self.assertIs(type(got),
-# cls if expecttype is None else expecttype)
-#
-# def assert_roundtrip_not_equal(self, values, *, mode=None, expecttype=None):
-# mode = self._resolve_mode(mode)
-# for obj in values:
-# cls = type(obj)
-# with self.subTest(obj):
-# got = self._get_roundtrip(obj, mode)
-# self.assertIsNot(got, obj)
-# self.assertIs(type(got), type(obj))
-# self.assertNotEqual(got, obj)
-# self.assertIs(type(got),
-# cls if expecttype is None else expecttype)
+ mode = self._resolve_mode(mode)
+ for obj in values:
+ with self.subTest(repr(obj)):
+ got = self._get_roundtrip(obj, mode)
+ if got is obj:
+ continue
+ self.assertIs(type(got),
+ type(obj) if expecttype is None else expecttype)
+ self.assert_equal_or_equalish(got, obj)
+
+ def assert_roundtrip_equal_not_identical(self, values, *,
+ mode=None, expecttype=None):
+ mode = self._resolve_mode(mode)
+ for obj in values:
+ with self.subTest(repr(obj)):
+ got = self._get_roundtrip(obj, mode)
+ self.assertIsNot(got, obj)
+ self.assertIs(type(got),
+ type(obj) if expecttype is None else expecttype)
+ self.assert_equal_or_equalish(got, obj)
+
+ def assert_roundtrip_not_equal(self, values, *,
+ mode=None, expecttype=None):
+ mode = self._resolve_mode(mode)
+ for obj in values:
+ with self.subTest(repr(obj)):
+ got = self._get_roundtrip(obj, mode)
+ self.assertIsNot(got, obj)
+ self.assertIs(type(got),
+ type(obj) if expecttype is None else expecttype)
+ self.assertNotEqual(got, obj)
def assert_not_shareable(self, values, exctype=None, *, mode=None):
mode = self._resolve_mode(mode)
for obj in values:
- with self.subTest(obj):
+ with self.subTest(repr(obj)):
with self.assertRaises(NotShareableError) as cm:
_testinternalcapi.get_crossinterp_data(obj, mode)
if exctype is not None:
@@ -95,6 +517,340 @@ class _GetXIDataTests(unittest.TestCase):
return mode
+class PickleTests(_GetXIDataTests):
+
+ MODE = 'pickle'
+
+ def test_shareable(self):
+ with ignore_byteswarning():
+ for obj in SHAREABLE:
+ if obj in PICKLEABLE:
+ self.assert_roundtrip_equal([obj])
+ else:
+ self.assert_not_shareable([obj])
+
+ def test_not_shareable(self):
+ with ignore_byteswarning():
+ for obj in NOT_SHAREABLE:
+ if type(obj) is types.MappingProxyType:
+ self.assert_not_shareable([obj])
+ elif obj in PICKLEABLE:
+ with self.subTest(repr(obj)):
+ # We don't worry about checking the actual value.
+ # The other tests should cover that well enough.
+ got = self.get_roundtrip(obj)
+ self.assertIs(type(got), type(obj))
+ else:
+ self.assert_not_shareable([obj])
+
+ def test_list(self):
+ self.assert_roundtrip_equal_not_identical([
+ [],
+ [1, 2, 3],
+ [[1], (2,), {3: 4}],
+ ])
+
+ def test_dict(self):
+ self.assert_roundtrip_equal_not_identical([
+ {},
+ {1: 7, 2: 8, 3: 9},
+ {1: [1], 2: (2,), 3: {3: 4}},
+ ])
+
+ def test_set(self):
+ self.assert_roundtrip_equal_not_identical([
+ set(),
+ {1, 2, 3},
+ {frozenset({1}), (2,)},
+ ])
+
+ # classes
+
+ def assert_class_defs_same(self, defs):
+ # Unpickle relative to the unchanged original module.
+ self.assert_roundtrip_identical(defs.TOP_CLASSES)
+
+ instances = []
+ for cls, args in defs.TOP_CLASSES.items():
+ if cls in defs.CLASSES_WITHOUT_EQUALITY:
+ continue
+ instances.append(cls(*args))
+ self.assert_roundtrip_equal_not_identical(instances)
+
+ # these don't compare equal
+ instances = []
+ for cls, args in defs.TOP_CLASSES.items():
+ if cls not in defs.CLASSES_WITHOUT_EQUALITY:
+ continue
+ instances.append(cls(*args))
+ self.assert_roundtrip_equal(instances)
+
+ def assert_class_defs_other_pickle(self, defs, mod):
+ # Pickle relative to a different module than the original.
+ for cls in defs.TOP_CLASSES:
+ assert not hasattr(mod, cls.__name__), (cls, getattr(mod, cls.__name__))
+ self.assert_not_shareable(defs.TOP_CLASSES)
+
+ instances = []
+ for cls, args in defs.TOP_CLASSES.items():
+ instances.append(cls(*args))
+ self.assert_not_shareable(instances)
+
+ def assert_class_defs_other_unpickle(self, defs, mod, *, fail=False):
+ # Unpickle relative to a different module than the original.
+ for cls in defs.TOP_CLASSES:
+ assert not hasattr(mod, cls.__name__), (cls, getattr(mod, cls.__name__))
+
+ instances = []
+ for cls, args in defs.TOP_CLASSES.items():
+ with self.subTest(repr(cls)):
+ setattr(mod, cls.__name__, cls)
+ xid = self.get_xidata(cls)
+ inst = cls(*args)
+ instxid = self.get_xidata(inst)
+ instances.append(
+ (cls, xid, inst, instxid))
+
+ for cls, xid, inst, instxid in instances:
+ with self.subTest(repr(cls)):
+ delattr(mod, cls.__name__)
+ if fail:
+ with self.assertRaises(NotShareableError):
+ _testinternalcapi.restore_crossinterp_data(xid)
+ continue
+ got = _testinternalcapi.restore_crossinterp_data(xid)
+ self.assertIsNot(got, cls)
+ self.assertNotEqual(got, cls)
+
+ gotcls = got
+ got = _testinternalcapi.restore_crossinterp_data(instxid)
+ self.assertIsNot(got, inst)
+ self.assertIs(type(got), gotcls)
+ if cls in defs.CLASSES_WITHOUT_EQUALITY:
+ self.assertNotEqual(got, inst)
+ elif cls in defs.BUILTIN_SUBCLASSES:
+ self.assertEqual(got, inst)
+ else:
+ self.assertNotEqual(got, inst)
+
+ def assert_class_defs_not_shareable(self, defs):
+ self.assert_not_shareable(defs.TOP_CLASSES)
+
+ instances = []
+ for cls, args in defs.TOP_CLASSES.items():
+ instances.append(cls(*args))
+ self.assert_not_shareable(instances)
+
+ def test_user_class_normal(self):
+ self.assert_class_defs_same(defs)
+
+ def test_user_class_in___main__(self):
+ with using___main__() as mod:
+ defs = load_defs(mod)
+ self.assert_class_defs_same(defs)
+
+ def test_user_class_not_in___main___with_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ assert defs.__file__
+ mod.__file__ = defs.__file__
+ self.assert_class_defs_not_shareable(defs)
+
+ def test_user_class_not_in___main___without_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ defs.__file__ = None
+ mod.__file__ = None
+ self.assert_class_defs_not_shareable(defs)
+
+ def test_user_class_not_in___main___unpickle_with_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ assert defs.__file__
+ mod.__file__ = defs.__file__
+ self.assert_class_defs_other_unpickle(defs, mod)
+
+ def test_user_class_not_in___main___unpickle_without_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ defs.__file__ = None
+ mod.__file__ = None
+ self.assert_class_defs_other_unpickle(defs, mod, fail=True)
+
+ def test_user_class_in_module(self):
+ with temp_module('__spam__') as mod:
+ defs = load_defs(mod)
+ self.assert_class_defs_same(defs)
+
+ def test_user_class_not_in_module_with_filename(self):
+ with temp_module('__spam__') as mod:
+ defs = load_defs(mod.__name__)
+ assert defs.__file__
+ # For now, we only address this case for __main__.
+ self.assert_class_defs_not_shareable(defs)
+
+ def test_user_class_not_in_module_without_filename(self):
+ with temp_module('__spam__') as mod:
+ defs = load_defs(mod.__name__)
+ defs.__file__ = None
+ self.assert_class_defs_not_shareable(defs)
+
+ def test_user_class_module_missing_then_imported(self):
+ with missing_defs_module('__spam__', prep=True) as modname:
+ defs = load_defs(modname)
+ # For now, we only address this case for __main__.
+ self.assert_class_defs_not_shareable(defs)
+
+ def test_user_class_module_missing_not_available(self):
+ with missing_defs_module('__spam__') as modname:
+ defs = load_defs(modname)
+ self.assert_class_defs_not_shareable(defs)
+
+ def test_nested_class(self):
+ eggs = defs.EggsNested()
+ with self.assertRaises(NotShareableError):
+ self.get_roundtrip(eggs)
+
+ # functions
+
+ def assert_func_defs_same(self, defs):
+ # Unpickle relative to the unchanged original module.
+ self.assert_roundtrip_identical(defs.TOP_FUNCTIONS)
+
+ def assert_func_defs_other_pickle(self, defs, mod):
+ # Pickle relative to a different module than the original.
+ for func in defs.TOP_FUNCTIONS:
+ assert not hasattr(mod, func.__name__), (getattr(mod, func.__name__),)
+ self.assert_not_shareable(defs.TOP_FUNCTIONS)
+
+ def assert_func_defs_other_unpickle(self, defs, mod, *, fail=False):
+ # Unpickle relative to a different module than the original.
+ for func in defs.TOP_FUNCTIONS:
+ assert not hasattr(mod, func.__name__), (getattr(mod, func.__name__),)
+
+ captured = []
+ for func in defs.TOP_FUNCTIONS:
+ with self.subTest(func):
+ setattr(mod, func.__name__, func)
+ xid = self.get_xidata(func)
+ captured.append(
+ (func, xid))
+
+ for func, xid in captured:
+ with self.subTest(func):
+ delattr(mod, func.__name__)
+ if fail:
+ with self.assertRaises(NotShareableError):
+ _testinternalcapi.restore_crossinterp_data(xid)
+ continue
+ got = _testinternalcapi.restore_crossinterp_data(xid)
+ self.assertIsNot(got, func)
+ self.assertNotEqual(got, func)
+
+ def assert_func_defs_not_shareable(self, defs):
+ self.assert_not_shareable(defs.TOP_FUNCTIONS)
+
+ def test_user_function_normal(self):
+ self.assert_roundtrip_equal(defs.TOP_FUNCTIONS)
+ self.assert_func_defs_same(defs)
+
+ def test_user_func_in___main__(self):
+ with using___main__() as mod:
+ defs = load_defs(mod)
+ self.assert_func_defs_same(defs)
+
+ def test_user_func_not_in___main___with_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ assert defs.__file__
+ mod.__file__ = defs.__file__
+ self.assert_func_defs_not_shareable(defs)
+
+ def test_user_func_not_in___main___without_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ defs.__file__ = None
+ mod.__file__ = None
+ self.assert_func_defs_not_shareable(defs)
+
+ def test_user_func_not_in___main___unpickle_with_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ assert defs.__file__
+ mod.__file__ = defs.__file__
+ self.assert_func_defs_other_unpickle(defs, mod)
+
+ def test_user_func_not_in___main___unpickle_without_filename(self):
+ with using___main__() as mod:
+ defs = load_defs('__main__')
+ defs.__file__ = None
+ mod.__file__ = None
+ self.assert_func_defs_other_unpickle(defs, mod, fail=True)
+
+ def test_user_func_in_module(self):
+ with temp_module('__spam__') as mod:
+ defs = load_defs(mod)
+ self.assert_func_defs_same(defs)
+
+ def test_user_func_not_in_module_with_filename(self):
+ with temp_module('__spam__') as mod:
+ defs = load_defs(mod.__name__)
+ assert defs.__file__
+ # For now, we only address this case for __main__.
+ self.assert_func_defs_not_shareable(defs)
+
+ def test_user_func_not_in_module_without_filename(self):
+ with temp_module('__spam__') as mod:
+ defs = load_defs(mod.__name__)
+ defs.__file__ = None
+ self.assert_func_defs_not_shareable(defs)
+
+ def test_user_func_module_missing_then_imported(self):
+ with missing_defs_module('__spam__', prep=True) as modname:
+ defs = load_defs(modname)
+ # For now, we only address this case for __main__.
+ self.assert_func_defs_not_shareable(defs)
+
+ def test_user_func_module_missing_not_available(self):
+ with missing_defs_module('__spam__') as modname:
+ defs = load_defs(modname)
+ self.assert_func_defs_not_shareable(defs)
+
+ def test_nested_function(self):
+ self.assert_not_shareable(defs.NESTED_FUNCTIONS)
+
+ # exceptions
+
+ def test_user_exception_normal(self):
+ self.assert_roundtrip_equal([
+ defs.MimimalError('error!'),
+ ])
+ self.assert_roundtrip_equal_not_identical([
+ defs.RichError('error!', 42),
+ ])
+
+ def test_builtin_exception(self):
+ msg = 'error!'
+ try:
+ raise Exception
+ except Exception as exc:
+ caught = exc
+ special = {
+ BaseExceptionGroup: (msg, [caught]),
+ ExceptionGroup: (msg, [caught]),
+ UnicodeError: (None, msg, None, None, None),
+ UnicodeEncodeError: ('utf-8', '', 1, 3, msg),
+ UnicodeDecodeError: ('utf-8', b'', 1, 3, msg),
+ UnicodeTranslateError: ('', 1, 3, msg),
+ }
+ exceptions = []
+ for cls in EXCEPTION_TYPES:
+ args = special.get(cls) or (msg,)
+ exceptions.append(cls(*args))
+
+ self.assert_roundtrip_equal(exceptions)
+
+
class MarshalTests(_GetXIDataTests):
MODE = 'marshal'
@@ -137,7 +893,7 @@ class MarshalTests(_GetXIDataTests):
'',
])
self.assert_not_shareable([
- object(),
+ OBJECT,
types.SimpleNamespace(),
])
@@ -208,10 +964,7 @@ class MarshalTests(_GetXIDataTests):
shareable = [
StopIteration,
]
- types = [
- *BUILTIN_TYPES,
- *OTHER_TYPES,
- ]
+ types = BUILTIN_TYPES
self.assert_not_shareable(cls for cls in types
if cls not in shareable)
self.assert_roundtrip_identical(cls for cls in types
@@ -286,10 +1039,236 @@ class MarshalTests(_GetXIDataTests):
])
+class CodeTests(_GetXIDataTests):
+
+ MODE = 'code'
+
+ def test_function_code(self):
+ self.assert_roundtrip_equal_not_identical([
+ *(f.__code__ for f in defs.FUNCTIONS),
+ *(f.__code__ for f in defs.FUNCTION_LIKE),
+ ])
+
+ def test_functions(self):
+ self.assert_not_shareable([
+ *defs.FUNCTIONS,
+ *defs.FUNCTION_LIKE,
+ ])
+
+ def test_other_objects(self):
+ self.assert_not_shareable([
+ None,
+ True,
+ False,
+ Ellipsis,
+ NotImplemented,
+ 9999,
+ 'spam',
+ b'spam',
+ (),
+ [],
+ {},
+ object(),
+ ])
+
+
+class ShareableFuncTests(_GetXIDataTests):
+
+ MODE = 'func'
+
+ def test_stateless(self):
+ self.assert_roundtrip_equal([
+ *defs.STATELESS_FUNCTIONS,
+ # Generators can be stateless too.
+ *defs.FUNCTION_LIKE,
+ ])
+
+ def test_not_stateless(self):
+ self.assert_not_shareable([
+ *(f for f in defs.FUNCTIONS
+ if f not in defs.STATELESS_FUNCTIONS),
+ ])
+
+ def test_other_objects(self):
+ self.assert_not_shareable([
+ None,
+ True,
+ False,
+ Ellipsis,
+ NotImplemented,
+ 9999,
+ 'spam',
+ b'spam',
+ (),
+ [],
+ {},
+ object(),
+ ])
+
+
+class PureShareableScriptTests(_GetXIDataTests):
+
+ MODE = 'script-pure'
+
+ VALID_SCRIPTS = [
+ '',
+ 'spam',
+ '# a comment',
+ 'print("spam")',
+ 'raise Exception("spam")',
+ """if True:
+ do_something()
+ """,
+ """if True:
+ def spam(x):
+ return x
+ class Spam:
+ def eggs(self):
+ return 42
+ x = Spam().eggs()
+ raise ValueError(spam(x))
+ """,
+ ]
+ INVALID_SCRIPTS = [
+ ' pass', # IndentationError
+ '----', # SyntaxError
+ """if True:
+ def spam():
+ # no body
+ spam()
+ """, # IndentationError
+ ]
+
+ def test_valid_str(self):
+ self.assert_roundtrip_not_equal([
+ *self.VALID_SCRIPTS,
+ ], expecttype=types.CodeType)
+
+ def test_invalid_str(self):
+ self.assert_not_shareable([
+ *self.INVALID_SCRIPTS,
+ ])
+
+ def test_valid_bytes(self):
+ self.assert_roundtrip_not_equal([
+ *(s.encode('utf8') for s in self.VALID_SCRIPTS),
+ ], expecttype=types.CodeType)
+
+ def test_invalid_bytes(self):
+ self.assert_not_shareable([
+ *(s.encode('utf8') for s in self.INVALID_SCRIPTS),
+ ])
+
+ def test_pure_script_code(self):
+ self.assert_roundtrip_equal_not_identical([
+ *(f.__code__ for f in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_impure_script_code(self):
+ self.assert_not_shareable([
+ *(f.__code__ for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_other_code(self):
+ self.assert_not_shareable([
+ *(f.__code__ for f in defs.FUNCTIONS
+ if f not in defs.SCRIPT_FUNCTIONS),
+ *(f.__code__ for f in defs.FUNCTION_LIKE),
+ ])
+
+ def test_pure_script_function(self):
+ self.assert_roundtrip_not_equal([
+ *defs.PURE_SCRIPT_FUNCTIONS,
+ ], expecttype=types.CodeType)
+
+ def test_impure_script_function(self):
+ self.assert_not_shareable([
+ *(f for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_other_function(self):
+ self.assert_not_shareable([
+ *(f for f in defs.FUNCTIONS
+ if f not in defs.SCRIPT_FUNCTIONS),
+ *defs.FUNCTION_LIKE,
+ ])
+
+ def test_other_objects(self):
+ self.assert_not_shareable([
+ None,
+ True,
+ False,
+ Ellipsis,
+ NotImplemented,
+ (),
+ [],
+ {},
+ object(),
+ ])
+
+
+class ShareableScriptTests(PureShareableScriptTests):
+
+ MODE = 'script'
+
+ def test_impure_script_code(self):
+ self.assert_roundtrip_equal_not_identical([
+ *(f.__code__ for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_impure_script_function(self):
+ self.assert_roundtrip_not_equal([
+ *(f for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ], expecttype=types.CodeType)
+
+
+class ShareableFallbackTests(_GetXIDataTests):
+
+ MODE = 'fallback'
+
+ def test_shareable(self):
+ self.assert_roundtrip_equal(SHAREABLE)
+
+ def test_not_shareable(self):
+ okay = [
+ *PICKLEABLE,
+ *defs.STATELESS_FUNCTIONS,
+ LAMBDA,
+ ]
+ ignored = [
+ *TUPLES_WITHOUT_EQUALITY,
+ OBJECT,
+ METHOD,
+ BUILTIN_METHOD,
+ METHOD_WRAPPER,
+ ]
+ with ignore_byteswarning():
+ self.assert_roundtrip_equal([
+ *(o for o in NOT_SHAREABLE
+ if o in okay and o not in ignored
+ and o is not MAPPING_PROXY_EMPTY),
+ ])
+ self.assert_roundtrip_not_equal([
+ *(o for o in NOT_SHAREABLE
+ if o in ignored and o is not MAPPING_PROXY_EMPTY),
+ ])
+ self.assert_not_shareable([
+ *(o for o in NOT_SHAREABLE if o not in okay),
+ MAPPING_PROXY_EMPTY,
+ ])
+
+
class ShareableTypeTests(_GetXIDataTests):
MODE = 'xidata'
+ def test_shareable(self):
+ self.assert_roundtrip_equal(SHAREABLE)
+
def test_singletons(self):
self.assert_roundtrip_identical([
None,
@@ -357,8 +1336,8 @@ class ShareableTypeTests(_GetXIDataTests):
def test_tuples_containing_non_shareable_types(self):
non_shareables = [
- Exception(),
- object(),
+ EXCEPTION,
+ OBJECT,
]
for s in non_shareables:
value = tuple([0, 1.0, s])
@@ -373,21 +1352,31 @@ class ShareableTypeTests(_GetXIDataTests):
# The rest are not shareable.
+ def test_not_shareable(self):
+ self.assert_not_shareable(NOT_SHAREABLE)
+
def test_object(self):
self.assert_not_shareable([
object(),
])
+ def test_code(self):
+ # types.CodeType
+ self.assert_not_shareable([
+ *(f.__code__ for f in defs.FUNCTIONS),
+ *(f.__code__ for f in defs.FUNCTION_LIKE),
+ ])
+
def test_function_object(self):
for func in defs.FUNCTIONS:
assert type(func) is types.FunctionType, func
assert type(defs.SpamOkay.okay) is types.FunctionType, func
- assert type(lambda: None) is types.LambdaType
+ assert type(LAMBDA) is types.LambdaType
self.assert_not_shareable([
*defs.FUNCTIONS,
defs.SpamOkay.okay,
- (lambda: None),
+ LAMBDA,
])
def test_builtin_function(self):
@@ -444,28 +1433,15 @@ class ShareableTypeTests(_GetXIDataTests):
])
def test_class(self):
- self.assert_not_shareable([
- defs.Spam,
- defs.SpamOkay,
- defs.SpamFull,
- defs.SubSpamFull,
- defs.SubTuple,
- defs.EggsNested,
- ])
- self.assert_not_shareable([
- defs.Spam(),
- defs.SpamOkay(),
- defs.SpamFull(1, 2, 3),
- defs.SubSpamFull(1, 2, 3),
- defs.SubTuple([1, 2, 3]),
- defs.EggsNested(),
- ])
+ self.assert_not_shareable(defs.CLASSES)
+
+ instances = []
+ for cls, args in defs.CLASSES.items():
+ instances.append(cls(*args))
+ self.assert_not_shareable(instances)
def test_builtin_type(self):
- self.assert_not_shareable([
- *BUILTIN_TYPES,
- *OTHER_TYPES,
- ])
+ self.assert_not_shareable(BUILTIN_TYPES)
def test_exception(self):
self.assert_not_shareable([
@@ -504,14 +1480,8 @@ class ShareableTypeTests(_GetXIDataTests):
""", ns, ns)
self.assert_not_shareable([
- types.MappingProxyType({}),
+ MAPPING_PROXY_EMPTY,
types.SimpleNamespace(),
- # types.CodeType
- defs.spam_minimal.__code__,
- defs.spam_full.__code__,
- defs.spam_CC.__code__,
- defs.eggs_closure_C.__code__,
- defs.ham_C_closure.__code__,
# types.CellType
types.CellType(),
# types.FrameType