diff options
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 84 |
1 files changed, 79 insertions, 5 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 4dc9a31d22f..a1259ff1d63 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -513,9 +513,14 @@ class _TestProcess(BaseTestCase): time.sleep(100) @classmethod - def _sleep_no_int_handler(cls): + def _sleep_some_event(cls, event): + event.set() + time.sleep(100) + + @classmethod + def _sleep_no_int_handler(cls, event): signal.signal(signal.SIGINT, signal.SIG_DFL) - cls._sleep_some() + cls._sleep_some_event(event) @classmethod def _test_sleep(cls, delay): @@ -525,7 +530,10 @@ class _TestProcess(BaseTestCase): if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) - p = self.Process(target=target or self._sleep_some) + event = self.Event() + if not target: + target = self._sleep_some_event + p = self.Process(target=target, args=(event,)) p.daemon = True p.start() @@ -543,8 +551,11 @@ class _TestProcess(BaseTestCase): self.assertTimingAlmostEqual(join.elapsed, 0.0) self.assertEqual(p.is_alive(), True) - # XXX maybe terminating too soon causes the problems on Gentoo... - time.sleep(1) + timeout = support.SHORT_TIMEOUT + if not event.wait(timeout): + p.terminate() + p.join() + self.fail(f"event not signaled in {timeout} seconds") meth(p) @@ -2463,6 +2474,12 @@ class _TestValue(BaseTestCase): self.assertNotHasAttr(arr5, 'get_lock') self.assertNotHasAttr(arr5, 'get_obj') + @unittest.skipIf(c_int is None, "requires _ctypes") + def test_invalid_typecode(self): + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.Value('x', None) + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.RawValue('x', None) class _TestArray(BaseTestCase): @@ -2543,6 +2560,12 @@ class _TestArray(BaseTestCase): self.assertNotHasAttr(arr5, 'get_lock') self.assertNotHasAttr(arr5, 'get_obj') + @unittest.skipIf(c_int is None, "requires _ctypes") + def test_invalid_typecode(self): + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.Array('x', []) + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.RawArray('x', []) # # # @@ -6778,6 +6801,35 @@ class _TestSpawnedSysPath(BaseTestCase): self.assertEqual(child_sys_path[1:], sys.path[1:]) self.assertIsNone(import_error, msg=f"child could not import {self._mod_name}") + def test_std_streams_flushed_after_preload(self): + # gh-135335: Check fork server flushes standard streams after + # preloading modules + if multiprocessing.get_start_method() != "forkserver": + self.skipTest("forkserver specific test") + + # Create a test module in the temporary directory on the child's path + # TODO: This can all be simplified once gh-126631 is fixed and we can + # use __main__ instead of a module. + dirname = os.path.join(self._temp_dir, 'preloaded_module') + init_name = os.path.join(dirname, '__init__.py') + os.mkdir(dirname) + with open(init_name, "w") as f: + cmd = '''if 1: + import sys + print('stderr', end='', file=sys.stderr) + print('stdout', end='', file=sys.stdout) + ''' + f.write(cmd) + + name = os.path.join(os.path.dirname(__file__), 'mp_preload_flush.py') + env = {'PYTHONPATH': self._temp_dir} + _, out, err = test.support.script_helper.assert_python_ok(name, **env) + + # Check stderr first, as it is more likely to be useful to see in the + # event of a failure. + self.assertEqual(err.decode().rstrip(), 'stderr') + self.assertEqual(out.decode().rstrip(), 'stdout') + class MiscTestCase(unittest.TestCase): def test__all__(self): @@ -6821,6 +6873,28 @@ class MiscTestCase(unittest.TestCase): self.assertEqual("332833500", out.decode('utf-8').strip()) self.assertFalse(err, msg=err.decode('utf-8')) + def test_forked_thread_not_started(self): + # gh-134381: Ensure that a thread that has not been started yet in + # the parent process can be started within a forked child process. + + if multiprocessing.get_start_method() != "fork": + self.skipTest("fork specific test") + + q = multiprocessing.Queue() + t = threading.Thread(target=lambda: q.put("done"), daemon=True) + + def child(): + t.start() + t.join() + + p = multiprocessing.Process(target=child) + p.start() + p.join(support.SHORT_TIMEOUT) + + self.assertEqual(p.exitcode, 0) + self.assertEqual(q.get_nowait(), "done") + close_queue(q) + # # Mixins |