diff options
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r-- | Lib/test/test_multiprocessing.py | 327 |
1 files changed, 200 insertions, 127 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 15870570db0..ab6d36a0f09 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1,33 +1,35 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Unit tests for the multiprocessing package # import unittest -import Queue +import queue as pyqueue import time +import io import sys import os import gc +import errno import signal import array import socket import random import logging -import errno +import test.support import test.script_helper -from test import test_support -from StringIO import StringIO -_multiprocessing = test_support.import_module('_multiprocessing') -# import threading after _multiprocessing to raise a more relevant error + + +# Skip tests if _multiprocessing wasn't built. +_multiprocessing = test.support.import_module('_multiprocessing') +# Skip tests if sem_open implementation is broken. +test.support.import_module('multiprocessing.synchronize') +# import threading after _multiprocessing to raise a more revelant error # message: "No module named _multiprocessing". _multiprocessing is not compiled # without thread support. import threading -# Work around broken sem_open implementations -test_support.import_module('multiprocessing.synchronize') - import multiprocessing.dummy import multiprocessing.connection import multiprocessing.managers @@ -57,7 +59,8 @@ except ImportError: # # -latin = str +def latin(s): + return s.encode('latin') # # Constants @@ -326,36 +329,6 @@ class _TestProcess(BaseTestCase): ] self.assertEqual(result, expected) - @classmethod - def _test_sys_exit(cls, reason, testfn): - sys.stderr = open(testfn, 'w') - sys.exit(reason) - - def test_sys_exit(self): - # See Issue 13854 - if self.TYPE == 'threads': - return - - testfn = test_support.TESTFN - self.addCleanup(test_support.unlink, testfn) - - for reason, code in (([1, 2, 3], 1), ('ignore this', 0)): - p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) - p.daemon = True - p.start() - p.join(5) - self.assertEqual(p.exitcode, code) - - with open(testfn, 'r') as f: - self.assertEqual(f.read().rstrip(), str(reason)) - - for reason in (True, False, 8): - p = self.Process(target=sys.exit, args=(reason,)) - p.daemon = True - p.start() - p.join(5) - self.assertEqual(p.exitcode, reason) - # # # @@ -395,6 +368,59 @@ class _TestSubclassingProcess(BaseTestCase): uppercaser.stop() uppercaser.join() + def test_stderr_flush(self): + # sys.stderr is flushed at process shutdown (issue #13812) + if self.TYPE == "threads": + return + + testfn = test.support.TESTFN + self.addCleanup(test.support.unlink, testfn) + proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) + proc.start() + proc.join() + with open(testfn, 'r') as f: + err = f.read() + # The whole traceback was printed + self.assertIn("ZeroDivisionError", err) + self.assertIn("test_multiprocessing.py", err) + self.assertIn("1/0 # MARKER", err) + + @classmethod + def _test_stderr_flush(cls, testfn): + sys.stderr = open(testfn, 'w') + 1/0 # MARKER + + + @classmethod + def _test_sys_exit(cls, reason, testfn): + sys.stderr = open(testfn, 'w') + sys.exit(reason) + + def test_sys_exit(self): + # See Issue 13854 + if self.TYPE == 'threads': + return + + testfn = test.support.TESTFN + self.addCleanup(test.support.unlink, testfn) + + for reason, code in (([1, 2, 3], 1), ('ignore this', 0)): + p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) + p.daemon = True + p.start() + p.join(5) + self.assertEqual(p.exitcode, code) + + with open(testfn, 'r') as f: + self.assertEqual(f.read().rstrip(), str(reason)) + + for reason in (True, False, 8): + p = self.Process(target=sys.exit, args=(reason,)) + p.daemon = True + p.start() + p.join(5) + self.assertEqual(p.exitcode, reason) + # # # @@ -454,22 +480,22 @@ class _TestQueue(BaseTestCase): put = TimingWrapper(queue.put) put_nowait = TimingWrapper(queue.put_nowait) - self.assertRaises(Queue.Full, put, 7, False) + self.assertRaises(pyqueue.Full, put, 7, False) self.assertTimingAlmostEqual(put.elapsed, 0) - self.assertRaises(Queue.Full, put, 7, False, None) + self.assertRaises(pyqueue.Full, put, 7, False, None) self.assertTimingAlmostEqual(put.elapsed, 0) - self.assertRaises(Queue.Full, put_nowait, 7) + self.assertRaises(pyqueue.Full, put_nowait, 7) self.assertTimingAlmostEqual(put_nowait.elapsed, 0) - self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) + self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) - self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) + self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) self.assertTimingAlmostEqual(put.elapsed, 0) - self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) + self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) child_can_start.set() @@ -522,22 +548,22 @@ class _TestQueue(BaseTestCase): get = TimingWrapper(queue.get) get_nowait = TimingWrapper(queue.get_nowait) - self.assertRaises(Queue.Empty, get, False) + self.assertRaises(pyqueue.Empty, get, False) self.assertTimingAlmostEqual(get.elapsed, 0) - self.assertRaises(Queue.Empty, get, False, None) + self.assertRaises(pyqueue.Empty, get, False, None) self.assertTimingAlmostEqual(get.elapsed, 0) - self.assertRaises(Queue.Empty, get_nowait) + self.assertRaises(pyqueue.Empty, get_nowait) self.assertTimingAlmostEqual(get_nowait.elapsed, 0) - self.assertRaises(Queue.Empty, get, True, TIMEOUT1) + self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) - self.assertRaises(Queue.Empty, get, False, TIMEOUT2) + self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) self.assertTimingAlmostEqual(get.elapsed, 0) - self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) + self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) proc.join() @@ -573,7 +599,7 @@ class _TestQueue(BaseTestCase): # check that all expected items are in the queue for i in range(20): self.assertEqual(queue.get(), i) - self.assertRaises(Queue.Empty, queue.get, False) + self.assertRaises(pyqueue.Empty, queue.get, False) p.join() @@ -605,13 +631,13 @@ class _TestQueue(BaseTestCase): self.skipTest("requires 'queue.task_done()' method") workers = [self.Process(target=self._test_task_done, args=(queue,)) - for i in xrange(4)] + for i in range(4)] for p in workers: p.daemon = True p.start() - for i in xrange(10): + for i in range(10): queue.put(i) queue.join() @@ -786,11 +812,11 @@ class _TestCondition(BaseTestCase): t.start() # wait for them all to sleep - for i in xrange(6): + for i in range(6): sleeping.acquire() # check they have all timed out - for i in xrange(6): + for i in range(6): woken.acquire() self.assertReturnsIfImplemented(0, get_value, woken) @@ -808,7 +834,7 @@ class _TestCondition(BaseTestCase): t.start() # wait for them to all sleep - for i in xrange(6): + for i in range(6): sleeping.acquire() # check no process/thread has woken up @@ -821,7 +847,13 @@ class _TestCondition(BaseTestCase): cond.release() # check they have all woken - time.sleep(DELTA) + for i in range(10): + try: + if get_value(woken) == 6: + break + except NotImplementedError: + break + time.sleep(DELTA) self.assertReturnsIfImplemented(6, get_value, woken) # check state is not mucked up @@ -833,7 +865,7 @@ class _TestCondition(BaseTestCase): cond.acquire() res = wait(TIMEOUT1) cond.release() - self.assertEqual(res, None) + self.assertEqual(res, False) self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) @@ -997,7 +1029,7 @@ class _TestArray(BaseTestCase): self.assertEqual(len(arr), size) self.assertEqual(list(arr), [0] * size) arr[:] = range(10) - self.assertEqual(list(arr), range(10)) + self.assertEqual(list(arr), list(range(10))) del arr @unittest.skipIf(c_int is None, "requires _ctypes") @@ -1005,24 +1037,17 @@ class _TestArray(BaseTestCase): self.test_array(raw=True) @unittest.skipIf(c_int is None, "requires _ctypes") - def test_array_accepts_long(self): - arr = self.Array('i', 10L) - self.assertEqual(len(arr), 10) - raw_arr = self.RawArray('i', 10L) - self.assertEqual(len(raw_arr), 10) - - @unittest.skipIf(c_int is None, "requires _ctypes") def test_getobj_getlock_obj(self): - arr1 = self.Array('i', range(10)) + arr1 = self.Array('i', list(range(10))) lock1 = arr1.get_lock() obj1 = arr1.get_obj() - arr2 = self.Array('i', range(10), lock=None) + arr2 = self.Array('i', list(range(10)), lock=None) lock2 = arr2.get_lock() obj2 = arr2.get_obj() lock = self.Lock() - arr3 = self.Array('i', range(10), lock=lock) + arr3 = self.Array('i', list(range(10)), lock=lock) lock3 = arr3.get_lock() obj3 = arr3.get_obj() self.assertEqual(lock, lock3) @@ -1046,14 +1071,14 @@ class _TestContainers(BaseTestCase): ALLOWED_TYPES = ('manager',) def test_list(self): - a = self.list(range(10)) - self.assertEqual(a[:], range(10)) + a = self.list(list(range(10))) + self.assertEqual(a[:], list(range(10))) b = self.list() self.assertEqual(b[:], []) - b.extend(range(5)) - self.assertEqual(b[:], range(5)) + b.extend(list(range(5))) + self.assertEqual(b[:], list(range(5))) self.assertEqual(b[2], 2) self.assertEqual(b[2:10], [2,3,4]) @@ -1063,7 +1088,7 @@ class _TestContainers(BaseTestCase): self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) - self.assertEqual(a[:], range(10)) + self.assertEqual(a[:], list(range(10))) d = [a, b] e = self.list(d) @@ -1078,7 +1103,7 @@ class _TestContainers(BaseTestCase): def test_dict(self): d = self.dict() - indices = range(65, 70) + indices = list(range(65, 70)) for i in indices: d[i] = chr(i) self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) @@ -1104,6 +1129,7 @@ class _TestContainers(BaseTestCase): def sqr(x, wait=0.0): time.sleep(wait) return x*x + class _TestPool(BaseTestCase): def test_apply(self): @@ -1113,9 +1139,9 @@ class _TestPool(BaseTestCase): def test_map(self): pmap = self.pool.map - self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) - self.assertEqual(pmap(sqr, range(100), chunksize=20), - map(sqr, range(100))) + self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) + self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), + list(map(sqr, list(range(100))))) def test_map_chunksize(self): try: @@ -1136,25 +1162,25 @@ class _TestPool(BaseTestCase): self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) def test_imap(self): - it = self.pool.imap(sqr, range(10)) - self.assertEqual(list(it), map(sqr, range(10))) + it = self.pool.imap(sqr, list(range(10))) + self.assertEqual(list(it), list(map(sqr, list(range(10))))) - it = self.pool.imap(sqr, range(10)) + it = self.pool.imap(sqr, list(range(10))) for i in range(10): - self.assertEqual(it.next(), i*i) - self.assertRaises(StopIteration, it.next) + self.assertEqual(next(it), i*i) + self.assertRaises(StopIteration, it.__next__) - it = self.pool.imap(sqr, range(1000), chunksize=100) + it = self.pool.imap(sqr, list(range(1000)), chunksize=100) for i in range(1000): - self.assertEqual(it.next(), i*i) - self.assertRaises(StopIteration, it.next) + self.assertEqual(next(it), i*i) + self.assertRaises(StopIteration, it.__next__) def test_imap_unordered(self): - it = self.pool.imap_unordered(sqr, range(1000)) - self.assertEqual(sorted(it), map(sqr, range(1000))) + it = self.pool.imap_unordered(sqr, list(range(1000))) + self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) - it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) - self.assertEqual(sorted(it), map(sqr, range(1000))) + it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53) + self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) def test_make_pool(self): self.assertRaises(ValueError, multiprocessing.Pool, -1) @@ -1181,7 +1207,7 @@ class _TestPool(BaseTestCase): self.pool.terminate() join = TimingWrapper(self.pool.join) join() - self.assertTrue(join.elapsed < 0.2) + self.assertLess(join.elapsed, 0.5) def test_empty_iterable(self): # See Issue 12157 @@ -1195,27 +1221,55 @@ class _TestPool(BaseTestCase): p.close() p.join() +def raising(): + raise KeyError("key") + def unpickleable_result(): return lambda: 42 class _TestPoolWorkerErrors(BaseTestCase): ALLOWED_TYPES = ('processes', ) + def test_async_error_callback(self): + p = multiprocessing.Pool(2) + + scratchpad = [None] + def errback(exc): + scratchpad[0] = exc + + res = p.apply_async(raising, error_callback=errback) + self.assertRaises(KeyError, res.get) + self.assertTrue(scratchpad[0]) + self.assertIsInstance(scratchpad[0], KeyError) + + p.close() + p.join() + def test_unpickleable_result(self): from multiprocessing.pool import MaybeEncodingError p = multiprocessing.Pool(2) # Make sure we don't lose pool processes because of encoding errors. for iteration in range(20): - res = p.apply_async(unpickleable_result) + + scratchpad = [None] + def errback(exc): + scratchpad[0] = exc + + res = p.apply_async(unpickleable_result, error_callback=errback) self.assertRaises(MaybeEncodingError, res.get) + wrapped = scratchpad[0] + self.assertTrue(wrapped) + self.assertIsInstance(scratchpad[0], MaybeEncodingError) + self.assertIsNotNone(wrapped.exc) + self.assertIsNotNone(wrapped.value) p.close() p.join() class _TestPoolWorkerLifetime(BaseTestCase): - ALLOWED_TYPES = ('processes', ) + def test_pool_worker_lifetime(self): p = multiprocessing.Pool(3, maxtasksperchild=10) self.assertEqual(3, len(p._pool)) @@ -1277,8 +1331,8 @@ class _TestZZZNumberOfObjects(BaseTestCase): refs = self.manager._number_of_objects() debug_info = self.manager._debug_info() if refs != EXPECTED_NUMBER: - print self.manager._debug_info() - print debug_info + print(self.manager._debug_info()) + print(debug_info) self.assertEqual(refs, EXPECTED_NUMBER) @@ -1297,15 +1351,13 @@ class FooBar(object): return '_h()' def baz(): - for i in xrange(10): + for i in range(10): yield i*i class IteratorProxy(BaseProxy): - _exposed_ = ('next', '__next__') + _exposed_ = ('__next__',) def __iter__(self): return self - def next(self): - return self._callmethod('next') def __next__(self): return self._callmethod('__next__') @@ -1353,7 +1405,7 @@ class _TestMyManager(BaseTestCase): # Test of connecting to a remote server and using xmlrpclib for serialization # -_queue = Queue.Queue() +_queue = pyqueue.Queue() def get_queue(): return _queue @@ -1440,7 +1492,16 @@ class _TestManagerRestart(BaseTestCase): manager.shutdown() manager = QueueManager( address=addr, authkey=authkey, serializer=SERIALIZER) - manager.start() + try: + manager.start() + except IOError as e: + if e.errno != errno.EADDRINUSE: + raise + # Retry after some time, in case the old socket was lingering + # (sporadic failure on buildbots) + time.sleep(1.0) + manager = QueueManager( + address=addr, authkey=authkey, serializer=SERIALIZER) manager.shutdown() # @@ -1469,7 +1530,7 @@ class _TestConnection(BaseTestCase): seq = [1, 2.25, None] msg = latin('hello world') longmsg = msg * 10 - arr = array.array('i', range(4)) + arr = array.array('i', list(range(4))) if self.TYPE == 'processes': self.assertEqual(type(conn.fileno()), int) @@ -1499,7 +1560,7 @@ class _TestConnection(BaseTestCase): self.assertEqual(conn.send_bytes(longmsg), None) try: res = conn.recv_bytes_into(buffer) - except multiprocessing.BufferTooShort, e: + except multiprocessing.BufferTooShort as e: self.assertEqual(e.args, (longmsg,)) else: self.fail('expected BufferTooShort, got %s' % res) @@ -1632,13 +1693,14 @@ class _TestConnection(BaseTestCase): p = self.Process(target=self._writefd, args=(child_conn, b"foo")) p.daemon = True p.start() - with open(test_support.TESTFN, "wb") as f: + self.addCleanup(test.support.unlink, test.support.TESTFN) + with open(test.support.TESTFN, "wb") as f: fd = f.fileno() if msvcrt: fd = msvcrt.get_osfhandle(fd) reduction.send_handle(conn, fd, p.pid) p.join() - with open(test_support.TESTFN, "rb") as f: + with open(test.support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"foo") @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") @@ -1657,7 +1719,8 @@ class _TestConnection(BaseTestCase): p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) p.daemon = True p.start() - with open(test_support.TESTFN, "wb") as f: + self.addCleanup(test.support.unlink, test.support.TESTFN) + with open(test.support.TESTFN, "wb") as f: fd = f.fileno() for newfd in range(256, MAXFD): if not self._is_fd_assigned(newfd): @@ -1670,7 +1733,7 @@ class _TestConnection(BaseTestCase): finally: os.close(newfd) p.join() - with open(test_support.TESTFN, "rb") as f: + with open(test.support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"bar") @classmethod @@ -1831,7 +1894,7 @@ class _TestHeap(BaseTestCase): blocks = [] # create and destroy lots of blocks of different sizes - for i in xrange(iterations): + for i in range(iterations): size = int(random.lognormvariate(0, 1) * 1000) b = multiprocessing.heap.BufferWrapper(size) blocks.append(b) @@ -1847,7 +1910,7 @@ class _TestHeap(BaseTestCase): occupied = 0 heap._lock.acquire() self.addCleanup(heap._lock.release) - for L in heap._len_to_seq.values(): + for L in list(heap._len_to_seq.values()): for arena, start, stop in L: all.append((heap._arenas.index(arena), start, stop, stop-start, 'free')) @@ -1918,7 +1981,7 @@ class _TestSharedCTypes(BaseTestCase): x = Value('i', 7, lock=lock) y = Value(c_double, 1.0/3.0, lock=lock) foo = Value(_Foo, 3, 2, lock=lock) - arr = self.Array('d', range(10), lock=lock) + arr = self.Array('d', list(range(10)), lock=lock) string = self.Array('c', 20, lock=lock) string.value = latin('hello') @@ -2133,7 +2196,7 @@ def create_test_cases(Mixin, type): glob = globals() Type = type.capitalize() - for name in glob.keys(): + for name in list(glob.keys()): if name.startswith('_Test'): base = glob[name] if type in base.ALLOWED_TYPES: @@ -2257,7 +2320,7 @@ class TestInitializers(unittest.TestCase): def _ThisSubProcess(q): try: item = q.get(block=False) - except Queue.Empty: + except pyqueue.Empty: pass def _TestProcess(q): @@ -2309,13 +2372,30 @@ class TestStdinBadfiledescriptor(unittest.TestCase): p.join() def test_flushing(self): - sio = StringIO() + sio = io.StringIO() flike = _file_like(sio) flike.write('foo') proc = multiprocessing.Process(target=lambda: flike.flush()) flike.flush() assert sio.getvalue() == 'foo' + +# +# Issue 14151: Test invalid family on invalid environment +# + +class TestInvalidFamily(unittest.TestCase): + + @unittest.skipIf(WIN32, "skipped on Windows") + def test_invalid_family(self): + with self.assertRaises(ValueError): + multiprocessing.connection.Listener(r'\\.\test') + + @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") + def test_invalid_family_win32(self): + with self.assertRaises(ValueError): + multiprocessing.connection.Listener('/var/test.pipe') + # # Test interaction with socket timeouts - see Issue #6056 # @@ -2371,7 +2451,8 @@ class TestNoForkBomb(unittest.TestCase): # testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, - TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb] + TestStdinBadfiledescriptor, TestInvalidFamily, + TestTimeouts, TestNoForkBomb] # # @@ -2387,7 +2468,7 @@ def test_main(run=None): check_enough_semaphores() if run is None: - from test.test_support import run_unittest as run + from test.support import run_unittest as run util.get_temp_dir() # creates temp directory for use by all processes @@ -2408,15 +2489,7 @@ def test_main(run=None): loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) - # (ncoghlan): Whether or not sys.exc_clear is executed by the threading - # module during these tests is at least platform dependent and possibly - # non-deterministic on any given platform. So we don't mind if the listed - # warnings aren't actually raised. - with test_support.check_py3k_warnings( - (".+__(get|set)slice__ has been removed", DeprecationWarning), - (r"sys.exc_clear\(\) not supported", DeprecationWarning), - quiet=True): - run(suite) + run(suite) ThreadsMixin.pool.terminate() ProcessesMixin.pool.terminate() |