aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/support
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/support')
-rw-r--r--Lib/test/support/__init__.py29
-rw-r--r--Lib/test/support/channels.py (renamed from Lib/test/support/interpreters/channels.py)4
-rw-r--r--Lib/test/support/interpreters/__init__.py258
-rw-r--r--Lib/test/support/interpreters/_crossinterp.py102
-rw-r--r--Lib/test/support/interpreters/queues.py288
5 files changed, 29 insertions, 652 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 351d832a26d..48e74adcce3 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -945,6 +945,31 @@ def check_sizeof(test, o, size):
% (type(o), result, size)
test.assertEqual(result, size, msg)
+def subTests(arg_names, arg_values, /, *, _do_cleanups=False):
+ """Run multiple subtests with different parameters.
+ """
+ single_param = False
+ if isinstance(arg_names, str):
+ arg_names = arg_names.replace(',',' ').split()
+ if len(arg_names) == 1:
+ single_param = True
+ arg_values = tuple(arg_values)
+ def decorator(func):
+ if isinstance(func, type):
+ raise TypeError('subTests() can only decorate methods, not classes')
+ @functools.wraps(func)
+ def wrapper(self, /, *args, **kwargs):
+ for values in arg_values:
+ if single_param:
+ values = (values,)
+ subtest_kwargs = dict(zip(arg_names, values))
+ with self.subTest(**subtest_kwargs):
+ func(self, *args, **kwargs, **subtest_kwargs)
+ if _do_cleanups:
+ self.doCleanups()
+ return wrapper
+ return decorator
+
#=======================================================================
# Decorator/context manager for running a code in a different locale,
# correctly resetting it afterwards.
@@ -1084,7 +1109,7 @@ def set_memlimit(limit: str) -> None:
global real_max_memuse
memlimit = _parse_memlimit(limit)
if memlimit < _2G - 1:
- raise ValueError('Memory limit {limit!r} too low to be useful')
+ raise ValueError(f'Memory limit {limit!r} too low to be useful')
real_max_memuse = memlimit
memlimit = min(memlimit, MAX_Py_ssize_t)
@@ -2358,7 +2383,7 @@ def infinite_recursion(max_depth=None):
# very deep recursion.
max_depth = 20_000
elif max_depth < 3:
- raise ValueError("max_depth must be at least 3, got {max_depth}")
+ raise ValueError(f"max_depth must be at least 3, got {max_depth}")
depth = get_recursion_depth()
depth = max(depth - 1, 1) # Ignore infinite_recursion() frame.
limit = depth + max_depth
diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/channels.py
index 1724759b75a..b2de24d9d3e 100644
--- a/Lib/test/support/interpreters/channels.py
+++ b/Lib/test/support/channels.py
@@ -2,14 +2,14 @@
import time
import _interpchannels as _channels
-from . import _crossinterp
+from concurrent.interpreters import _crossinterp
# aliases:
from _interpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError, # noqa: F401
ChannelEmptyError, ChannelNotEmptyError, # noqa: F401
)
-from ._crossinterp import (
+from concurrent.interpreters._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
)
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py
deleted file mode 100644
index e067f259364..00000000000
--- a/Lib/test/support/interpreters/__init__.py
+++ /dev/null
@@ -1,258 +0,0 @@
-"""Subinterpreters High Level Module."""
-
-import threading
-import weakref
-import _interpreters
-
-# aliases:
-from _interpreters import (
- InterpreterError, InterpreterNotFoundError, NotShareableError,
- is_shareable,
-)
-
-
-__all__ = [
- 'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
- 'Interpreter',
- 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
- 'NotShareableError',
- 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
-]
-
-
-_queuemod = None
-
-def __getattr__(name):
- if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'):
- global create_queue, Queue, QueueEmpty, QueueFull
- ns = globals()
- from .queues import (
- create as create_queue,
- Queue, QueueEmpty, QueueFull,
- )
- return ns[name]
- else:
- raise AttributeError(name)
-
-
-_EXEC_FAILURE_STR = """
-{superstr}
-
-Uncaught in the interpreter:
-
-{formatted}
-""".strip()
-
-class ExecutionFailed(InterpreterError):
- """An unhandled exception happened during execution.
-
- This is raised from Interpreter.exec() and Interpreter.call().
- """
-
- def __init__(self, excinfo):
- msg = excinfo.formatted
- if not msg:
- if excinfo.type and excinfo.msg:
- msg = f'{excinfo.type.__name__}: {excinfo.msg}'
- else:
- msg = excinfo.type.__name__ or excinfo.msg
- super().__init__(msg)
- self.excinfo = excinfo
-
- def __str__(self):
- try:
- formatted = self.excinfo.errdisplay
- except Exception:
- return super().__str__()
- else:
- return _EXEC_FAILURE_STR.format(
- superstr=super().__str__(),
- formatted=formatted,
- )
-
-
-def create():
- """Return a new (idle) Python interpreter."""
- id = _interpreters.create(reqrefs=True)
- return Interpreter(id, _ownsref=True)
-
-
-def list_all():
- """Return all existing interpreters."""
- return [Interpreter(id, _whence=whence)
- for id, whence in _interpreters.list_all(require_ready=True)]
-
-
-def get_current():
- """Return the currently running interpreter."""
- id, whence = _interpreters.get_current()
- return Interpreter(id, _whence=whence)
-
-
-def get_main():
- """Return the main interpreter."""
- id, whence = _interpreters.get_main()
- assert whence == _interpreters.WHENCE_RUNTIME, repr(whence)
- return Interpreter(id, _whence=whence)
-
-
-_known = weakref.WeakValueDictionary()
-
-class Interpreter:
- """A single Python interpreter.
-
- Attributes:
-
- "id" - the unique process-global ID number for the interpreter
- "whence" - indicates where the interpreter was created
-
- If the interpreter wasn't created by this module
- then any method that modifies the interpreter will fail,
- i.e. .close(), .prepare_main(), .exec(), and .call()
- """
-
- _WHENCE_TO_STR = {
- _interpreters.WHENCE_UNKNOWN: 'unknown',
- _interpreters.WHENCE_RUNTIME: 'runtime init',
- _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
- _interpreters.WHENCE_CAPI: 'C-API',
- _interpreters.WHENCE_XI: 'cross-interpreter C-API',
- _interpreters.WHENCE_STDLIB: '_interpreters module',
- }
-
- def __new__(cls, id, /, _whence=None, _ownsref=None):
- # There is only one instance for any given ID.
- if not isinstance(id, int):
- raise TypeError(f'id must be an int, got {id!r}')
- id = int(id)
- if _whence is None:
- if _ownsref:
- _whence = _interpreters.WHENCE_STDLIB
- else:
- _whence = _interpreters.whence(id)
- assert _whence in cls._WHENCE_TO_STR, repr(_whence)
- if _ownsref is None:
- _ownsref = (_whence == _interpreters.WHENCE_STDLIB)
- try:
- self = _known[id]
- assert hasattr(self, '_ownsref')
- except KeyError:
- self = super().__new__(cls)
- _known[id] = self
- self._id = id
- self._whence = _whence
- self._ownsref = _ownsref
- if _ownsref:
- # This may raise InterpreterNotFoundError:
- _interpreters.incref(id)
- return self
-
- def __repr__(self):
- return f'{type(self).__name__}({self.id})'
-
- def __hash__(self):
- return hash(self._id)
-
- def __del__(self):
- self._decref()
-
- # for pickling:
- def __getnewargs__(self):
- return (self._id,)
-
- # for pickling:
- def __getstate__(self):
- return None
-
- def _decref(self):
- if not self._ownsref:
- return
- self._ownsref = False
- try:
- _interpreters.decref(self._id)
- except InterpreterNotFoundError:
- pass
-
- @property
- def id(self):
- return self._id
-
- @property
- def whence(self):
- return self._WHENCE_TO_STR[self._whence]
-
- def is_running(self):
- """Return whether or not the identified interpreter is running."""
- return _interpreters.is_running(self._id)
-
- # Everything past here is available only to interpreters created by
- # interpreters.create().
-
- def close(self):
- """Finalize and destroy the interpreter.
-
- Attempting to destroy the current interpreter results
- in an InterpreterError.
- """
- return _interpreters.destroy(self._id, restrict=True)
-
- def prepare_main(self, ns=None, /, **kwargs):
- """Bind the given values into the interpreter's __main__.
-
- The values must be shareable.
- """
- ns = dict(ns, **kwargs) if ns is not None else kwargs
- _interpreters.set___main___attrs(self._id, ns, restrict=True)
-
- def exec(self, code, /):
- """Run the given source code in the interpreter.
-
- This is essentially the same as calling the builtin "exec"
- with this interpreter, using the __dict__ of its __main__
- module as both globals and locals.
-
- There is no return value.
-
- If the code raises an unhandled exception then an ExecutionFailed
- exception is raised, which summarizes the unhandled exception.
- The actual exception is discarded because objects cannot be
- shared between interpreters.
-
- This blocks the current Python thread until done. During
- that time, the previous interpreter is allowed to run
- in other threads.
- """
- excinfo = _interpreters.exec(self._id, code, restrict=True)
- if excinfo is not None:
- raise ExecutionFailed(excinfo)
-
- def call(self, callable, /):
- """Call the object in the interpreter with given args/kwargs.
-
- Only functions that take no arguments and have no closure
- are supported.
-
- The return value is discarded.
-
- If the callable raises an exception then the error display
- (including full traceback) is send back between the interpreters
- and an ExecutionFailed exception is raised, much like what
- happens with Interpreter.exec().
- """
- # XXX Support args and kwargs.
- # XXX Support arbitrary callables.
- # XXX Support returning the return value (e.g. via pickle).
- excinfo = _interpreters.call(self._id, callable, restrict=True)
- if excinfo is not None:
- raise ExecutionFailed(excinfo)
-
- def call_in_thread(self, callable, /):
- """Return a new thread that calls the object in the interpreter.
-
- The return value and any raised exception are discarded.
- """
- def task():
- self.call(callable)
- t = threading.Thread(target=task)
- t.start()
- return t
diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py
deleted file mode 100644
index 544e197ba4c..00000000000
--- a/Lib/test/support/interpreters/_crossinterp.py
+++ /dev/null
@@ -1,102 +0,0 @@
-"""Common code between queues and channels."""
-
-
-class ItemInterpreterDestroyed(Exception):
- """Raised when trying to get an item whose interpreter was destroyed."""
-
-
-class classonly:
- """A non-data descriptor that makes a value only visible on the class.
-
- This is like the "classmethod" builtin, but does not show up on
- instances of the class. It may be used as a decorator.
- """
-
- def __init__(self, value):
- self.value = value
- self.getter = classmethod(value).__get__
- self.name = None
-
- def __set_name__(self, cls, name):
- if self.name is not None:
- raise TypeError('already used')
- self.name = name
-
- def __get__(self, obj, cls):
- if obj is not None:
- raise AttributeError(self.name)
- # called on the class
- return self.getter(None, cls)
-
-
-class UnboundItem:
- """Represents a cross-interpreter item no longer bound to an interpreter.
-
- An item is unbound when the interpreter that added it to the
- cross-interpreter container is destroyed.
- """
-
- __slots__ = ()
-
- @classonly
- def singleton(cls, kind, module, name='UNBOUND'):
- doc = cls.__doc__.replace('cross-interpreter container', kind)
- doc = doc.replace('cross-interpreter', kind)
- subclass = type(
- f'Unbound{kind.capitalize()}Item',
- (cls,),
- dict(
- _MODULE=module,
- _NAME=name,
- __doc__=doc,
- ),
- )
- return object.__new__(subclass)
-
- _MODULE = __name__
- _NAME = 'UNBOUND'
-
- def __new__(cls):
- raise Exception(f'use {cls._MODULE}.{cls._NAME}')
-
- def __repr__(self):
- return f'{self._MODULE}.{self._NAME}'
-# return f'interpreters.queues.UNBOUND'
-
-
-UNBOUND = object.__new__(UnboundItem)
-UNBOUND_ERROR = object()
-UNBOUND_REMOVE = object()
-
-_UNBOUND_CONSTANT_TO_FLAG = {
- UNBOUND_REMOVE: 1,
- UNBOUND_ERROR: 2,
- UNBOUND: 3,
-}
-_UNBOUND_FLAG_TO_CONSTANT = {v: k
- for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
-
-
-def serialize_unbound(unbound):
- op = unbound
- try:
- flag = _UNBOUND_CONSTANT_TO_FLAG[op]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
- return flag,
-
-
-def resolve_unbound(flag, exctype_destroyed):
- try:
- op = _UNBOUND_FLAG_TO_CONSTANT[flag]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
- if op is UNBOUND_REMOVE:
- # "remove" not possible here
- raise NotImplementedError
- elif op is UNBOUND_ERROR:
- raise exctype_destroyed("item's original interpreter destroyed")
- elif op is UNBOUND:
- return UNBOUND
- else:
- raise NotImplementedError(repr(op))
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
deleted file mode 100644
index 99987f2f692..00000000000
--- a/Lib/test/support/interpreters/queues.py
+++ /dev/null
@@ -1,288 +0,0 @@
-"""Cross-interpreter Queues High Level Module."""
-
-import queue
-import time
-import weakref
-import _interpqueues as _queues
-from . import _crossinterp
-
-# aliases:
-from _interpqueues import (
- QueueError, QueueNotFoundError,
-)
-from ._crossinterp import (
- UNBOUND_ERROR, UNBOUND_REMOVE,
-)
-
-__all__ = [
- 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
- 'create', 'list_all',
- 'Queue',
- 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
- 'ItemInterpreterDestroyed',
-]
-
-
-class QueueEmpty(QueueError, queue.Empty):
- """Raised from get_nowait() when the queue is empty.
-
- It is also raised from get() if it times out.
- """
-
-
-class QueueFull(QueueError, queue.Full):
- """Raised from put_nowait() when the queue is full.
-
- It is also raised from put() if it times out.
- """
-
-
-class ItemInterpreterDestroyed(QueueError,
- _crossinterp.ItemInterpreterDestroyed):
- """Raised from get() and get_nowait()."""
-
-
-_SHARED_ONLY = 0
-_PICKLED = 1
-
-
-UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
-
-
-def _serialize_unbound(unbound):
- if unbound is UNBOUND:
- unbound = _crossinterp.UNBOUND
- return _crossinterp.serialize_unbound(unbound)
-
-
-def _resolve_unbound(flag):
- resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
- if resolved is _crossinterp.UNBOUND:
- resolved = UNBOUND
- return resolved
-
-
-def create(maxsize=0, *, unbounditems=UNBOUND):
- """Return a new cross-interpreter queue.
-
- The queue may be used to pass data safely between interpreters.
-
- "unbounditems" sets the default for Queue.put(); see that method for
- supported values. The default value is UNBOUND, which replaces
- the unbound item.
- """
- unbound = _serialize_unbound(unbounditems)
- unboundop, = unbound
- qid = _queues.create(maxsize, unboundop, -1)
- self = Queue(qid)
- self._set_unbound(unboundop, unbounditems)
- return self
-
-
-def list_all():
- """Return a list of all open queues."""
- queues = []
- for qid, unboundop, _ in _queues.list_all():
- self = Queue(qid)
- if not hasattr(self, '_unbound'):
- self._set_unbound(unboundop)
- else:
- assert self._unbound[0] == unboundop
- queues.append(self)
- return queues
-
-
-_known_queues = weakref.WeakValueDictionary()
-
-class Queue:
- """A cross-interpreter queue."""
-
- def __new__(cls, id, /):
- # There is only one instance for any given ID.
- if isinstance(id, int):
- id = int(id)
- else:
- raise TypeError(f'id must be an int, got {id!r}')
- try:
- self = _known_queues[id]
- except KeyError:
- self = super().__new__(cls)
- self._id = id
- _known_queues[id] = self
- _queues.bind(id)
- return self
-
- def __del__(self):
- try:
- _queues.release(self._id)
- except QueueNotFoundError:
- pass
- try:
- del _known_queues[self._id]
- except KeyError:
- pass
-
- def __repr__(self):
- return f'{type(self).__name__}({self.id})'
-
- def __hash__(self):
- return hash(self._id)
-
- # for pickling:
- def __getnewargs__(self):
- return (self._id,)
-
- # for pickling:
- def __getstate__(self):
- return None
-
- def _set_unbound(self, op, items=None):
- assert not hasattr(self, '_unbound')
- if items is None:
- items = _resolve_unbound(op)
- unbound = (op, items)
- self._unbound = unbound
- return unbound
-
- @property
- def id(self):
- return self._id
-
- @property
- def unbounditems(self):
- try:
- _, items = self._unbound
- except AttributeError:
- op, _ = _queues.get_queue_defaults(self._id)
- _, items = self._set_unbound(op)
- return items
-
- @property
- def maxsize(self):
- try:
- return self._maxsize
- except AttributeError:
- self._maxsize = _queues.get_maxsize(self._id)
- return self._maxsize
-
- def empty(self):
- return self.qsize() == 0
-
- def full(self):
- return _queues.is_full(self._id)
-
- def qsize(self):
- return _queues.get_count(self._id)
-
- def put(self, obj, timeout=None, *,
- unbounditems=None,
- _delay=10 / 1000, # 10 milliseconds
- ):
- """Add the object to the queue.
-
- This blocks while the queue is full.
-
- For most objects, the object received through Queue.get() will
- be a new one, equivalent to the original and not sharing any
- actual underlying data. The notable exceptions include
- cross-interpreter types (like Queue) and memoryview, where the
- underlying data is actually shared. Furthermore, some types
- can be sent through a queue more efficiently than others. This
- group includes various immutable types like int, str, bytes, and
- tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable().
-
- "unbounditems" controls the behavior of Queue.get() for the given
- object if the current interpreter (calling put()) is later
- destroyed.
-
- If "unbounditems" is None (the default) then it uses the
- queue's default, set with create_queue(),
- which is usually UNBOUND.
-
- If "unbounditems" is UNBOUND_ERROR then get() will raise an
- ItemInterpreterDestroyed exception if the original interpreter
- has been destroyed. This does not otherwise affect the queue;
- the next call to put() will work like normal, returning the next
- item in the queue.
-
- If "unbounditems" is UNBOUND_REMOVE then the item will be removed
- from the queue as soon as the original interpreter is destroyed.
- Be aware that this will introduce an imbalance between put()
- and get() calls.
-
- If "unbounditems" is UNBOUND then it is returned by get() in place
- of the unbound item.
- """
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- if timeout is not None:
- timeout = int(timeout)
- if timeout < 0:
- raise ValueError(f'timeout value must be non-negative')
- end = time.time() + timeout
- while True:
- try:
- _queues.put(self._id, obj, unboundop)
- except QueueFull as exc:
- if timeout is not None and time.time() >= end:
- raise # re-raise
- time.sleep(_delay)
- else:
- break
-
- def put_nowait(self, obj, *, unbounditems=None):
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- _queues.put(self._id, obj, unboundop)
-
- def get(self, timeout=None, *,
- _delay=10 / 1000, # 10 milliseconds
- ):
- """Return the next object from the queue.
-
- This blocks while the queue is empty.
-
- If the next item's original interpreter has been destroyed
- then the "next object" is determined by the value of the
- "unbounditems" argument to put().
- """
- if timeout is not None:
- timeout = int(timeout)
- if timeout < 0:
- raise ValueError(f'timeout value must be non-negative')
- end = time.time() + timeout
- while True:
- try:
- obj, unboundop = _queues.get(self._id)
- except QueueEmpty as exc:
- if timeout is not None and time.time() >= end:
- raise # re-raise
- time.sleep(_delay)
- else:
- break
- if unboundop is not None:
- assert obj is None, repr(obj)
- return _resolve_unbound(unboundop)
- return obj
-
- def get_nowait(self):
- """Return the next object from the channel.
-
- If the queue is empty then raise QueueEmpty. Otherwise this
- is the same as get().
- """
- try:
- obj, unboundop = _queues.get(self._id)
- except QueueEmpty as exc:
- raise # re-raise
- if unboundop is not None:
- assert obj is None, repr(obj)
- return _resolve_unbound(unboundop)
- return obj
-
-
-_queues._register_heap_types(Queue, QueueEmpty, QueueFull)