aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2018-01-27 21:22:47 +0200
committerGitHub <noreply@github.com>2018-01-27 21:22:47 +0200
commit7c684073f951dd891021676ecfd86ffc18b8895e (patch)
treeb19f8254e8c3f3ac516b922eda7b985edbfebf70 /Lib/asyncio/selector_events.py
parentf13f12d8daa587b5fcc66fe3ed1090a5dadab289 (diff)
downloadcpython-7c684073f951dd891021676ecfd86ffc18b8895e.tar.gz
cpython-7c684073f951dd891021676ecfd86ffc18b8895e.zip
bpo-32622: Implement loop.sendfile() (#5271)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 9446ae6a3bc..5956f2d993e 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -540,6 +540,20 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
fut.set_result((conn, address))
+ async def _sendfile_native(self, transp, file, offset, count):
+ del self._transports[transp._sock_fd]
+ resume_reading = transp.is_reading()
+ transp.pause_reading()
+ await transp._make_empty_waiter()
+ try:
+ return await self.sock_sendfile(transp._sock, file, offset, count,
+ fallback=False)
+ finally:
+ transp._reset_empty_waiter()
+ if resume_reading:
+ transp.resume_reading()
+ self._transports[transp._sock_fd] = transp
+
def _process_events(self, event_list):
for key, mask in event_list:
fileobj, (reader, writer) = key.fileobj, key.data
@@ -695,12 +709,14 @@ class _SelectorTransport(transports._FlowControlMixin,
class _SelectorSocketTransport(_SelectorTransport):
_start_tls_compatible = True
+ _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
super().__init__(loop, sock, protocol, extra, server)
self._eof = False
self._paused = False
+ self._empty_waiter = None
# Disable the Nagle algorithm -- small writes will be
# sent without waiting for the TCP ACK. This generally
@@ -765,6 +781,8 @@ class _SelectorSocketTransport(_SelectorTransport):
f'not {type(data).__name__!r}')
if self._eof:
raise RuntimeError('Cannot call write() after write_eof()')
+ if self._empty_waiter is not None:
+ raise RuntimeError('unable to write; sendfile is in progress')
if not data:
return
@@ -807,12 +825,16 @@ class _SelectorSocketTransport(_SelectorTransport):
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_exception(exc)
else:
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop._remove_writer(self._sock_fd)
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_result(None)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
@@ -828,6 +850,23 @@ class _SelectorSocketTransport(_SelectorTransport):
def can_write_eof(self):
return True
+ def _call_connection_lost(self, exc):
+ super()._call_connection_lost(exc)
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_exception(
+ ConnectionError("Connection is closed by peer"))
+
+ def _make_empty_waiter(self):
+ if self._empty_waiter is not None:
+ raise RuntimeError("Empty waiter is already set")
+ self._empty_waiter = self._loop.create_future()
+ if not self._buffer:
+ self._empty_waiter.set_result(None)
+ return self._empty_waiter
+
+ def _reset_empty_waiter(self):
+ self._empty_waiter = None
+
class _SelectorDatagramTransport(_SelectorTransport):