aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_asyncio/test_base_events.py30
-rw-r--r--Lib/test/test_fileio.py12
-rw-r--r--Lib/test/test_peepholer.py1
-rw-r--r--Lib/test/test_pyclbr.py4
-rw-r--r--Lib/test/test_pyexpat.py20
-rw-r--r--Lib/test/test_re.py28
-rw-r--r--Lib/test/test_threading.py55
-rw-r--r--Lib/test/test_zoneinfo/test_zoneinfo.py43
8 files changed, 152 insertions, 41 deletions
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index 2ca5c4c6719..bb9f366fc41 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1190,6 +1190,36 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.run_until_complete(coro)
self.assertTrue(sock.close.called)
+ @patch_socket
+ def test_create_connection_happy_eyeballs_empty_exceptions(self, m_socket):
+ # See gh-135836: Fix IndexError when Happy Eyeballs algorithm
+ # results in empty exceptions list
+
+ async def getaddrinfo(*args, **kw):
+ return [(socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 80)),
+ (socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 80))]
+
+ def getaddrinfo_task(*args, **kwds):
+ return self.loop.create_task(getaddrinfo(*args, **kwds))
+
+ self.loop.getaddrinfo = getaddrinfo_task
+
+ # Mock staggered_race to return empty exceptions list
+ # This simulates the scenario where Happy Eyeballs algorithm
+ # cancels all attempts but doesn't properly collect exceptions
+ with mock.patch('asyncio.staggered.staggered_race') as mock_staggered:
+ # Return (None, []) - no winner, empty exceptions list
+ async def mock_race(coro_fns, delay, loop):
+ return None, []
+ mock_staggered.side_effect = mock_race
+
+ coro = self.loop.create_connection(
+ MyProto, 'example.com', 80, happy_eyeballs_delay=0.1)
+
+ # Should raise TimeoutError instead of IndexError
+ with self.assertRaisesRegex(TimeoutError, "create_connection failed"):
+ self.loop.run_until_complete(coro)
+
def test_create_connection_host_port_sock(self):
coro = self.loop.create_connection(
MyProto, 'example.com', 80, sock=object())
diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py
index 5a0f033ebb8..e3d54f6315a 100644
--- a/Lib/test/test_fileio.py
+++ b/Lib/test/test_fileio.py
@@ -591,7 +591,7 @@ class OtherFileTests:
try:
f.write(b"abc")
f.close()
- with open(TESTFN_ASCII, "rb") as f:
+ with self.open(TESTFN_ASCII, "rb") as f:
self.assertEqual(f.read(), b"abc")
finally:
os.unlink(TESTFN_ASCII)
@@ -608,7 +608,7 @@ class OtherFileTests:
try:
f.write(b"abc")
f.close()
- with open(TESTFN_UNICODE, "rb") as f:
+ with self.open(TESTFN_UNICODE, "rb") as f:
self.assertEqual(f.read(), b"abc")
finally:
os.unlink(TESTFN_UNICODE)
@@ -692,13 +692,13 @@ class OtherFileTests:
def testAppend(self):
try:
- f = open(TESTFN, 'wb')
+ f = self.FileIO(TESTFN, 'wb')
f.write(b'spam')
f.close()
- f = open(TESTFN, 'ab')
+ f = self.FileIO(TESTFN, 'ab')
f.write(b'eggs')
f.close()
- f = open(TESTFN, 'rb')
+ f = self.FileIO(TESTFN, 'rb')
d = f.read()
f.close()
self.assertEqual(d, b'spameggs')
@@ -734,6 +734,7 @@ class OtherFileTests:
class COtherFileTests(OtherFileTests, unittest.TestCase):
FileIO = _io.FileIO
modulename = '_io'
+ open = _io.open
@cpython_only
def testInvalidFd_overflow(self):
@@ -755,6 +756,7 @@ class COtherFileTests(OtherFileTests, unittest.TestCase):
class PyOtherFileTests(OtherFileTests, unittest.TestCase):
FileIO = _pyio.FileIO
modulename = '_pyio'
+ open = _pyio.open
def test_open_code(self):
# Check that the default behaviour of open_code matches
diff --git a/Lib/test/test_peepholer.py b/Lib/test/test_peepholer.py
index 3d7300e1480..98629df4574 100644
--- a/Lib/test/test_peepholer.py
+++ b/Lib/test/test_peepholer.py
@@ -292,6 +292,7 @@ class TestTranforms(BytecodeTestCase):
('---x', 'UNARY_NEGATIVE', None, False, None, None),
('~~~x', 'UNARY_INVERT', None, False, None, None),
('+++x', 'CALL_INTRINSIC_1', intrinsic_positive, False, None, None),
+ ('~True', 'UNARY_INVERT', None, False, None, None),
]
for (
diff --git a/Lib/test/test_pyclbr.py b/Lib/test/test_pyclbr.py
index 3e7b2cd0dc9..bce68e6cd7a 100644
--- a/Lib/test/test_pyclbr.py
+++ b/Lib/test/test_pyclbr.py
@@ -11,7 +11,6 @@ from types import FunctionType, MethodType, BuiltinFunctionType
import pyclbr
from unittest import TestCase, main as unittest_main
from test.test_importlib import util as test_importlib_util
-import warnings
StaticMethodType = type(staticmethod(lambda: None))
@@ -246,9 +245,6 @@ class PyclbrTest(TestCase):
# These were once some of the longest modules.
cm('random', ignore=('Random',)) # from _random import Random as CoreGenerator
cm('pickle', ignore=('partial', 'PickleBuffer'))
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- cm('sre_parse', ignore=('dump', 'groups', 'pos')) # from sre_constants import *; property
with temporary_main_spec():
cm(
'pdb',
diff --git a/Lib/test/test_pyexpat.py b/Lib/test/test_pyexpat.py
index 1d56ccd71cf..d4b4f60be98 100644
--- a/Lib/test/test_pyexpat.py
+++ b/Lib/test/test_pyexpat.py
@@ -9,12 +9,11 @@ import traceback
from io import BytesIO
from test import support
from test.support import os_helper
-
+from test.support import sortdict
+from unittest import mock
from xml.parsers import expat
from xml.parsers.expat import errors
-from test.support import sortdict
-
class SetAttributeTest(unittest.TestCase):
def setUp(self):
@@ -436,6 +435,19 @@ class BufferTextTest(unittest.TestCase):
"<!--abc-->", "4", "<!--def-->", "5", "</a>"],
"buffered text not properly split")
+ def test_change_character_data_handler_in_callback(self):
+ # Test that xmlparse_handler_setter() properly handles
+ # the special case "parser.CharacterDataHandler = None".
+ def handler(*args):
+ parser.CharacterDataHandler = None
+
+ handler_wrapper = mock.Mock(wraps=handler)
+ parser = expat.ParserCreate()
+ parser.CharacterDataHandler = handler_wrapper
+ parser.Parse(b"<a>1<b/>2<c></c>3<!--abc-->4<!--def-->5</a> ", True)
+ handler_wrapper.assert_called_once()
+ self.assertIsNone(parser.CharacterDataHandler)
+
# Test handling of exception from callback:
class HandlerExceptionTest(unittest.TestCase):
@@ -595,7 +607,7 @@ class ChardataBufferTest(unittest.TestCase):
def test_disabling_buffer(self):
xml1 = b"<?xml version='1.0' encoding='iso8859'?><a>" + b'a' * 512
xml2 = b'b' * 1024
- xml3 = b'c' * 1024 + b'</a>';
+ xml3 = b'c' * 1024 + b'</a>'
parser = expat.ParserCreate()
parser.CharacterDataHandler = self.counting_handler
parser.buffer_text = 1
diff --git a/Lib/test/test_re.py b/Lib/test/test_re.py
index e9128ac1d97..993652d2e88 100644
--- a/Lib/test/test_re.py
+++ b/Lib/test/test_re.py
@@ -5,7 +5,6 @@ from test.support import (gc_collect, bigmemtest, _2G,
import locale
import re
import string
-import sys
import unittest
import warnings
from re import Scanner
@@ -2927,33 +2926,6 @@ class ImplementationTest(unittest.TestCase):
pat = re.compile("")
check_disallow_instantiation(self, type(pat.scanner("")))
- def test_deprecated_modules(self):
- deprecated = {
- 'sre_compile': ['compile', 'error',
- 'SRE_FLAG_IGNORECASE', 'SUBPATTERN',
- '_compile_info'],
- 'sre_constants': ['error', 'SRE_FLAG_IGNORECASE', 'SUBPATTERN',
- '_NamedIntConstant'],
- 'sre_parse': ['SubPattern', 'parse',
- 'SRE_FLAG_IGNORECASE', 'SUBPATTERN',
- '_parse_sub'],
- }
- for name in deprecated:
- with self.subTest(module=name):
- sys.modules.pop(name, None)
- with self.assertWarns(DeprecationWarning) as w:
- __import__(name)
- self.assertEqual(str(w.warning),
- f"module {name!r} is deprecated")
- self.assertEqual(w.filename, __file__)
- self.assertIn(name, sys.modules)
- mod = sys.modules[name]
- self.assertEqual(mod.__name__, name)
- self.assertEqual(mod.__package__, '')
- for attr in deprecated[name]:
- self.assertHasAttr(mod, attr)
- del sys.modules[name]
-
@cpython_only
def test_case_helpers(self):
import _sre
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 13b55d0f0a2..00a3037c3e1 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1247,6 +1247,61 @@ class ThreadTests(BaseTestCase):
self.assertEqual(err, b"")
self.assertIn(b"all clear", out)
+ @support.subTests('lock_class_name', ['Lock', 'RLock'])
+ def test_acquire_daemon_thread_lock_in_finalization(self, lock_class_name):
+ # gh-123940: Py_Finalize() prevents other threads from running Python
+ # code (and so, releasing locks), so acquiring a locked lock can not
+ # succeed.
+ # We raise an exception rather than hang.
+ code = textwrap.dedent(f"""
+ import threading
+ import time
+
+ thread_started_event = threading.Event()
+
+ lock = threading.{lock_class_name}()
+ def loop():
+ if {lock_class_name!r} == 'RLock':
+ lock.acquire()
+ with lock:
+ thread_started_event.set()
+ while True:
+ time.sleep(1)
+
+ uncontested_lock = threading.{lock_class_name}()
+
+ class Cycle:
+ def __init__(self):
+ self.self_ref = self
+ self.thr = threading.Thread(
+ target=loop, daemon=True)
+ self.thr.start()
+ thread_started_event.wait()
+
+ def __del__(self):
+ assert self.thr.is_alive()
+
+ # We *can* acquire an unlocked lock
+ uncontested_lock.acquire()
+ if {lock_class_name!r} == 'RLock':
+ uncontested_lock.acquire()
+
+ # Acquiring a locked one fails
+ try:
+ lock.acquire()
+ except PythonFinalizationError:
+ assert self.thr.is_alive()
+ print('got the correct exception!')
+
+ # Cycle holds a reference to itself, which ensures it is
+ # cleaned up during the GC that runs after daemon threads
+ # have been forced to exit during finalization.
+ Cycle()
+ """)
+ rc, out, err = assert_python_ok("-c", code)
+ self.assertEqual(err, b"")
+ self.assertIn(b"got the correct exception", out)
+
def test_start_new_thread_failed(self):
# gh-109746: if Python fails to start newly created thread
# due to failure of underlying PyThread_start_new_thread() call,
diff --git a/Lib/test/test_zoneinfo/test_zoneinfo.py b/Lib/test/test_zoneinfo/test_zoneinfo.py
index f313e394f49..44e87e71c8e 100644
--- a/Lib/test/test_zoneinfo/test_zoneinfo.py
+++ b/Lib/test/test_zoneinfo/test_zoneinfo.py
@@ -58,6 +58,10 @@ def tearDownModule():
shutil.rmtree(TEMP_DIR)
+class CustomError(Exception):
+ pass
+
+
class TzPathUserMixin:
"""
Adds a setUp() and tearDown() to make TZPATH manipulations thread-safe.
@@ -404,6 +408,25 @@ class ZoneInfoTest(TzPathUserMixin, ZoneInfoTestBase):
self.assertEqual(t.utcoffset(), offset.utcoffset)
self.assertEqual(t.dst(), offset.dst)
+ def test_cache_exception(self):
+ class Incomparable(str):
+ eq_called = False
+ def __eq__(self, other):
+ self.eq_called = True
+ raise CustomError
+ __hash__ = str.__hash__
+
+ key = "America/Los_Angeles"
+ tz1 = self.klass(key)
+ key = Incomparable(key)
+ try:
+ tz2 = self.klass(key)
+ except CustomError:
+ self.assertTrue(key.eq_called)
+ else:
+ self.assertFalse(key.eq_called)
+ self.assertIs(tz2, tz1)
+
class CZoneInfoTest(ZoneInfoTest):
module = c_zoneinfo
@@ -1507,6 +1530,26 @@ class ZoneInfoCacheTest(TzPathUserMixin, ZoneInfoTestBase):
self.assertIsNot(dub0, dub1)
self.assertIs(tok0, tok1)
+ def test_clear_cache_refleak(self):
+ class Stringy(str):
+ allow_comparisons = True
+ def __eq__(self, other):
+ if not self.allow_comparisons:
+ raise CustomError
+ return super().__eq__(other)
+ __hash__ = str.__hash__
+
+ key = Stringy("America/Los_Angeles")
+ self.klass(key)
+ key.allow_comparisons = False
+ try:
+ # Note: This is try/except rather than assertRaises because
+ # there is no guarantee that the key is even still in the cache,
+ # or that the key for the cache is the original `key` object.
+ self.klass.clear_cache(only_keys="America/Los_Angeles")
+ except CustomError:
+ pass
+
class CZoneInfoCacheTest(ZoneInfoCacheTest):
module = c_zoneinfo