aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/threading.py')
-rw-r--r--Lib/threading.py776
1 files changed, 438 insertions, 338 deletions
diff --git a/Lib/threading.py b/Lib/threading.py
index db9ab29c7da..58ffa7ebc27 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -1,53 +1,40 @@
"""Thread module emulating a subset of Java's threading model."""
import sys as _sys
+import _thread
-try:
- import thread
-except ImportError:
- del _sys.modules[__name__]
- raise
-
-import warnings
-
-from collections import deque as _deque
from time import time as _time, sleep as _sleep
from traceback import format_exc as _format_exc
+from _weakrefset import WeakSet
-# Note regarding PEP 8 compliant aliases
+# Note regarding PEP 8 compliant names
# This threading model was originally inspired by Java, and inherited
# the convention of camelCase function and method names from that
-# language. While those names are not in any imminent danger of being
-# deprecated, starting with Python 2.6, the module now provides a
-# PEP 8 compliant alias for any such method name.
-# Using the new PEP 8 compliant names also facilitates substitution
+# language. Those originaly names are not in any imminent danger of
+# being deprecated (even for Py3k),so this module provides them as an
+# alias for the PEP 8 compliant names
+# Note that using the new PEP 8 compliant names facilitates substitution
# with the multiprocessing module, which doesn't provide the old
# Java inspired names.
-
-# Rename some stuff so "from threading import *" is safe
-__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
- 'current_thread', 'enumerate', 'Event',
- 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
+__all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier',
'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
-_start_new_thread = thread.start_new_thread
-_allocate_lock = thread.allocate_lock
-_get_ident = thread.get_ident
-ThreadError = thread.error
-del thread
-
+# Rename some stuff so "from threading import *" is safe
+_start_new_thread = _thread.start_new_thread
+_allocate_lock = _thread.allocate_lock
+_get_ident = _thread.get_ident
+ThreadError = _thread.error
+try:
+ _CRLock = _thread.RLock
+except AttributeError:
+ _CRLock = None
+TIMEOUT_MAX = _thread.TIMEOUT_MAX
+del _thread
-# sys.exc_clear is used to work around the fact that except blocks
-# don't fully clear the exception until 3.0.
-warnings.filterwarnings('ignore', category=DeprecationWarning,
- module='threading', message='sys.exc_clear')
# Debug support (adapted from ihooks.py).
-# All the major classes here derive from _Verbose. We force that to
-# be a new-style class so that all the major classes here are new-style.
-# This helps debugging (type(instance) is more revealing for instances
-# of new-style classes).
_VERBOSE = False
@@ -58,10 +45,10 @@ if __debug__:
def __init__(self, verbose=None):
if verbose is None:
verbose = _VERBOSE
- self.__verbose = verbose
+ self._verbose = verbose
def _note(self, format, *args):
- if self.__verbose:
+ if self._verbose:
format = format % args
# Issue #4188: calling current_thread() can incur an infinite
# recursion if it has to create a DummyThread on the fly.
@@ -98,37 +85,41 @@ def settrace(func):
Lock = _allocate_lock
-def RLock(*args, **kwargs):
- return _RLock(*args, **kwargs)
+def RLock(verbose=None, *args, **kwargs):
+ if verbose is None:
+ verbose = _VERBOSE
+ if (__debug__ and verbose) or _CRLock is None:
+ return _PyRLock(verbose, *args, **kwargs)
+ return _CRLock(*args, **kwargs)
class _RLock(_Verbose):
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
- self.__block = _allocate_lock()
- self.__owner = None
- self.__count = 0
+ self._block = _allocate_lock()
+ self._owner = None
+ self._count = 0
def __repr__(self):
- owner = self.__owner
+ owner = self._owner
try:
owner = _active[owner].name
except KeyError:
pass
return "<%s owner=%r count=%d>" % (
- self.__class__.__name__, owner, self.__count)
+ self.__class__.__name__, owner, self._count)
- def acquire(self, blocking=1):
+ def acquire(self, blocking=True, timeout=-1):
me = _get_ident()
- if self.__owner == me:
- self.__count = self.__count + 1
+ if self._owner == me:
+ self._count = self._count + 1
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
return 1
- rc = self.__block.acquire(blocking)
+ rc = self._block.acquire(blocking, timeout)
if rc:
- self.__owner = me
- self.__count = 1
+ self._owner = me
+ self._count = 1
if __debug__:
self._note("%s.acquire(%s): initial success", self, blocking)
else:
@@ -139,12 +130,12 @@ class _RLock(_Verbose):
__enter__ = acquire
def release(self):
- if self.__owner != _get_ident():
+ if self._owner != _get_ident():
raise RuntimeError("cannot release un-acquired lock")
- self.__count = count = self.__count - 1
+ self._count = count = self._count - 1
if not count:
- self.__owner = None
- self.__block.release()
+ self._owner = None
+ self._block.release()
if __debug__:
self._note("%s.release(): final release", self)
else:
@@ -156,26 +147,26 @@ class _RLock(_Verbose):
# Internal methods used by condition variables
- def _acquire_restore(self, count_owner):
- count, owner = count_owner
- self.__block.acquire()
- self.__count = count
- self.__owner = owner
+ def _acquire_restore(self, state):
+ self._block.acquire()
+ self._count, self._owner = state
if __debug__:
self._note("%s._acquire_restore()", self)
def _release_save(self):
if __debug__:
self._note("%s._release_save()", self)
- count = self.__count
- self.__count = 0
- owner = self.__owner
- self.__owner = None
- self.__block.release()
+ count = self._count
+ self._count = 0
+ owner = self._owner
+ self._owner = None
+ self._block.release()
return (count, owner)
def _is_owned(self):
- return self.__owner == _get_ident()
+ return self._owner == _get_ident()
+
+_PyRLock = _RLock
def Condition(*args, **kwargs):
@@ -187,7 +178,7 @@ class _Condition(_Verbose):
_Verbose.__init__(self, verbose)
if lock is None:
lock = RLock()
- self.__lock = lock
+ self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
@@ -206,28 +197,28 @@ class _Condition(_Verbose):
self._is_owned = lock._is_owned
except AttributeError:
pass
- self.__waiters = []
+ self._waiters = []
def __enter__(self):
- return self.__lock.__enter__()
+ return self._lock.__enter__()
def __exit__(self, *args):
- return self.__lock.__exit__(*args)
+ return self._lock.__exit__(*args)
def __repr__(self):
- return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
+ return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
def _release_save(self):
- self.__lock.release() # No state to save
+ self._lock.release() # No state to save
def _acquire_restore(self, x):
- self.__lock.acquire() # Ignore saved state
+ self._lock.acquire() # Ignore saved state
def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if __lock doesn't have _is_owned().
- if self.__lock.acquire(0):
- self.__lock.release()
+ if self._lock.acquire(0):
+ self._lock.release()
return False
else:
return True
@@ -237,47 +228,63 @@ class _Condition(_Verbose):
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
- self.__waiters.append(waiter)
+ self._waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
+ gotit = True
if __debug__:
self._note("%s.wait(): got it", self)
else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = 0.0005 # 500 us -> initial delay of 1 ms
- while True:
- gotit = waiter.acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
+ if timeout > 0:
+ gotit = waiter.acquire(True, timeout)
+ else:
+ gotit = waiter.acquire(False)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
try:
- self.__waiters.remove(waiter)
+ self._waiters.remove(waiter)
except ValueError:
pass
else:
if __debug__:
self._note("%s.wait(%s): got it", self, timeout)
+ return gotit
finally:
self._acquire_restore(saved_state)
+ def wait_for(self, predicate, timeout=None):
+ endtime = None
+ waittime = timeout
+ result = predicate()
+ while not result:
+ if waittime is not None:
+ if endtime is None:
+ endtime = _time() + waittime
+ else:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Timed out.",
+ self, predicate, timeout)
+ break
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.",
+ self, predicate, timeout, waittime)
+ self.wait(waittime)
+ result = predicate()
+ else:
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Success.",
+ self, predicate, timeout)
+ return result
+
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
- __waiters = self.__waiters
+ __waiters = self._waiters
waiters = __waiters[:n]
if not waiters:
if __debug__:
@@ -292,10 +299,10 @@ class _Condition(_Verbose):
except ValueError:
pass
- def notifyAll(self):
- self.notify(len(self.__waiters))
+ def notify_all(self):
+ self.notify(len(self._waiters))
- notify_all = notifyAll
+ notifyAll = notify_all
def Semaphore(*args, **kwargs):
@@ -309,38 +316,48 @@ class _Semaphore(_Verbose):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
_Verbose.__init__(self, verbose)
- self.__cond = Condition(Lock())
- self.__value = value
+ self._cond = Condition(Lock())
+ self._value = value
- def acquire(self, blocking=1):
+ def acquire(self, blocking=True, timeout=None):
+ if not blocking and timeout is not None:
+ raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
- self.__cond.acquire()
- while self.__value == 0:
+ endtime = None
+ self._cond.acquire()
+ while self._value == 0:
if not blocking:
break
if __debug__:
self._note("%s.acquire(%s): blocked waiting, value=%s",
- self, blocking, self.__value)
- self.__cond.wait()
+ self, blocking, self._value)
+ if timeout is not None:
+ if endtime is None:
+ endtime = _time() + timeout
+ else:
+ timeout = endtime - _time()
+ if timeout <= 0:
+ break
+ self._cond.wait(timeout)
else:
- self.__value = self.__value - 1
+ self._value = self._value - 1
if __debug__:
self._note("%s.acquire: success, value=%s",
- self, self.__value)
+ self, self._value)
rc = True
- self.__cond.release()
+ self._cond.release()
return rc
__enter__ = acquire
def release(self):
- self.__cond.acquire()
- self.__value = self.__value + 1
+ self._cond.acquire()
+ self._value = self._value + 1
if __debug__:
self._note("%s.release: success, value=%s",
- self, self.__value)
- self.__cond.notify()
- self.__cond.release()
+ self, self._value)
+ self._cond.notify()
+ self._cond.release()
def __exit__(self, t, v, tb):
self.release()
@@ -356,8 +373,8 @@ class _BoundedSemaphore(_Semaphore):
self._initial_value = value
def release(self):
- if self._Semaphore__value >= self._initial_value:
- raise ValueError, "Semaphore released too many times"
+ if self._value >= self._initial_value:
+ raise ValueError("Semaphore released too many times")
return _Semaphore.release(self)
@@ -370,41 +387,213 @@ class _Event(_Verbose):
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
- self.__cond = Condition(Lock())
- self.__flag = False
+ self._cond = Condition(Lock())
+ self._flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
- self.__cond.__init__()
+ self._cond.__init__()
- def isSet(self):
- return self.__flag
+ def is_set(self):
+ return self._flag
- is_set = isSet
+ isSet = is_set
def set(self):
- self.__cond.acquire()
+ self._cond.acquire()
try:
- self.__flag = True
- self.__cond.notify_all()
+ self._flag = True
+ self._cond.notify_all()
finally:
- self.__cond.release()
+ self._cond.release()
def clear(self):
- self.__cond.acquire()
+ self._cond.acquire()
try:
- self.__flag = False
+ self._flag = False
finally:
- self.__cond.release()
+ self._cond.release()
def wait(self, timeout=None):
- self.__cond.acquire()
+ self._cond.acquire()
try:
- if not self.__flag:
- self.__cond.wait(timeout)
- return self.__flag
+ signaled = self._flag
+ if not signaled:
+ signaled = self._cond.wait(timeout)
+ return signaled
finally:
- self.__cond.release()
+ self._cond.release()
+
+
+# A barrier class. Inspired in part by the pthread_barrier_* api and
+# the CyclicBarrier class from Java. See
+# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
+# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
+# CyclicBarrier.html
+# for information.
+# We maintain two main states, 'filling' and 'draining' enabling the barrier
+# to be cyclic. Threads are not allowed into it until it has fully drained
+# since the previous cycle. In addition, a 'resetting' state exists which is
+# similar to 'draining' except that threads leave with a BrokenBarrierError,
+# and a 'broken' state in which all threads get the exception.
+class Barrier(_Verbose):
+ """
+ Barrier. Useful for synchronizing a fixed number of threads
+ at known synchronization points. Threads block on 'wait()' and are
+ simultaneously once they have all made that call.
+ """
+ def __init__(self, parties, action=None, timeout=None, verbose=None):
+ """
+ Create a barrier, initialised to 'parties' threads.
+ 'action' is a callable which, when supplied, will be called
+ by one of the threads after they have all entered the
+ barrier and just prior to releasing them all.
+ If a 'timeout' is provided, it is uses as the default for
+ all subsequent 'wait()' calls.
+ """
+ _Verbose.__init__(self, verbose)
+ self._cond = Condition(Lock())
+ self._action = action
+ self._timeout = timeout
+ self._parties = parties
+ self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
+ self._count = 0
+
+ def wait(self, timeout=None):
+ """
+ Wait for the barrier. When the specified number of threads have
+ started waiting, they are all simultaneously awoken. If an 'action'
+ was provided for the barrier, one of the threads will have executed
+ that callback prior to returning.
+ Returns an individual index number from 0 to 'parties-1'.
+ """
+ if timeout is None:
+ timeout = self._timeout
+ with self._cond:
+ self._enter() # Block while the barrier drains.
+ index = self._count
+ self._count += 1
+ try:
+ if index + 1 == self._parties:
+ # We release the barrier
+ self._release()
+ else:
+ # We wait until someone releases us
+ self._wait(timeout)
+ return index
+ finally:
+ self._count -= 1
+ # Wake up any threads waiting for barrier to drain.
+ self._exit()
+
+ # Block until the barrier is ready for us, or raise an exception
+ # if it is broken.
+ def _enter(self):
+ while self._state in (-1, 1):
+ # It is draining or resetting, wait until done
+ self._cond.wait()
+ #see if the barrier is in a broken state
+ if self._state < 0:
+ raise BrokenBarrierError
+ assert self._state == 0
+
+ # Optionally run the 'action' and release the threads waiting
+ # in the barrier.
+ def _release(self):
+ try:
+ if self._action:
+ self._action()
+ # enter draining state
+ self._state = 1
+ self._cond.notify_all()
+ except:
+ #an exception during the _action handler. Break and reraise
+ self._break()
+ raise
+
+ # Wait in the barrier until we are relased. Raise an exception
+ # if the barrier is reset or broken.
+ def _wait(self, timeout):
+ if not self._cond.wait_for(lambda : self._state != 0, timeout):
+ #timed out. Break the barrier
+ self._break()
+ raise BrokenBarrierError
+ if self._state < 0:
+ raise BrokenBarrierError
+ assert self._state == 1
+
+ # If we are the last thread to exit the barrier, signal any threads
+ # waiting for the barrier to drain.
+ def _exit(self):
+ if self._count == 0:
+ if self._state in (-1, 1):
+ #resetting or draining
+ self._state = 0
+ self._cond.notify_all()
+
+ def reset(self):
+ """
+ Reset the barrier to the initial state.
+ Any threads currently waiting will get the BrokenBarrier exception
+ raised.
+ """
+ with self._cond:
+ if self._count > 0:
+ if self._state == 0:
+ #reset the barrier, waking up threads
+ self._state = -1
+ elif self._state == -2:
+ #was broken, set it to reset state
+ #which clears when the last thread exits
+ self._state = -1
+ else:
+ self._state = 0
+ self._cond.notify_all()
+
+ def abort(self):
+ """
+ Place the barrier into a 'broken' state.
+ Useful in case of error. Any currently waiting threads and
+ threads attempting to 'wait()' will have BrokenBarrierError
+ raised.
+ """
+ with self._cond:
+ self._break()
+
+ def _break(self):
+ # An internal error was detected. The barrier is set to
+ # a broken state all parties awakened.
+ self._state = -2
+ self._cond.notify_all()
+
+ @property
+ def parties(self):
+ """
+ Return the number of threads required to trip the barrier.
+ """
+ return self._parties
+
+ @property
+ def n_waiting(self):
+ """
+ Return the number of threads that are currently waiting at the barrier.
+ """
+ # We don't need synchronization here since this is an ephemeral result
+ # anyway. It returns the correct value in the steady state.
+ if self._state == 0:
+ return self._count
+ return 0
+
+ @property
+ def broken(self):
+ """
+ Return True if the barrier is in a broken state
+ """
+ return self._state == -2
+
+#exception raised by the Barrier class
+class BrokenBarrierError(RuntimeError): pass
+
# Helper to generate new thread names
_counter = 0
@@ -418,6 +607,8 @@ _active_limbo_lock = _allocate_lock()
_active = {} # maps thread id to Thread object
_limbo = {}
+# For debug and leak testing
+_dangling = WeakSet()
# Main class for threads
@@ -431,7 +622,7 @@ class Thread(_Verbose):
__exc_info = _sys.exc_info
# Keep sys.exc_clear too to clear the exception just before
# allowing .join() to return.
- __exc_clear = _sys.exc_clear
+ #XXX __exc_clear = _sys.exc_clear
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
@@ -439,125 +630,122 @@ class Thread(_Verbose):
_Verbose.__init__(self, verbose)
if kwargs is None:
kwargs = {}
- self.__target = target
- self.__name = str(name or _newname())
- self.__args = args
- self.__kwargs = kwargs
- self.__daemonic = self._set_daemon()
- self.__ident = None
- self.__started = Event()
- self.__stopped = False
- self.__block = Condition(Lock())
- self.__initialized = True
+ self._target = target
+ self._name = str(name or _newname())
+ self._args = args
+ self._kwargs = kwargs
+ self._daemonic = self._set_daemon()
+ self._ident = None
+ self._started = Event()
+ self._stopped = False
+ self._block = Condition(Lock())
+ self._initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
- self.__stderr = _sys.stderr
+ self._stderr = _sys.stderr
+ _dangling.add(self)
def _reset_internal_locks(self):
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
- if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block
- self.__block.__init__()
- self.__started._reset_internal_locks()
-
- @property
- def _block(self):
- # used by a unittest
- return self.__block
+ if hasattr(self, '_block'): # DummyThread deletes _block
+ self._block.__init__()
+ self._started._reset_internal_locks()
def _set_daemon(self):
# Overridden in _MainThread and _DummyThread
return current_thread().daemon
def __repr__(self):
- assert self.__initialized, "Thread.__init__() was not called"
+ assert self._initialized, "Thread.__init__() was not called"
status = "initial"
- if self.__started.is_set():
+ if self._started.is_set():
status = "started"
- if self.__stopped:
+ if self._stopped:
status = "stopped"
- if self.__daemonic:
+ if self._daemonic:
status += " daemon"
- if self.__ident is not None:
- status += " %s" % self.__ident
- return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
+ if self._ident is not None:
+ status += " %s" % self._ident
+ return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
def start(self):
- if not self.__initialized:
+ if not self._initialized:
raise RuntimeError("thread.__init__() not called")
- if self.__started.is_set():
+
+ if self._started.is_set():
raise RuntimeError("threads can only be started once")
if __debug__:
self._note("%s.start(): starting thread", self)
with _active_limbo_lock:
_limbo[self] = self
try:
- _start_new_thread(self.__bootstrap, ())
+ _start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
- self.__started.wait()
+ self._started.wait()
def run(self):
try:
- if self.__target:
- self.__target(*self.__args, **self.__kwargs)
+ if self._target:
+ self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
- del self.__target, self.__args, self.__kwargs
+ del self._target, self._args, self._kwargs
- def __bootstrap(self):
+ def _bootstrap(self):
# Wrapper around the real bootstrap code that ignores
# exceptions during interpreter cleanup. Those typically
# happen when a daemon thread wakes up at an unfortunate
# moment, finds the world around it destroyed, and raises some
# random exception *** while trying to report the exception in
- # __bootstrap_inner() below ***. Those random exceptions
+ # _bootstrap_inner() below ***. Those random exceptions
# don't help anybody, and they confuse users, so we suppress
# them. We suppress them only when it appears that the world
# indeed has already been destroyed, so that exceptions in
- # __bootstrap_inner() during normal business hours are properly
+ # _bootstrap_inner() during normal business hours are properly
# reported. Also, we only suppress them for daemonic threads;
# if a non-daemonic encounters this, something else is wrong.
try:
- self.__bootstrap_inner()
+ self._bootstrap_inner()
except:
- if self.__daemonic and _sys is None:
+ if self._daemonic and _sys is None:
return
raise
def _set_ident(self):
- self.__ident = _get_ident()
+ self._ident = _get_ident()
- def __bootstrap_inner(self):
+ def _bootstrap_inner(self):
try:
self._set_ident()
- self.__started.set()
+ self._started.set()
with _active_limbo_lock:
- _active[self.__ident] = self
+ _active[self._ident] = self
del _limbo[self]
if __debug__:
- self._note("%s.__bootstrap(): thread started", self)
+ self._note("%s._bootstrap(): thread started", self)
if _trace_hook:
- self._note("%s.__bootstrap(): registering trace hook", self)
+ self._note("%s._bootstrap(): registering trace hook", self)
_sys.settrace(_trace_hook)
if _profile_hook:
- self._note("%s.__bootstrap(): registering profile hook", self)
+ self._note("%s._bootstrap(): registering profile hook", self)
_sys.setprofile(_profile_hook)
try:
self.run()
except SystemExit:
if __debug__:
- self._note("%s.__bootstrap(): raised SystemExit", self)
+ self._note("%s._bootstrap(): raised SystemExit", self)
except:
if __debug__:
- self._note("%s.__bootstrap(): unhandled exception", self)
+ self._note("%s._bootstrap(): unhandled exception", self)
# If sys.stderr is no more (most likely from interpreter
- # shutdown) use self.__stderr. Otherwise still use sys (as in
+ # shutdown) use self._stderr. Otherwise still use sys (as in
# _sys) in case sys.stderr was redefined since the creation of
# self.
if _sys:
@@ -567,69 +755,66 @@ class Thread(_Verbose):
# Do the best job possible w/o a huge amt. of code to
# approximate a traceback (code ideas from
# Lib/traceback.py)
- exc_type, exc_value, exc_tb = self.__exc_info()
+ exc_type, exc_value, exc_tb = self._exc_info()
try:
- print>>self.__stderr, (
+ print((
"Exception in thread " + self.name +
- " (most likely raised during interpreter shutdown):")
- print>>self.__stderr, (
- "Traceback (most recent call last):")
+ " (most likely raised during interpreter shutdown):"), file=self._stderr)
+ print((
+ "Traceback (most recent call last):"), file=self._stderr)
while exc_tb:
- print>>self.__stderr, (
+ print((
' File "%s", line %s, in %s' %
(exc_tb.tb_frame.f_code.co_filename,
exc_tb.tb_lineno,
- exc_tb.tb_frame.f_code.co_name))
+ exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
exc_tb = exc_tb.tb_next
- print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
+ print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
# Make sure that exc_tb gets deleted since it is a memory
# hog; deleting everything else is just for thoroughness
finally:
del exc_type, exc_value, exc_tb
else:
if __debug__:
- self._note("%s.__bootstrap(): normal return", self)
+ self._note("%s._bootstrap(): normal return", self)
finally:
# Prevent a race in
# test_threading.test_no_refcycle_through_target when
# the exception keeps the target alive past when we
# assert that it's dead.
- self.__exc_clear()
+ #XXX self.__exc_clear()
+ pass
finally:
with _active_limbo_lock:
- self.__stop()
+ self._stop()
try:
- # We don't call self.__delete() because it also
+ # We don't call self._delete() because it also
# grabs _active_limbo_lock.
del _active[_get_ident()]
except:
pass
- def __stop(self):
- # DummyThreads delete self.__block, but they have no waiters to
- # notify anyway (join() is forbidden on them).
- if not hasattr(self, '_Thread__block'):
- return
- self.__block.acquire()
- self.__stopped = True
- self.__block.notify_all()
- self.__block.release()
+ def _stop(self):
+ self._block.acquire()
+ self._stopped = True
+ self._block.notify_all()
+ self._block.release()
- def __delete(self):
+ def _delete(self):
"Remove current thread from the dict of currently running threads."
- # Notes about running with dummy_thread:
+ # Notes about running with _dummy_thread:
#
- # Must take care to not raise an exception if dummy_thread is being
+ # Must take care to not raise an exception if _dummy_thread is being
# used (and thus this module is being used as an instance of
- # dummy_threading). dummy_thread.get_ident() always returns -1 since
- # there is only one thread if dummy_thread is being used. Thus
+ # dummy_threading). _dummy_thread.get_ident() always returns -1 since
+ # there is only one thread if _dummy_thread is being used. Thus
# len(_active) is always <= 1 here, and any Thread instance created
# overwrites the (if any) thread currently registered in _active.
#
# An instance of _MainThread is always created by 'threading'. This
# gets overwritten the instant an instance of Thread is created; both
- # threads return -1 from dummy_thread.get_ident() and thus have the
+ # threads return -1 from _dummy_thread.get_ident() and thus have the
# same key in the dict. So when the _MainThread instance created by
# 'threading' tries to clean itself up when atexit calls this method
# it gets a KeyError if another Thread instance was created.
@@ -651,71 +836,72 @@ class Thread(_Verbose):
raise
def join(self, timeout=None):
- if not self.__initialized:
+ if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
- if not self.__started.is_set():
+ if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if __debug__:
- if not self.__stopped:
+ if not self._stopped:
self._note("%s.join(): waiting until thread stops", self)
- self.__block.acquire()
+
+ self._block.acquire()
try:
if timeout is None:
- while not self.__stopped:
- self.__block.wait()
+ while not self._stopped:
+ self._block.wait()
if __debug__:
self._note("%s.join(): thread stopped", self)
else:
deadline = _time() + timeout
- while not self.__stopped:
+ while not self._stopped:
delay = deadline - _time()
if delay <= 0:
if __debug__:
self._note("%s.join(): timed out", self)
break
- self.__block.wait(delay)
+ self._block.wait(delay)
else:
if __debug__:
self._note("%s.join(): thread stopped", self)
finally:
- self.__block.release()
+ self._block.release()
@property
def name(self):
- assert self.__initialized, "Thread.__init__() not called"
- return self.__name
+ assert self._initialized, "Thread.__init__() not called"
+ return self._name
@name.setter
def name(self, name):
- assert self.__initialized, "Thread.__init__() not called"
- self.__name = str(name)
+ assert self._initialized, "Thread.__init__() not called"
+ self._name = str(name)
@property
def ident(self):
- assert self.__initialized, "Thread.__init__() not called"
- return self.__ident
+ assert self._initialized, "Thread.__init__() not called"
+ return self._ident
- def isAlive(self):
- assert self.__initialized, "Thread.__init__() not called"
- return self.__started.is_set() and not self.__stopped
+ def is_alive(self):
+ assert self._initialized, "Thread.__init__() not called"
+ return self._started.is_set() and not self._stopped
- is_alive = isAlive
+ isAlive = is_alive
@property
def daemon(self):
- assert self.__initialized, "Thread.__init__() not called"
- return self.__daemonic
+ assert self._initialized, "Thread.__init__() not called"
+ return self._daemonic
@daemon.setter
def daemon(self, daemonic):
- if not self.__initialized:
+ if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
- if self.__started.is_set():
+ if self._started.is_set():
raise RuntimeError("cannot set daemon status of active thread");
- self.__daemonic = daemonic
+ self._daemonic = daemonic
def isDaemon(self):
return self.daemon
@@ -767,16 +953,16 @@ class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread")
- self._Thread__started.set()
+ self._started.set()
self._set_ident()
with _active_limbo_lock:
- _active[_get_ident()] = self
+ _active[self._ident] = self
def _set_daemon(self):
return False
def _exitfunc(self):
- self._Thread__stop()
+ self._stop()
t = _pickSomeNonDaemonThread()
if t:
if __debug__:
@@ -786,7 +972,7 @@ class _MainThread(Thread):
t = _pickSomeNonDaemonThread()
if __debug__:
self._note("%s: exiting", self)
- self._Thread__delete()
+ self._delete()
def _pickSomeNonDaemonThread():
for t in enumerate():
@@ -808,49 +994,52 @@ class _DummyThread(Thread):
def __init__(self):
Thread.__init__(self, name=_newname("Dummy-%d"))
- # Thread.__block consumes an OS-level locking primitive, which
+ # Thread._block consumes an OS-level locking primitive, which
# can never be used by a _DummyThread. Since a _DummyThread
# instance is immortal, that's bad, so release this resource.
- del self._Thread__block
+ del self._block
- self._Thread__started.set()
+ self._started.set()
self._set_ident()
with _active_limbo_lock:
- _active[_get_ident()] = self
+ _active[self._ident] = self
def _set_daemon(self):
return True
+ def _stop(self):
+ pass
+
def join(self, timeout=None):
assert False, "cannot join a dummy thread"
# Global API functions
-def currentThread():
+def current_thread():
try:
return _active[_get_ident()]
except KeyError:
##print "current_thread(): no current thread for", _get_ident()
return _DummyThread()
-current_thread = currentThread
+currentThread = current_thread
-def activeCount():
+def active_count():
with _active_limbo_lock:
return len(_active) + len(_limbo)
-active_count = activeCount
+activeCount = active_count
def _enumerate():
# Same as enumerate(), but without the lock. Internal use only.
- return _active.values() + _limbo.values()
+ return list(_active.values()) + list(_limbo.values())
def enumerate():
with _active_limbo_lock:
- return _active.values() + _limbo.values()
+ return list(_active.values()) + list(_limbo.values())
-from thread import stack_size
+from _thread import stack_size
# Create the main thread object,
# and make it available for the interpreter
@@ -862,7 +1051,7 @@ _shutdown = _MainThread()._exitfunc
# module, or from the python fallback
try:
- from thread import _local as local
+ from _thread import _local as local
except ImportError:
from _threading_local import local
@@ -881,110 +1070,21 @@ def _after_fork():
new_active = {}
current = current_thread()
with _active_limbo_lock:
- for thread in _active.itervalues():
+ for thread in _active.values():
# Any lock/condition variable may be currently locked or in an
# invalid state, so we reinitialize them.
- if hasattr(thread, '_reset_internal_locks'):
- thread._reset_internal_locks()
+ thread._reset_internal_locks()
if thread is current:
# There is only one active thread. We reset the ident to
# its new value since it can have changed.
ident = _get_ident()
- thread._Thread__ident = ident
+ thread._ident = ident
new_active[ident] = thread
else:
# All the others are already stopped.
- thread._Thread__stop()
+ thread._stop()
_limbo.clear()
_active.clear()
_active.update(new_active)
assert len(_active) == 1
-
-
-# Self-test code
-
-def _test():
-
- class BoundedQueue(_Verbose):
-
- def __init__(self, limit):
- _Verbose.__init__(self)
- self.mon = RLock()
- self.rc = Condition(self.mon)
- self.wc = Condition(self.mon)
- self.limit = limit
- self.queue = _deque()
-
- def put(self, item):
- self.mon.acquire()
- while len(self.queue) >= self.limit:
- self._note("put(%s): queue full", item)
- self.wc.wait()
- self.queue.append(item)
- self._note("put(%s): appended, length now %d",
- item, len(self.queue))
- self.rc.notify()
- self.mon.release()
-
- def get(self):
- self.mon.acquire()
- while not self.queue:
- self._note("get(): queue empty")
- self.rc.wait()
- item = self.queue.popleft()
- self._note("get(): got %s, %d left", item, len(self.queue))
- self.wc.notify()
- self.mon.release()
- return item
-
- class ProducerThread(Thread):
-
- def __init__(self, queue, quota):
- Thread.__init__(self, name="Producer")
- self.queue = queue
- self.quota = quota
-
- def run(self):
- from random import random
- counter = 0
- while counter < self.quota:
- counter = counter + 1
- self.queue.put("%s.%d" % (self.name, counter))
- _sleep(random() * 0.00001)
-
-
- class ConsumerThread(Thread):
-
- def __init__(self, queue, count):
- Thread.__init__(self, name="Consumer")
- self.queue = queue
- self.count = count
-
- def run(self):
- while self.count > 0:
- item = self.queue.get()
- print item
- self.count = self.count - 1
-
- NP = 3
- QL = 4
- NI = 5
-
- Q = BoundedQueue(QL)
- P = []
- for i in range(NP):
- t = ProducerThread(Q, NI)
- t.name = ("Producer-%d" % (i+1))
- P.append(t)
- C = ConsumerThread(Q, NI*NP)
- for t in P:
- t.start()
- _sleep(0.000001)
- C.start()
- for t in P:
- t.join()
- C.join()
-
-if __name__ == '__main__':
- _test()