diff options
author | Russell Keith-Magee <russell@keith-magee.com> | 2024-07-31 16:24:15 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-31 10:24:15 +0200 |
commit | f071f01b7b7e19d7d6b3a4b0ec62f820ecb14660 (patch) | |
tree | d4f6a25a18fc0ca8de86794a78cc62646a390d7e /Lib/socket.py | |
parent | d01fd240517e0c5fb679686a93119d0aa6b0fc0f (diff) | |
download | cpython-f071f01b7b7e19d7d6b3a4b0ec62f820ecb14660.tar.gz cpython-f071f01b7b7e19d7d6b3a4b0ec62f820ecb14660.zip |
gh-122133: Rework pure Python socketpair tests to avoid use of importlib.reload. (#122493)
Co-authored-by: Gregory P. Smith <greg@krypto.org>
Diffstat (limited to 'Lib/socket.py')
-rw-r--r-- | Lib/socket.py | 121 |
1 files changed, 58 insertions, 63 deletions
diff --git a/Lib/socket.py b/Lib/socket.py index 2e6043cbdb8..9207101dcf9 100644 --- a/Lib/socket.py +++ b/Lib/socket.py @@ -592,16 +592,65 @@ if hasattr(_socket.socket, "share"): return socket(0, 0, 0, info) __all__.append("fromshare") -if hasattr(_socket, "socketpair"): +# Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain. +# This is used if _socket doesn't natively provide socketpair. It's +# always defined so that it can be patched in for testing purposes. +def _fallback_socketpair(family=AF_INET, type=SOCK_STREAM, proto=0): + if family == AF_INET: + host = _LOCALHOST + elif family == AF_INET6: + host = _LOCALHOST_V6 + else: + raise ValueError("Only AF_INET and AF_INET6 socket address families " + "are supported") + if type != SOCK_STREAM: + raise ValueError("Only SOCK_STREAM socket type is supported") + if proto != 0: + raise ValueError("Only protocol zero is supported") + + # We create a connected TCP socket. Note the trick with + # setblocking(False) that prevents us from having to create a thread. + lsock = socket(family, type, proto) + try: + lsock.bind((host, 0)) + lsock.listen() + # On IPv6, ignore flow_info and scope_id + addr, port = lsock.getsockname()[:2] + csock = socket(family, type, proto) + try: + csock.setblocking(False) + try: + csock.connect((addr, port)) + except (BlockingIOError, InterruptedError): + pass + csock.setblocking(True) + ssock, _ = lsock.accept() + except: + csock.close() + raise + finally: + lsock.close() - def socketpair(family=None, type=SOCK_STREAM, proto=0): - """socketpair([family[, type[, proto]]]) -> (socket object, socket object) + # Authenticating avoids using a connection from something else + # able to connect to {host}:{port} instead of us. + # We expect only AF_INET and AF_INET6 families. + try: + if ( + ssock.getsockname() != csock.getpeername() + or csock.getsockname() != ssock.getpeername() + ): + raise ConnectionError("Unexpected peer connection") + except: + # getsockname() and getpeername() can fail + # if either socket isn't connected. + ssock.close() + csock.close() + raise - Create a pair of socket objects from the sockets returned by the platform - socketpair() function. - The arguments are the same as for socket() except the default family is - AF_UNIX if defined on the platform; otherwise, the default is AF_INET. - """ + return (ssock, csock) + +if hasattr(_socket, "socketpair"): + def socketpair(family=None, type=SOCK_STREAM, proto=0): if family is None: try: family = AF_UNIX @@ -613,61 +662,7 @@ if hasattr(_socket, "socketpair"): return a, b else: - - # Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain. - def socketpair(family=AF_INET, type=SOCK_STREAM, proto=0): - if family == AF_INET: - host = _LOCALHOST - elif family == AF_INET6: - host = _LOCALHOST_V6 - else: - raise ValueError("Only AF_INET and AF_INET6 socket address families " - "are supported") - if type != SOCK_STREAM: - raise ValueError("Only SOCK_STREAM socket type is supported") - if proto != 0: - raise ValueError("Only protocol zero is supported") - - # We create a connected TCP socket. Note the trick with - # setblocking(False) that prevents us from having to create a thread. - lsock = socket(family, type, proto) - try: - lsock.bind((host, 0)) - lsock.listen() - # On IPv6, ignore flow_info and scope_id - addr, port = lsock.getsockname()[:2] - csock = socket(family, type, proto) - try: - csock.setblocking(False) - try: - csock.connect((addr, port)) - except (BlockingIOError, InterruptedError): - pass - csock.setblocking(True) - ssock, _ = lsock.accept() - except: - csock.close() - raise - finally: - lsock.close() - - # Authenticating avoids using a connection from something else - # able to connect to {host}:{port} instead of us. - # We expect only AF_INET and AF_INET6 families. - try: - if ( - ssock.getsockname() != csock.getpeername() - or csock.getsockname() != ssock.getpeername() - ): - raise ConnectionError("Unexpected peer connection") - except: - # getsockname() and getpeername() can fail - # if either socket isn't connected. - ssock.close() - csock.close() - raise - - return (ssock, csock) + socketpair = _fallback_socketpair __all__.append("socketpair") socketpair.__doc__ = """socketpair([family[, type[, proto]]]) -> (socket object, socket object) |