aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py1222
1 files changed, 948 insertions, 274 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 38844888fdb..75eb852cc97 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -1,14 +1,21 @@
import unittest
-from test import test_support
+from test import script_helper
+from test import support
import subprocess
import sys
import signal
+import io
+import locale
import os
import errno
import tempfile
import time
import re
import sysconfig
+import warnings
+import select
+import shutil
+import gc
try:
import resource
@@ -42,7 +49,7 @@ class BaseTestCase(unittest.TestCase):
def setUp(self):
# Try to minimize the number of children we have so this test
# doesn't crash on some buildbots (Alphas in particular).
- test_support.reap_children()
+ support.reap_children()
def tearDown(self):
for inst in subprocess._active:
@@ -54,7 +61,7 @@ class BaseTestCase(unittest.TestCase):
# In a debug build, stuff like "[6580 refs]" is printed to stderr at
# shutdown time. That frustrates tests trying to check stderr produced
# from a spawned Python process.
- actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
+ actual = support.strip_python_stderr(stderr)
self.assertEqual(actual, expected, msg)
@@ -94,8 +101,8 @@ class ProcessTestCase(BaseTestCase):
def test_check_output(self):
# check_output() function with zero return code
output = subprocess.check_output(
- [sys.executable, "-c", "print 'BDFL'"])
- self.assertIn('BDFL', output)
+ [sys.executable, "-c", "print('BDFL')"])
+ self.assertIn(b'BDFL', output)
def test_check_output_nonzero(self):
# check_call() function with non-zero return code
@@ -109,13 +116,13 @@ class ProcessTestCase(BaseTestCase):
output = subprocess.check_output(
[sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
stderr=subprocess.STDOUT)
- self.assertIn('BDFL', output)
+ self.assertIn(b'BDFL', output)
def test_check_output_stdout_arg(self):
# check_output() function stderr redirected to stdout
with self.assertRaises(ValueError) as c:
output = subprocess.check_output(
- [sys.executable, "-c", "print 'will not be run'"],
+ [sys.executable, "-c", "print('will not be run')"],
stdout=sys.stdout)
self.fail("Expected ValueError when stdout arg supplied.")
self.assertIn('stdout', c.exception.args[0])
@@ -133,7 +140,7 @@ class ProcessTestCase(BaseTestCase):
def test_invalid_args(self):
# Popen() called with invalid arguments should raise TypeError
# but Popen.__del__ should not complain (issue #12085)
- with test_support.captured_stderr() as s:
+ with support.captured_stderr() as s:
self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
argcount = subprocess.Popen.__init__.__code__.co_argcount
too_many_args = [0] * (argcount + 1)
@@ -142,7 +149,7 @@ class ProcessTestCase(BaseTestCase):
def test_stdin_none(self):
# .stdin is None when not redirected
- p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
@@ -152,9 +159,9 @@ class ProcessTestCase(BaseTestCase):
def test_stdout_none(self):
# .stdout is None when not redirected
p = subprocess.Popen([sys.executable, "-c",
- 'print " this bit of output is from a '
+ 'print(" this bit of output is from a '
'test of stdout in a different '
- 'process ..."'],
+ 'process ...")'],
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
self.addCleanup(p.stdin.close)
self.addCleanup(p.stderr.close)
@@ -163,38 +170,122 @@ class ProcessTestCase(BaseTestCase):
def test_stderr_none(self):
# .stderr is None when not redirected
- p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stdin.close)
p.wait()
self.assertEqual(p.stderr, None)
- def test_executable_with_cwd(self):
- python_dir = os.path.dirname(os.path.realpath(sys.executable))
- p = subprocess.Popen(["somethingyoudonthave", "-c",
- "import sys; sys.exit(47)"],
- executable=sys.executable, cwd=python_dir)
+ # For use in the test_cwd* tests below.
+ def _normalize_cwd(self, cwd):
+ # Normalize an expected cwd (for Tru64 support).
+ # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
+ # strings. See bug #1063571.
+ original_cwd = os.getcwd()
+ os.chdir(cwd)
+ cwd = os.getcwd()
+ os.chdir(original_cwd)
+ return cwd
+
+ # For use in the test_cwd* tests below.
+ def _split_python_path(self):
+ # Return normalized (python_dir, python_base).
+ python_path = os.path.realpath(sys.executable)
+ return os.path.split(python_path)
+
+ # For use in the test_cwd* tests below.
+ def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
+ # Invoke Python via Popen, and assert that (1) the call succeeds,
+ # and that (2) the current working directory of the child process
+ # matches *expected_cwd*.
+ p = subprocess.Popen([python_arg, "-c",
+ "import os, sys; "
+ "sys.stdout.write(os.getcwd()); "
+ "sys.exit(47)"],
+ stdout=subprocess.PIPE,
+ **kwargs)
+ self.addCleanup(p.stdout.close)
p.wait()
- self.assertEqual(p.returncode, 47)
+ self.assertEqual(47, p.returncode)
+ normcase = os.path.normcase
+ self.assertEqual(normcase(expected_cwd),
+ normcase(p.stdout.read().decode("utf-8")))
+
+ def test_cwd(self):
+ # Check that cwd changes the cwd for the child process.
+ temp_dir = tempfile.gettempdir()
+ temp_dir = self._normalize_cwd(temp_dir)
+ self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
+
+ @unittest.skipIf(mswindows, "pending resolution of issue #15533")
+ def test_cwd_with_relative_arg(self):
+ # Check that Popen looks for args[0] relative to cwd if args[0]
+ # is relative.
+ python_dir, python_base = self._split_python_path()
+ rel_python = os.path.join(os.curdir, python_base)
+ with support.temp_cwd() as wrong_dir:
+ # Before calling with the correct cwd, confirm that the call fails
+ # without cwd and with the wrong cwd.
+ self.assertRaises(OSError, subprocess.Popen,
+ [rel_python])
+ self.assertRaises(OSError, subprocess.Popen,
+ [rel_python], cwd=wrong_dir)
+ python_dir = self._normalize_cwd(python_dir)
+ self._assert_cwd(python_dir, rel_python, cwd=python_dir)
+
+ @unittest.skipIf(mswindows, "pending resolution of issue #15533")
+ def test_cwd_with_relative_executable(self):
+ # Check that Popen looks for executable relative to cwd if executable
+ # is relative (and that executable takes precedence over args[0]).
+ python_dir, python_base = self._split_python_path()
+ rel_python = os.path.join(os.curdir, python_base)
+ doesntexist = "somethingyoudonthave"
+ with support.temp_cwd() as wrong_dir:
+ # Before calling with the correct cwd, confirm that the call fails
+ # without cwd and with the wrong cwd.
+ self.assertRaises(OSError, subprocess.Popen,
+ [doesntexist], executable=rel_python)
+ self.assertRaises(OSError, subprocess.Popen,
+ [doesntexist], executable=rel_python,
+ cwd=wrong_dir)
+ python_dir = self._normalize_cwd(python_dir)
+ self._assert_cwd(python_dir, doesntexist, executable=rel_python,
+ cwd=python_dir)
+
+ def test_cwd_with_absolute_arg(self):
+ # Check that Popen can find the executable when the cwd is wrong
+ # if args[0] is an absolute path.
+ python_dir, python_base = self._split_python_path()
+ abs_python = os.path.join(python_dir, python_base)
+ rel_python = os.path.join(os.curdir, python_base)
+ with script_helper.temp_dir() as wrong_dir:
+ # Before calling with an absolute path, confirm that using a
+ # relative path fails.
+ self.assertRaises(OSError, subprocess.Popen,
+ [rel_python], cwd=wrong_dir)
+ wrong_dir = self._normalize_cwd(wrong_dir)
+ self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
+
+ def test_executable_with_cwd(self):
+ python_dir, python_base = self._split_python_path()
+ python_dir = self._normalize_cwd(python_dir)
+ self._assert_cwd(python_dir, "somethingyoudonthave",
+ executable=sys.executable, cwd=python_dir)
@unittest.skipIf(sysconfig.is_python_build(),
"need an installed Python. See #7774")
def test_executable_without_cwd(self):
# For a normal installation, it should work without 'cwd'
# argument. For test runs in the build directory, see #7774.
- p = subprocess.Popen(["somethingyoudonthave", "-c",
- "import sys; sys.exit(47)"],
- executable=sys.executable)
- p.wait()
- self.assertEqual(p.returncode, 47)
+ self._assert_cwd('', "somethingyoudonthave", executable=sys.executable)
def test_stdin_pipe(self):
# stdin redirection
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
stdin=subprocess.PIPE)
- p.stdin.write("pear")
+ p.stdin.write(b"pear")
p.stdin.close()
p.wait()
self.assertEqual(p.returncode, 1)
@@ -202,8 +293,9 @@ class ProcessTestCase(BaseTestCase):
def test_stdin_filedes(self):
# stdin is set to open file descriptor
tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
d = tf.fileno()
- os.write(d, "pear")
+ os.write(d, b"pear")
os.lseek(d, 0, 0)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
@@ -214,7 +306,8 @@ class ProcessTestCase(BaseTestCase):
def test_stdin_fileobj(self):
# stdin is set to open file object
tf = tempfile.TemporaryFile()
- tf.write("pear")
+ self.addCleanup(tf.close)
+ tf.write(b"pear")
tf.seek(0)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
@@ -228,28 +321,30 @@ class ProcessTestCase(BaseTestCase):
'import sys; sys.stdout.write("orange")'],
stdout=subprocess.PIPE)
self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), "orange")
+ self.assertEqual(p.stdout.read(), b"orange")
def test_stdout_filedes(self):
# stdout is set to open file descriptor
tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=d)
p.wait()
os.lseek(d, 0, 0)
- self.assertEqual(os.read(d, 1024), "orange")
+ self.assertEqual(os.read(d, 1024), b"orange")
def test_stdout_fileobj(self):
# stdout is set to open file object
tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=tf)
p.wait()
tf.seek(0)
- self.assertEqual(tf.read(), "orange")
+ self.assertEqual(tf.read(), b"orange")
def test_stderr_pipe(self):
# stderr redirection
@@ -257,95 +352,101 @@ class ProcessTestCase(BaseTestCase):
'import sys; sys.stderr.write("strawberry")'],
stderr=subprocess.PIPE)
self.addCleanup(p.stderr.close)
- self.assertStderrEqual(p.stderr.read(), "strawberry")
+ self.assertStderrEqual(p.stderr.read(), b"strawberry")
def test_stderr_filedes(self):
# stderr is set to open file descriptor
tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'],
stderr=d)
p.wait()
os.lseek(d, 0, 0)
- self.assertStderrEqual(os.read(d, 1024), "strawberry")
+ self.assertStderrEqual(os.read(d, 1024), b"strawberry")
def test_stderr_fileobj(self):
# stderr is set to open file object
tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'],
stderr=tf)
p.wait()
tf.seek(0)
- self.assertStderrEqual(tf.read(), "strawberry")
+ self.assertStderrEqual(tf.read(), b"strawberry")
def test_stdout_stderr_pipe(self):
# capture stdout and stderr to the same pipe
p = subprocess.Popen([sys.executable, "-c",
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ 'import sys;'
+ 'sys.stdout.write("apple");'
+ 'sys.stdout.flush();'
+ 'sys.stderr.write("orange")'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
self.addCleanup(p.stdout.close)
- self.assertStderrEqual(p.stdout.read(), "appleorange")
+ self.assertStderrEqual(p.stdout.read(), b"appleorange")
def test_stdout_stderr_file(self):
# capture stdout and stderr to the same open file
tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
p = subprocess.Popen([sys.executable, "-c",
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdout=tf,
- stderr=tf)
+ 'import sys;'
+ 'sys.stdout.write("apple");'
+ 'sys.stdout.flush();'
+ 'sys.stderr.write("orange")'],
+ stdout=tf,
+ stderr=tf)
p.wait()
tf.seek(0)
- self.assertStderrEqual(tf.read(), "appleorange")
+ self.assertStderrEqual(tf.read(), b"appleorange")
def test_stdout_filedes_of_stdout(self):
# stdout is set to 1 (#1531862).
- cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"
+ cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), b'.\n'))"
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
self.assertEqual(rc, 2)
- def test_cwd(self):
- tmpdir = tempfile.gettempdir()
- # We cannot use os.path.realpath to canonicalize the path,
- # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
- cwd = os.getcwd()
- os.chdir(tmpdir)
- tmpdir = os.getcwd()
- os.chdir(cwd)
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(os.getcwd())'],
- stdout=subprocess.PIPE,
- cwd=tmpdir)
- self.addCleanup(p.stdout.close)
- normcase = os.path.normcase
- self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
-
def test_env(self):
newenv = os.environ.copy()
newenv["FRUIT"] = "orange"
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(os.getenv("FRUIT"))'],
- stdout=subprocess.PIPE,
- env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), "orange")
+ with subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;'
+ 'sys.stdout.write(os.getenv("FRUIT"))'],
+ stdout=subprocess.PIPE,
+ env=newenv) as p:
+ stdout, stderr = p.communicate()
+ self.assertEqual(stdout, b"orange")
+
+ # Windows requires at least the SYSTEMROOT environment variable to start
+ # Python
+ @unittest.skipIf(sys.platform == 'win32',
+ 'cannot test an empty env on Windows')
+ @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') is not None,
+ 'the python library cannot be loaded '
+ 'with an empty environment')
+ def test_empty_env(self):
+ with subprocess.Popen([sys.executable, "-c",
+ 'import os; '
+ 'print(list(os.environ.keys()))'],
+ stdout=subprocess.PIPE,
+ env={}) as p:
+ stdout, stderr = p.communicate()
+ self.assertIn(stdout.strip(),
+ (b"[]",
+ # Mac OS X adds __CF_USER_TEXT_ENCODING variable to an empty
+ # environment
+ b"['__CF_USER_TEXT_ENCODING']"))
def test_communicate_stdin(self):
p = subprocess.Popen([sys.executable, "-c",
'import sys;'
'sys.exit(sys.stdin.read() == "pear")'],
stdin=subprocess.PIPE)
- p.communicate("pear")
+ p.communicate(b"pear")
self.assertEqual(p.returncode, 1)
def test_communicate_stdout(self):
@@ -353,7 +454,7 @@ class ProcessTestCase(BaseTestCase):
'import sys; sys.stdout.write("pineapple")'],
stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
- self.assertEqual(stdout, "pineapple")
+ self.assertEqual(stdout, b"pineapple")
self.assertEqual(stderr, None)
def test_communicate_stderr(self):
@@ -362,39 +463,45 @@ class ProcessTestCase(BaseTestCase):
stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, None)
- self.assertStderrEqual(stderr, "pineapple")
+ self.assertStderrEqual(stderr, b"pineapple")
def test_communicate(self):
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stderr.write("pineapple");'
- 'sys.stdout.write(sys.stdin.read())'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ 'import sys,os;'
+ 'sys.stderr.write("pineapple");'
+ 'sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- (stdout, stderr) = p.communicate("banana")
- self.assertEqual(stdout, "banana")
- self.assertStderrEqual(stderr, "pineapple")
+ (stdout, stderr) = p.communicate(b"banana")
+ self.assertEqual(stdout, b"banana")
+ self.assertStderrEqual(stderr, b"pineapple")
- # This test is Linux specific for simplicity to at least have
- # some coverage. It is not a platform specific bug.
- @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
- "Linux specific")
# Test for the fd leak reported in http://bugs.python.org/issue2791.
def test_communicate_pipe_fd_leak(self):
- fd_directory = '/proc/%d/fd' % os.getpid()
- num_fds_before_popen = len(os.listdir(fd_directory))
- p = subprocess.Popen([sys.executable, "-c", "print()"],
- stdout=subprocess.PIPE)
- p.communicate()
- num_fds_after_communicate = len(os.listdir(fd_directory))
- del p
- num_fds_after_destruction = len(os.listdir(fd_directory))
- self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
- self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
+ for stdin_pipe in (False, True):
+ for stdout_pipe in (False, True):
+ for stderr_pipe in (False, True):
+ options = {}
+ if stdin_pipe:
+ options['stdin'] = subprocess.PIPE
+ if stdout_pipe:
+ options['stdout'] = subprocess.PIPE
+ if stderr_pipe:
+ options['stderr'] = subprocess.PIPE
+ if not options:
+ continue
+ p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
+ p.communicate()
+ if p.stdin is not None:
+ self.assertTrue(p.stdin.closed)
+ if p.stdout is not None:
+ self.assertTrue(p.stdout.closed)
+ if p.stderr is not None:
+ self.assertTrue(p.stderr.closed)
def test_communicate_returns(self):
# communicate() should return None if no redirection is active
@@ -416,91 +523,154 @@ class ProcessTestCase(BaseTestCase):
os.close(x)
os.close(y)
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(sys.stdin.read(47));'
- 'sys.stderr.write("xyz"*%d);'
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ 'import sys,os;'
+ 'sys.stdout.write(sys.stdin.read(47));'
+ 'sys.stderr.write("xyz"*%d);'
+ 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- string_to_write = "abc"*pipe_buf
+ string_to_write = b"abc"*pipe_buf
(stdout, stderr) = p.communicate(string_to_write)
self.assertEqual(stdout, string_to_write)
def test_writes_before_communicate(self):
# stdin.write before communicate()
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(sys.stdin.read())'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ 'import sys,os;'
+ 'sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- p.stdin.write("banana")
- (stdout, stderr) = p.communicate("split")
- self.assertEqual(stdout, "bananasplit")
- self.assertStderrEqual(stderr, "")
+ p.stdin.write(b"banana")
+ (stdout, stderr) = p.communicate(b"split")
+ self.assertEqual(stdout, b"bananasplit")
+ self.assertStderrEqual(stderr, b"")
def test_universal_newlines(self):
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY +
- 'sys.stdout.write("line1\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line2\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line3\\r\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line4\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline5");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline6");'],
- stdout=subprocess.PIPE,
- universal_newlines=1)
+ 'import sys,os;' + SETBINARY +
+ 'buf = sys.stdout.buffer;'
+ 'buf.write(sys.stdin.readline().encode());'
+ 'buf.flush();'
+ 'buf.write(b"line2\\n");'
+ 'buf.flush();'
+ 'buf.write(sys.stdin.read().encode());'
+ 'buf.flush();'
+ 'buf.write(b"line4\\n");'
+ 'buf.flush();'
+ 'buf.write(b"line5\\r\\n");'
+ 'buf.flush();'
+ 'buf.write(b"line6\\r");'
+ 'buf.flush();'
+ 'buf.write(b"\\nline7");'
+ 'buf.flush();'
+ 'buf.write(b"\\nline8");'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=1)
+ p.stdin.write("line1\n")
+ self.assertEqual(p.stdout.readline(), "line1\n")
+ p.stdin.write("line3\n")
+ p.stdin.close()
self.addCleanup(p.stdout.close)
- stdout = p.stdout.read()
- if hasattr(file, 'newlines'):
- # Interpreter with universal newline support
- self.assertEqual(stdout,
- "line1\nline2\nline3\nline4\nline5\nline6")
- else:
- # Interpreter without universal newline support
- self.assertEqual(stdout,
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+ self.assertEqual(p.stdout.readline(),
+ "line2\n")
+ self.assertEqual(p.stdout.read(6),
+ "line3\n")
+ self.assertEqual(p.stdout.read(),
+ "line4\nline5\nline6\nline7\nline8")
def test_universal_newlines_communicate(self):
# universal newlines through communicate()
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY +
- 'sys.stdout.write("line1\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line2\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line3\\r\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line4\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline5");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline6");'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=1)
+ 'import sys,os;' + SETBINARY +
+ 'buf = sys.stdout.buffer;'
+ 'buf.write(b"line2\\n");'
+ 'buf.flush();'
+ 'buf.write(b"line4\\n");'
+ 'buf.flush();'
+ 'buf.write(b"line5\\r\\n");'
+ 'buf.flush();'
+ 'buf.write(b"line6\\r");'
+ 'buf.flush();'
+ 'buf.write(b"\\nline7");'
+ 'buf.flush();'
+ 'buf.write(b"\\nline8");'],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=1)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
+ # BUG: can't give a non-empty stdin because it breaks both the
+ # select- and poll-based communicate() implementations.
(stdout, stderr) = p.communicate()
- if hasattr(file, 'newlines'):
- # Interpreter with universal newline support
- self.assertEqual(stdout,
- "line1\nline2\nline3\nline4\nline5\nline6")
- else:
- # Interpreter without universal newline support
- self.assertEqual(stdout,
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+ self.assertEqual(stdout,
+ "line2\nline4\nline5\nline6\nline7\nline8")
+
+ def test_universal_newlines_communicate_stdin(self):
+ # universal newlines through communicate(), with only stdin
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' + SETBINARY + '''\nif True:
+ s = sys.stdin.readline()
+ assert s == "line1\\n", repr(s)
+ s = sys.stdin.read()
+ assert s == "line3\\n", repr(s)
+ '''],
+ stdin=subprocess.PIPE,
+ universal_newlines=1)
+ (stdout, stderr) = p.communicate("line1\nline3\n")
+ self.assertEqual(p.returncode, 0)
+
+ def test_universal_newlines_communicate_input_none(self):
+ # Test communicate(input=None) with universal newlines.
+ #
+ # We set stdout to PIPE because, as of this writing, a different
+ # code path is tested when the number of pipes is zero or one.
+ p = subprocess.Popen([sys.executable, "-c", "pass"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
+ p.communicate()
+ self.assertEqual(p.returncode, 0)
+
+ def test_universal_newlines_communicate_encodings(self):
+ # Check that universal newlines mode works for various encodings,
+ # in particular for encodings in the UTF-16 and UTF-32 families.
+ # See issue #15595.
+ #
+ # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
+ # without, and UTF-16 and UTF-32.
+ for encoding in ['utf-16', 'utf-32-be']:
+ old_getpreferredencoding = locale.getpreferredencoding
+ # Indirectly via io.TextIOWrapper, Popen() defaults to
+ # locale.getpreferredencoding(False) and earlier in Python 3.2 to
+ # locale.getpreferredencoding().
+ def getpreferredencoding(do_setlocale=True):
+ return encoding
+ code = ("import sys; "
+ r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
+ encoding)
+ args = [sys.executable, '-c', code]
+ try:
+ locale.getpreferredencoding = getpreferredencoding
+ # We set stdin to be non-None because, as of this writing,
+ # a different code path is used when the number of pipes is
+ # zero or one.
+ popen = subprocess.Popen(args, universal_newlines=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ stdout, stderr = popen.communicate(input='')
+ finally:
+ locale.getpreferredencoding = old_getpreferredencoding
+
+ self.assertEqual(stdout, '1\n2\n3\n4')
def test_no_leaking(self):
# Make sure we leak no resources
@@ -509,11 +679,12 @@ class ProcessTestCase(BaseTestCase):
else:
max_handles = 2050 # too much for (at least some) Windows setups
handles = []
+ tmpdir = tempfile.mkdtemp()
try:
for i in range(max_handles):
try:
- handles.append(os.open(test_support.TESTFN,
- os.O_WRONLY | os.O_CREAT))
+ tmpfile = os.path.join(tmpdir, support.TESTFN)
+ handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
except OSError as e:
if e.errno != errno.EMFILE:
raise
@@ -538,7 +709,7 @@ class ProcessTestCase(BaseTestCase):
finally:
for h in handles:
os.close(h)
- test_support.unlink(test_support.TESTFN)
+ shutil.rmtree(tmpdir)
def test_list2cmdline(self):
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
@@ -589,6 +760,14 @@ class ProcessTestCase(BaseTestCase):
with self.assertRaises(TypeError):
subprocess.Popen([sys.executable, "-c", "pass"], "orange")
+ def test_bufsize_is_none(self):
+ # bufsize=None should be the same as bufsize=0.
+ p = subprocess.Popen([sys.executable, "-c", "pass"], None)
+ self.assertEqual(p.wait(), 0)
+ # Again with keyword arg
+ p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
+ self.assertEqual(p.wait(), 0)
+
def test_leaking_fds_on_error(self):
# see bug #5179: Popen leaks file descriptors to PIPEs if
# the child fails to execute; this will eventually exhaust
@@ -605,6 +784,17 @@ class ProcessTestCase(BaseTestCase):
if c.exception.errno not in (errno.ENOENT, errno.EACCES):
raise c.exception
+ def test_issue8780(self):
+ # Ensure that stdout is inherited from the parent
+ # if stdout=PIPE is not used
+ code = ';'.join((
+ 'import subprocess, sys',
+ 'retcode = subprocess.call('
+ "[sys.executable, '-c', 'print(\"Hello World!\")'])",
+ 'assert retcode == 0'))
+ output = subprocess.check_output([sys.executable, '-c', code])
+ self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
+
def test_handles_closed_on_exception(self):
# If CreateProcess exits with an error, ensure the
# duplicate output handles are released
@@ -634,7 +824,7 @@ class ProcessTestCase(BaseTestCase):
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- p.communicate("x" * 2**20)
+ p.communicate(b"x" * 2**20)
def test_communicate_epipe_only_stdin(self):
# Issue 10963: communicate() should hide EPIPE
@@ -642,7 +832,26 @@ class ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE)
self.addCleanup(p.stdin.close)
time.sleep(2)
- p.communicate("x" * 2**20)
+ p.communicate(b"x" * 2**20)
+
+ @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
+ "Requires signal.SIGALRM")
+ def test_communicate_eintr(self):
+ # Issue #12493: communicate() should handle EINTR
+ def handler(signum, frame):
+ pass
+ old_handler = signal.signal(signal.SIGALRM, handler)
+ self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+
+ # the process is running for 2 seconds
+ args = [sys.executable, "-c", 'import time; time.sleep(2)']
+ for stream in ('stdout', 'stderr'):
+ kw = {stream: subprocess.PIPE}
+ with subprocess.Popen(args, **kw) as process:
+ signal.alarm(1)
+ # communicate() will be interrupted by SIGALRM
+ process.communicate()
+
# This test is Linux-ish specific for simplicity to at least have
# some coverage. It is not a platform specific bug.
@@ -690,7 +899,8 @@ class _SuppressCoreFiles(object):
'com.apple.CrashReporter', 'DialogType'],
stdout=subprocess.PIPE).communicate()[0]
if value.strip() == b'developer':
- print "this tests triggers the Crash Reporter, that is intentional"
+ print("this tests triggers the Crash Reporter, "
+ "that is intentional", end='')
sys.stdout.flush()
def __exit__(self, *args):
@@ -703,53 +913,127 @@ class _SuppressCoreFiles(object):
except (ValueError, resource.error):
pass
- @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
- "Requires signal.SIGALRM")
- def test_communicate_eintr(self):
- # Issue #12493: communicate() should handle EINTR
- def handler(signum, frame):
- pass
- old_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
-
- # the process is running for 2 seconds
- args = [sys.executable, "-c", 'import time; time.sleep(2)']
- for stream in ('stdout', 'stderr'):
- kw = {stream: subprocess.PIPE}
- with subprocess.Popen(args, **kw) as process:
- signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
- process.communicate()
-
@unittest.skipIf(mswindows, "POSIX specific tests")
class POSIXProcessTestCase(BaseTestCase):
- def test_exceptions(self):
- # caught & re-raised exceptions
- with self.assertRaises(OSError) as c:
+ def setUp(self):
+ super().setUp()
+ self._nonexistent_dir = "/_this/pa.th/does/not/exist"
+
+ def _get_chdir_exception(self):
+ try:
+ os.chdir(self._nonexistent_dir)
+ except OSError as e:
+ # This avoids hard coding the errno value or the OS perror()
+ # string and instead capture the exception that we want to see
+ # below for comparison.
+ desired_exception = e
+ desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
+ else:
+ self.fail("chdir to nonexistant directory %s succeeded." %
+ self._nonexistent_dir)
+ return desired_exception
+
+ def test_exception_cwd(self):
+ """Test error in the child raised in the parent for a bad cwd."""
+ desired_exception = self._get_chdir_exception()
+ try:
p = subprocess.Popen([sys.executable, "-c", ""],
- cwd="/this/path/does/not/exist")
- # The attribute child_traceback should contain "os.chdir" somewhere.
- self.assertIn("os.chdir", c.exception.child_traceback)
+ cwd=self._nonexistent_dir)
+ except OSError as e:
+ # Test that the child process chdir failure actually makes
+ # it up to the parent process as the correct exception.
+ self.assertEqual(desired_exception.errno, e.errno)
+ self.assertEqual(desired_exception.strerror, e.strerror)
+ else:
+ self.fail("Expected OSError: %s" % desired_exception)
+
+ def test_exception_bad_executable(self):
+ """Test error in the child raised in the parent for a bad executable."""
+ desired_exception = self._get_chdir_exception()
+ try:
+ p = subprocess.Popen([sys.executable, "-c", ""],
+ executable=self._nonexistent_dir)
+ except OSError as e:
+ # Test that the child process exec failure actually makes
+ # it up to the parent process as the correct exception.
+ self.assertEqual(desired_exception.errno, e.errno)
+ self.assertEqual(desired_exception.strerror, e.strerror)
+ else:
+ self.fail("Expected OSError: %s" % desired_exception)
+
+ def test_exception_bad_args_0(self):
+ """Test error in the child raised in the parent for a bad args[0]."""
+ desired_exception = self._get_chdir_exception()
+ try:
+ p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
+ except OSError as e:
+ # Test that the child process exec failure actually makes
+ # it up to the parent process as the correct exception.
+ self.assertEqual(desired_exception.errno, e.errno)
+ self.assertEqual(desired_exception.strerror, e.strerror)
+ else:
+ self.fail("Expected OSError: %s" % desired_exception)
+
+ def test_restore_signals(self):
+ # Code coverage for both values of restore_signals to make sure it
+ # at least does not blow up.
+ # A test for behavior would be complex. Contributions welcome.
+ subprocess.call([sys.executable, "-c", ""], restore_signals=True)
+ subprocess.call([sys.executable, "-c", ""], restore_signals=False)
+
+ def test_start_new_session(self):
+ # For code coverage of calling setsid(). We don't care if we get an
+ # EPERM error from it depending on the test execution environment, that
+ # still indicates that it was called.
+ try:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import os; print(os.getpgid(os.getpid()))"],
+ start_new_session=True)
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+ else:
+ parent_pgid = os.getpgid(os.getpid())
+ child_pgid = int(output)
+ self.assertNotEqual(parent_pgid, child_pgid)
def test_run_abort(self):
# returncode handles signal termination
with _SuppressCoreFiles():
p = subprocess.Popen([sys.executable, "-c",
- "import os; os.abort()"])
+ 'import os; os.abort()'])
p.wait()
self.assertEqual(-p.returncode, signal.SIGABRT)
def test_preexec(self):
- # preexec function
+ # DISCLAIMER: Setting environment variables is *not* a good use
+ # of a preexec_fn. This is merely a test.
p = subprocess.Popen([sys.executable, "-c",
- "import sys, os;"
- "sys.stdout.write(os.getenv('FRUIT'))"],
+ 'import sys,os;'
+ 'sys.stdout.write(os.getenv("FRUIT"))'],
stdout=subprocess.PIPE,
preexec_fn=lambda: os.putenv("FRUIT", "apple"))
self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), "apple")
+ self.assertEqual(p.stdout.read(), b"apple")
+
+ def test_preexec_exception(self):
+ def raise_it():
+ raise ValueError("What if two swallows carried a coconut?")
+ try:
+ p = subprocess.Popen([sys.executable, "-c", ""],
+ preexec_fn=raise_it)
+ except RuntimeError as e:
+ self.assertTrue(
+ subprocess._posixsubprocess,
+ "Expected a ValueError from the preexec_fn")
+ except ValueError as e:
+ self.assertIn("coconut", e.args[0])
+ else:
+ self.fail("Exception raised by preexec_fn did not make it "
+ "to the parent process.")
class _TestExecuteChildPopen(subprocess.Popen):
"""Used to test behavior at the end of _execute_child."""
@@ -757,20 +1041,9 @@ class POSIXProcessTestCase(BaseTestCase):
self._testcase = testcase
subprocess.Popen.__init__(self, *args, **kwargs)
- def _execute_child(
- self, args, executable, preexec_fn, close_fds, cwd, env,
- universal_newlines, startupinfo, creationflags, shell,
- p2cread, p2cwrite,
- c2pread, c2pwrite,
- errread, errwrite):
+ def _execute_child(self, *args, **kwargs):
try:
- subprocess.Popen._execute_child(
- self, args, executable, preexec_fn, close_fds,
- cwd, env, universal_newlines,
- startupinfo, creationflags, shell,
- p2cread, p2cwrite,
- c2pread, c2pwrite,
- errread, errwrite)
+ subprocess.Popen._execute_child(self, *args, **kwargs)
finally:
# Open a bunch of file descriptors and verify that
# none of them are the same as the ones the Popen
@@ -780,7 +1053,9 @@ class POSIXProcessTestCase(BaseTestCase):
try:
for fd in devzero_fds:
self._testcase.assertNotIn(
- fd, (p2cwrite, c2pread, errread))
+ fd, (self.stdin.fileno(), self.stdout.fileno(),
+ self.stderr.fileno()),
+ msg="At least one fd was closed early.")
finally:
map(os.close, devzero_fds)
@@ -793,17 +1068,55 @@ class POSIXProcessTestCase(BaseTestCase):
with self.assertRaises(RuntimeError):
self._TestExecuteChildPopen(
- self, [sys.executable, "-c", "pass"],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, preexec_fn=raise_it)
+ self, [sys.executable, "-c", "pass"],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, preexec_fn=raise_it)
+
+ def test_preexec_gc_module_failure(self):
+ # This tests the code that disables garbage collection if the child
+ # process will execute any Python.
+ def raise_runtime_error():
+ raise RuntimeError("this shouldn't escape")
+ enabled = gc.isenabled()
+ orig_gc_disable = gc.disable
+ orig_gc_isenabled = gc.isenabled
+ try:
+ gc.disable()
+ self.assertFalse(gc.isenabled())
+ subprocess.call([sys.executable, '-c', ''],
+ preexec_fn=lambda: None)
+ self.assertFalse(gc.isenabled(),
+ "Popen enabled gc when it shouldn't.")
+
+ gc.enable()
+ self.assertTrue(gc.isenabled())
+ subprocess.call([sys.executable, '-c', ''],
+ preexec_fn=lambda: None)
+ self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
+
+ gc.disable = raise_runtime_error
+ self.assertRaises(RuntimeError, subprocess.Popen,
+ [sys.executable, '-c', ''],
+ preexec_fn=lambda: None)
+
+ del gc.isenabled # force an AttributeError
+ self.assertRaises(AttributeError, subprocess.Popen,
+ [sys.executable, '-c', ''],
+ preexec_fn=lambda: None)
+ finally:
+ gc.disable = orig_gc_disable
+ gc.isenabled = orig_gc_isenabled
+ if not enabled:
+ gc.disable()
def test_args_string(self):
# args is a string
- f, fname = mkstemp()
- os.write(f, "#!/bin/sh\n")
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
- sys.executable)
- os.close(f)
+ fd, fname = mkstemp()
+ # reopen in text mode
+ with open(fd, "w", errors="surrogateescape") as fobj:
+ fobj.write("#!/bin/sh\n")
+ fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
+ sys.executable)
os.chmod(fname, 0o700)
p = subprocess.Popen(fname)
p.wait()
@@ -829,7 +1142,7 @@ class POSIXProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE,
env=newenv)
self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), "apple")
+ self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_shell_string(self):
# Run command through the shell (string)
@@ -839,16 +1152,17 @@ class POSIXProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE,
env=newenv)
self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), "apple")
+ self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_call_string(self):
# call() function with string argument on UNIX
- f, fname = mkstemp()
- os.write(f, "#!/bin/sh\n")
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
- sys.executable)
- os.close(f)
- os.chmod(fname, 0700)
+ fd, fname = mkstemp()
+ # reopen in text mode
+ with open(fd, "w", errors="surrogateescape") as fobj:
+ fobj.write("#!/bin/sh\n")
+ fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
+ sys.executable)
+ os.chmod(fname, 0o700)
rc = subprocess.call(fname)
os.remove(fname)
self.assertEqual(rc, 47)
@@ -871,7 +1185,7 @@ class POSIXProcessTestCase(BaseTestCase):
p = subprocess.Popen("echo $0", executable=sh, shell=True,
stdout=subprocess.PIPE)
self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), sh)
+ self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
def _kill_process(self, method, *args):
# Do not inherit file handles from the parent.
@@ -916,19 +1230,19 @@ class POSIXProcessTestCase(BaseTestCase):
def test_send_signal(self):
p = self._kill_process('send_signal', signal.SIGINT)
_, stderr = p.communicate()
- self.assertIn('KeyboardInterrupt', stderr)
+ self.assertIn(b'KeyboardInterrupt', stderr)
self.assertNotEqual(p.wait(), 0)
def test_kill(self):
p = self._kill_process('kill')
_, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
+ self.assertStderrEqual(stderr, b'')
self.assertEqual(p.wait(), -signal.SIGKILL)
def test_terminate(self):
p = self._kill_process('terminate')
_, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
+ self.assertStderrEqual(stderr, b'')
self.assertEqual(p.wait(), -signal.SIGTERM)
def test_send_signal_dead(self):
@@ -964,7 +1278,7 @@ class POSIXProcessTestCase(BaseTestCase):
stdin=stdin,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
- err = test_support.strip_python_stderr(err)
+ err = support.strip_python_stderr(err)
self.assertEqual((out, err), (b'apple', b'orange'))
finally:
for b, a in zip(newfds, fds):
@@ -995,6 +1309,54 @@ class POSIXProcessTestCase(BaseTestCase):
# all standard fds closed.
self.check_close_std_fds([0, 1, 2])
+ def test_remapping_std_fds(self):
+ # open up some temporary files
+ temps = [mkstemp() for i in range(3)]
+ try:
+ temp_fds = [fd for fd, fname in temps]
+
+ # unlink the files -- we won't need to reopen them
+ for fd, fname in temps:
+ os.unlink(fname)
+
+ # write some data to what will become stdin, and rewind
+ os.write(temp_fds[1], b"STDIN")
+ os.lseek(temp_fds[1], 0, 0)
+
+ # move the standard file descriptors out of the way
+ saved_fds = [os.dup(fd) for fd in range(3)]
+ try:
+ # duplicate the file objects over the standard fd's
+ for fd, temp_fd in enumerate(temp_fds):
+ os.dup2(temp_fd, fd)
+
+ # now use those files in the "wrong" order, so that subprocess
+ # has to rearrange them in the child
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; got = sys.stdin.read();'
+ 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
+ stdin=temp_fds[1],
+ stdout=temp_fds[2],
+ stderr=temp_fds[0])
+ p.wait()
+ finally:
+ # restore the original fd's underneath sys.stdin, etc.
+ for std, saved in enumerate(saved_fds):
+ os.dup2(saved, std)
+ os.close(saved)
+
+ for fd in temp_fds:
+ os.lseek(fd, 0, 0)
+
+ out = os.read(temp_fds[2], 1024)
+ err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
+ self.assertEqual(out, b"got STDIN")
+ self.assertEqual(err, b"err")
+
+ finally:
+ for fd in temp_fds:
+ os.close(fd)
+
def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
# open up some temporary files
temps = [mkstemp() for i in range(3)]
@@ -1029,7 +1391,7 @@ class POSIXProcessTestCase(BaseTestCase):
os.lseek(fd, 0, 0)
out = os.read(stdout_no, 1024)
- err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
+ err = support.strip_python_stderr(os.read(stderr_no, 1024))
finally:
for std, saved in enumerate(saved_fds):
os.dup2(saved, std)
@@ -1053,15 +1415,263 @@ class POSIXProcessTestCase(BaseTestCase):
self.check_swap_fds(2, 0, 1)
self.check_swap_fds(2, 1, 0)
+ def test_surrogates_error_message(self):
+ def prepare():
+ raise ValueError("surrogate:\uDCff")
+
+ try:
+ subprocess.call(
+ [sys.executable, "-c", "pass"],
+ preexec_fn=prepare)
+ except ValueError as err:
+ # Pure Python implementations keeps the message
+ self.assertIsNone(subprocess._posixsubprocess)
+ self.assertEqual(str(err), "surrogate:\uDCff")
+ except RuntimeError as err:
+ # _posixsubprocess uses a default message
+ self.assertIsNotNone(subprocess._posixsubprocess)
+ self.assertEqual(str(err), "Exception occurred in preexec_fn.")
+ else:
+ self.fail("Expected ValueError or RuntimeError")
+
+ def test_undecodable_env(self):
+ for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
+ # test str with surrogates
+ script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
+ env = os.environ.copy()
+ env[key] = value
+ # Use C locale to get ascii for the locale encoding to force
+ # surrogate-escaping of \xFF in the child process; otherwise it can
+ # be decoded as-is if the default locale is latin-1.
+ env['LC_ALL'] = 'C'
+ stdout = subprocess.check_output(
+ [sys.executable, "-c", script],
+ env=env)
+ stdout = stdout.rstrip(b'\n\r')
+ self.assertEqual(stdout.decode('ascii'), ascii(value))
+
+ # test bytes
+ key = key.encode("ascii", "surrogateescape")
+ value = value.encode("ascii", "surrogateescape")
+ script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
+ env = os.environ.copy()
+ env[key] = value
+ stdout = subprocess.check_output(
+ [sys.executable, "-c", script],
+ env=env)
+ stdout = stdout.rstrip(b'\n\r')
+ self.assertEqual(stdout.decode('ascii'), ascii(value))
+
+ def test_bytes_program(self):
+ abs_program = os.fsencode(sys.executable)
+ path, program = os.path.split(sys.executable)
+ program = os.fsencode(program)
+
+ # absolute bytes path
+ exitcode = subprocess.call([abs_program, "-c", "pass"])
+ self.assertEqual(exitcode, 0)
+
+ # bytes program, unicode PATH
+ env = os.environ.copy()
+ env["PATH"] = path
+ exitcode = subprocess.call([program, "-c", "pass"], env=env)
+ self.assertEqual(exitcode, 0)
+
+ # bytes program, bytes PATH
+ envb = os.environb.copy()
+ envb[b"PATH"] = os.fsencode(path)
+ exitcode = subprocess.call([program, "-c", "pass"], env=envb)
+ self.assertEqual(exitcode, 0)
+
+ def test_pipe_cloexec(self):
+ sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
+ fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
+
+ p1 = subprocess.Popen([sys.executable, sleeper],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, close_fds=False)
+
+ self.addCleanup(p1.communicate, b'')
+
+ p2 = subprocess.Popen([sys.executable, fd_status],
+ stdout=subprocess.PIPE, close_fds=False)
+
+ output, error = p2.communicate()
+ result_fds = set(map(int, output.split(b',')))
+ unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
+ p1.stderr.fileno()])
+
+ self.assertFalse(result_fds & unwanted_fds,
+ "Expected no fds from %r to be open in child, "
+ "found %r" %
+ (unwanted_fds, result_fds & unwanted_fds))
+
+ def test_pipe_cloexec_real_tools(self):
+ qcat = support.findfile("qcat.py", subdir="subprocessdata")
+ qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
+
+ subdata = b'zxcvbn'
+ data = subdata * 4 + b'\n'
+
+ p1 = subprocess.Popen([sys.executable, qcat],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ close_fds=False)
+
+ p2 = subprocess.Popen([sys.executable, qgrep, subdata],
+ stdin=p1.stdout, stdout=subprocess.PIPE,
+ close_fds=False)
+
+ self.addCleanup(p1.wait)
+ self.addCleanup(p2.wait)
+ def kill_p1():
+ try:
+ p1.terminate()
+ except ProcessLookupError:
+ pass
+ def kill_p2():
+ try:
+ p2.terminate()
+ except ProcessLookupError:
+ pass
+ self.addCleanup(kill_p1)
+ self.addCleanup(kill_p2)
+
+ p1.stdin.write(data)
+ p1.stdin.close()
+
+ readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
+
+ self.assertTrue(readfiles, "The child hung")
+ self.assertEqual(p2.stdout.read(), data)
+
+ p1.stdout.close()
+ p2.stdout.close()
+
+ def test_close_fds(self):
+ fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
+
+ fds = os.pipe()
+ self.addCleanup(os.close, fds[0])
+ self.addCleanup(os.close, fds[1])
+
+ open_fds = set(fds)
+ # add a bunch more fds
+ for _ in range(9):
+ fd = os.open("/dev/null", os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ open_fds.add(fd)
+
+ p = subprocess.Popen([sys.executable, fd_status],
+ stdout=subprocess.PIPE, close_fds=False)
+ output, ignored = p.communicate()
+ remaining_fds = set(map(int, output.split(b',')))
+
+ self.assertEqual(remaining_fds & open_fds, open_fds,
+ "Some fds were closed")
+
+ p = subprocess.Popen([sys.executable, fd_status],
+ stdout=subprocess.PIPE, close_fds=True)
+ output, ignored = p.communicate()
+ remaining_fds = set(map(int, output.split(b',')))
+
+ self.assertFalse(remaining_fds & open_fds,
+ "Some fds were left open")
+ self.assertIn(1, remaining_fds, "Subprocess failed")
+
+ # Keep some of the fd's we opened open in the subprocess.
+ # This tests _posixsubprocess.c's proper handling of fds_to_keep.
+ fds_to_keep = set(open_fds.pop() for _ in range(8))
+ p = subprocess.Popen([sys.executable, fd_status],
+ stdout=subprocess.PIPE, close_fds=True,
+ pass_fds=())
+ output, ignored = p.communicate()
+ remaining_fds = set(map(int, output.split(b',')))
+
+ self.assertFalse(remaining_fds & fds_to_keep & open_fds,
+ "Some fds not in pass_fds were left open")
+ self.assertIn(1, remaining_fds, "Subprocess failed")
+
+ # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
+ # descriptor of a pipe closed in the parent process is valid in the
+ # child process according to fstat(), but the mode of the file
+ # descriptor is invalid, and read or write raise an error.
+ @support.requires_mac_ver(10, 5)
+ def test_pass_fds(self):
+ fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
+
+ open_fds = set()
+
+ for x in range(5):
+ fds = os.pipe()
+ self.addCleanup(os.close, fds[0])
+ self.addCleanup(os.close, fds[1])
+ open_fds.update(fds)
+
+ for fd in open_fds:
+ p = subprocess.Popen([sys.executable, fd_status],
+ stdout=subprocess.PIPE, close_fds=True,
+ pass_fds=(fd, ))
+ output, ignored = p.communicate()
+
+ remaining_fds = set(map(int, output.split(b',')))
+ to_be_closed = open_fds - {fd}
+
+ self.assertIn(fd, remaining_fds, "fd to be passed not passed")
+ self.assertFalse(remaining_fds & to_be_closed,
+ "fd to be closed passed")
+
+ # pass_fds overrides close_fds with a warning.
+ with self.assertWarns(RuntimeWarning) as context:
+ self.assertFalse(subprocess.call(
+ [sys.executable, "-c", "import sys; sys.exit(0)"],
+ close_fds=False, pass_fds=(fd, )))
+ self.assertIn('overriding close_fds', str(context.warning))
+
+ def test_stdout_stdin_are_single_inout_fd(self):
+ with io.open(os.devnull, "r+") as inout:
+ p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+ stdout=inout, stdin=inout)
+ p.wait()
+
+ def test_stdout_stderr_are_single_inout_fd(self):
+ with io.open(os.devnull, "r+") as inout:
+ p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+ stdout=inout, stderr=inout)
+ p.wait()
+
+ def test_stderr_stdin_are_single_inout_fd(self):
+ with io.open(os.devnull, "r+") as inout:
+ p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+ stderr=inout, stdin=inout)
+ p.wait()
+
def test_wait_when_sigchild_ignored(self):
# NOTE: sigchild_ignore.py may not be an effective test on all OSes.
- sigchild_ignore = test_support.findfile("sigchild_ignore.py",
- subdir="subprocessdata")
+ sigchild_ignore = support.findfile("sigchild_ignore.py",
+ subdir="subprocessdata")
p = subprocess.Popen([sys.executable, sigchild_ignore],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
- " non-zero with this error:\n%s" % stderr)
+ " non-zero with this error:\n%s" %
+ stderr.decode('utf8'))
+
+ def test_select_unbuffered(self):
+ # Issue #11459: bufsize=0 should really set the pipes as
+ # unbuffered (and therefore let select() work properly).
+ select = support.import_module("select")
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;'
+ 'sys.stdout.write("apple")'],
+ stdout=subprocess.PIPE,
+ bufsize=0)
+ f = p.stdout
+ self.addCleanup(f.close)
+ try:
+ self.assertEqual(f.read(4), b"appl")
+ self.assertIn(f, select.select([f], [], [], 0.0)[0])
+ finally:
+ p.wait()
def test_zombie_fast_process_del(self):
# Issue #12650: on Unix, if Popen.__del__() was called before the
@@ -1113,37 +1723,6 @@ class POSIXProcessTestCase(BaseTestCase):
self.assertRaises(OSError, os.waitpid, pid, 0)
self.assertNotIn(ident, [id(o) for o in subprocess._active])
- def test_pipe_cloexec(self):
- # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
- # and are not inherited by another child process.
- p1 = subprocess.Popen([sys.executable, "-c",
- 'import os;'
- 'os.read(0, 1)'
- ],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- p2 = subprocess.Popen([sys.executable, "-c", """if True:
- import os, errno, sys
- for fd in %r:
- try:
- os.close(fd)
- except OSError as e:
- if e.errno != errno.EBADF:
- raise
- else:
- sys.exit(1)
- sys.exit(0)
- """ % [f.fileno() for f in (p1.stdin, p1.stdout,
- p1.stderr)]
- ],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, close_fds=False)
- p1.communicate('foo')
- _, stderr = p2.communicate()
-
- self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
-
@unittest.skipUnless(mswindows, "Windows specific tests")
class Win32ProcessTestCase(BaseTestCase):
@@ -1198,7 +1777,7 @@ class Win32ProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE,
env=newenv)
self.addCleanup(p.stdout.close)
- self.assertIn("physalis", p.stdout.read())
+ self.assertIn(b"physalis", p.stdout.read())
def test_shell_string(self):
# Run command through the shell (string)
@@ -1208,7 +1787,7 @@ class Win32ProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE,
env=newenv)
self.addCleanup(p.stdout.close)
- self.assertIn("physalis", p.stdout.read())
+ self.assertIn(b"physalis", p.stdout.read())
def test_call_string(self):
# call() function with string argument on Windows
@@ -1235,7 +1814,7 @@ class Win32ProcessTestCase(BaseTestCase):
p.stdout.read(1)
getattr(p, method)(*args)
_, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
+ self.assertStderrEqual(stderr, b'')
returncode = p.wait()
self.assertNotEqual(returncode, 0)
@@ -1283,6 +1862,33 @@ class Win32ProcessTestCase(BaseTestCase):
self._kill_dead_process('terminate')
+# The module says:
+# "NB This only works (and is only relevant) for UNIX."
+#
+# Actually, getoutput should work on any platform with an os.popen, but
+# I'll take the comment as given, and skip this suite.
+@unittest.skipUnless(os.name == 'posix', "only relevant for UNIX")
+class CommandTests(unittest.TestCase):
+ def test_getoutput(self):
+ self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
+ self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
+ (0, 'xyzzy'))
+
+ # we use mkdtemp in the next line to create an empty directory
+ # under our exclusive control; from that, we can invent a pathname
+ # that we _know_ won't exist. This is guaranteed to fail.
+ dir = None
+ try:
+ dir = tempfile.mkdtemp()
+ name = os.path.join(dir, "foo")
+
+ status, output = subprocess.getstatusoutput('cat ' + name)
+ self.assertNotEqual(status, 0)
+ finally:
+ if dir is not None:
+ os.rmdir(dir)
+
+
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
"poll system call not supported")
class ProcessTestCaseNoPoll(ProcessTestCase):
@@ -1295,6 +1901,28 @@ class ProcessTestCaseNoPoll(ProcessTestCase):
ProcessTestCase.tearDown(self)
+@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
+ "_posixsubprocess extension module not found.")
+class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
+ @classmethod
+ def setUpClass(cls):
+ global subprocess
+ assert subprocess._posixsubprocess
+ # Reimport subprocess while forcing _posixsubprocess to not exist.
+ with support.check_warnings(('.*_posixsubprocess .* not being used.*',
+ RuntimeWarning)):
+ subprocess = support.import_fresh_module(
+ 'subprocess', blocked=['_posixsubprocess'])
+ assert not subprocess._posixsubprocess
+
+ @classmethod
+ def tearDownClass(cls):
+ global subprocess
+ # Reimport subprocess as it should be, restoring order to the universe.
+ subprocess = support.import_fresh_module('subprocess')
+ assert subprocess._posixsubprocess
+
+
class HelperFunctionTests(unittest.TestCase):
@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
def test_eintr_retry_call(self):
@@ -1313,11 +1941,12 @@ class HelperFunctionTests(unittest.TestCase):
subprocess._eintr_retry_call(fake_os_func, 666))
self.assertEqual([(256, 999), (666,), (666,)], record_calls)
-@unittest.skipUnless(mswindows, "mswindows only")
+
+@unittest.skipUnless(mswindows, "Windows-specific tests")
class CommandsWithSpaces (BaseTestCase):
def setUp(self):
- super(CommandsWithSpaces, self).setUp()
+ super().setUp()
f, fname = mkstemp(".py", "te st")
self.fname = fname.lower ()
os.write(f, b"import sys;"
@@ -1327,7 +1956,7 @@ class CommandsWithSpaces (BaseTestCase):
def tearDown(self):
os.remove(self.fname)
- super(CommandsWithSpaces, self).tearDown()
+ super().tearDown()
def with_spaces(self, *args, **kwargs):
kwargs['stdout'] = subprocess.PIPE
@@ -1356,16 +1985,61 @@ class CommandsWithSpaces (BaseTestCase):
# call() function with sequence argument with spaces on Windows
self.with_spaces([sys.executable, self.fname, "ab cd"])
+
+class ContextManagerTests(BaseTestCase):
+
+ def test_pipe(self):
+ with subprocess.Popen([sys.executable, "-c",
+ "import sys;"
+ "sys.stdout.write('stdout');"
+ "sys.stderr.write('stderr');"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE) as proc:
+ self.assertEqual(proc.stdout.read(), b"stdout")
+ self.assertStderrEqual(proc.stderr.read(), b"stderr")
+
+ self.assertTrue(proc.stdout.closed)
+ self.assertTrue(proc.stderr.closed)
+
+ def test_returncode(self):
+ with subprocess.Popen([sys.executable, "-c",
+ "import sys; sys.exit(100)"]) as proc:
+ pass
+ # __exit__ calls wait(), so the returncode should be set
+ self.assertEqual(proc.returncode, 100)
+
+ def test_communicate_stdin(self):
+ with subprocess.Popen([sys.executable, "-c",
+ "import sys;"
+ "sys.exit(sys.stdin.read() == 'context')"],
+ stdin=subprocess.PIPE) as proc:
+ proc.communicate(b"context")
+ self.assertEqual(proc.returncode, 1)
+
+ def test_invalid_args(self):
+ with self.assertRaises(EnvironmentError) as c:
+ with subprocess.Popen(['nonexisting_i_hope'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE) as proc:
+ pass
+
+ self.assertEqual(c.exception.errno, errno.ENOENT)
+
+
def test_main():
unit_tests = (ProcessTestCase,
POSIXProcessTestCase,
Win32ProcessTestCase,
+ ProcessTestCasePOSIXPurePython,
+ CommandTests,
ProcessTestCaseNoPoll,
HelperFunctionTests,
- CommandsWithSpaces)
+ CommandsWithSpaces,
+ ContextManagerTests,
+ )
- test_support.run_unittest(*unit_tests)
- test_support.reap_children()
+ support.run_unittest(*unit_tests)
+ support.reap_children()
if __name__ == "__main__":
- test_main()
+ unittest.main()