aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--Lib/asyncio/unix_events.py325
-rw-r--r--Lib/test/test_asyncio/test_events.py9
-rw-r--r--Lib/test/test_asyncio/test_streams.py46
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py29
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py636
5 files changed, 4 insertions, 1041 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 41ccf1b78fb..9a2e300259e 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -28,9 +28,9 @@ from .log import logger
__all__ = (
'SelectorEventLoop',
- 'AbstractChildWatcher', 'SafeChildWatcher',
- 'FastChildWatcher', 'PidfdChildWatcher',
- 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
+ 'AbstractChildWatcher',
+ 'PidfdChildWatcher',
+ 'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
'EventLoop',
)
@@ -1062,325 +1062,6 @@ class BaseChildWatcher(AbstractChildWatcher):
})
-class SafeChildWatcher(BaseChildWatcher):
- """'Safe' child watcher implementation.
-
- This implementation avoids disrupting other code spawning processes by
- polling explicitly each process in the SIGCHLD handler instead of calling
- os.waitpid(-1).
-
- This is a safe solution but it has a significant overhead when handling a
- big number of children (O(n) each time SIGCHLD is raised)
- """
-
- def __init__(self):
- super().__init__()
- warnings._deprecated("SafeChildWatcher",
- "{name!r} is deprecated as of Python 3.12 and will be "
- "removed in Python {remove}.",
- remove=(3, 14))
-
- def close(self):
- self._callbacks.clear()
- super().close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, a, b, c):
- pass
-
- def add_child_handler(self, pid, callback, *args):
- self._callbacks[pid] = (callback, args)
-
- # Prevent a race condition in case the child is already terminated.
- self._do_waitpid(pid)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def _do_waitpid_all(self):
-
- for pid in list(self._callbacks):
- self._do_waitpid(pid)
-
- def _do_waitpid(self, expected_pid):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, os.WNOHANG)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- else:
- if pid == 0:
- # The child process is still alive.
- return
-
- returncode = waitstatus_to_exitcode(status)
- if self._loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
-
- try:
- callback, args = self._callbacks.pop(pid)
- except KeyError: # pragma: no cover
- # May happen if .remove_child_handler() is called
- # after os.waitpid() returns.
- if self._loop.get_debug():
- logger.warning("Child watcher got an unexpected pid: %r",
- pid, exc_info=True)
- else:
- callback(pid, returncode, *args)
-
-
-class FastChildWatcher(BaseChildWatcher):
- """'Fast' child watcher implementation.
-
- This implementation reaps every terminated processes by calling
- os.waitpid(-1) directly, possibly breaking other code spawning processes
- and waiting for their termination.
-
- There is no noticeable overhead when handling a big number of children
- (O(1) each time a child terminates).
- """
- def __init__(self):
- super().__init__()
- self._lock = threading.Lock()
- self._zombies = {}
- self._forks = 0
- warnings._deprecated("FastChildWatcher",
- "{name!r} is deprecated as of Python 3.12 and will be "
- "removed in Python {remove}.",
- remove=(3, 14))
-
- def close(self):
- self._callbacks.clear()
- self._zombies.clear()
- super().close()
-
- def __enter__(self):
- with self._lock:
- self._forks += 1
-
- return self
-
- def __exit__(self, a, b, c):
- with self._lock:
- self._forks -= 1
-
- if self._forks or not self._zombies:
- return
-
- collateral_victims = str(self._zombies)
- self._zombies.clear()
-
- logger.warning(
- "Caught subprocesses termination from unknown pids: %s",
- collateral_victims)
-
- def add_child_handler(self, pid, callback, *args):
- assert self._forks, "Must use the context manager"
-
- with self._lock:
- try:
- returncode = self._zombies.pop(pid)
- except KeyError:
- # The child is running.
- self._callbacks[pid] = callback, args
- return
-
- # The child is dead already. We can fire the callback.
- callback(pid, returncode, *args)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def _do_waitpid_all(self):
- # Because of signal coalescing, we must keep calling waitpid() as
- # long as we're able to reap a child.
- while True:
- try:
- pid, status = os.waitpid(-1, os.WNOHANG)
- except ChildProcessError:
- # No more child processes exist.
- return
- else:
- if pid == 0:
- # A child process is still alive.
- return
-
- returncode = waitstatus_to_exitcode(status)
-
- with self._lock:
- try:
- callback, args = self._callbacks.pop(pid)
- except KeyError:
- # unknown child
- if self._forks:
- # It may not be registered yet.
- self._zombies[pid] = returncode
- if self._loop.get_debug():
- logger.debug('unknown process %s exited '
- 'with returncode %s',
- pid, returncode)
- continue
- callback = None
- else:
- if self._loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- pid, returncode)
-
- if callback is None:
- logger.warning(
- "Caught subprocess termination from unknown pid: "
- "%d -> %d", pid, returncode)
- else:
- callback(pid, returncode, *args)
-
-
-class MultiLoopChildWatcher(AbstractChildWatcher):
- """A watcher that doesn't require running loop in the main thread.
-
- This implementation registers a SIGCHLD signal handler on
- instantiation (which may conflict with other code that
- install own handler for this signal).
-
- The solution is safe but it has a significant overhead when
- handling a big number of processes (*O(n)* each time a
- SIGCHLD is received).
- """
-
- # Implementation note:
- # The class keeps compatibility with AbstractChildWatcher ABC
- # To achieve this it has empty attach_loop() method
- # and doesn't accept explicit loop argument
- # for add_child_handler()/remove_child_handler()
- # but retrieves the current loop by get_running_loop()
-
- def __init__(self):
- self._callbacks = {}
- self._saved_sighandler = None
- warnings._deprecated("MultiLoopChildWatcher",
- "{name!r} is deprecated as of Python 3.12 and will be "
- "removed in Python {remove}.",
- remove=(3, 14))
-
- def is_active(self):
- return self._saved_sighandler is not None
-
- def close(self):
- self._callbacks.clear()
- if self._saved_sighandler is None:
- return
-
- handler = signal.getsignal(signal.SIGCHLD)
- if handler != self._sig_chld:
- logger.warning("SIGCHLD handler was changed by outside code")
- else:
- signal.signal(signal.SIGCHLD, self._saved_sighandler)
- self._saved_sighandler = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
- def add_child_handler(self, pid, callback, *args):
- loop = events.get_running_loop()
- self._callbacks[pid] = (loop, callback, args)
-
- # Prevent a race condition in case the child is already terminated.
- self._do_waitpid(pid)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def attach_loop(self, loop):
- # Don't save the loop but initialize itself if called first time
- # The reason to do it here is that attach_loop() is called from
- # unix policy only for the main thread.
- # Main thread is required for subscription on SIGCHLD signal
- if self._saved_sighandler is not None:
- return
-
- self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
- if self._saved_sighandler is None:
- logger.warning("Previous SIGCHLD handler was set by non-Python code, "
- "restore to default handler on watcher close.")
- self._saved_sighandler = signal.SIG_DFL
-
- # Set SA_RESTART to limit EINTR occurrences.
- signal.siginterrupt(signal.SIGCHLD, False)
-
- def _do_waitpid_all(self):
- for pid in list(self._callbacks):
- self._do_waitpid(pid)
-
- def _do_waitpid(self, expected_pid):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, os.WNOHANG)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- debug_log = False
- else:
- if pid == 0:
- # The child process is still alive.
- return
-
- returncode = waitstatus_to_exitcode(status)
- debug_log = True
- try:
- loop, callback, args = self._callbacks.pop(pid)
- except KeyError: # pragma: no cover
- # May happen if .remove_child_handler() is called
- # after os.waitpid() returns.
- logger.warning("Child watcher got an unexpected pid: %r",
- pid, exc_info=True)
- else:
- if loop.is_closed():
- logger.warning("Loop %r that handles pid %r is closed", loop, pid)
- else:
- if debug_log and loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
- loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
- def _sig_chld(self, signum, frame):
- try:
- self._do_waitpid_all()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
- logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
-
-
class ThreadedChildWatcher(AbstractChildWatcher):
"""Threaded child watcher implementation.
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 88c85a36b5d..06eb4d3841a 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -2214,7 +2214,7 @@ else:
super().setUp()
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
+ watcher = asyncio.ThreadedChildWatcher()
watcher.attach_loop(self.loop)
asyncio.set_child_watcher(watcher)
@@ -2833,13 +2833,6 @@ class GetEventLoopTestsMixin:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
- if sys.platform != 'win32':
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- watcher.attach_loop(self.loop)
- asyncio.set_child_watcher(watcher)
-
def tearDown(self):
try:
if sys.platform != 'win32':
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index ae943f39869..d32b7ff2518 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -822,52 +822,6 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")
- @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
- @requires_subprocess()
- def test_read_all_from_pipe_reader(self):
- # See asyncio issue 168. This test is derived from the example
- # subprocess_attach_read_pipe.py, but we configure the
- # StreamReader's limit so that twice it is less than the size
- # of the data writer. Also we must explicitly attach a child
- # watcher to the event loop.
-
- code = """\
-import os, sys
-fd = int(sys.argv[1])
-os.write(fd, b'data')
-os.close(fd)
-"""
- rfd, wfd = os.pipe()
- args = [sys.executable, '-c', code, str(wfd)]
-
- pipe = open(rfd, 'rb', 0)
- reader = asyncio.StreamReader(loop=self.loop, limit=1)
- protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
- transport, _ = self.loop.run_until_complete(
- self.loop.connect_read_pipe(lambda: protocol, pipe))
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- watcher.attach_loop(self.loop)
- try:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(watcher)
- create = asyncio.create_subprocess_exec(
- *args,
- pass_fds={wfd},
- )
- proc = self.loop.run_until_complete(create)
- self.loop.run_until_complete(proc.wait())
- finally:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(None)
-
- os.close(wfd)
- data = self.loop.run_until_complete(reader.read(-1))
- self.assertEqual(data, b'data')
-
def test_streamreader_constructor_without_loop(self):
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
asyncio.StreamReader()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index cf1a1985338..27ae766a194 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -631,15 +631,6 @@ class SubprocessMixin:
# the transport was not notified yet
self.assertFalse(killed)
- # Unlike SafeChildWatcher, FastChildWatcher does not pop the
- # callbacks if waitpid() is called elsewhere. Let's clear them
- # manually to avoid a warning when the watcher is detached.
- if (sys.platform != 'win32' and
- isinstance(self, SubprocessFastWatcherTests)):
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.get_child_watcher()._callbacks.clear()
-
async def _test_popen_error(self, stdin):
if sys.platform == 'win32':
target = 'asyncio.windows_utils.Popen'
@@ -908,26 +899,6 @@ if sys.platform != 'win32':
def _get_watcher(self):
return unix_events.ThreadedChildWatcher()
- class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
- test_utils.TestCase):
-
- def _get_watcher(self):
- with self.assertWarns(DeprecationWarning):
- return unix_events.SafeChildWatcher()
-
- class MultiLoopChildWatcherTests(test_utils.TestCase):
-
- def test_warns(self):
- with self.assertWarns(DeprecationWarning):
- unix_events.MultiLoopChildWatcher()
-
- class SubprocessFastWatcherTests(SubprocessWatcherMixin,
- test_utils.TestCase):
-
- def _get_watcher(self):
- with self.assertWarns(DeprecationWarning):
- return unix_events.FastChildWatcher()
-
@unittest.skipUnless(
unix_events.can_use_pidfd(),
"operating system does not support pidfds",
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 9452213c685..42fb54a4c3a 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1138,578 +1138,6 @@ class AbstractChildWatcherTests(unittest.TestCase):
NotImplementedError, watcher.__exit__, f, f, f)
-class BaseChildWatcherTests(unittest.TestCase):
-
- def test_not_implemented(self):
- f = mock.Mock()
- watcher = unix_events.BaseChildWatcher()
- self.assertRaises(
- NotImplementedError, watcher._do_waitpid, f)
-
-
-class ChildWatcherTestsMixin:
-
- ignore_warnings = mock.patch.object(log.logger, "warning")
-
- def setUp(self):
- super().setUp()
- self.loop = self.new_test_loop()
- self.running = False
- self.zombies = {}
-
- with mock.patch.object(
- self.loop, "add_signal_handler") as self.m_add_signal_handler:
- self.watcher = self.create_watcher()
- self.watcher.attach_loop(self.loop)
-
- def waitpid(self, pid, flags):
- if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
- self.assertGreater(pid, 0)
- try:
- if pid < 0:
- return self.zombies.popitem()
- else:
- return pid, self.zombies.pop(pid)
- except KeyError:
- pass
- if self.running:
- return 0, 0
- else:
- raise ChildProcessError()
-
- def add_zombie(self, pid, status):
- self.zombies[pid] = status
-
- def waitstatus_to_exitcode(self, status):
- if status > 32768:
- return status - 32768
- elif 32700 < status < 32768:
- return status - 32768
- else:
- return status
-
- def test_create_watcher(self):
- self.m_add_signal_handler.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
-
- def waitpid_mocks(func):
- def wrapped_func(self):
- def patch(target, wrapper):
- return mock.patch(target, wraps=wrapper,
- new_callable=mock.Mock)
-
- with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \
- patch('os.waitpid', self.waitpid) as m_waitpid:
- func(self, m_waitpid)
- return wrapped_func
-
- @waitpid_mocks
- def test_sigchld(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(42, callback, 9, 10, 14)
-
- self.assertFalse(callback.called)
-
- # child is running
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- # child terminates (returncode 12)
- self.running = False
- self.add_zombie(42, EXITCODE(12))
- self.watcher._sig_chld()
-
- callback.assert_called_once_with(42, 12, 9, 10, 14)
-
- callback.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(42, EXITCODE(13))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- # sigchld called again
- self.zombies.clear()
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_sigchld_two_children(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- # register child 1
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(43, callback1, 7, 8)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # register child 2
- with self.watcher:
- self.watcher.add_child_handler(44, callback2, 147, 18)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # children are running
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child 1 terminates (signal 3)
- self.add_zombie(43, SIGNAL(3))
- self.watcher._sig_chld()
-
- callback1.assert_called_once_with(43, -3, 7, 8)
- self.assertFalse(callback2.called)
-
- callback1.reset_mock()
-
- # child 2 still running
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child 2 terminates (code 108)
- self.add_zombie(44, EXITCODE(108))
- self.running = False
- self.watcher._sig_chld()
-
- callback2.assert_called_once_with(44, 108, 147, 18)
- self.assertFalse(callback1.called)
-
- callback2.reset_mock()
-
- # ensure that the children are effectively reaped
- self.add_zombie(43, EXITCODE(14))
- self.add_zombie(44, EXITCODE(15))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # sigchld called again
- self.zombies.clear()
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_sigchld_two_children_terminating_together(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- # register child 1
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(45, callback1, 17, 8)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # register child 2
- with self.watcher:
- self.watcher.add_child_handler(46, callback2, 1147, 18)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # children are running
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child 1 terminates (code 78)
- # child 2 terminates (signal 5)
- self.add_zombie(45, EXITCODE(78))
- self.add_zombie(46, SIGNAL(5))
- self.running = False
- self.watcher._sig_chld()
-
- callback1.assert_called_once_with(45, 78, 17, 8)
- callback2.assert_called_once_with(46, -5, 1147, 18)
-
- callback1.reset_mock()
- callback2.reset_mock()
-
- # ensure that the children are effectively reaped
- self.add_zombie(45, EXITCODE(14))
- self.add_zombie(46, EXITCODE(15))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_sigchld_race_condition(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- # child terminates before being registered
- self.add_zombie(50, EXITCODE(4))
- self.watcher._sig_chld()
-
- self.watcher.add_child_handler(50, callback, 1, 12)
-
- callback.assert_called_once_with(50, 4, 1, 12)
- callback.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(50, SIGNAL(1))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_sigchld_replace_handler(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(51, callback1, 19)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # register the same child again
- with self.watcher:
- self.watcher.add_child_handler(51, callback2, 21)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child terminates (signal 8)
- self.running = False
- self.add_zombie(51, SIGNAL(8))
- self.watcher._sig_chld()
-
- callback2.assert_called_once_with(51, -8, 21)
- self.assertFalse(callback1.called)
-
- callback2.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(51, EXITCODE(13))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_sigchld_remove_handler(self, m_waitpid):
- callback = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(52, callback, 1984)
-
- self.assertFalse(callback.called)
-
- # unregister the child
- self.watcher.remove_child_handler(52)
-
- self.assertFalse(callback.called)
-
- # child terminates (code 99)
- self.running = False
- self.add_zombie(52, EXITCODE(99))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_sigchld_unknown_status(self, m_waitpid):
- callback = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(53, callback, -19)
-
- self.assertFalse(callback.called)
-
- # terminate with unknown status
- self.zombies[53] = 1178
- self.running = False
- self.watcher._sig_chld()
-
- callback.assert_called_once_with(53, 1178, -19)
-
- callback.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(53, EXITCODE(101))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_remove_child_handler(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
- callback3 = mock.Mock()
-
- # register children
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(54, callback1, 1)
- self.watcher.add_child_handler(55, callback2, 2)
- self.watcher.add_child_handler(56, callback3, 3)
-
- # remove child handler 1
- self.assertTrue(self.watcher.remove_child_handler(54))
-
- # remove child handler 2 multiple times
- self.assertTrue(self.watcher.remove_child_handler(55))
- self.assertFalse(self.watcher.remove_child_handler(55))
- self.assertFalse(self.watcher.remove_child_handler(55))
-
- # all children terminate
- self.add_zombie(54, EXITCODE(0))
- self.add_zombie(55, EXITCODE(1))
- self.add_zombie(56, EXITCODE(2))
- self.running = False
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
- callback3.assert_called_once_with(56, 2, 3)
-
- @waitpid_mocks
- def test_sigchld_unhandled_exception(self, m_waitpid):
- callback = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(57, callback)
-
- # raise an exception
- m_waitpid.side_effect = ValueError
-
- with mock.patch.object(log.logger,
- 'error') as m_error:
-
- self.assertEqual(self.watcher._sig_chld(), None)
- self.assertTrue(m_error.called)
-
- @waitpid_mocks
- def test_sigchld_child_reaped_elsewhere(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(58, callback)
-
- self.assertFalse(callback.called)
-
- # child terminates
- self.running = False
- self.add_zombie(58, EXITCODE(4))
-
- # waitpid is called elsewhere
- os.waitpid(58, os.WNOHANG)
-
- m_waitpid.reset_mock()
-
- # sigchld
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- if isinstance(self.watcher, asyncio.FastChildWatcher):
- # here the FastChildWatcher enters a deadlock
- # (there is no way to prevent it)
- self.assertFalse(callback.called)
- else:
- callback.assert_called_once_with(58, 255)
-
- @waitpid_mocks
- def test_sigchld_unknown_pid_during_registration(self, m_waitpid):
- # register two children
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- with self.ignore_warnings, self.watcher:
- self.running = True
- # child 1 terminates
- self.add_zombie(591, EXITCODE(7))
- # an unknown child terminates
- self.add_zombie(593, EXITCODE(17))
-
- self.watcher._sig_chld()
-
- self.watcher.add_child_handler(591, callback1)
- self.watcher.add_child_handler(592, callback2)
-
- callback1.assert_called_once_with(591, 7)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_set_loop(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(60, callback)
-
- # attach a new loop
- old_loop = self.loop
- self.loop = self.new_test_loop()
- patch = mock.patch.object
-
- with patch(old_loop, "remove_signal_handler") as m_old_remove, \
- patch(self.loop, "add_signal_handler") as m_new_add:
-
- self.watcher.attach_loop(self.loop)
-
- m_old_remove.assert_called_once_with(
- signal.SIGCHLD)
- m_new_add.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
-
- # child terminates
- self.running = False
- self.add_zombie(60, EXITCODE(9))
- self.watcher._sig_chld()
-
- callback.assert_called_once_with(60, 9)
-
- @waitpid_mocks
- def test_set_loop_race_condition(self, m_waitpid):
- # register 3 children
- callback1 = mock.Mock()
- callback2 = mock.Mock()
- callback3 = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(61, callback1)
- self.watcher.add_child_handler(62, callback2)
- self.watcher.add_child_handler(622, callback3)
-
- # detach the loop
- old_loop = self.loop
- self.loop = None
-
- with mock.patch.object(
- old_loop, "remove_signal_handler") as m_remove_signal_handler:
-
- with self.assertWarnsRegex(
- RuntimeWarning, 'A loop is being detached'):
- self.watcher.attach_loop(None)
-
- m_remove_signal_handler.assert_called_once_with(
- signal.SIGCHLD)
-
- # child 1 & 2 terminate
- self.add_zombie(61, EXITCODE(11))
- self.add_zombie(62, SIGNAL(5))
-
- # SIGCHLD was not caught
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
- self.assertFalse(callback3.called)
-
- # attach a new loop
- self.loop = self.new_test_loop()
-
- with mock.patch.object(
- self.loop, "add_signal_handler") as m_add_signal_handler:
-
- self.watcher.attach_loop(self.loop)
-
- m_add_signal_handler.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
- callback1.assert_called_once_with(61, 11) # race condition!
- callback2.assert_called_once_with(62, -5) # race condition!
- self.assertFalse(callback3.called)
-
- callback1.reset_mock()
- callback2.reset_mock()
-
- # child 3 terminates
- self.running = False
- self.add_zombie(622, EXITCODE(19))
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
- callback3.assert_called_once_with(622, 19)
-
- @waitpid_mocks
- def test_close(self, m_waitpid):
- # register two children
- callback1 = mock.Mock()
-
- with self.watcher:
- self.running = True
- # child 1 terminates
- self.add_zombie(63, EXITCODE(9))
- # other child terminates
- self.add_zombie(65, EXITCODE(18))
- self.watcher._sig_chld()
-
- self.watcher.add_child_handler(63, callback1)
- self.watcher.add_child_handler(64, callback1)
-
- self.assertEqual(len(self.watcher._callbacks), 1)
- if isinstance(self.watcher, asyncio.FastChildWatcher):
- self.assertEqual(len(self.watcher._zombies), 1)
-
- with mock.patch.object(
- self.loop,
- "remove_signal_handler") as m_remove_signal_handler:
-
- self.watcher.close()
-
- m_remove_signal_handler.assert_called_once_with(
- signal.SIGCHLD)
- self.assertFalse(self.watcher._callbacks)
- if isinstance(self.watcher, asyncio.FastChildWatcher):
- self.assertFalse(self.watcher._zombies)
-
-
-class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
- def create_watcher(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- return asyncio.SafeChildWatcher()
-
-
-class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
- def create_watcher(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- return asyncio.FastChildWatcher()
-
-
class PolicyTests(unittest.TestCase):
def create_policy(self):
@@ -1739,70 +1167,6 @@ class PolicyTests(unittest.TestCase):
with self.assertWarns(DeprecationWarning):
self.assertIs(watcher, policy.get_child_watcher())
- def test_get_child_watcher_after_set(self):
- policy = self.create_policy()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- watcher = asyncio.FastChildWatcher()
- policy.set_child_watcher(watcher)
-
- self.assertIs(policy._watcher, watcher)
- with self.assertWarns(DeprecationWarning):
- self.assertIs(watcher, policy.get_child_watcher())
-
- def test_get_child_watcher_thread(self):
-
- def f():
- policy.set_event_loop(policy.new_event_loop())
-
- self.assertIsInstance(policy.get_event_loop(),
- asyncio.AbstractEventLoop)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- watcher = policy.get_child_watcher()
-
- self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
- self.assertIsNone(watcher._loop)
-
- policy.get_event_loop().close()
-
- policy = self.create_policy()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- policy.set_child_watcher(asyncio.SafeChildWatcher())
-
- th = threading.Thread(target=f)
- th.start()
- th.join()
-
- def test_child_watcher_replace_mainloop_existing(self):
- policy = self.create_policy()
- loop = policy.new_event_loop()
- policy.set_event_loop(loop)
-
- # Explicitly setup SafeChildWatcher,
- # default ThreadedChildWatcher has no _loop property
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- policy.set_child_watcher(watcher)
- watcher.attach_loop(loop)
-
- self.assertIs(watcher._loop, loop)
-
- new_loop = policy.new_event_loop()
- policy.set_event_loop(new_loop)
-
- self.assertIs(watcher._loop, new_loop)
-
- policy.set_event_loop(None)
-
- self.assertIs(watcher._loop, None)
-
- loop.close()
- new_loop.close()
-
-
class TestFunctional(unittest.TestCase):
def setUp(self):