diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/__init__.py | 1 | ||||
-rw-r--r-- | Lib/multiprocessing/connection.py | 60 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 10 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/connection.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 51 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 6 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 53 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 63 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 41 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 11 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 11 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 64 |
13 files changed, 226 insertions, 159 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 2e91e8eb6eb..e6e16c83224 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -80,7 +80,6 @@ class TimeoutError(ProcessError): class AuthenticationError(ProcessError): pass -# This is down here because _multiprocessing uses BufferTooShort import _multiprocessing # diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 4421ac5cfd7..4fa6f70bf3f 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -90,10 +90,21 @@ def arbitrary_address(family): return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) elif family == 'AF_PIPE': return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % - (os.getpid(), _mmap_counter.next())) + (os.getpid(), next(_mmap_counter))) else: raise ValueError('unrecognized family') +def _validate_family(family): + ''' + Checks if the family is valid for the current environment. + ''' + if sys.platform != 'win32' and family == 'AF_PIPE': + raise ValueError('Family %s is not recognized.' % family) + + if sys.platform == 'win32' and family == 'AF_UNIX': + # double check + if not hasattr(socket, family): + raise ValueError('Family %s is not recognized.' % family) def address_type(address): ''' @@ -126,13 +137,14 @@ class Listener(object): or default_family address = address or arbitrary_address(family) + _validate_family(family) if family == 'AF_PIPE': self._listener = PipeListener(address, backlog) else: self._listener = SocketListener(address, family, backlog) if authkey is not None and not isinstance(authkey, bytes): - raise TypeError, 'authkey should be a byte string' + raise TypeError('authkey should be a byte string') self._authkey = authkey @@ -163,13 +175,14 @@ def Client(address, family=None, authkey=None): Returns a connection to the address of a `Listener` ''' family = family or address_type(address) + _validate_family(family) if family == 'AF_PIPE': c = PipeClient(address) else: c = SocketClient(address) if authkey is not None and not isinstance(authkey, bytes): - raise TypeError, 'authkey should be a byte string' + raise TypeError('authkey should be a byte string') if authkey is not None: answer_challenge(c, authkey) @@ -232,7 +245,7 @@ else: try: win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError, e: + except WindowsError as e: if e.args[0] != win32.ERROR_PIPE_CONNECTED: raise @@ -289,26 +302,25 @@ def SocketClient(address): Return a connection object connected to the socket given by `address` ''' family = address_type(address) - s = socket.socket( getattr(socket, family) ) - s.setblocking(True) - t = _init_timeout() + with socket.socket( getattr(socket, family) ) as s: + s.setblocking(True) + t = _init_timeout() - while 1: - try: - s.connect(address) - except socket.error, e: - if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): - debug('failed to connect to address %s', address) - raise - time.sleep(0.01) + while 1: + try: + s.connect(address) + except socket.error as e: + if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): + debug('failed to connect to address %s', address) + raise + time.sleep(0.01) + else: + break else: - break - else: - raise + raise - fd = duplicate(s.fileno()) + fd = duplicate(s.fileno()) conn = _multiprocessing.Connection(fd) - s.close() return conn # @@ -352,7 +364,7 @@ if sys.platform == 'win32': handle = self._handle_queue.pop(0) try: win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError, e: + except WindowsError as e: # ERROR_NO_DATA can occur if a client has already connected, # written data and then disconnected -- see Issue 14725. if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, @@ -378,7 +390,7 @@ if sys.platform == 'win32': address, win32.GENERIC_READ | win32.GENERIC_WRITE, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL ) - except WindowsError, e: + except WindowsError as e: if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, win32.ERROR_PIPE_BUSY) or _check_timeout(t): raise @@ -456,11 +468,11 @@ def _xml_loads(s): class XmlListener(Listener): def accept(self): global xmlrpclib - import xmlrpclib + import xmlrpc.client as xmlrpclib obj = Listener.accept(self) return ConnectionWrapper(obj, _xml_dumps, _xml_loads) def XmlClient(*args, **kwds): global xmlrpclib - import xmlrpclib + import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index e3b126e9796..101c3cba4d7 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -52,7 +52,7 @@ from multiprocessing import TimeoutError, cpu_count from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore from threading import Event -from Queue import Queue +from queue import Queue # # @@ -86,7 +86,11 @@ class DummyProcess(threading.Thread): # class Condition(threading._Condition): - notify_all = threading._Condition.notify_all.im_func + # XXX + if sys.version_info < (3, 0): + notify_all = threading._Condition.notify_all.__func__ + else: + notify_all = threading._Condition.notify_all # # @@ -114,7 +118,7 @@ class Namespace(object): def __init__(self, **kwds): self.__dict__.update(kwds) def __repr__(self): - items = self.__dict__.items() + items = list(self.__dict__.items()) temp = [] for name, value in items: if not name.startswith('_'): diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index 50dc9ffe736..af105794f15 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -34,7 +34,7 @@ __all__ = [ 'Client', 'Listener', 'Pipe' ] -from Queue import Queue +from queue import Queue families = [None] diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 55980ffdcad..bc8ac44c22e 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -55,34 +55,35 @@ def assert_spawning(self): # Try making some callable types picklable # -from pickle import Pickler +from pickle import _Pickler as Pickler class ForkingPickler(Pickler): dispatch = Pickler.dispatch.copy() - @classmethod def register(cls, type, reduce): def dispatcher(self, obj): rv = reduce(obj) - self.save_reduce(obj=obj, *rv) + if isinstance(rv, str): + self.save_global(obj, rv) + else: + self.save_reduce(obj=obj, *rv) cls.dispatch[type] = dispatcher def _reduce_method(m): - if m.im_self is None: - return getattr, (m.im_class, m.im_func.func_name) + if m.__self__ is None: + return getattr, (m.__class__, m.__func__.__name__) else: - return getattr, (m.im_self, m.im_func.func_name) -ForkingPickler.register(type(ForkingPickler.save), _reduce_method) + return getattr, (m.__self__, m.__func__.__name__) +class _C: + def f(self): + pass +ForkingPickler.register(type(_C().f), _reduce_method) + def _reduce_method_descriptor(m): return getattr, (m.__objclass__, m.__name__) ForkingPickler.register(type(list.append), _reduce_method_descriptor) ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) -#def _reduce_builtin_function_or_method(m): -# return getattr, (m.__self__, m.__name__) -#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method) -#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method) - try: from functools import partial except ImportError: @@ -123,8 +124,6 @@ if sys.platform != 'win32': import random random.seed() code = process_obj._bootstrap() - sys.stdout.flush() - sys.stderr.flush() os._exit(code) def poll(self, flag=os.WNOHANG): @@ -163,7 +162,7 @@ if sys.platform != 'win32': if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) - except OSError, e: + except OSError as e: if self.wait(timeout=0.1) is None: raise @@ -176,19 +175,15 @@ if sys.platform != 'win32': # else: - import thread + import _thread import msvcrt import _subprocess import time + from pickle import dump, load, HIGHEST_PROTOCOL from _multiprocessing import win32, Connection, PipeConnection from .util import Finalize - #try: - # from cPickle import dump, load, HIGHEST_PROTOCOL - #except ImportError: - from pickle import load, HIGHEST_PROTOCOL - def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) @@ -238,7 +233,7 @@ else: ''' Start a subprocess to run the code of a process object ''' - _tls = thread._local() + _tls = _thread._local() def __init__(self, process_obj): # create pipe for communication with child @@ -463,12 +458,20 @@ def prepare(data): process.ORIGINAL_DIR = data['orig_dir'] if 'main_path' in data: + # XXX (ncoghlan): The following code makes several bogus + # assumptions regarding the relationship between __file__ + # and a module's real name. See PEP 302 and issue #10845 main_path = data['main_path'] main_name = os.path.splitext(os.path.basename(main_path))[0] if main_name == '__init__': main_name = os.path.basename(os.path.dirname(main_path)) - if main_name != 'ipython': + if main_name == '__main__': + main_module = sys.modules['__main__'] + main_module.__file__ = main_path + elif main_name != 'ipython': + # Main modules not actually called __main__.py may + # contain additional code that should still be executed import imp if main_path is None: @@ -497,7 +500,7 @@ def prepare(data): # Try to make the potentially picklable objects in # sys.modules['__main__'] realize they are in the main # module -- somewhat ugly. - for obj in main_module.__dict__.values(): + for obj in list(main_module.__dict__.values()): try: if obj.__module__ == '__parents_main__': obj.__module__ = '__main__' diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index a1f37118d5a..0a25ef05c7f 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -60,7 +60,7 @@ if sys.platform == 'win32': def __init__(self, size): self.size = size - self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next()) + self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter)) self.buffer = mmap.mmap(-1, self.size, tagname=self.name) assert win32.GetLastError() == 0, 'tagname already in use' self._state = (self.size, self.name) @@ -213,7 +213,7 @@ class Heap(object): def malloc(self, size): # return a block of right size (possibly rounded up) - assert 0 <= size < sys.maxint + assert 0 <= size < sys.maxsize if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork self._lock.acquire() @@ -239,7 +239,7 @@ class BufferWrapper(object): _heap = Heap() def __init__(self, size): - assert 0 <= size < sys.maxint + assert 0 <= size < sys.maxsize block = BufferWrapper._heap.malloc(size) self._state = (block, size) Finalize(self, BufferWrapper._heap.free, args=(block,)) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index ffe5812b89d..5588ead1169 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -44,28 +44,31 @@ import sys import weakref import threading import array -import Queue +import queue from traceback import format_exc +from pickle import PicklingError from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler from multiprocessing.util import Finalize, info -try: - from cPickle import PicklingError -except ImportError: - from pickle import PicklingError - # # Register some things for pickling # def reduce_array(a): - return array.array, (a.typecode, a.tostring()) + return array.array, (a.typecode, a.tobytes()) ForkingPickler.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] +if view_types[0] is not list: # only needed in Py3.0 + def rebuild_as_list(obj): + return list, (list(obj),) + for view_type in view_types: + ForkingPickler.register(view_type, rebuild_as_list) + import copyreg + copyreg.pickle(view_type, rebuild_as_list) # # Type for identifying shared objects @@ -131,7 +134,7 @@ def all_methods(obj): temp = [] for name in dir(obj): func = getattr(obj, name) - if hasattr(func, '__call__'): + if callable(func): temp.append(name) return temp @@ -211,7 +214,7 @@ class Server(object): msg = ('#RETURN', result) try: c.send(msg) - except Exception, e: + except Exception as e: try: c.send(('#TRACEBACK', format_exc())) except Exception: @@ -251,7 +254,7 @@ class Server(object): try: res = function(*args, **kwds) - except Exception, e: + except Exception as e: msg = ('#ERROR', e) else: typeid = gettypeid and gettypeid.get(methodname, None) @@ -286,9 +289,9 @@ class Server(object): try: try: send(msg) - except Exception, e: + except Exception as e: send(('#UNSERIALIZABLE', repr(msg))) - except Exception, e: + except Exception as e: util.info('exception in thread serving %r', threading.current_thread().name) util.info(' ... message was %r', msg) @@ -321,7 +324,7 @@ class Server(object): self.mutex.acquire() try: result = [] - keys = self.id_to_obj.keys() + keys = list(self.id_to_obj.keys()) keys.sort() for ident in keys: if ident != '0': @@ -507,7 +510,7 @@ class BaseManager(object): ''' assert self._state.value == State.INITIAL - if initializer is not None and not hasattr(initializer, '__call__'): + if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') # pipe over which we will retrieve address of server @@ -653,7 +656,7 @@ class BaseManager(object): getattr(proxytype, '_method_to_typeid_', None) if method_to_typeid: - for key, value in method_to_typeid.items(): + for key, value in list(method_to_typeid.items()): assert type(key) is str, '%r is not a string' % key assert type(value) is str, '%r is not a string' % value @@ -805,7 +808,7 @@ class BaseProxy(object): util.debug('DECREF %r', token.id) conn = _Client(token.address, authkey=authkey) dispatch(conn, None, 'decref', (token.id,)) - except Exception, e: + except Exception as e: util.debug('... decref failed %s', e) else: @@ -823,7 +826,7 @@ class BaseProxy(object): self._manager = None try: self._incref() - except Exception, e: + except Exception as e: # the proxy may just be for a manager which has shutdown util.info('incref failed: %s' % e) @@ -894,8 +897,8 @@ def MakeProxyType(name, exposed, _cache={}): dic = {} for meth in exposed: - exec '''def %s(self, *args, **kwds): - return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic + exec('''def %s(self, *args, **kwds): + return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) ProxyType = type(name, (BaseProxy,), dic) ProxyType._exposed_ = exposed @@ -936,7 +939,7 @@ class Namespace(object): def __init__(self, **kwds): self.__dict__.update(kwds) def __repr__(self): - items = self.__dict__.items() + items = list(self.__dict__.items()) temp = [] for name, value in items: if not name.startswith('_'): @@ -964,14 +967,11 @@ def Array(typecode, sequence, lock=True): # class IteratorProxy(BaseProxy): - # XXX remove methods for Py3.0 and Py2.6 - _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') + _exposed_ = ('__next__', 'send', 'throw', 'close') def __iter__(self): return self def __next__(self, *args): return self._callmethod('__next__', args) - def next(self, *args): - return self._callmethod('next', args) def send(self, *args): return self._callmethod('send', args) def throw(self, *args): @@ -993,7 +993,6 @@ class AcquirerProxy(BaseProxy): class ConditionProxy(AcquirerProxy): - # XXX will Condition.notfyAll() name be available in Py3.0? _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') def wait(self, timeout=None): return self._callmethod('wait', (timeout,)) @@ -1095,8 +1094,8 @@ class SyncManager(BaseManager): this class. ''' -SyncManager.register('Queue', Queue.Queue) -SyncManager.register('JoinableQueue', Queue.Queue) +SyncManager.register('Queue', queue.Queue) +SyncManager.register('JoinableQueue', queue.Queue) SyncManager.register('Event', threading.Event, EventProxy) SyncManager.register('Lock', threading.Lock, AcquirerProxy) SyncManager.register('RLock', threading.RLock, AcquirerProxy) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 00c904a8a73..7502ff89f43 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -39,7 +39,7 @@ __all__ = ['Pool'] # import threading -import Queue +import queue import itertools import collections import time @@ -62,7 +62,7 @@ TERMINATE = 2 job_counter = itertools.count() def mapstar(args): - return map(*args) + return list(map(*args)) # # Code run by worker processes @@ -111,7 +111,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) - except Exception, e: + except Exception as e: result = (False, e) try: put((job, i, result)) @@ -129,14 +129,14 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): class Pool(object): ''' - Class which supports an async version of the `apply()` builtin + Class which supports an async version of applying functions to arguments. ''' Process = Process def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): self._setup_queues() - self._taskqueue = Queue.Queue() + self._taskqueue = queue.Queue() self._cache = {} self._state = RUN self._maxtasksperchild = maxtasksperchild @@ -151,7 +151,7 @@ class Pool(object): if processes < 1: raise ValueError("Number of processes must be at least 1") - if initializer is not None and not hasattr(initializer, '__call__'): + if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') self._processes = processes @@ -237,21 +237,22 @@ class Pool(object): def apply(self, func, args=(), kwds={}): ''' - Equivalent of `apply()` builtin + Equivalent of `func(*args, **kwds)`. ''' assert self._state == RUN return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): ''' - Equivalent of `map()` builtin + Apply `func` to each element in `iterable`, collecting the results + in a list that is returned. ''' assert self._state == RUN return self.map_async(func, iterable, chunksize).get() def imap(self, func, iterable, chunksize=1): ''' - Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` + Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' assert self._state == RUN if chunksize == 1: @@ -269,7 +270,7 @@ class Pool(object): def imap_unordered(self, func, iterable, chunksize=1): ''' - Like `imap()` method but ordering of results is arbitrary + Like `imap()` method but ordering of results is arbitrary. ''' assert self._state == RUN if chunksize == 1: @@ -285,18 +286,20 @@ class Pool(object): for i, x in enumerate(task_batches)), result._set_length)) return (item for chunk in result for item in chunk) - def apply_async(self, func, args=(), kwds={}, callback=None): + def apply_async(self, func, args=(), kwds={}, callback=None, + error_callback=None): ''' - Asynchronous equivalent of `apply()` builtin + Asynchronous version of `apply()` method. ''' assert self._state == RUN - result = ApplyResult(self._cache, callback) + result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result - def map_async(self, func, iterable, chunksize=None, callback=None): + def map_async(self, func, iterable, chunksize=None, callback=None, + error_callback=None): ''' - Asynchronous equivalent of `map()` builtin + Asynchronous version of `map()` method. ''' assert self._state == RUN if not hasattr(iterable, '__len__'): @@ -310,7 +313,8 @@ class Pool(object): chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) - result = MapResult(self._cache, chunksize, len(iterable), callback) + result = MapResult(self._cache, chunksize, len(iterable), callback, + error_callback=error_callback) self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None)) return result @@ -490,7 +494,7 @@ class Pool(object): # workers because we don't want workers to be restarted behind our back. debug('joining worker handler') if threading.current_thread() is not worker_handler: - worker_handler.join(1e100) + worker_handler.join() # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): @@ -501,11 +505,11 @@ class Pool(object): debug('joining task handler') if threading.current_thread() is not task_handler: - task_handler.join(1e100) + task_handler.join() debug('joining result handler') if threading.current_thread() is not result_handler: - result_handler.join(1e100) + result_handler.join() if pool and hasattr(pool[0], 'terminate'): debug('joining pool workers') @@ -521,12 +525,13 @@ class Pool(object): class ApplyResult(object): - def __init__(self, cache, callback): + def __init__(self, cache, callback, error_callback): self._cond = threading.Condition(threading.Lock()) - self._job = job_counter.next() + self._job = next(job_counter) self._cache = cache self._ready = False self._callback = callback + self._error_callback = error_callback cache[self._job] = self def ready(self): @@ -557,6 +562,8 @@ class ApplyResult(object): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) + if self._error_callback and not self._success: + self._error_callback(self._value) self._cond.acquire() try: self._ready = True @@ -571,8 +578,9 @@ class ApplyResult(object): class MapResult(ApplyResult): - def __init__(self, cache, chunksize, length, callback): - ApplyResult.__init__(self, cache, callback) + def __init__(self, cache, chunksize, length, callback, error_callback): + ApplyResult.__init__(self, cache, callback, + error_callback=error_callback) self._success = True self._value = [None] * length self._chunksize = chunksize @@ -598,10 +606,11 @@ class MapResult(ApplyResult): self._cond.notify() finally: self._cond.release() - else: self._success = False self._value = result + if self._error_callback: + self._error_callback(self._value) del self._cache[self._job] self._cond.acquire() try: @@ -618,7 +627,7 @@ class IMapIterator(object): def __init__(self, cache): self._cond = threading.Condition(threading.Lock()) - self._job = job_counter.next() + self._job = next(job_counter) self._cache = cache self._items = collections.deque() self._index = 0 @@ -712,8 +721,8 @@ class ThreadPool(Pool): Pool.__init__(self, processes, initializer, initargs) def _setup_queues(self): - self._inqueue = Queue.Queue() - self._outqueue = Queue.Queue() + self._inqueue = queue.Queue() + self._outqueue = queue.Queue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 482ea0a37a8..3262b50f50b 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -42,6 +42,7 @@ import os import sys import signal import itertools +from _weakrefset import WeakSet # # @@ -93,7 +94,7 @@ class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): assert group is None, 'group argument must be None for now' - count = _current_process._counter.next() + count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey self._daemonic = _current_process._daemonic @@ -105,6 +106,7 @@ class Process(object): self._kwargs = dict(kwargs) self._name = name or type(self).__name__ + '-' + \ ':'.join(str(i) for i in self._identity) + _dangling.add(self) def run(self): ''' @@ -164,7 +166,7 @@ class Process(object): @name.setter def name(self, name): - assert isinstance(name, basestring), 'name must be a string' + assert isinstance(name, str), 'name must be a string' self._name = name @property @@ -245,37 +247,45 @@ class Process(object): try: self._children = set() self._counter = itertools.count(1) - try: - sys.stdin.close() - sys.stdin = open(os.devnull) - except (OSError, ValueError): - pass + if sys.stdin is not None: + try: + sys.stdin.close() + sys.stdin = open(os.devnull) + except (OSError, ValueError): + pass + old_process = _current_process _current_process = self - util._finalizer_registry.clear() - util._run_after_forkers() + try: + util._finalizer_registry.clear() + util._run_after_forkers() + finally: + # delay finalization of the old process object until after + # _run_after_forkers() is executed + del old_process util.info('child process calling self.run()') try: self.run() exitcode = 0 finally: util._exit_function() - except SystemExit, e: + except SystemExit as e: if not e.args: exitcode = 1 elif isinstance(e.args[0], int): exitcode = e.args[0] else: sys.stderr.write(str(e.args[0]) + '\n') - sys.stderr.flush() exitcode = 0 if isinstance(e.args[0], str) else 1 except: exitcode = 1 import traceback sys.stderr.write('Process %s:\n' % self.name) - sys.stderr.flush() traceback.print_exc() + finally: + util.info('process exiting with exitcode %d' % exitcode) + sys.stdout.flush() + sys.stderr.flush() - util.info('process exiting with exitcode %d' % exitcode) return exitcode # @@ -318,6 +328,9 @@ del _MainProcess _exitcode_to_name = {} -for name, signum in signal.__dict__.items(): +for name, signum in list(signal.__dict__.items()): if name[:3]=='SIG' and '_' not in name: _exitcode_to_name[-signum] = name + +# For debug and leak testing +_dangling = WeakSet() diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 433c7e29dee..51d991245c1 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -42,7 +42,7 @@ import time import atexit import weakref -from Queue import Empty, Full +from queue import Empty, Full import _multiprocessing from multiprocessing import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition @@ -192,7 +192,13 @@ class Queue(object): debug('... done self._thread.start()') # On process exit we will wait for data to be flushed to pipe. - if not self._joincancelled: + # + # However, if this process created the queue then all + # processes which use the queue will be descendants of this + # process. Therefore waiting for the queue to be flushed + # is pointless once all the child processes have been joined. + created_by_this_process = (self._opid == os.getpid()) + if not self._joincancelled and not created_by_this_process: self._jointhread = Finalize( self._thread, Queue._finalize_join, [weakref.ref(self._thread)], @@ -268,7 +274,7 @@ class Queue(object): wrelease() except IndexError: pass - except Exception, e: + except Exception as e: # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 1eb044dd54f..1e694da49d9 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -78,7 +78,7 @@ def RawArray(typecode_or_type, size_or_initializer): Returns a ctypes array allocated from shared memory ''' type_ = typecode_to_type.get(typecode_or_type, typecode_or_type) - if isinstance(size_or_initializer, (int, long)): + if isinstance(size_or_initializer, int): type_ = type_ * size_or_initializer obj = _new_value(type_) ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj)) @@ -89,13 +89,10 @@ def RawArray(typecode_or_type, size_or_initializer): result.__init__(*size_or_initializer) return result -def Value(typecode_or_type, *args, **kwds): +def Value(typecode_or_type, *args, lock=None): ''' Return a synchronization wrapper for a Value ''' - lock = kwds.pop('lock', None) - if kwds: - raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys()) obj = RawValue(typecode_or_type, *args) if lock is False: return obj @@ -111,7 +108,7 @@ def Array(typecode_or_type, size_or_initializer, **kwds): ''' lock = kwds.pop('lock', None) if kwds: - raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys()) + raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys())) obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj @@ -174,7 +171,7 @@ def make_property(name): return prop_cache[name] except KeyError: d = {} - exec template % ((name,)*7) in d + exec(template % ((name,)*7), d) prop_cache[name] = d[name] return d[name] diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 4b077e53aed..70ae82569cc 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -62,7 +62,7 @@ except (ImportError): # Constants # -RECURSIVE_MUTEX, SEMAPHORE = range(2) +RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX # @@ -238,19 +238,20 @@ class Condition(object): # release lock count = self._lock._semlock._count() - for i in xrange(count): + for i in range(count): self._lock.release() try: # wait for notification or timeout - self._wait_semaphore.acquire(True, timeout) + ret = self._wait_semaphore.acquire(True, timeout) finally: # indicate that this thread has woken self._woken_count.release() # reacquire lock - for i in xrange(count): + for i in range(count): self._lock.acquire() + return ret def notify(self): assert self._lock._semlock._is_mine(), 'lock is not owned' @@ -285,7 +286,7 @@ class Condition(object): sleepers += 1 if sleepers: - for i in xrange(sleepers): + for i in range(sleepers): self._woken_count.acquire() # wait for a sleeper to wake # rezero wait_semaphore in case some timeouts just happened diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index c65dd9904ed..61b05335ac6 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -153,11 +153,11 @@ def _run_after_forkers(): for (index, ident, func), obj in items: try: func(obj) - except Exception, e: + except Exception as e: info('after forker raised exception %s', e) def register_after_fork(obj, func): - _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj + _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj # # Finalization using weakrefs @@ -182,7 +182,7 @@ class Finalize(object): self._callback = callback self._args = args self._kwargs = kwargs or {} - self._key = (exitpriority, _finalizer_counter.next()) + self._key = (exitpriority, next(_finalizer_counter)) _finalizer_registry[self._key] = self @@ -247,12 +247,18 @@ def _run_finalizers(minpriority=None): Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. ''' + if _finalizer_registry is None: + # This function may be called after this module's globals are + # destroyed. See the _exit_function function in this module for more + # notes. + return + if minpriority is None: f = lambda p : p[0][0] is not None else: f = lambda p : p[0][0] is not None and p[0][0] >= minpriority - items = [x for x in _finalizer_registry.items() if f(x)] + items = [x for x in list(_finalizer_registry.items()) if f(x)] items.sort(reverse=True) for key, finalizer in items: @@ -278,24 +284,42 @@ def is_exiting(): _exiting = False -def _exit_function(): - global _exiting - - info('process shutting down') - debug('running all "atexit" finalizers with priority >= 0') - _run_finalizers(0) +def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, + active_children=active_children, + current_process=current_process): + # We hold on to references to functions in the arglist due to the + # situation described below, where this function is called after this + # module's globals are destroyed. - for p in active_children(): - if p._daemonic: - info('calling terminate() for daemon %s', p.name) - p._popen.terminate() - - for p in active_children(): - info('calling join() for process %s', p.name) - p.join() + global _exiting - debug('running the remaining "atexit" finalizers') - _run_finalizers() + if not _exiting: + _exiting = True + + info('process shutting down') + debug('running all "atexit" finalizers with priority >= 0') + _run_finalizers(0) + if current_process() is not None: + # We check if the current process is None here because if + # it's None, any call to ``active_children()`` will raise an + # AttributeError (active_children winds up trying to get + # attributes from util._current_process). This happens in a + # variety of shutdown circumstances that are not well-understood + # because module-scope variables are not apparently supposed to + # be destroyed until after this function is called. However, + # they are indeed destroyed before this function is called. See + # issues #9775 and #15881. Also related: #4106, #9205, and #9207. + for p in active_children(): + if p._daemonic: + info('calling terminate() for daemon %s', p.name) + p._popen.terminate() + + for p in active_children(): + info('calling join() for process %s', p.name) + p.join() + + debug('running the remaining "atexit" finalizers') + _run_finalizers() atexit.register(_exit_function) |