aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py1
-rw-r--r--Lib/multiprocessing/connection.py60
-rw-r--r--Lib/multiprocessing/dummy/__init__.py10
-rw-r--r--Lib/multiprocessing/dummy/connection.py2
-rw-r--r--Lib/multiprocessing/forking.py51
-rw-r--r--Lib/multiprocessing/heap.py6
-rw-r--r--Lib/multiprocessing/managers.py53
-rw-r--r--Lib/multiprocessing/pool.py63
-rw-r--r--Lib/multiprocessing/process.py41
-rw-r--r--Lib/multiprocessing/queues.py12
-rw-r--r--Lib/multiprocessing/sharedctypes.py11
-rw-r--r--Lib/multiprocessing/synchronize.py11
-rw-r--r--Lib/multiprocessing/util.py64
13 files changed, 226 insertions, 159 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index 2e91e8eb6eb..e6e16c83224 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -80,7 +80,6 @@ class TimeoutError(ProcessError):
class AuthenticationError(ProcessError):
pass
-# This is down here because _multiprocessing uses BufferTooShort
import _multiprocessing
#
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 4421ac5cfd7..4fa6f70bf3f 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -90,10 +90,21 @@ def arbitrary_address(family):
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
- (os.getpid(), _mmap_counter.next()))
+ (os.getpid(), next(_mmap_counter)))
else:
raise ValueError('unrecognized family')
+def _validate_family(family):
+ '''
+ Checks if the family is valid for the current environment.
+ '''
+ if sys.platform != 'win32' and family == 'AF_PIPE':
+ raise ValueError('Family %s is not recognized.' % family)
+
+ if sys.platform == 'win32' and family == 'AF_UNIX':
+ # double check
+ if not hasattr(socket, family):
+ raise ValueError('Family %s is not recognized.' % family)
def address_type(address):
'''
@@ -126,13 +137,14 @@ class Listener(object):
or default_family
address = address or arbitrary_address(family)
+ _validate_family(family)
if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog)
else:
self._listener = SocketListener(address, family, backlog)
if authkey is not None and not isinstance(authkey, bytes):
- raise TypeError, 'authkey should be a byte string'
+ raise TypeError('authkey should be a byte string')
self._authkey = authkey
@@ -163,13 +175,14 @@ def Client(address, family=None, authkey=None):
Returns a connection to the address of a `Listener`
'''
family = family or address_type(address)
+ _validate_family(family)
if family == 'AF_PIPE':
c = PipeClient(address)
else:
c = SocketClient(address)
if authkey is not None and not isinstance(authkey, bytes):
- raise TypeError, 'authkey should be a byte string'
+ raise TypeError('authkey should be a byte string')
if authkey is not None:
answer_challenge(c, authkey)
@@ -232,7 +245,7 @@ else:
try:
win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError, e:
+ except WindowsError as e:
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
raise
@@ -289,26 +302,25 @@ def SocketClient(address):
Return a connection object connected to the socket given by `address`
'''
family = address_type(address)
- s = socket.socket( getattr(socket, family) )
- s.setblocking(True)
- t = _init_timeout()
+ with socket.socket( getattr(socket, family) ) as s:
+ s.setblocking(True)
+ t = _init_timeout()
- while 1:
- try:
- s.connect(address)
- except socket.error, e:
- if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
+ while 1:
+ try:
+ s.connect(address)
+ except socket.error as e:
+ if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
+ debug('failed to connect to address %s', address)
+ raise
+ time.sleep(0.01)
+ else:
+ break
else:
- break
- else:
- raise
+ raise
- fd = duplicate(s.fileno())
+ fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd)
- s.close()
return conn
#
@@ -352,7 +364,7 @@ if sys.platform == 'win32':
handle = self._handle_queue.pop(0)
try:
win32.ConnectNamedPipe(handle, win32.NULL)
- except WindowsError, e:
+ except WindowsError as e:
# ERROR_NO_DATA can occur if a client has already connected,
# written data and then disconnected -- see Issue 14725.
if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
@@ -378,7 +390,7 @@ if sys.platform == 'win32':
address, win32.GENERIC_READ | win32.GENERIC_WRITE,
0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
)
- except WindowsError, e:
+ except WindowsError as e:
if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
win32.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
@@ -456,11 +468,11 @@ def _xml_loads(s):
class XmlListener(Listener):
def accept(self):
global xmlrpclib
- import xmlrpclib
+ import xmlrpc.client as xmlrpclib
obj = Listener.accept(self)
return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
def XmlClient(*args, **kwds):
global xmlrpclib
- import xmlrpclib
+ import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index e3b126e9796..101c3cba4d7 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -52,7 +52,7 @@ from multiprocessing import TimeoutError, cpu_count
from multiprocessing.dummy.connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
from threading import Event
-from Queue import Queue
+from queue import Queue
#
#
@@ -86,7 +86,11 @@ class DummyProcess(threading.Thread):
#
class Condition(threading._Condition):
- notify_all = threading._Condition.notify_all.im_func
+ # XXX
+ if sys.version_info < (3, 0):
+ notify_all = threading._Condition.notify_all.__func__
+ else:
+ notify_all = threading._Condition.notify_all
#
#
@@ -114,7 +118,7 @@ class Namespace(object):
def __init__(self, **kwds):
self.__dict__.update(kwds)
def __repr__(self):
- items = self.__dict__.items()
+ items = list(self.__dict__.items())
temp = []
for name, value in items:
if not name.startswith('_'):
diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py
index 50dc9ffe736..af105794f15 100644
--- a/Lib/multiprocessing/dummy/connection.py
+++ b/Lib/multiprocessing/dummy/connection.py
@@ -34,7 +34,7 @@
__all__ = [ 'Client', 'Listener', 'Pipe' ]
-from Queue import Queue
+from queue import Queue
families = [None]
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 55980ffdcad..bc8ac44c22e 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -55,34 +55,35 @@ def assert_spawning(self):
# Try making some callable types picklable
#
-from pickle import Pickler
+from pickle import _Pickler as Pickler
class ForkingPickler(Pickler):
dispatch = Pickler.dispatch.copy()
-
@classmethod
def register(cls, type, reduce):
def dispatcher(self, obj):
rv = reduce(obj)
- self.save_reduce(obj=obj, *rv)
+ if isinstance(rv, str):
+ self.save_global(obj, rv)
+ else:
+ self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
def _reduce_method(m):
- if m.im_self is None:
- return getattr, (m.im_class, m.im_func.func_name)
+ if m.__self__ is None:
+ return getattr, (m.__class__, m.__func__.__name__)
else:
- return getattr, (m.im_self, m.im_func.func_name)
-ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
+ return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+ def f(self):
+ pass
+ForkingPickler.register(type(_C().f), _reduce_method)
+
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
ForkingPickler.register(type(list.append), _reduce_method_descriptor)
ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
-#def _reduce_builtin_function_or_method(m):
-# return getattr, (m.__self__, m.__name__)
-#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
-#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
-
try:
from functools import partial
except ImportError:
@@ -123,8 +124,6 @@ if sys.platform != 'win32':
import random
random.seed()
code = process_obj._bootstrap()
- sys.stdout.flush()
- sys.stderr.flush()
os._exit(code)
def poll(self, flag=os.WNOHANG):
@@ -163,7 +162,7 @@ if sys.platform != 'win32':
if self.returncode is None:
try:
os.kill(self.pid, signal.SIGTERM)
- except OSError, e:
+ except OSError as e:
if self.wait(timeout=0.1) is None:
raise
@@ -176,19 +175,15 @@ if sys.platform != 'win32':
#
else:
- import thread
+ import _thread
import msvcrt
import _subprocess
import time
+ from pickle import dump, load, HIGHEST_PROTOCOL
from _multiprocessing import win32, Connection, PipeConnection
from .util import Finalize
- #try:
- # from cPickle import dump, load, HIGHEST_PROTOCOL
- #except ImportError:
- from pickle import load, HIGHEST_PROTOCOL
-
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
@@ -238,7 +233,7 @@ else:
'''
Start a subprocess to run the code of a process object
'''
- _tls = thread._local()
+ _tls = _thread._local()
def __init__(self, process_obj):
# create pipe for communication with child
@@ -463,12 +458,20 @@ def prepare(data):
process.ORIGINAL_DIR = data['orig_dir']
if 'main_path' in data:
+ # XXX (ncoghlan): The following code makes several bogus
+ # assumptions regarding the relationship between __file__
+ # and a module's real name. See PEP 302 and issue #10845
main_path = data['main_path']
main_name = os.path.splitext(os.path.basename(main_path))[0]
if main_name == '__init__':
main_name = os.path.basename(os.path.dirname(main_path))
- if main_name != 'ipython':
+ if main_name == '__main__':
+ main_module = sys.modules['__main__']
+ main_module.__file__ = main_path
+ elif main_name != 'ipython':
+ # Main modules not actually called __main__.py may
+ # contain additional code that should still be executed
import imp
if main_path is None:
@@ -497,7 +500,7 @@ def prepare(data):
# Try to make the potentially picklable objects in
# sys.modules['__main__'] realize they are in the main
# module -- somewhat ugly.
- for obj in main_module.__dict__.values():
+ for obj in list(main_module.__dict__.values()):
try:
if obj.__module__ == '__parents_main__':
obj.__module__ = '__main__'
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index a1f37118d5a..0a25ef05c7f 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -60,7 +60,7 @@ if sys.platform == 'win32':
def __init__(self, size):
self.size = size
- self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next())
+ self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
assert win32.GetLastError() == 0, 'tagname already in use'
self._state = (self.size, self.name)
@@ -213,7 +213,7 @@ class Heap(object):
def malloc(self, size):
# return a block of right size (possibly rounded up)
- assert 0 <= size < sys.maxint
+ assert 0 <= size < sys.maxsize
if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork
self._lock.acquire()
@@ -239,7 +239,7 @@ class BufferWrapper(object):
_heap = Heap()
def __init__(self, size):
- assert 0 <= size < sys.maxint
+ assert 0 <= size < sys.maxsize
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
Finalize(self, BufferWrapper._heap.free, args=(block,))
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index ffe5812b89d..5588ead1169 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -44,28 +44,31 @@ import sys
import weakref
import threading
import array
-import Queue
+import queue
from traceback import format_exc
+from pickle import PicklingError
from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString
from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
from multiprocessing.util import Finalize, info
-try:
- from cPickle import PicklingError
-except ImportError:
- from pickle import PicklingError
-
#
# Register some things for pickling
#
def reduce_array(a):
- return array.array, (a.typecode, a.tostring())
+ return array.array, (a.typecode, a.tobytes())
ForkingPickler.register(array.array, reduce_array)
view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
+if view_types[0] is not list: # only needed in Py3.0
+ def rebuild_as_list(obj):
+ return list, (list(obj),)
+ for view_type in view_types:
+ ForkingPickler.register(view_type, rebuild_as_list)
+ import copyreg
+ copyreg.pickle(view_type, rebuild_as_list)
#
# Type for identifying shared objects
@@ -131,7 +134,7 @@ def all_methods(obj):
temp = []
for name in dir(obj):
func = getattr(obj, name)
- if hasattr(func, '__call__'):
+ if callable(func):
temp.append(name)
return temp
@@ -211,7 +214,7 @@ class Server(object):
msg = ('#RETURN', result)
try:
c.send(msg)
- except Exception, e:
+ except Exception as e:
try:
c.send(('#TRACEBACK', format_exc()))
except Exception:
@@ -251,7 +254,7 @@ class Server(object):
try:
res = function(*args, **kwds)
- except Exception, e:
+ except Exception as e:
msg = ('#ERROR', e)
else:
typeid = gettypeid and gettypeid.get(methodname, None)
@@ -286,9 +289,9 @@ class Server(object):
try:
try:
send(msg)
- except Exception, e:
+ except Exception as e:
send(('#UNSERIALIZABLE', repr(msg)))
- except Exception, e:
+ except Exception as e:
util.info('exception in thread serving %r',
threading.current_thread().name)
util.info(' ... message was %r', msg)
@@ -321,7 +324,7 @@ class Server(object):
self.mutex.acquire()
try:
result = []
- keys = self.id_to_obj.keys()
+ keys = list(self.id_to_obj.keys())
keys.sort()
for ident in keys:
if ident != '0':
@@ -507,7 +510,7 @@ class BaseManager(object):
'''
assert self._state.value == State.INITIAL
- if initializer is not None and not hasattr(initializer, '__call__'):
+ if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')
# pipe over which we will retrieve address of server
@@ -653,7 +656,7 @@ class BaseManager(object):
getattr(proxytype, '_method_to_typeid_', None)
if method_to_typeid:
- for key, value in method_to_typeid.items():
+ for key, value in list(method_to_typeid.items()):
assert type(key) is str, '%r is not a string' % key
assert type(value) is str, '%r is not a string' % value
@@ -805,7 +808,7 @@ class BaseProxy(object):
util.debug('DECREF %r', token.id)
conn = _Client(token.address, authkey=authkey)
dispatch(conn, None, 'decref', (token.id,))
- except Exception, e:
+ except Exception as e:
util.debug('... decref failed %s', e)
else:
@@ -823,7 +826,7 @@ class BaseProxy(object):
self._manager = None
try:
self._incref()
- except Exception, e:
+ except Exception as e:
# the proxy may just be for a manager which has shutdown
util.info('incref failed: %s' % e)
@@ -894,8 +897,8 @@ def MakeProxyType(name, exposed, _cache={}):
dic = {}
for meth in exposed:
- exec '''def %s(self, *args, **kwds):
- return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
+ exec('''def %s(self, *args, **kwds):
+ return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
ProxyType = type(name, (BaseProxy,), dic)
ProxyType._exposed_ = exposed
@@ -936,7 +939,7 @@ class Namespace(object):
def __init__(self, **kwds):
self.__dict__.update(kwds)
def __repr__(self):
- items = self.__dict__.items()
+ items = list(self.__dict__.items())
temp = []
for name, value in items:
if not name.startswith('_'):
@@ -964,14 +967,11 @@ def Array(typecode, sequence, lock=True):
#
class IteratorProxy(BaseProxy):
- # XXX remove methods for Py3.0 and Py2.6
- _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
+ _exposed_ = ('__next__', 'send', 'throw', 'close')
def __iter__(self):
return self
def __next__(self, *args):
return self._callmethod('__next__', args)
- def next(self, *args):
- return self._callmethod('next', args)
def send(self, *args):
return self._callmethod('send', args)
def throw(self, *args):
@@ -993,7 +993,6 @@ class AcquirerProxy(BaseProxy):
class ConditionProxy(AcquirerProxy):
- # XXX will Condition.notfyAll() name be available in Py3.0?
_exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
@@ -1095,8 +1094,8 @@ class SyncManager(BaseManager):
this class.
'''
-SyncManager.register('Queue', Queue.Queue)
-SyncManager.register('JoinableQueue', Queue.Queue)
+SyncManager.register('Queue', queue.Queue)
+SyncManager.register('JoinableQueue', queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 00c904a8a73..7502ff89f43 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -39,7 +39,7 @@ __all__ = ['Pool']
#
import threading
-import Queue
+import queue
import itertools
import collections
import time
@@ -62,7 +62,7 @@ TERMINATE = 2
job_counter = itertools.count()
def mapstar(args):
- return map(*args)
+ return list(map(*args))
#
# Code run by worker processes
@@ -111,7 +111,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
- except Exception, e:
+ except Exception as e:
result = (False, e)
try:
put((job, i, result))
@@ -129,14 +129,14 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
class Pool(object):
'''
- Class which supports an async version of the `apply()` builtin
+ Class which supports an async version of applying functions to arguments.
'''
Process = Process
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
self._setup_queues()
- self._taskqueue = Queue.Queue()
+ self._taskqueue = queue.Queue()
self._cache = {}
self._state = RUN
self._maxtasksperchild = maxtasksperchild
@@ -151,7 +151,7 @@ class Pool(object):
if processes < 1:
raise ValueError("Number of processes must be at least 1")
- if initializer is not None and not hasattr(initializer, '__call__'):
+ if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')
self._processes = processes
@@ -237,21 +237,22 @@ class Pool(object):
def apply(self, func, args=(), kwds={}):
'''
- Equivalent of `apply()` builtin
+ Equivalent of `func(*args, **kwds)`.
'''
assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, iterable, chunksize=None):
'''
- Equivalent of `map()` builtin
+ Apply `func` to each element in `iterable`, collecting the results
+ in a list that is returned.
'''
assert self._state == RUN
return self.map_async(func, iterable, chunksize).get()
def imap(self, func, iterable, chunksize=1):
'''
- Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
+ Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
assert self._state == RUN
if chunksize == 1:
@@ -269,7 +270,7 @@ class Pool(object):
def imap_unordered(self, func, iterable, chunksize=1):
'''
- Like `imap()` method but ordering of results is arbitrary
+ Like `imap()` method but ordering of results is arbitrary.
'''
assert self._state == RUN
if chunksize == 1:
@@ -285,18 +286,20 @@ class Pool(object):
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
- def apply_async(self, func, args=(), kwds={}, callback=None):
+ def apply_async(self, func, args=(), kwds={}, callback=None,
+ error_callback=None):
'''
- Asynchronous equivalent of `apply()` builtin
+ Asynchronous version of `apply()` method.
'''
assert self._state == RUN
- result = ApplyResult(self._cache, callback)
+ result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
- def map_async(self, func, iterable, chunksize=None, callback=None):
+ def map_async(self, func, iterable, chunksize=None, callback=None,
+ error_callback=None):
'''
- Asynchronous equivalent of `map()` builtin
+ Asynchronous version of `map()` method.
'''
assert self._state == RUN
if not hasattr(iterable, '__len__'):
@@ -310,7 +313,8 @@ class Pool(object):
chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = MapResult(self._cache, chunksize, len(iterable), callback)
+ result = MapResult(self._cache, chunksize, len(iterable), callback,
+ error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
@@ -490,7 +494,7 @@ class Pool(object):
# workers because we don't want workers to be restarted behind our back.
debug('joining worker handler')
if threading.current_thread() is not worker_handler:
- worker_handler.join(1e100)
+ worker_handler.join()
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
@@ -501,11 +505,11 @@ class Pool(object):
debug('joining task handler')
if threading.current_thread() is not task_handler:
- task_handler.join(1e100)
+ task_handler.join()
debug('joining result handler')
if threading.current_thread() is not result_handler:
- result_handler.join(1e100)
+ result_handler.join()
if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')
@@ -521,12 +525,13 @@ class Pool(object):
class ApplyResult(object):
- def __init__(self, cache, callback):
+ def __init__(self, cache, callback, error_callback):
self._cond = threading.Condition(threading.Lock())
- self._job = job_counter.next()
+ self._job = next(job_counter)
self._cache = cache
self._ready = False
self._callback = callback
+ self._error_callback = error_callback
cache[self._job] = self
def ready(self):
@@ -557,6 +562,8 @@ class ApplyResult(object):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
+ if self._error_callback and not self._success:
+ self._error_callback(self._value)
self._cond.acquire()
try:
self._ready = True
@@ -571,8 +578,9 @@ class ApplyResult(object):
class MapResult(ApplyResult):
- def __init__(self, cache, chunksize, length, callback):
- ApplyResult.__init__(self, cache, callback)
+ def __init__(self, cache, chunksize, length, callback, error_callback):
+ ApplyResult.__init__(self, cache, callback,
+ error_callback=error_callback)
self._success = True
self._value = [None] * length
self._chunksize = chunksize
@@ -598,10 +606,11 @@ class MapResult(ApplyResult):
self._cond.notify()
finally:
self._cond.release()
-
else:
self._success = False
self._value = result
+ if self._error_callback:
+ self._error_callback(self._value)
del self._cache[self._job]
self._cond.acquire()
try:
@@ -618,7 +627,7 @@ class IMapIterator(object):
def __init__(self, cache):
self._cond = threading.Condition(threading.Lock())
- self._job = job_counter.next()
+ self._job = next(job_counter)
self._cache = cache
self._items = collections.deque()
self._index = 0
@@ -712,8 +721,8 @@ class ThreadPool(Pool):
Pool.__init__(self, processes, initializer, initargs)
def _setup_queues(self):
- self._inqueue = Queue.Queue()
- self._outqueue = Queue.Queue()
+ self._inqueue = queue.Queue()
+ self._outqueue = queue.Queue()
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 482ea0a37a8..3262b50f50b 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -42,6 +42,7 @@ import os
import sys
import signal
import itertools
+from _weakrefset import WeakSet
#
#
@@ -93,7 +94,7 @@ class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
assert group is None, 'group argument must be None for now'
- count = _current_process._counter.next()
+ count = next(_current_process._counter)
self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey
self._daemonic = _current_process._daemonic
@@ -105,6 +106,7 @@ class Process(object):
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
+ _dangling.add(self)
def run(self):
'''
@@ -164,7 +166,7 @@ class Process(object):
@name.setter
def name(self, name):
- assert isinstance(name, basestring), 'name must be a string'
+ assert isinstance(name, str), 'name must be a string'
self._name = name
@property
@@ -245,37 +247,45 @@ class Process(object):
try:
self._children = set()
self._counter = itertools.count(1)
- try:
- sys.stdin.close()
- sys.stdin = open(os.devnull)
- except (OSError, ValueError):
- pass
+ if sys.stdin is not None:
+ try:
+ sys.stdin.close()
+ sys.stdin = open(os.devnull)
+ except (OSError, ValueError):
+ pass
+ old_process = _current_process
_current_process = self
- util._finalizer_registry.clear()
- util._run_after_forkers()
+ try:
+ util._finalizer_registry.clear()
+ util._run_after_forkers()
+ finally:
+ # delay finalization of the old process object until after
+ # _run_after_forkers() is executed
+ del old_process
util.info('child process calling self.run()')
try:
self.run()
exitcode = 0
finally:
util._exit_function()
- except SystemExit, e:
+ except SystemExit as e:
if not e.args:
exitcode = 1
elif isinstance(e.args[0], int):
exitcode = e.args[0]
else:
sys.stderr.write(str(e.args[0]) + '\n')
- sys.stderr.flush()
exitcode = 0 if isinstance(e.args[0], str) else 1
except:
exitcode = 1
import traceback
sys.stderr.write('Process %s:\n' % self.name)
- sys.stderr.flush()
traceback.print_exc()
+ finally:
+ util.info('process exiting with exitcode %d' % exitcode)
+ sys.stdout.flush()
+ sys.stderr.flush()
- util.info('process exiting with exitcode %d' % exitcode)
return exitcode
#
@@ -318,6 +328,9 @@ del _MainProcess
_exitcode_to_name = {}
-for name, signum in signal.__dict__.items():
+for name, signum in list(signal.__dict__.items()):
if name[:3]=='SIG' and '_' not in name:
_exitcode_to_name[-signum] = name
+
+# For debug and leak testing
+_dangling = WeakSet()
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 433c7e29dee..51d991245c1 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -42,7 +42,7 @@ import time
import atexit
import weakref
-from Queue import Empty, Full
+from queue import Empty, Full
import _multiprocessing
from multiprocessing import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
@@ -192,7 +192,13 @@ class Queue(object):
debug('... done self._thread.start()')
# On process exit we will wait for data to be flushed to pipe.
- if not self._joincancelled:
+ #
+ # However, if this process created the queue then all
+ # processes which use the queue will be descendants of this
+ # process. Therefore waiting for the queue to be flushed
+ # is pointless once all the child processes have been joined.
+ created_by_this_process = (self._opid == os.getpid())
+ if not self._joincancelled and not created_by_this_process:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
@@ -268,7 +274,7 @@ class Queue(object):
wrelease()
except IndexError:
pass
- except Exception, e:
+ except Exception as e:
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 1eb044dd54f..1e694da49d9 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -78,7 +78,7 @@ def RawArray(typecode_or_type, size_or_initializer):
Returns a ctypes array allocated from shared memory
'''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
- if isinstance(size_or_initializer, (int, long)):
+ if isinstance(size_or_initializer, int):
type_ = type_ * size_or_initializer
obj = _new_value(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
@@ -89,13 +89,10 @@ def RawArray(typecode_or_type, size_or_initializer):
result.__init__(*size_or_initializer)
return result
-def Value(typecode_or_type, *args, **kwds):
+def Value(typecode_or_type, *args, lock=None):
'''
Return a synchronization wrapper for a Value
'''
- lock = kwds.pop('lock', None)
- if kwds:
- raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
obj = RawValue(typecode_or_type, *args)
if lock is False:
return obj
@@ -111,7 +108,7 @@ def Array(typecode_or_type, size_or_initializer, **kwds):
'''
lock = kwds.pop('lock', None)
if kwds:
- raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
+ raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
@@ -174,7 +171,7 @@ def make_property(name):
return prop_cache[name]
except KeyError:
d = {}
- exec template % ((name,)*7) in d
+ exec(template % ((name,)*7), d)
prop_cache[name] = d[name]
return d[name]
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 4b077e53aed..70ae82569cc 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -62,7 +62,7 @@ except (ImportError):
# Constants
#
-RECURSIVE_MUTEX, SEMAPHORE = range(2)
+RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
#
@@ -238,19 +238,20 @@ class Condition(object):
# release lock
count = self._lock._semlock._count()
- for i in xrange(count):
+ for i in range(count):
self._lock.release()
try:
# wait for notification or timeout
- self._wait_semaphore.acquire(True, timeout)
+ ret = self._wait_semaphore.acquire(True, timeout)
finally:
# indicate that this thread has woken
self._woken_count.release()
# reacquire lock
- for i in xrange(count):
+ for i in range(count):
self._lock.acquire()
+ return ret
def notify(self):
assert self._lock._semlock._is_mine(), 'lock is not owned'
@@ -285,7 +286,7 @@ class Condition(object):
sleepers += 1
if sleepers:
- for i in xrange(sleepers):
+ for i in range(sleepers):
self._woken_count.acquire() # wait for a sleeper to wake
# rezero wait_semaphore in case some timeouts just happened
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index c65dd9904ed..61b05335ac6 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -153,11 +153,11 @@ def _run_after_forkers():
for (index, ident, func), obj in items:
try:
func(obj)
- except Exception, e:
+ except Exception as e:
info('after forker raised exception %s', e)
def register_after_fork(obj, func):
- _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
+ _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
#
# Finalization using weakrefs
@@ -182,7 +182,7 @@ class Finalize(object):
self._callback = callback
self._args = args
self._kwargs = kwargs or {}
- self._key = (exitpriority, _finalizer_counter.next())
+ self._key = (exitpriority, next(_finalizer_counter))
_finalizer_registry[self._key] = self
@@ -247,12 +247,18 @@ def _run_finalizers(minpriority=None):
Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
'''
+ if _finalizer_registry is None:
+ # This function may be called after this module's globals are
+ # destroyed. See the _exit_function function in this module for more
+ # notes.
+ return
+
if minpriority is None:
f = lambda p : p[0][0] is not None
else:
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
- items = [x for x in _finalizer_registry.items() if f(x)]
+ items = [x for x in list(_finalizer_registry.items()) if f(x)]
items.sort(reverse=True)
for key, finalizer in items:
@@ -278,24 +284,42 @@ def is_exiting():
_exiting = False
-def _exit_function():
- global _exiting
-
- info('process shutting down')
- debug('running all "atexit" finalizers with priority >= 0')
- _run_finalizers(0)
+def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
+ active_children=active_children,
+ current_process=current_process):
+ # We hold on to references to functions in the arglist due to the
+ # situation described below, where this function is called after this
+ # module's globals are destroyed.
- for p in active_children():
- if p._daemonic:
- info('calling terminate() for daemon %s', p.name)
- p._popen.terminate()
-
- for p in active_children():
- info('calling join() for process %s', p.name)
- p.join()
+ global _exiting
- debug('running the remaining "atexit" finalizers')
- _run_finalizers()
+ if not _exiting:
+ _exiting = True
+
+ info('process shutting down')
+ debug('running all "atexit" finalizers with priority >= 0')
+ _run_finalizers(0)
+ if current_process() is not None:
+ # We check if the current process is None here because if
+ # it's None, any call to ``active_children()`` will raise an
+ # AttributeError (active_children winds up trying to get
+ # attributes from util._current_process). This happens in a
+ # variety of shutdown circumstances that are not well-understood
+ # because module-scope variables are not apparently supposed to
+ # be destroyed until after this function is called. However,
+ # they are indeed destroyed before this function is called. See
+ # issues #9775 and #15881. Also related: #4106, #9205, and #9207.
+ for p in active_children():
+ if p._daemonic:
+ info('calling terminate() for daemon %s', p.name)
+ p._popen.terminate()
+
+ for p in active_children():
+ info('calling join() for process %s', p.name)
+ p.join()
+
+ debug('running the remaining "atexit" finalizers')
+ _run_finalizers()
atexit.register(_exit_function)