aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/multiprocessing/reduction.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/reduction.py')
-rw-r--r--Lib/multiprocessing/reduction.py34
1 files changed, 25 insertions, 9 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 6e5e5bc9de5..042a1368e0e 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -39,19 +39,21 @@ import os
import sys
import socket
import threading
+import struct
import _multiprocessing
from multiprocessing import current_process
from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.connection import Client, Listener, Connection
#
#
#
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS'))):
raise ImportError('pickling of connections not supported')
#
@@ -77,10 +79,23 @@ if sys.platform == 'win32':
else:
def send_handle(conn, handle, destination_pid):
- _multiprocessing.sendfd(conn.fileno(), handle)
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+ struct.pack("@i", handle))])
def recv_handle(conn):
- return _multiprocessing.recvfd(conn.fileno())
+ size = struct.calcsize("@i")
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
+ try:
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ return struct.unpack("@i", cmsg_data[:size])[0]
+ except (ValueError, IndexError, struct.error):
+ pass
+ raise RuntimeError('Invalid data received')
+
#
# Support for a per-process server thread which caches pickled handles
@@ -159,7 +174,7 @@ def rebuild_handle(pickled_data):
return new_handle
#
-# Register `_multiprocessing.Connection` with `ForkingPickler`
+# Register `Connection` with `ForkingPickler`
#
def reduce_connection(conn):
@@ -168,11 +183,11 @@ def reduce_connection(conn):
def rebuild_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
+ return Connection(
handle, readable=readable, writable=writable
)
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ForkingPickler.register(Connection, reduce_connection)
#
# Register `socket.socket` with `ForkingPickler`
@@ -201,6 +216,7 @@ ForkingPickler.register(socket.socket, reduce_socket)
#
if sys.platform == 'win32':
+ from multiprocessing.connection import PipeConnection
def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno())
@@ -208,8 +224,8 @@ if sys.platform == 'win32':
def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
+ return PipeConnection(
handle, readable=readable, writable=writable
)
- ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+ ForkingPickler.register(PipeConnection, reduce_pipe_connection)