diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_base_events.py')
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 2ca5c4c6719..22ae0ef3581 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -24,6 +24,10 @@ import warnings MOCK_ANY = mock.ANY +class CustomError(Exception): + pass + + def tearDownModule(): asyncio._set_event_loop_policy(None) @@ -146,6 +150,29 @@ class BaseEventTests(test_utils.TestCase): socket.SOCK_STREAM, socket.IPPROTO_TCP)) + def test_interleave_addrinfos(self): + self.maxDiff = None + SIX_A = (socket.AF_INET6, 0, 0, '', ('2001:db8::1', 1)) + SIX_B = (socket.AF_INET6, 0, 0, '', ('2001:db8::2', 2)) + SIX_C = (socket.AF_INET6, 0, 0, '', ('2001:db8::3', 3)) + SIX_D = (socket.AF_INET6, 0, 0, '', ('2001:db8::4', 4)) + FOUR_A = (socket.AF_INET, 0, 0, '', ('192.0.2.1', 5)) + FOUR_B = (socket.AF_INET, 0, 0, '', ('192.0.2.2', 6)) + FOUR_C = (socket.AF_INET, 0, 0, '', ('192.0.2.3', 7)) + FOUR_D = (socket.AF_INET, 0, 0, '', ('192.0.2.4', 8)) + + addrinfos = [SIX_A, SIX_B, SIX_C, FOUR_A, FOUR_B, FOUR_C, FOUR_D, SIX_D] + expected = [SIX_A, FOUR_A, SIX_B, FOUR_B, SIX_C, FOUR_C, SIX_D, FOUR_D] + + self.assertEqual(expected, base_events._interleave_addrinfos(addrinfos)) + + expected_fafc_2 = [SIX_A, SIX_B, FOUR_A, SIX_C, FOUR_B, SIX_D, FOUR_C, FOUR_D] + self.assertEqual( + expected_fafc_2, + base_events._interleave_addrinfos(addrinfos, first_address_family_count=2), + ) + + class BaseEventLoopTests(test_utils.TestCase): @@ -1049,6 +1076,71 @@ class BaseEventLoopTests(test_utils.TestCase): test_utils.run_briefly(self.loop) self.assertTrue(status['finalized']) + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') + @patch_socket + def test_create_connection_happy_eyeballs(self, m_socket): + + class MyProto(asyncio.Protocol): + pass + + async def getaddrinfo(*args, **kw): + return [(socket.AF_INET6, 0, 0, '', ('2001:db8::1', 1)), + (socket.AF_INET, 0, 0, '', ('192.0.2.1', 5))] + + async def sock_connect(sock, address): + if address[0] == '2001:db8::1': + await asyncio.sleep(1) + sock.connect(address) + + loop = asyncio.new_event_loop() + loop._add_writer = mock.Mock() + loop._add_writer = mock.Mock() + loop._add_reader = mock.Mock() + loop.getaddrinfo = getaddrinfo + loop.sock_connect = sock_connect + + coro = loop.create_connection(MyProto, 'example.com', 80, happy_eyeballs_delay=0.3) + transport, protocol = loop.run_until_complete(coro) + try: + sock = transport._sock + sock.connect.assert_called_with(('192.0.2.1', 5)) + finally: + transport.close() + test_utils.run_briefly(loop) # allow transport to close + loop.close() + + @patch_socket + def test_create_connection_happy_eyeballs_ipv4_only(self, m_socket): + + class MyProto(asyncio.Protocol): + pass + + async def getaddrinfo(*args, **kw): + return [(socket.AF_INET, 0, 0, '', ('192.0.2.1', 5)), + (socket.AF_INET, 0, 0, '', ('192.0.2.2', 6))] + + async def sock_connect(sock, address): + if address[0] == '192.0.2.1': + await asyncio.sleep(1) + sock.connect(address) + + loop = asyncio.new_event_loop() + loop._add_writer = mock.Mock() + loop._add_writer = mock.Mock() + loop._add_reader = mock.Mock() + loop.getaddrinfo = getaddrinfo + loop.sock_connect = sock_connect + + coro = loop.create_connection(MyProto, 'example.com', 80, happy_eyeballs_delay=0.3) + transport, protocol = loop.run_until_complete(coro) + try: + sock = transport._sock + sock.connect.assert_called_with(('192.0.2.2', 6)) + finally: + transport.close() + test_utils.run_briefly(loop) # allow transport to close + loop.close() + class MyProto(asyncio.Protocol): done = None @@ -1190,6 +1282,36 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.loop.run_until_complete(coro) self.assertTrue(sock.close.called) + @patch_socket + def test_create_connection_happy_eyeballs_empty_exceptions(self, m_socket): + # See gh-135836: Fix IndexError when Happy Eyeballs algorithm + # results in empty exceptions list + + async def getaddrinfo(*args, **kw): + return [(socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 80)), + (socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 80))] + + def getaddrinfo_task(*args, **kwds): + return self.loop.create_task(getaddrinfo(*args, **kwds)) + + self.loop.getaddrinfo = getaddrinfo_task + + # Mock staggered_race to return empty exceptions list + # This simulates the scenario where Happy Eyeballs algorithm + # cancels all attempts but doesn't properly collect exceptions + with mock.patch('asyncio.staggered.staggered_race') as mock_staggered: + # Return (None, []) - no winner, empty exceptions list + async def mock_race(coro_fns, delay, loop): + return None, [] + mock_staggered.side_effect = mock_race + + coro = self.loop.create_connection( + MyProto, 'example.com', 80, happy_eyeballs_delay=0.1) + + # Should raise TimeoutError instead of IndexError + with self.assertRaisesRegex(TimeoutError, "create_connection failed"): + self.loop.run_until_complete(coro) + def test_create_connection_host_port_sock(self): coro = self.loop.create_connection( MyProto, 'example.com', 80, sock=object()) @@ -1296,6 +1418,31 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.assertEqual(len(cm.exception.exceptions), 1) self.assertIsInstance(cm.exception.exceptions[0], OSError) + @patch_socket + def test_create_connection_connect_non_os_err_close_err(self, m_socket): + # Test the case when sock_connect() raises non-OSError exception + # and sock.close() raises OSError. + async def getaddrinfo(*args, **kw): + return [(2, 1, 6, '', ('107.6.106.82', 80))] + + def getaddrinfo_task(*args, **kwds): + return self.loop.create_task(getaddrinfo(*args, **kwds)) + + self.loop.getaddrinfo = getaddrinfo_task + self.loop.sock_connect = mock.Mock() + self.loop.sock_connect.side_effect = CustomError + sock = mock.Mock() + m_socket.socket.return_value = sock + sock.close.side_effect = OSError + + coro = self.loop.create_connection(MyProto, 'example.com', 80) + self.assertRaises( + CustomError, self.loop.run_until_complete, coro) + + coro = self.loop.create_connection(MyProto, 'example.com', 80, all_errors=True) + self.assertRaises( + CustomError, self.loop.run_until_complete, coro) + def test_create_connection_multiple(self): async def getaddrinfo(*args, **kw): return [(2, 1, 6, '', ('0.0.0.1', 80)), |