diff options
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 64 | ||||
-rw-r--r-- | Lib/asyncio/base_subprocess.py | 3 | ||||
-rw-r--r-- | Lib/asyncio/coroutines.py | 5 | ||||
-rw-r--r-- | Lib/asyncio/events.py | 4 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 3 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 3 | ||||
-rw-r--r-- | Lib/asyncio/sslproto.py | 3 | ||||
-rw-r--r-- | Lib/asyncio/test_utils.py | 8 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 6 | ||||
-rw-r--r-- | Lib/asyncio/windows_utils.py | 3 |
10 files changed, 85 insertions, 17 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 99f12b40bdd..9c2fa12486f 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -13,7 +13,6 @@ conscious design decision, leaving the door open for keyword arguments to modify the meaning of the API call itself. """ - import collections import concurrent.futures import heapq @@ -28,6 +27,7 @@ import time import traceback import sys import warnings +import weakref from . import compat from . import coroutines @@ -242,6 +242,13 @@ class BaseEventLoop(events.AbstractEventLoop): self._task_factory = None self._coroutine_wrapper_set = False + # A weak set of all asynchronous generators that are being iterated + # by the loop. + self._asyncgens = weakref.WeakSet() + + # Set to True when `loop.shutdown_asyncgens` is called. + self._asyncgens_shutdown_called = False + def __repr__(self): return ('<%s running=%s closed=%s debug=%s>' % (self.__class__.__name__, self.is_running(), @@ -333,6 +340,46 @@ class BaseEventLoop(events.AbstractEventLoop): if self._closed: raise RuntimeError('Event loop is closed') + def _asyncgen_finalizer_hook(self, agen): + self._asyncgens.discard(agen) + if not self.is_closed(): + self.create_task(agen.aclose()) + + def _asyncgen_firstiter_hook(self, agen): + if self._asyncgens_shutdown_called: + warnings.warn( + "asynchronous generator {!r} was scheduled after " + "loop.shutdown_asyncgens() call".format(agen), + ResourceWarning, source=self) + + self._asyncgens.add(agen) + + @coroutine + def shutdown_asyncgens(self): + """Shutdown all active asynchronous generators.""" + self._asyncgens_shutdown_called = True + + if not len(self._asyncgens): + return + + closing_agens = list(self._asyncgens) + self._asyncgens.clear() + + shutdown_coro = tasks.gather( + *[ag.aclose() for ag in closing_agens], + return_exceptions=True, + loop=self) + + results = yield from shutdown_coro + for result, agen in zip(results, closing_agens): + if isinstance(result, Exception): + self.call_exception_handler({ + 'message': 'an error occurred during closing of ' + 'asynchronous generator {!r}'.format(agen), + 'exception': result, + 'asyncgen': agen + }) + def run_forever(self): """Run until stop() is called.""" self._check_closed() @@ -340,6 +387,9 @@ class BaseEventLoop(events.AbstractEventLoop): raise RuntimeError('Event loop is running.') self._set_coroutine_wrapper(self._debug) self._thread_id = threading.get_ident() + old_agen_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, + finalizer=self._asyncgen_finalizer_hook) try: while True: self._run_once() @@ -349,6 +399,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._stopping = False self._thread_id = None self._set_coroutine_wrapper(False) + sys.set_asyncgen_hooks(*old_agen_hooks) def run_until_complete(self, future): """Run until the Future is done. @@ -426,7 +477,8 @@ class BaseEventLoop(events.AbstractEventLoop): if compat.PY34: def __del__(self): if not self.is_closed(): - warnings.warn("unclosed event loop %r" % self, ResourceWarning) + warnings.warn("unclosed event loop %r" % self, ResourceWarning, + source=self) if not self.is_running(): self.close() @@ -1068,7 +1120,7 @@ class BaseEventLoop(events.AbstractEventLoop): transport = yield from self._make_subprocess_transport( protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) if self._debug: - logger.info('%s: %r' % (debug_log, transport)) + logger.info('%s: %r', debug_log, transport) return transport, protocol @coroutine @@ -1098,7 +1150,7 @@ class BaseEventLoop(events.AbstractEventLoop): protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs) if self._debug: - logger.info('%s: %r' % (debug_log, transport)) + logger.info('%s: %r', debug_log, transport) return transport, protocol def get_exception_handler(self): @@ -1178,7 +1230,9 @@ class BaseEventLoop(events.AbstractEventLoop): - 'handle' (optional): Handle instance; - 'protocol' (optional): Protocol instance; - 'transport' (optional): Transport instance; - - 'socket' (optional): Socket instance. + - 'socket' (optional): Socket instance; + - 'asyncgen' (optional): Asynchronous generator that caused + the exception. New keys maybe introduced in the future. diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index bcc481d20ea..28482b71896 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -128,7 +128,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport): if compat.PY34: def __del__(self): if not self._closed: - warnings.warn("unclosed transport %r" % self, ResourceWarning) + warnings.warn("unclosed transport %r" % self, ResourceWarning, + source=self) self.close() def get_pid(self): diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py index 72ffb44e957..d92f67d590e 100644 --- a/Lib/asyncio/coroutines.py +++ b/Lib/asyncio/coroutines.py @@ -276,7 +276,10 @@ def _format_coroutine(coro): try: coro_code = coro.gi_code except AttributeError: - coro_code = coro.cr_code + try: + coro_code = coro.cr_code + except AttributeError: + return repr(coro) try: coro_frame = coro.gi_frame diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index c48c5bed736..cc9a986b994 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -248,6 +248,10 @@ class AbstractEventLoop: """ raise NotImplementedError + def shutdown_asyncgens(self): + """Shutdown all active asynchronous generators.""" + raise NotImplementedError + # Methods scheduling callbacks. All these return Handles. def _timer_handle_cancelled(self, handle): diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 97ab487f974..68a523af958 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -92,7 +92,8 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, if compat.PY34: def __del__(self): if self._sock is not None: - warnings.warn("unclosed transport %r" % self, ResourceWarning) + warnings.warn("unclosed transport %r" % self, ResourceWarning, + source=self) self.close() def _fatal_error(self, exc, message='Fatal error on pipe transport'): diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index c57f509a12b..1850bdb5a11 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -585,7 +585,8 @@ class _SelectorTransport(transports._FlowControlMixin, if compat.PY34: def __del__(self): if self._sock is not None: - warnings.warn("unclosed transport %r" % self, ResourceWarning) + warnings.warn("unclosed transport %r" % self, ResourceWarning, + source=self) self._sock.close() def _fatal_error(self, exc, message='Fatal error on transport'): diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index afe85a14387..92d3c4c9091 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -331,7 +331,8 @@ class _SSLProtocolTransport(transports._FlowControlMixin, if compat.PY34: def __del__(self): if not self._closed: - warnings.warn("unclosed transport %r" % self, ResourceWarning) + warnings.warn("unclosed transport %r" % self, ResourceWarning, + source=self) self.close() def pause_reading(self): diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py index 396e6aed567..ac8a8ef752c 100644 --- a/Lib/asyncio/test_utils.py +++ b/Lib/asyncio/test_utils.py @@ -117,10 +117,10 @@ class SSLWSGIServerMixin: 'test', 'test_asyncio') keyfile = os.path.join(here, 'ssl_key.pem') certfile = os.path.join(here, 'ssl_cert.pem') - ssock = ssl.wrap_socket(request, - keyfile=keyfile, - certfile=certfile, - server_side=True) + context = ssl.SSLContext() + context.load_cert_chain(certfile, keyfile) + + ssock = context.wrap_socket(request, server_side=True) try: self.RequestHandlerClass(ssock, client_address, self) ssock.close() diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index f7f9eb2a1dd..4222054f265 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -393,7 +393,8 @@ class _UnixReadPipeTransport(transports.ReadTransport): if compat.PY34: def __del__(self): if self._pipe is not None: - warnings.warn("unclosed transport %r" % self, ResourceWarning) + warnings.warn("unclosed transport %r" % self, ResourceWarning, + source=self) self._pipe.close() def _fatal_error(self, exc, message='Fatal error on pipe transport'): @@ -596,7 +597,8 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, if compat.PY34: def __del__(self): if self._pipe is not None: - warnings.warn("unclosed transport %r" % self, ResourceWarning) + warnings.warn("unclosed transport %r" % self, ResourceWarning, + source=self) self._pipe.close() def abort(self): diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py index 870cd13abe6..7c63fb904b3 100644 --- a/Lib/asyncio/windows_utils.py +++ b/Lib/asyncio/windows_utils.py @@ -159,7 +159,8 @@ class PipeHandle: def __del__(self): if self._handle is not None: - warnings.warn("unclosed %r" % self, ResourceWarning) + warnings.warn("unclosed %r" % self, ResourceWarning, + source=self) self.close() def __enter__(self): |