diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2018-01-27 21:22:47 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-27 21:22:47 +0200 |
commit | 7c684073f951dd891021676ecfd86ffc18b8895e (patch) | |
tree | b19f8254e8c3f3ac516b922eda7b985edbfebf70 /Lib/asyncio/selector_events.py | |
parent | f13f12d8daa587b5fcc66fe3ed1090a5dadab289 (diff) | |
download | cpython-7c684073f951dd891021676ecfd86ffc18b8895e.tar.gz cpython-7c684073f951dd891021676ecfd86ffc18b8895e.zip |
bpo-32622: Implement loop.sendfile() (#5271)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r-- | Lib/asyncio/selector_events.py | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 9446ae6a3bc..5956f2d993e 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -540,6 +540,20 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: fut.set_result((conn, address)) + async def _sendfile_native(self, transp, file, offset, count): + del self._transports[transp._sock_fd] + resume_reading = transp.is_reading() + transp.pause_reading() + await transp._make_empty_waiter() + try: + return await self.sock_sendfile(transp._sock, file, offset, count, + fallback=False) + finally: + transp._reset_empty_waiter() + if resume_reading: + transp.resume_reading() + self._transports[transp._sock_fd] = transp + def _process_events(self, event_list): for key, mask in event_list: fileobj, (reader, writer) = key.fileobj, key.data @@ -695,12 +709,14 @@ class _SelectorTransport(transports._FlowControlMixin, class _SelectorSocketTransport(_SelectorTransport): _start_tls_compatible = True + _sendfile_compatible = constants._SendfileMode.TRY_NATIVE def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): super().__init__(loop, sock, protocol, extra, server) self._eof = False self._paused = False + self._empty_waiter = None # Disable the Nagle algorithm -- small writes will be # sent without waiting for the TCP ACK. This generally @@ -765,6 +781,8 @@ class _SelectorSocketTransport(_SelectorTransport): f'not {type(data).__name__!r}') if self._eof: raise RuntimeError('Cannot call write() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to write; sendfile is in progress') if not data: return @@ -807,12 +825,16 @@ class _SelectorSocketTransport(_SelectorTransport): self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') + if self._empty_waiter is not None: + self._empty_waiter.set_exception(exc) else: if n: del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop._remove_writer(self._sock_fd) + if self._empty_waiter is not None: + self._empty_waiter.set_result(None) if self._closing: self._call_connection_lost(None) elif self._eof: @@ -828,6 +850,23 @@ class _SelectorSocketTransport(_SelectorTransport): def can_write_eof(self): return True + def _call_connection_lost(self, exc): + super()._call_connection_lost(exc) + if self._empty_waiter is not None: + self._empty_waiter.set_exception( + ConnectionError("Connection is closed by peer")) + + def _make_empty_waiter(self): + if self._empty_waiter is not None: + raise RuntimeError("Empty waiter is already set") + self._empty_waiter = self._loop.create_future() + if not self._buffer: + self._empty_waiter.set_result(None) + return self._empty_waiter + + def _reset_empty_waiter(self): + self._empty_waiter = None + class _SelectorDatagramTransport(_SelectorTransport): |