aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r--Lib/test/test_os.py329
1 files changed, 313 insertions, 16 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index d70a0aeaf84..50240939d8a 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -24,6 +24,9 @@ import itertools
import stat
import locale
import codecs
+import decimal
+import fractions
+import pickle
try:
import threading
except ImportError:
@@ -32,6 +35,10 @@ try:
import resource
except ImportError:
resource = None
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
from test.script_helper import assert_python_ok
@@ -256,6 +263,13 @@ class StatAttributeTests(unittest.TestCase):
warnings.simplefilter("ignore", DeprecationWarning)
self.check_stat_attributes(fname)
+ def test_stat_result_pickle(self):
+ result = os.stat(self.fname)
+ p = pickle.dumps(result)
+ self.assertIn(b'\x03cos\nstat_result\n', p)
+ unpickled = pickle.loads(p)
+ self.assertEqual(result, unpickled)
+
@unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
def test_statvfs_attributes(self):
try:
@@ -263,7 +277,7 @@ class StatAttributeTests(unittest.TestCase):
except OSError as e:
# On AtheOS, glibc always returns ENOSYS
if e.errno == errno.ENOSYS:
- self.skipTest('glibc always returns ENOSYS on AtheOS')
+ self.skipTest('os.statvfs() failed with ENOSYS')
# Make sure direct access works
self.assertEqual(result.f_bfree, result[3])
@@ -300,6 +314,21 @@ class StatAttributeTests(unittest.TestCase):
except TypeError:
pass
+ @unittest.skipUnless(hasattr(os, 'statvfs'),
+ "need os.statvfs()")
+ def test_statvfs_result_pickle(self):
+ try:
+ result = os.statvfs(self.fname)
+ except OSError as e:
+ # On AtheOS, glibc always returns ENOSYS
+ if e.errno == errno.ENOSYS:
+ self.skipTest('os.statvfs() failed with ENOSYS')
+
+ p = pickle.dumps(result)
+ self.assertIn(b'\x03cos\nstatvfs_result\n', p)
+ unpickled = pickle.loads(p)
+ self.assertEqual(result, unpickled)
+
def test_utime_dir(self):
delta = 1000000
st = os.stat(support.TESTFN)
@@ -478,9 +507,9 @@ class StatAttributeTests(unittest.TestCase):
# Verify that an open file can be stat'ed
try:
os.stat(r"c:\pagefile.sys")
- except WindowsError as e:
- if e.errno == 2: # file does not exist; cannot run test
- self.skipTest(r'c:\pagefile.sys does not exist')
+ except FileNotFoundError:
+ self.skipTest(r'c:\pagefile.sys does not exist')
+ except OSError as e:
self.fail("Could not stat pagefile.sys")
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@@ -876,6 +905,17 @@ class MakedirTests(unittest.TestCase):
os.makedirs(path, mode=mode, exist_ok=True)
os.umask(old_mask)
+ @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
+ def test_chown_uid_gid_arguments_must_be_index(self):
+ stat = os.stat(support.TESTFN)
+ uid = stat.st_uid
+ gid = stat.st_gid
+ for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
+ self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
+ self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
+ self.assertIsNone(os.chown(support.TESTFN, uid, gid))
+ self.assertIsNone(os.chown(support.TESTFN, -1, -1))
+
def test_exist_ok_s_isgid_directory(self):
path = os.path.join(support.TESTFN, 'dir1')
S_ISGID = stat.S_ISGID
@@ -1133,27 +1173,27 @@ class ExecTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase):
def test_rename(self):
- self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
+ self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
def test_remove(self):
- self.assertRaises(WindowsError, os.remove, support.TESTFN)
+ self.assertRaises(OSError, os.remove, support.TESTFN)
def test_chdir(self):
- self.assertRaises(WindowsError, os.chdir, support.TESTFN)
+ self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self):
f = open(support.TESTFN, "w")
try:
- self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
+ self.assertRaises(OSError, os.mkdir, support.TESTFN)
finally:
f.close()
os.unlink(support.TESTFN)
def test_utime(self):
- self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
+ self.assertRaises(OSError, os.utime, support.TESTFN, None)
def test_chmod(self):
- self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
+ self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
class TestInvalidFD(unittest.TestCase):
singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
@@ -1287,41 +1327,57 @@ class PosixUidGidTests(unittest.TestCase):
@unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
def test_setuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.setuid, 0)
+ self.assertRaises(OSError, os.setuid, 0)
self.assertRaises(OverflowError, os.setuid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
def test_setgid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setgid, 0)
+ self.assertRaises(OSError, os.setgid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32)
@unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
def test_seteuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.seteuid, 0)
+ self.assertRaises(OSError, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
def test_setegid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setegid, 0)
+ self.assertRaises(OSError, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
def test_setreuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.setreuid, 0, 0)
+ self.assertRaises(OSError, os.setreuid, 0, 0)
self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
+ def test_setreuid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
+
@unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
def test_setregid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setregid, 0, 0)
+ self.assertRaises(OSError, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
+ def test_setregid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
+
@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
class Pep383Tests(unittest.TestCase):
def setUp(self):
@@ -1511,6 +1567,52 @@ class Win32KillTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+class Win32ListdirTests(unittest.TestCase):
+ """Test listdir on Windows."""
+
+ def setUp(self):
+ self.created_paths = []
+ for i in range(2):
+ dir_name = 'SUB%d' % i
+ dir_path = os.path.join(support.TESTFN, dir_name)
+ file_name = 'FILE%d' % i
+ file_path = os.path.join(support.TESTFN, file_name)
+ os.makedirs(dir_path)
+ with open(file_path, 'w') as f:
+ f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
+ self.created_paths.extend([dir_name, file_name])
+ self.created_paths.sort()
+
+ def tearDown(self):
+ shutil.rmtree(support.TESTFN)
+
+ def test_listdir_no_extended_path(self):
+ """Test when the path is not an "extended" path."""
+ # unicode
+ self.assertEqual(
+ sorted(os.listdir(support.TESTFN)),
+ self.created_paths)
+ # bytes
+ self.assertEqual(
+ sorted(os.listdir(os.fsencode(support.TESTFN))),
+ [os.fsencode(path) for path in self.created_paths])
+
+ def test_listdir_extended_path(self):
+ """Test when the path starts with '\\\\?\\'."""
+ # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
+ # unicode
+ path = '\\\\?\\' + os.path.abspath(support.TESTFN)
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ self.created_paths)
+ # bytes
+ path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ [os.fsencode(path) for path in self.created_paths])
+
+
+@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@support.skip_unless_symlink
class Win32SymlinkTests(unittest.TestCase):
filelink = 'filelinktest'
@@ -2175,6 +2277,197 @@ class TermsizeTests(unittest.TestCase):
self.assertEqual(expected, actual)
+class OSErrorTests(unittest.TestCase):
+ def setUp(self):
+ class Str(str):
+ pass
+
+ self.bytes_filenames = []
+ self.unicode_filenames = []
+ if support.TESTFN_UNENCODABLE is not None:
+ decoded = support.TESTFN_UNENCODABLE
+ else:
+ decoded = support.TESTFN
+ self.unicode_filenames.append(decoded)
+ self.unicode_filenames.append(Str(decoded))
+ if support.TESTFN_UNDECODABLE is not None:
+ encoded = support.TESTFN_UNDECODABLE
+ else:
+ encoded = os.fsencode(support.TESTFN)
+ self.bytes_filenames.append(encoded)
+ self.bytes_filenames.append(memoryview(encoded))
+
+ self.filenames = self.bytes_filenames + self.unicode_filenames
+
+ def test_oserror_filename(self):
+ funcs = [
+ (self.filenames, os.chdir,),
+ (self.filenames, os.chmod, 0o777),
+ (self.filenames, os.lstat,),
+ (self.filenames, os.open, os.O_RDONLY),
+ (self.filenames, os.rmdir,),
+ (self.filenames, os.stat,),
+ (self.filenames, os.unlink,),
+ ]
+ if sys.platform == "win32":
+ funcs.extend((
+ (self.bytes_filenames, os.rename, b"dst"),
+ (self.bytes_filenames, os.replace, b"dst"),
+ (self.unicode_filenames, os.rename, "dst"),
+ (self.unicode_filenames, os.replace, "dst"),
+ # Issue #16414: Don't test undecodable names with listdir()
+ # because of a Windows bug.
+ #
+ # With the ANSI code page 932, os.listdir(b'\xe7') return an
+ # empty list (instead of failing), whereas os.listdir(b'\xff')
+ # raises a FileNotFoundError. It looks like a Windows bug:
+ # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
+ # fails with ERROR_FILE_NOT_FOUND (2), instead of
+ # ERROR_PATH_NOT_FOUND (3).
+ (self.unicode_filenames, os.listdir,),
+ ))
+ else:
+ funcs.extend((
+ (self.filenames, os.listdir,),
+ (self.filenames, os.rename, "dst"),
+ (self.filenames, os.replace, "dst"),
+ ))
+ if hasattr(os, "chown"):
+ funcs.append((self.filenames, os.chown, 0, 0))
+ if hasattr(os, "lchown"):
+ funcs.append((self.filenames, os.lchown, 0, 0))
+ if hasattr(os, "truncate"):
+ funcs.append((self.filenames, os.truncate, 0))
+ if hasattr(os, "chflags"):
+ funcs.append((self.filenames, os.chflags, 0))
+ if hasattr(os, "lchflags"):
+ funcs.append((self.filenames, os.lchflags, 0))
+ if hasattr(os, "chroot"):
+ funcs.append((self.filenames, os.chroot,))
+ if hasattr(os, "link"):
+ if sys.platform == "win32":
+ funcs.append((self.bytes_filenames, os.link, b"dst"))
+ funcs.append((self.unicode_filenames, os.link, "dst"))
+ else:
+ funcs.append((self.filenames, os.link, "dst"))
+ if hasattr(os, "listxattr"):
+ funcs.extend((
+ (self.filenames, os.listxattr,),
+ (self.filenames, os.getxattr, "user.test"),
+ (self.filenames, os.setxattr, "user.test", b'user'),
+ (self.filenames, os.removexattr, "user.test"),
+ ))
+ if hasattr(os, "lchmod"):
+ funcs.append((self.filenames, os.lchmod, 0o777))
+ if hasattr(os, "readlink"):
+ if sys.platform == "win32":
+ funcs.append((self.unicode_filenames, os.readlink,))
+ else:
+ funcs.append((self.filenames, os.readlink,))
+
+ for filenames, func, *func_args in funcs:
+ for name in filenames:
+ try:
+ func(name, *func_args)
+ except OSError as err:
+ self.assertIs(err.filename, name)
+ else:
+ self.fail("No exception thrown by {}".format(func))
+
+class CPUCountTests(unittest.TestCase):
+ def test_cpu_count(self):
+ cpus = os.cpu_count()
+ if cpus is not None:
+ self.assertIsInstance(cpus, int)
+ self.assertGreater(cpus, 0)
+ else:
+ self.skipTest("Could not determine the number of CPUs")
+
+
+class FDInheritanceTests(unittest.TestCase):
+ def test_get_set_inheritable(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(os.get_inheritable(fd), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_get_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ # clear FD_CLOEXEC flag
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ flags &= ~fcntl.FD_CLOEXEC
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+ self.assertEqual(os.get_inheritable(fd), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_set_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ fcntl.FD_CLOEXEC)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ 0)
+
+ def test_open(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
+ def test_pipe(self):
+ rfd, wfd = os.pipe()
+ self.addCleanup(os.close, rfd)
+ self.addCleanup(os.close, wfd)
+ self.assertEqual(os.get_inheritable(rfd), False)
+ self.assertEqual(os.get_inheritable(wfd), False)
+
+ def test_dup(self):
+ fd1 = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd1)
+
+ fd2 = os.dup(fd1)
+ self.addCleanup(os.close, fd2)
+ self.assertEqual(os.get_inheritable(fd2), False)
+
+ @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
+ def test_dup2(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+
+ # inheritable by default
+ fd2 = os.open(__file__, os.O_RDONLY)
+ try:
+ os.dup2(fd, fd2)
+ self.assertEqual(os.get_inheritable(fd2), True)
+ finally:
+ os.close(fd2)
+
+ # force non-inheritable
+ fd3 = os.open(__file__, os.O_RDONLY)
+ try:
+ os.dup2(fd, fd3, inheritable=False)
+ self.assertEqual(os.get_inheritable(fd3), False)
+ finally:
+ os.close(fd3)
+
+ @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
+ def test_openpty(self):
+ master_fd, slave_fd = os.openpty()
+ self.addCleanup(os.close, master_fd)
+ self.addCleanup(os.close, slave_fd)
+ self.assertEqual(os.get_inheritable(master_fd), False)
+ self.assertEqual(os.get_inheritable(slave_fd), False)
+
+
@support.reap_threads
def test_main():
support.run_unittest(
@@ -2192,6 +2485,7 @@ def test_main():
PosixUidGidTests,
Pep383Tests,
Win32KillTests,
+ Win32ListdirTests,
Win32SymlinkTests,
NonLocalSymlinkTests,
FSEncodingTests,
@@ -2204,7 +2498,10 @@ def test_main():
ExtendedAttributeTests,
Win32DeprecatedBytesAPI,
TermsizeTests,
+ OSErrorTests,
RemoveDirsTests,
+ CPUCountTests,
+ FDInheritanceTests,
)
if __name__ == "__main__":