aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_threadsignals.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threadsignals.py')
-rw-r--r--Lib/test/test_threadsignals.py156
1 files changed, 147 insertions, 9 deletions
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index 2f7eb607c7e..e0af31dae47 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -4,15 +4,20 @@ import unittest
import signal
import os
import sys
-from test.test_support import run_unittest, import_module, reap_threads
-thread = import_module('thread')
+from test.support import run_unittest, import_module
+thread = import_module('_thread')
+import time
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
- raise unittest.SkipTest, "Can't test signal on %s" % sys.platform
+ raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
process_pid = os.getpid()
signalled_all=thread.allocate_lock()
+# Issue #11223: Locks are implemented using a mutex and a condition variable of
+# the pthread library on FreeBSD6. POSIX condition variables cannot be
+# interrupted by signals (see pthread_cond_wait manual page).
+USING_PTHREAD_COND = (sys.platform == 'freebsd6')
def registerSignals(for_usr1, for_usr2, for_alrm):
usr1 = signal.signal(signal.SIGUSR1, for_usr1)
@@ -34,13 +39,12 @@ def send_signals():
signalled_all.release()
class ThreadSignals(unittest.TestCase):
- """Test signal handling semantics of threads.
- We spawn a thread, have the thread send two signals, and
- wait for it to finish. Check that we got both signals
- and that they were run by the main thread.
- """
- @reap_threads
+
def test_signals(self):
+ # Test signal handling semantics of threads.
+ # We spawn a thread, have the thread send two signals, and
+ # wait for it to finish. Check that we got both signals
+ # and that they were run by the main thread.
signalled_all.acquire()
self.spawnSignallingThread()
signalled_all.acquire()
@@ -67,6 +71,140 @@ class ThreadSignals(unittest.TestCase):
def spawnSignallingThread(self):
thread.start_new_thread(send_signals, ())
+ def alarm_interrupt(self, sig, frame):
+ raise KeyboardInterrupt
+
+ @unittest.skipIf(USING_PTHREAD_COND,
+ 'POSIX condition variables cannot be interrupted')
+ def test_lock_acquire_interruption(self):
+ # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
+ # in a deadlock.
+ # XXX this test can fail when the legacy (non-semaphore) implementation
+ # of locks is used in thread_pthread.h, see issue #11223.
+ oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
+ try:
+ lock = thread.allocate_lock()
+ lock.acquire()
+ signal.alarm(1)
+ t1 = time.time()
+ self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
+ dt = time.time() - t1
+ # Checking that KeyboardInterrupt was raised is not sufficient.
+ # We want to assert that lock.acquire() was interrupted because
+ # of the signal, not that the signal handler was called immediately
+ # after timeout return of lock.acquire() (which can fool assertRaises).
+ self.assertLess(dt, 3.0)
+ finally:
+ signal.signal(signal.SIGALRM, oldalrm)
+
+ @unittest.skipIf(USING_PTHREAD_COND,
+ 'POSIX condition variables cannot be interrupted')
+ def test_rlock_acquire_interruption(self):
+ # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
+ # in a deadlock.
+ # XXX this test can fail when the legacy (non-semaphore) implementation
+ # of locks is used in thread_pthread.h, see issue #11223.
+ oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
+ try:
+ rlock = thread.RLock()
+ # For reentrant locks, the initial acquisition must be in another
+ # thread.
+ def other_thread():
+ rlock.acquire()
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while rlock.acquire(blocking=False):
+ rlock.release()
+ time.sleep(0.01)
+ signal.alarm(1)
+ t1 = time.time()
+ self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
+ dt = time.time() - t1
+ # See rationale above in test_lock_acquire_interruption
+ self.assertLess(dt, 3.0)
+ finally:
+ signal.signal(signal.SIGALRM, oldalrm)
+
+ def acquire_retries_on_intr(self, lock):
+ self.sig_recvd = False
+ def my_handler(signal, frame):
+ self.sig_recvd = True
+ old_handler = signal.signal(signal.SIGUSR1, my_handler)
+ try:
+ def other_thread():
+ # Acquire the lock in a non-main thread, so this test works for
+ # RLocks.
+ lock.acquire()
+ # Wait until the main thread is blocked in the lock acquire, and
+ # then wake it up with this.
+ time.sleep(0.5)
+ os.kill(process_pid, signal.SIGUSR1)
+ # Let the main thread take the interrupt, handle it, and retry
+ # the lock acquisition. Then we'll let it run.
+ time.sleep(0.5)
+ lock.release()
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while lock.acquire(blocking=False):
+ lock.release()
+ time.sleep(0.01)
+ result = lock.acquire() # Block while we receive a signal.
+ self.assertTrue(self.sig_recvd)
+ self.assertTrue(result)
+ finally:
+ signal.signal(signal.SIGUSR1, old_handler)
+
+ def test_lock_acquire_retries_on_intr(self):
+ self.acquire_retries_on_intr(thread.allocate_lock())
+
+ def test_rlock_acquire_retries_on_intr(self):
+ self.acquire_retries_on_intr(thread.RLock())
+
+ def test_interrupted_timed_acquire(self):
+ # Test to make sure we recompute lock acquisition timeouts when we
+ # receive a signal. Check this by repeatedly interrupting a lock
+ # acquire in the main thread, and make sure that the lock acquire times
+ # out after the right amount of time.
+ # NOTE: this test only behaves as expected if C signals get delivered
+ # to the main thread. Otherwise lock.acquire() itself doesn't get
+ # interrupted and the test trivially succeeds.
+ self.start = None
+ self.end = None
+ self.sigs_recvd = 0
+ done = thread.allocate_lock()
+ done.acquire()
+ lock = thread.allocate_lock()
+ lock.acquire()
+ def my_handler(signum, frame):
+ self.sigs_recvd += 1
+ old_handler = signal.signal(signal.SIGUSR1, my_handler)
+ try:
+ def timed_acquire():
+ self.start = time.time()
+ lock.acquire(timeout=0.5)
+ self.end = time.time()
+ def send_signals():
+ for _ in range(40):
+ time.sleep(0.02)
+ os.kill(process_pid, signal.SIGUSR1)
+ done.release()
+
+ # Send the signals from the non-main thread, since the main thread
+ # is the only one that can process signals.
+ thread.start_new_thread(send_signals, ())
+ timed_acquire()
+ # Wait for thread to finish
+ done.acquire()
+ # This allows for some timing and scheduling imprecision
+ self.assertLess(self.end - self.start, 2.0)
+ self.assertGreater(self.end - self.start, 0.3)
+ # If the signal is received several times before PyErr_CheckSignals()
+ # is called, the handler will get called less than 40 times. Just
+ # check it's been called at least once.
+ self.assertGreater(self.sigs_recvd, 0)
+ finally:
+ signal.signal(signal.SIGUSR1, old_handler)
+
def test_main():
global signal_blackboard