aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/_test_multiprocessing.py
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-06-24 19:22:23 +0200
committerGitHub <noreply@github.com>2017-06-24 19:22:23 +0200
commit13e96cc596d158b98996db3fa291086ea4afecd9 (patch)
treee5d5abb7f5364b484ca4396ff99986ccac16ed0c /Lib/test/_test_multiprocessing.py
parent0ee32c148119031e19c79359f5c4789ee69fa355 (diff)
downloadcpython-13e96cc596d158b98996db3fa291086ea4afecd9.tar.gz
cpython-13e96cc596d158b98996db3fa291086ea4afecd9.zip
Fix bpo-30596: Add close() method to multiprocessing.Process (#2010)
* Fix bpo-30596: Add close() method to multiprocessing.Process * Raise ValueError if close() is called before the Process is finished running * Add docs * Add NEWS blurb
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r--Lib/test/_test_multiprocessing.py36
1 files changed, 36 insertions, 0 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 7148ea49489..d4a461dc5e6 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -403,6 +403,42 @@ class _TestProcess(BaseTestCase):
p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=1))
+ @classmethod
+ def _test_close(cls, rc=0, q=None):
+ if q is not None:
+ q.get()
+ sys.exit(rc)
+
+ def test_close(self):
+ if self.TYPE == "threads":
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+ q = self.Queue()
+ p = self.Process(target=self._test_close, kwargs={'q': q})
+ p.daemon = True
+ p.start()
+ self.assertEqual(p.is_alive(), True)
+ # Child is still alive, cannot close
+ with self.assertRaises(ValueError):
+ p.close()
+
+ q.put(None)
+ p.join()
+ self.assertEqual(p.is_alive(), False)
+ self.assertEqual(p.exitcode, 0)
+ p.close()
+ with self.assertRaises(ValueError):
+ p.is_alive()
+ with self.assertRaises(ValueError):
+ p.join()
+ with self.assertRaises(ValueError):
+ p.terminate()
+ p.close()
+
+ wr = weakref.ref(p)
+ del p
+ gc.collect()
+ self.assertIs(wr(), None)
+
def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))