diff options
Diffstat (limited to 'Lib/threading.py')
-rw-r--r-- | Lib/threading.py | 776 |
1 files changed, 438 insertions, 338 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index db9ab29c7da..58ffa7ebc27 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1,53 +1,40 @@ """Thread module emulating a subset of Java's threading model.""" import sys as _sys +import _thread -try: - import thread -except ImportError: - del _sys.modules[__name__] - raise - -import warnings - -from collections import deque as _deque from time import time as _time, sleep as _sleep from traceback import format_exc as _format_exc +from _weakrefset import WeakSet -# Note regarding PEP 8 compliant aliases +# Note regarding PEP 8 compliant names # This threading model was originally inspired by Java, and inherited # the convention of camelCase function and method names from that -# language. While those names are not in any imminent danger of being -# deprecated, starting with Python 2.6, the module now provides a -# PEP 8 compliant alias for any such method name. -# Using the new PEP 8 compliant names also facilitates substitution +# language. Those originaly names are not in any imminent danger of +# being deprecated (even for Py3k),so this module provides them as an +# alias for the PEP 8 compliant names +# Note that using the new PEP 8 compliant names facilitates substitution # with the multiprocessing module, which doesn't provide the old # Java inspired names. - -# Rename some stuff so "from threading import *" is safe -__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', - 'current_thread', 'enumerate', 'Event', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', +__all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] -_start_new_thread = thread.start_new_thread -_allocate_lock = thread.allocate_lock -_get_ident = thread.get_ident -ThreadError = thread.error -del thread - +# Rename some stuff so "from threading import *" is safe +_start_new_thread = _thread.start_new_thread +_allocate_lock = _thread.allocate_lock +_get_ident = _thread.get_ident +ThreadError = _thread.error +try: + _CRLock = _thread.RLock +except AttributeError: + _CRLock = None +TIMEOUT_MAX = _thread.TIMEOUT_MAX +del _thread -# sys.exc_clear is used to work around the fact that except blocks -# don't fully clear the exception until 3.0. -warnings.filterwarnings('ignore', category=DeprecationWarning, - module='threading', message='sys.exc_clear') # Debug support (adapted from ihooks.py). -# All the major classes here derive from _Verbose. We force that to -# be a new-style class so that all the major classes here are new-style. -# This helps debugging (type(instance) is more revealing for instances -# of new-style classes). _VERBOSE = False @@ -58,10 +45,10 @@ if __debug__: def __init__(self, verbose=None): if verbose is None: verbose = _VERBOSE - self.__verbose = verbose + self._verbose = verbose def _note(self, format, *args): - if self.__verbose: + if self._verbose: format = format % args # Issue #4188: calling current_thread() can incur an infinite # recursion if it has to create a DummyThread on the fly. @@ -98,37 +85,41 @@ def settrace(func): Lock = _allocate_lock -def RLock(*args, **kwargs): - return _RLock(*args, **kwargs) +def RLock(verbose=None, *args, **kwargs): + if verbose is None: + verbose = _VERBOSE + if (__debug__ and verbose) or _CRLock is None: + return _PyRLock(verbose, *args, **kwargs) + return _CRLock(*args, **kwargs) class _RLock(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) - self.__block = _allocate_lock() - self.__owner = None - self.__count = 0 + self._block = _allocate_lock() + self._owner = None + self._count = 0 def __repr__(self): - owner = self.__owner + owner = self._owner try: owner = _active[owner].name except KeyError: pass return "<%s owner=%r count=%d>" % ( - self.__class__.__name__, owner, self.__count) + self.__class__.__name__, owner, self._count) - def acquire(self, blocking=1): + def acquire(self, blocking=True, timeout=-1): me = _get_ident() - if self.__owner == me: - self.__count = self.__count + 1 + if self._owner == me: + self._count = self._count + 1 if __debug__: self._note("%s.acquire(%s): recursive success", self, blocking) return 1 - rc = self.__block.acquire(blocking) + rc = self._block.acquire(blocking, timeout) if rc: - self.__owner = me - self.__count = 1 + self._owner = me + self._count = 1 if __debug__: self._note("%s.acquire(%s): initial success", self, blocking) else: @@ -139,12 +130,12 @@ class _RLock(_Verbose): __enter__ = acquire def release(self): - if self.__owner != _get_ident(): + if self._owner != _get_ident(): raise RuntimeError("cannot release un-acquired lock") - self.__count = count = self.__count - 1 + self._count = count = self._count - 1 if not count: - self.__owner = None - self.__block.release() + self._owner = None + self._block.release() if __debug__: self._note("%s.release(): final release", self) else: @@ -156,26 +147,26 @@ class _RLock(_Verbose): # Internal methods used by condition variables - def _acquire_restore(self, count_owner): - count, owner = count_owner - self.__block.acquire() - self.__count = count - self.__owner = owner + def _acquire_restore(self, state): + self._block.acquire() + self._count, self._owner = state if __debug__: self._note("%s._acquire_restore()", self) def _release_save(self): if __debug__: self._note("%s._release_save()", self) - count = self.__count - self.__count = 0 - owner = self.__owner - self.__owner = None - self.__block.release() + count = self._count + self._count = 0 + owner = self._owner + self._owner = None + self._block.release() return (count, owner) def _is_owned(self): - return self.__owner == _get_ident() + return self._owner == _get_ident() + +_PyRLock = _RLock def Condition(*args, **kwargs): @@ -187,7 +178,7 @@ class _Condition(_Verbose): _Verbose.__init__(self, verbose) if lock is None: lock = RLock() - self.__lock = lock + self._lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire self.release = lock.release @@ -206,28 +197,28 @@ class _Condition(_Verbose): self._is_owned = lock._is_owned except AttributeError: pass - self.__waiters = [] + self._waiters = [] def __enter__(self): - return self.__lock.__enter__() + return self._lock.__enter__() def __exit__(self, *args): - return self.__lock.__exit__(*args) + return self._lock.__exit__(*args) def __repr__(self): - return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) + return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) def _release_save(self): - self.__lock.release() # No state to save + self._lock.release() # No state to save def _acquire_restore(self, x): - self.__lock.acquire() # Ignore saved state + self._lock.acquire() # Ignore saved state def _is_owned(self): # Return True if lock is owned by current_thread. # This method is called only if __lock doesn't have _is_owned(). - if self.__lock.acquire(0): - self.__lock.release() + if self._lock.acquire(0): + self._lock.release() return False else: return True @@ -237,47 +228,63 @@ class _Condition(_Verbose): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() - self.__waiters.append(waiter) + self._waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() + gotit = True if __debug__: self._note("%s.wait(): got it", self) else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = 0.0005 # 500 us -> initial delay of 1 ms - while True: - gotit = waiter.acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) + if timeout > 0: + gotit = waiter.acquire(True, timeout) + else: + gotit = waiter.acquire(False) if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) try: - self.__waiters.remove(waiter) + self._waiters.remove(waiter) except ValueError: pass else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) + return gotit finally: self._acquire_restore(saved_state) + def wait_for(self, predicate, timeout=None): + endtime = None + waittime = timeout + result = predicate() + while not result: + if waittime is not None: + if endtime is None: + endtime = _time() + waittime + else: + waittime = endtime - _time() + if waittime <= 0: + if __debug__: + self._note("%s.wait_for(%r, %r): Timed out.", + self, predicate, timeout) + break + if __debug__: + self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.", + self, predicate, timeout, waittime) + self.wait(waittime) + result = predicate() + else: + if __debug__: + self._note("%s.wait_for(%r, %r): Success.", + self, predicate, timeout) + return result + def notify(self, n=1): if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") - __waiters = self.__waiters + __waiters = self._waiters waiters = __waiters[:n] if not waiters: if __debug__: @@ -292,10 +299,10 @@ class _Condition(_Verbose): except ValueError: pass - def notifyAll(self): - self.notify(len(self.__waiters)) + def notify_all(self): + self.notify(len(self._waiters)) - notify_all = notifyAll + notifyAll = notify_all def Semaphore(*args, **kwargs): @@ -309,38 +316,48 @@ class _Semaphore(_Verbose): if value < 0: raise ValueError("semaphore initial value must be >= 0") _Verbose.__init__(self, verbose) - self.__cond = Condition(Lock()) - self.__value = value + self._cond = Condition(Lock()) + self._value = value - def acquire(self, blocking=1): + def acquire(self, blocking=True, timeout=None): + if not blocking and timeout is not None: + raise ValueError("can't specify timeout for non-blocking acquire") rc = False - self.__cond.acquire() - while self.__value == 0: + endtime = None + self._cond.acquire() + while self._value == 0: if not blocking: break if __debug__: self._note("%s.acquire(%s): blocked waiting, value=%s", - self, blocking, self.__value) - self.__cond.wait() + self, blocking, self._value) + if timeout is not None: + if endtime is None: + endtime = _time() + timeout + else: + timeout = endtime - _time() + if timeout <= 0: + break + self._cond.wait(timeout) else: - self.__value = self.__value - 1 + self._value = self._value - 1 if __debug__: self._note("%s.acquire: success, value=%s", - self, self.__value) + self, self._value) rc = True - self.__cond.release() + self._cond.release() return rc __enter__ = acquire def release(self): - self.__cond.acquire() - self.__value = self.__value + 1 + self._cond.acquire() + self._value = self._value + 1 if __debug__: self._note("%s.release: success, value=%s", - self, self.__value) - self.__cond.notify() - self.__cond.release() + self, self._value) + self._cond.notify() + self._cond.release() def __exit__(self, t, v, tb): self.release() @@ -356,8 +373,8 @@ class _BoundedSemaphore(_Semaphore): self._initial_value = value def release(self): - if self._Semaphore__value >= self._initial_value: - raise ValueError, "Semaphore released too many times" + if self._value >= self._initial_value: + raise ValueError("Semaphore released too many times") return _Semaphore.release(self) @@ -370,41 +387,213 @@ class _Event(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) - self.__cond = Condition(Lock()) - self.__flag = False + self._cond = Condition(Lock()) + self._flag = False def _reset_internal_locks(self): # private! called by Thread._reset_internal_locks by _after_fork() - self.__cond.__init__() + self._cond.__init__() - def isSet(self): - return self.__flag + def is_set(self): + return self._flag - is_set = isSet + isSet = is_set def set(self): - self.__cond.acquire() + self._cond.acquire() try: - self.__flag = True - self.__cond.notify_all() + self._flag = True + self._cond.notify_all() finally: - self.__cond.release() + self._cond.release() def clear(self): - self.__cond.acquire() + self._cond.acquire() try: - self.__flag = False + self._flag = False finally: - self.__cond.release() + self._cond.release() def wait(self, timeout=None): - self.__cond.acquire() + self._cond.acquire() try: - if not self.__flag: - self.__cond.wait(timeout) - return self.__flag + signaled = self._flag + if not signaled: + signaled = self._cond.wait(timeout) + return signaled finally: - self.__cond.release() + self._cond.release() + + +# A barrier class. Inspired in part by the pthread_barrier_* api and +# the CyclicBarrier class from Java. See +# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and +# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ +# CyclicBarrier.html +# for information. +# We maintain two main states, 'filling' and 'draining' enabling the barrier +# to be cyclic. Threads are not allowed into it until it has fully drained +# since the previous cycle. In addition, a 'resetting' state exists which is +# similar to 'draining' except that threads leave with a BrokenBarrierError, +# and a 'broken' state in which all threads get the exception. +class Barrier(_Verbose): + """ + Barrier. Useful for synchronizing a fixed number of threads + at known synchronization points. Threads block on 'wait()' and are + simultaneously once they have all made that call. + """ + def __init__(self, parties, action=None, timeout=None, verbose=None): + """ + Create a barrier, initialised to 'parties' threads. + 'action' is a callable which, when supplied, will be called + by one of the threads after they have all entered the + barrier and just prior to releasing them all. + If a 'timeout' is provided, it is uses as the default for + all subsequent 'wait()' calls. + """ + _Verbose.__init__(self, verbose) + self._cond = Condition(Lock()) + self._action = action + self._timeout = timeout + self._parties = parties + self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken + self._count = 0 + + def wait(self, timeout=None): + """ + Wait for the barrier. When the specified number of threads have + started waiting, they are all simultaneously awoken. If an 'action' + was provided for the barrier, one of the threads will have executed + that callback prior to returning. + Returns an individual index number from 0 to 'parties-1'. + """ + if timeout is None: + timeout = self._timeout + with self._cond: + self._enter() # Block while the barrier drains. + index = self._count + self._count += 1 + try: + if index + 1 == self._parties: + # We release the barrier + self._release() + else: + # We wait until someone releases us + self._wait(timeout) + return index + finally: + self._count -= 1 + # Wake up any threads waiting for barrier to drain. + self._exit() + + # Block until the barrier is ready for us, or raise an exception + # if it is broken. + def _enter(self): + while self._state in (-1, 1): + # It is draining or resetting, wait until done + self._cond.wait() + #see if the barrier is in a broken state + if self._state < 0: + raise BrokenBarrierError + assert self._state == 0 + + # Optionally run the 'action' and release the threads waiting + # in the barrier. + def _release(self): + try: + if self._action: + self._action() + # enter draining state + self._state = 1 + self._cond.notify_all() + except: + #an exception during the _action handler. Break and reraise + self._break() + raise + + # Wait in the barrier until we are relased. Raise an exception + # if the barrier is reset or broken. + def _wait(self, timeout): + if not self._cond.wait_for(lambda : self._state != 0, timeout): + #timed out. Break the barrier + self._break() + raise BrokenBarrierError + if self._state < 0: + raise BrokenBarrierError + assert self._state == 1 + + # If we are the last thread to exit the barrier, signal any threads + # waiting for the barrier to drain. + def _exit(self): + if self._count == 0: + if self._state in (-1, 1): + #resetting or draining + self._state = 0 + self._cond.notify_all() + + def reset(self): + """ + Reset the barrier to the initial state. + Any threads currently waiting will get the BrokenBarrier exception + raised. + """ + with self._cond: + if self._count > 0: + if self._state == 0: + #reset the barrier, waking up threads + self._state = -1 + elif self._state == -2: + #was broken, set it to reset state + #which clears when the last thread exits + self._state = -1 + else: + self._state = 0 + self._cond.notify_all() + + def abort(self): + """ + Place the barrier into a 'broken' state. + Useful in case of error. Any currently waiting threads and + threads attempting to 'wait()' will have BrokenBarrierError + raised. + """ + with self._cond: + self._break() + + def _break(self): + # An internal error was detected. The barrier is set to + # a broken state all parties awakened. + self._state = -2 + self._cond.notify_all() + + @property + def parties(self): + """ + Return the number of threads required to trip the barrier. + """ + return self._parties + + @property + def n_waiting(self): + """ + Return the number of threads that are currently waiting at the barrier. + """ + # We don't need synchronization here since this is an ephemeral result + # anyway. It returns the correct value in the steady state. + if self._state == 0: + return self._count + return 0 + + @property + def broken(self): + """ + Return True if the barrier is in a broken state + """ + return self._state == -2 + +#exception raised by the Barrier class +class BrokenBarrierError(RuntimeError): pass + # Helper to generate new thread names _counter = 0 @@ -418,6 +607,8 @@ _active_limbo_lock = _allocate_lock() _active = {} # maps thread id to Thread object _limbo = {} +# For debug and leak testing +_dangling = WeakSet() # Main class for threads @@ -431,7 +622,7 @@ class Thread(_Verbose): __exc_info = _sys.exc_info # Keep sys.exc_clear too to clear the exception just before # allowing .join() to return. - __exc_clear = _sys.exc_clear + #XXX __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): @@ -439,125 +630,122 @@ class Thread(_Verbose): _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} - self.__target = target - self.__name = str(name or _newname()) - self.__args = args - self.__kwargs = kwargs - self.__daemonic = self._set_daemon() - self.__ident = None - self.__started = Event() - self.__stopped = False - self.__block = Condition(Lock()) - self.__initialized = True + self._target = target + self._name = str(name or _newname()) + self._args = args + self._kwargs = kwargs + self._daemonic = self._set_daemon() + self._ident = None + self._started = Event() + self._stopped = False + self._block = Condition(Lock()) + self._initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances - self.__stderr = _sys.stderr + self._stderr = _sys.stderr + _dangling.add(self) def _reset_internal_locks(self): # private! Called by _after_fork() to reset our internal locks as # they may be in an invalid state leading to a deadlock or crash. - if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block - self.__block.__init__() - self.__started._reset_internal_locks() - - @property - def _block(self): - # used by a unittest - return self.__block + if hasattr(self, '_block'): # DummyThread deletes _block + self._block.__init__() + self._started._reset_internal_locks() def _set_daemon(self): # Overridden in _MainThread and _DummyThread return current_thread().daemon def __repr__(self): - assert self.__initialized, "Thread.__init__() was not called" + assert self._initialized, "Thread.__init__() was not called" status = "initial" - if self.__started.is_set(): + if self._started.is_set(): status = "started" - if self.__stopped: + if self._stopped: status = "stopped" - if self.__daemonic: + if self._daemonic: status += " daemon" - if self.__ident is not None: - status += " %s" % self.__ident - return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) + if self._ident is not None: + status += " %s" % self._ident + return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) def start(self): - if not self.__initialized: + if not self._initialized: raise RuntimeError("thread.__init__() not called") - if self.__started.is_set(): + + if self._started.is_set(): raise RuntimeError("threads can only be started once") if __debug__: self._note("%s.start(): starting thread", self) with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self.__bootstrap, ()) + _start_new_thread(self._bootstrap, ()) except Exception: with _active_limbo_lock: del _limbo[self] raise - self.__started.wait() + self._started.wait() def run(self): try: - if self.__target: - self.__target(*self.__args, **self.__kwargs) + if self._target: + self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. - del self.__target, self.__args, self.__kwargs + del self._target, self._args, self._kwargs - def __bootstrap(self): + def _bootstrap(self): # Wrapper around the real bootstrap code that ignores # exceptions during interpreter cleanup. Those typically # happen when a daemon thread wakes up at an unfortunate # moment, finds the world around it destroyed, and raises some # random exception *** while trying to report the exception in - # __bootstrap_inner() below ***. Those random exceptions + # _bootstrap_inner() below ***. Those random exceptions # don't help anybody, and they confuse users, so we suppress # them. We suppress them only when it appears that the world # indeed has already been destroyed, so that exceptions in - # __bootstrap_inner() during normal business hours are properly + # _bootstrap_inner() during normal business hours are properly # reported. Also, we only suppress them for daemonic threads; # if a non-daemonic encounters this, something else is wrong. try: - self.__bootstrap_inner() + self._bootstrap_inner() except: - if self.__daemonic and _sys is None: + if self._daemonic and _sys is None: return raise def _set_ident(self): - self.__ident = _get_ident() + self._ident = _get_ident() - def __bootstrap_inner(self): + def _bootstrap_inner(self): try: self._set_ident() - self.__started.set() + self._started.set() with _active_limbo_lock: - _active[self.__ident] = self + _active[self._ident] = self del _limbo[self] if __debug__: - self._note("%s.__bootstrap(): thread started", self) + self._note("%s._bootstrap(): thread started", self) if _trace_hook: - self._note("%s.__bootstrap(): registering trace hook", self) + self._note("%s._bootstrap(): registering trace hook", self) _sys.settrace(_trace_hook) if _profile_hook: - self._note("%s.__bootstrap(): registering profile hook", self) + self._note("%s._bootstrap(): registering profile hook", self) _sys.setprofile(_profile_hook) try: self.run() except SystemExit: if __debug__: - self._note("%s.__bootstrap(): raised SystemExit", self) + self._note("%s._bootstrap(): raised SystemExit", self) except: if __debug__: - self._note("%s.__bootstrap(): unhandled exception", self) + self._note("%s._bootstrap(): unhandled exception", self) # If sys.stderr is no more (most likely from interpreter - # shutdown) use self.__stderr. Otherwise still use sys (as in + # shutdown) use self._stderr. Otherwise still use sys (as in # _sys) in case sys.stderr was redefined since the creation of # self. if _sys: @@ -567,69 +755,66 @@ class Thread(_Verbose): # Do the best job possible w/o a huge amt. of code to # approximate a traceback (code ideas from # Lib/traceback.py) - exc_type, exc_value, exc_tb = self.__exc_info() + exc_type, exc_value, exc_tb = self._exc_info() try: - print>>self.__stderr, ( + print(( "Exception in thread " + self.name + - " (most likely raised during interpreter shutdown):") - print>>self.__stderr, ( - "Traceback (most recent call last):") + " (most likely raised during interpreter shutdown):"), file=self._stderr) + print(( + "Traceback (most recent call last):"), file=self._stderr) while exc_tb: - print>>self.__stderr, ( + print(( ' File "%s", line %s, in %s' % (exc_tb.tb_frame.f_code.co_filename, exc_tb.tb_lineno, - exc_tb.tb_frame.f_code.co_name)) + exc_tb.tb_frame.f_code.co_name)), file=self._stderr) exc_tb = exc_tb.tb_next - print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) + print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) # Make sure that exc_tb gets deleted since it is a memory # hog; deleting everything else is just for thoroughness finally: del exc_type, exc_value, exc_tb else: if __debug__: - self._note("%s.__bootstrap(): normal return", self) + self._note("%s._bootstrap(): normal return", self) finally: # Prevent a race in # test_threading.test_no_refcycle_through_target when # the exception keeps the target alive past when we # assert that it's dead. - self.__exc_clear() + #XXX self.__exc_clear() + pass finally: with _active_limbo_lock: - self.__stop() + self._stop() try: - # We don't call self.__delete() because it also + # We don't call self._delete() because it also # grabs _active_limbo_lock. del _active[_get_ident()] except: pass - def __stop(self): - # DummyThreads delete self.__block, but they have no waiters to - # notify anyway (join() is forbidden on them). - if not hasattr(self, '_Thread__block'): - return - self.__block.acquire() - self.__stopped = True - self.__block.notify_all() - self.__block.release() + def _stop(self): + self._block.acquire() + self._stopped = True + self._block.notify_all() + self._block.release() - def __delete(self): + def _delete(self): "Remove current thread from the dict of currently running threads." - # Notes about running with dummy_thread: + # Notes about running with _dummy_thread: # - # Must take care to not raise an exception if dummy_thread is being + # Must take care to not raise an exception if _dummy_thread is being # used (and thus this module is being used as an instance of - # dummy_threading). dummy_thread.get_ident() always returns -1 since - # there is only one thread if dummy_thread is being used. Thus + # dummy_threading). _dummy_thread.get_ident() always returns -1 since + # there is only one thread if _dummy_thread is being used. Thus # len(_active) is always <= 1 here, and any Thread instance created # overwrites the (if any) thread currently registered in _active. # # An instance of _MainThread is always created by 'threading'. This # gets overwritten the instant an instance of Thread is created; both - # threads return -1 from dummy_thread.get_ident() and thus have the + # threads return -1 from _dummy_thread.get_ident() and thus have the # same key in the dict. So when the _MainThread instance created by # 'threading' tries to clean itself up when atexit calls this method # it gets a KeyError if another Thread instance was created. @@ -651,71 +836,72 @@ class Thread(_Verbose): raise def join(self, timeout=None): - if not self.__initialized: + if not self._initialized: raise RuntimeError("Thread.__init__() not called") - if not self.__started.is_set(): + if not self._started.is_set(): raise RuntimeError("cannot join thread before it is started") if self is current_thread(): raise RuntimeError("cannot join current thread") if __debug__: - if not self.__stopped: + if not self._stopped: self._note("%s.join(): waiting until thread stops", self) - self.__block.acquire() + + self._block.acquire() try: if timeout is None: - while not self.__stopped: - self.__block.wait() + while not self._stopped: + self._block.wait() if __debug__: self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout - while not self.__stopped: + while not self._stopped: delay = deadline - _time() if delay <= 0: if __debug__: self._note("%s.join(): timed out", self) break - self.__block.wait(delay) + self._block.wait(delay) else: if __debug__: self._note("%s.join(): thread stopped", self) finally: - self.__block.release() + self._block.release() @property def name(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__name + assert self._initialized, "Thread.__init__() not called" + return self._name @name.setter def name(self, name): - assert self.__initialized, "Thread.__init__() not called" - self.__name = str(name) + assert self._initialized, "Thread.__init__() not called" + self._name = str(name) @property def ident(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__ident + assert self._initialized, "Thread.__init__() not called" + return self._ident - def isAlive(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__started.is_set() and not self.__stopped + def is_alive(self): + assert self._initialized, "Thread.__init__() not called" + return self._started.is_set() and not self._stopped - is_alive = isAlive + isAlive = is_alive @property def daemon(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__daemonic + assert self._initialized, "Thread.__init__() not called" + return self._daemonic @daemon.setter def daemon(self, daemonic): - if not self.__initialized: + if not self._initialized: raise RuntimeError("Thread.__init__() not called") - if self.__started.is_set(): + if self._started.is_set(): raise RuntimeError("cannot set daemon status of active thread"); - self.__daemonic = daemonic + self._daemonic = daemonic def isDaemon(self): return self.daemon @@ -767,16 +953,16 @@ class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread") - self._Thread__started.set() + self._started.set() self._set_ident() with _active_limbo_lock: - _active[_get_ident()] = self + _active[self._ident] = self def _set_daemon(self): return False def _exitfunc(self): - self._Thread__stop() + self._stop() t = _pickSomeNonDaemonThread() if t: if __debug__: @@ -786,7 +972,7 @@ class _MainThread(Thread): t = _pickSomeNonDaemonThread() if __debug__: self._note("%s: exiting", self) - self._Thread__delete() + self._delete() def _pickSomeNonDaemonThread(): for t in enumerate(): @@ -808,49 +994,52 @@ class _DummyThread(Thread): def __init__(self): Thread.__init__(self, name=_newname("Dummy-%d")) - # Thread.__block consumes an OS-level locking primitive, which + # Thread._block consumes an OS-level locking primitive, which # can never be used by a _DummyThread. Since a _DummyThread # instance is immortal, that's bad, so release this resource. - del self._Thread__block + del self._block - self._Thread__started.set() + self._started.set() self._set_ident() with _active_limbo_lock: - _active[_get_ident()] = self + _active[self._ident] = self def _set_daemon(self): return True + def _stop(self): + pass + def join(self, timeout=None): assert False, "cannot join a dummy thread" # Global API functions -def currentThread(): +def current_thread(): try: return _active[_get_ident()] except KeyError: ##print "current_thread(): no current thread for", _get_ident() return _DummyThread() -current_thread = currentThread +currentThread = current_thread -def activeCount(): +def active_count(): with _active_limbo_lock: return len(_active) + len(_limbo) -active_count = activeCount +activeCount = active_count def _enumerate(): # Same as enumerate(), but without the lock. Internal use only. - return _active.values() + _limbo.values() + return list(_active.values()) + list(_limbo.values()) def enumerate(): with _active_limbo_lock: - return _active.values() + _limbo.values() + return list(_active.values()) + list(_limbo.values()) -from thread import stack_size +from _thread import stack_size # Create the main thread object, # and make it available for the interpreter @@ -862,7 +1051,7 @@ _shutdown = _MainThread()._exitfunc # module, or from the python fallback try: - from thread import _local as local + from _thread import _local as local except ImportError: from _threading_local import local @@ -881,110 +1070,21 @@ def _after_fork(): new_active = {} current = current_thread() with _active_limbo_lock: - for thread in _active.itervalues(): + for thread in _active.values(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. - if hasattr(thread, '_reset_internal_locks'): - thread._reset_internal_locks() + thread._reset_internal_locks() if thread is current: # There is only one active thread. We reset the ident to # its new value since it can have changed. ident = _get_ident() - thread._Thread__ident = ident + thread._ident = ident new_active[ident] = thread else: # All the others are already stopped. - thread._Thread__stop() + thread._stop() _limbo.clear() _active.clear() _active.update(new_active) assert len(_active) == 1 - - -# Self-test code - -def _test(): - - class BoundedQueue(_Verbose): - - def __init__(self, limit): - _Verbose.__init__(self) - self.mon = RLock() - self.rc = Condition(self.mon) - self.wc = Condition(self.mon) - self.limit = limit - self.queue = _deque() - - def put(self, item): - self.mon.acquire() - while len(self.queue) >= self.limit: - self._note("put(%s): queue full", item) - self.wc.wait() - self.queue.append(item) - self._note("put(%s): appended, length now %d", - item, len(self.queue)) - self.rc.notify() - self.mon.release() - - def get(self): - self.mon.acquire() - while not self.queue: - self._note("get(): queue empty") - self.rc.wait() - item = self.queue.popleft() - self._note("get(): got %s, %d left", item, len(self.queue)) - self.wc.notify() - self.mon.release() - return item - - class ProducerThread(Thread): - - def __init__(self, queue, quota): - Thread.__init__(self, name="Producer") - self.queue = queue - self.quota = quota - - def run(self): - from random import random - counter = 0 - while counter < self.quota: - counter = counter + 1 - self.queue.put("%s.%d" % (self.name, counter)) - _sleep(random() * 0.00001) - - - class ConsumerThread(Thread): - - def __init__(self, queue, count): - Thread.__init__(self, name="Consumer") - self.queue = queue - self.count = count - - def run(self): - while self.count > 0: - item = self.queue.get() - print item - self.count = self.count - 1 - - NP = 3 - QL = 4 - NI = 5 - - Q = BoundedQueue(QL) - P = [] - for i in range(NP): - t = ProducerThread(Q, NI) - t.name = ("Producer-%d" % (i+1)) - P.append(t) - C = ConsumerThread(Q, NI*NP) - for t in P: - t.start() - _sleep(0.000001) - C.start() - for t in P: - t.join() - C.join() - -if __name__ == '__main__': - _test() |