aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 3f113ec1be4..1e5adcc8db1 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -4214,6 +4214,89 @@ class ConfigDictTest(BaseTest):
handler = logging.getHandlerByName('custom')
self.assertEqual(handler.custom_kwargs, custom_kwargs)
+ # See gh-91555 and gh-90321
+ @support.requires_subprocess()
+ def test_deadlock_in_queue(self):
+ queue = multiprocessing.Queue()
+ handler = logging.handlers.QueueHandler(queue)
+ logger = multiprocessing.get_logger()
+ level = logger.level
+ try:
+ logger.setLevel(logging.DEBUG)
+ logger.addHandler(handler)
+ logger.debug("deadlock")
+ finally:
+ logger.setLevel(level)
+ logger.removeHandler(handler)
+
+ def test_recursion_in_custom_handler(self):
+ class BadHandler(logging.Handler):
+ def __init__(self):
+ super().__init__()
+ def emit(self, record):
+ logger.debug("recurse")
+ logger = logging.getLogger("test_recursion_in_custom_handler")
+ logger.addHandler(BadHandler())
+ logger.setLevel(logging.DEBUG)
+ logger.debug("boom")
+
+ @threading_helper.requires_working_threading()
+ def test_thread_supression_noninterference(self):
+ lock = threading.Lock()
+ logger = logging.getLogger("test_thread_supression_noninterference")
+
+ # Block on the first call, allow others through
+ #
+ # NOTE: We need to bypass the base class's lock, otherwise that will
+ # block multiple calls to the same handler itself.
+ class BlockOnceHandler(TestHandler):
+ def __init__(self, barrier):
+ super().__init__(support.Matcher())
+ self.barrier = barrier
+
+ def createLock(self):
+ self.lock = None
+
+ def handle(self, record):
+ self.emit(record)
+
+ def emit(self, record):
+ if self.barrier:
+ barrier = self.barrier
+ self.barrier = None
+ barrier.wait()
+ with lock:
+ pass
+ super().emit(record)
+ logger.info("blow up if not supressed")
+
+ barrier = threading.Barrier(2)
+ handler = BlockOnceHandler(barrier)
+ logger.addHandler(handler)
+ logger.setLevel(logging.DEBUG)
+
+ t1 = threading.Thread(target=logger.debug, args=("1",))
+ with lock:
+
+ # Ensure first thread is blocked in the handler, hence supressing logging...
+ t1.start()
+ barrier.wait()
+
+ # ...but the second thread should still be able to log...
+ t2 = threading.Thread(target=logger.debug, args=("2",))
+ t2.start()
+ t2.join(timeout=3)
+
+ self.assertEqual(len(handler.buffer), 1)
+ self.assertTrue(handler.matches(levelno=logging.DEBUG, message='2'))
+
+ # The first thread should still be blocked here
+ self.assertTrue(t1.is_alive())
+
+ # Now the lock has been released the first thread should complete
+ t1.join()
+ self.assertEqual(len(handler.buffer), 2)
+ self.assertTrue(handler.matches(levelno=logging.DEBUG, message='1'))
class ManagerTest(BaseTest):
def test_manager_loggerclass(self):