diff options
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r-- | Lib/test/test_os.py | 481 |
1 files changed, 424 insertions, 57 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 3dc96a3fcee..ab0fe94bd07 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -4,6 +4,7 @@ import os import errno +import getpass import unittest import warnings import sys @@ -40,9 +41,35 @@ try: import fcntl except ImportError: fcntl = None +try: + import _winapi +except ImportError: + _winapi = None +try: + import grp + groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem] + if hasattr(os, 'getgid'): + process_gid = os.getgid() + if process_gid not in groups: + groups.append(process_gid) +except ImportError: + groups = [] +try: + import pwd + all_users = [u.pw_uid for u in pwd.getpwall()] +except ImportError: + all_users = [] +try: + from _testcapi import INT_MAX, PY_SSIZE_T_MAX +except ImportError: + INT_MAX = PY_SSIZE_T_MAX = sys.maxsize from test.script_helper import assert_python_ok +root_in_posix = False +if hasattr(os, 'geteuid'): + root_in_posix = (os.geteuid() == 0) + with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) os.stat_float_times(True) @@ -116,6 +143,26 @@ class FileTests(unittest.TestCase): self.assertEqual(type(s), bytes) self.assertEqual(s, b"spam") + @support.cpython_only + # Skip the test on 32-bit platforms: the number of bytes must fit in a + # Py_ssize_t type + @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, + "needs INT_MAX < PY_SSIZE_T_MAX") + @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) + def test_large_read(self, size): + with open(support.TESTFN, "wb") as fp: + fp.write(b'test') + self.addCleanup(support.unlink, support.TESTFN) + + # Issue #21932: Make sure that os.read() does not raise an + # OverflowError for size larger than INT_MAX + with open(support.TESTFN, "rb") as fp: + data = os.read(fp.fileno(), size) + + # The test does not try to read more than 2 GB at once because the + # operating system is free to return less bytes than requested. + self.assertEqual(data, b'test') + def test_write(self): # os.write() accepts bytes- and buffer-like objects but not strings fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) @@ -533,6 +580,28 @@ class StatAttributeTests(unittest.TestCase): os.stat(r) self.assertEqual(ctx.exception.errno, errno.EBADF) + def check_file_attributes(self, result): + self.assertTrue(hasattr(result, 'st_file_attributes')) + self.assertTrue(isinstance(result.st_file_attributes, int)) + self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) + + @unittest.skipUnless(sys.platform == "win32", + "st_file_attributes is Win32 specific") + def test_file_attributes(self): + # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) + result = os.stat(self.fname) + self.check_file_attributes(result) + self.assertEqual( + result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, + 0) + + # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) + result = os.stat(support.TESTFN) + self.check_file_attributes(result) + self.assertEqual( + result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, + stat.FILE_ATTRIBUTE_DIRECTORY) + from test import mapping_tests class EnvironTests(mapping_tests.BasicTestMappingProtocol): @@ -945,17 +1014,6 @@ class MakedirTests(unittest.TestCase): os.makedirs(path, mode=mode, exist_ok=True) os.umask(old_mask) - @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown') - def test_chown_uid_gid_arguments_must_be_index(self): - stat = os.stat(support.TESTFN) - uid = stat.st_uid - gid = stat.st_gid - for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): - self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) - self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) - self.assertIsNone(os.chown(support.TESTFN, uid, gid)) - self.assertIsNone(os.chown(support.TESTFN, -1, -1)) - def test_exist_ok_s_isgid_directory(self): path = os.path.join(support.TESTFN, 'dir1') S_ISGID = stat.S_ISGID @@ -1006,6 +1064,58 @@ class MakedirTests(unittest.TestCase): os.removedirs(path) +@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") +class ChownFileTests(unittest.TestCase): + + def setUpClass(): + os.mkdir(support.TESTFN) + + def test_chown_uid_gid_arguments_must_be_index(self): + stat = os.stat(support.TESTFN) + uid = stat.st_uid + gid = stat.st_gid + for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): + self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) + self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) + self.assertIsNone(os.chown(support.TESTFN, uid, gid)) + self.assertIsNone(os.chown(support.TESTFN, -1, -1)) + + @unittest.skipUnless(len(groups) > 1, "test needs more than one group") + def test_chown(self): + gid_1, gid_2 = groups[:2] + uid = os.stat(support.TESTFN).st_uid + os.chown(support.TESTFN, uid, gid_1) + gid = os.stat(support.TESTFN).st_gid + self.assertEqual(gid, gid_1) + os.chown(support.TESTFN, uid, gid_2) + gid = os.stat(support.TESTFN).st_gid + self.assertEqual(gid, gid_2) + + @unittest.skipUnless(root_in_posix and len(all_users) > 1, + "test needs root privilege and more than one user") + def test_chown_with_root(self): + uid_1, uid_2 = all_users[:2] + gid = os.stat(support.TESTFN).st_gid + os.chown(support.TESTFN, uid_1, gid) + uid = os.stat(support.TESTFN).st_uid + self.assertEqual(uid, uid_1) + os.chown(support.TESTFN, uid_2, gid) + uid = os.stat(support.TESTFN).st_uid + self.assertEqual(uid, uid_2) + + @unittest.skipUnless(not root_in_posix and len(all_users) > 1, + "test needs non-root account and more than one user") + def test_chown_without_permission(self): + uid_1, uid_2 = all_users[:2] + gid = os.stat(support.TESTFN).st_gid + with self.assertRaises(PermissionError): + os.chown(support.TESTFN, uid_1, gid) + os.chown(support.TESTFN, uid_2, gid) + + def tearDownClass(): + os.rmdir(support.TESTFN) + + class RemoveDirsTests(unittest.TestCase): def setUp(self): os.makedirs(support.TESTFN) @@ -1122,8 +1232,10 @@ class URandomFDTests(unittest.TestCase): code = """if 1: import os import sys + import test.support os.urandom(4) - os.closerange(3, 256) + with test.support.SuppressCrashReport(): + os.closerange(3, 256) sys.stdout.buffer.write(os.urandom(4)) """ rc, out, err = assert_python_ok('-Sc', code) @@ -1137,16 +1249,18 @@ class URandomFDTests(unittest.TestCase): code = """if 1: import os import sys + import test.support os.urandom(4) - for fd in range(3, 256): - try: - os.close(fd) - except OSError: - pass - else: - # Found the urandom fd (XXX hopefully) - break - os.closerange(3, 256) + with test.support.SuppressCrashReport(): + for fd in range(3, 256): + try: + os.close(fd) + except OSError: + pass + else: + # Found the urandom fd (XXX hopefully) + break + os.closerange(3, 256) with open({TESTFN!r}, 'rb') as f: os.dup2(f.fileno(), fd) sys.stdout.buffer.write(os.urandom(4)) @@ -1372,6 +1486,16 @@ class TestInvalidFD(unittest.TestCase): def test_writev(self): self.check(os.writev, [b'abc']) + def test_inheritable(self): + self.check(os.get_inheritable) + self.check(os.set_inheritable, True) + + @unittest.skipUnless(hasattr(os, 'get_blocking'), + 'needs os.get_blocking() and os.set_blocking()') + def test_blocking(self): + self.check(os.get_blocking) + self.check(os.set_blocking, True) + class LinkTests(unittest.TestCase): def setUp(self): @@ -1819,6 +1943,37 @@ class Win32SymlinkTests(unittest.TestCase): shutil.rmtree(level1) +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") +class Win32JunctionTests(unittest.TestCase): + junction = 'junctiontest' + junction_target = os.path.dirname(os.path.abspath(__file__)) + + def setUp(self): + assert os.path.exists(self.junction_target) + assert not os.path.exists(self.junction) + + def tearDown(self): + if os.path.exists(self.junction): + # os.rmdir delegates to Windows' RemoveDirectoryW, + # which removes junction points safely. + os.rmdir(self.junction) + + def test_create_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + self.assertTrue(os.path.isdir(self.junction)) + + # Junctions are not recognized as links. + self.assertFalse(os.path.islink(self.junction)) + + def test_unlink_removes_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + + os.unlink(self.junction) + self.assertFalse(os.path.exists(self.junction)) + + @support.skip_unless_symlink class NonLocalSymlinkTests(unittest.TestCase): @@ -2025,11 +2180,13 @@ class TestSendfile(unittest.TestCase): @classmethod def setUpClass(cls): + cls.key = support.threading_setup() with open(support.TESTFN, "wb") as f: f.write(cls.DATA) @classmethod def tearDownClass(cls): + support.threading_cleanup(*cls.key) support.unlink(support.TESTFN) def setUp(self): @@ -2556,41 +2713,251 @@ class FDInheritanceTests(unittest.TestCase): self.assertEqual(os.get_inheritable(slave_fd), False) -@support.reap_threads -def test_main(): - support.run_unittest( - FileTests, - StatAttributeTests, - EnvironTests, - WalkTests, - FwalkTests, - MakedirTests, - DevNullTests, - URandomTests, - ExecTests, - Win32ErrorTests, - TestInvalidFD, - PosixUidGidTests, - Pep383Tests, - Win32KillTests, - Win32ListdirTests, - Win32SymlinkTests, - NonLocalSymlinkTests, - FSEncodingTests, - DeviceEncodingTests, - PidTests, - LoginTests, - LinkTests, - TestSendfile, - ProgramPriorityTests, - ExtendedAttributeTests, - Win32DeprecatedBytesAPI, - TermsizeTests, - OSErrorTests, - RemoveDirsTests, - CPUCountTests, - FDInheritanceTests, - ) +@unittest.skipUnless(hasattr(os, 'get_blocking'), + 'needs os.get_blocking() and os.set_blocking()') +class BlockingTests(unittest.TestCase): + def test_blocking(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_blocking(fd), True) + + os.set_blocking(fd, False) + self.assertEqual(os.get_blocking(fd), False) + + os.set_blocking(fd, True) + self.assertEqual(os.get_blocking(fd), True) + + + +class ExportsTests(unittest.TestCase): + def test_os_all(self): + self.assertIn('open', os.__all__) + self.assertIn('walk', os.__all__) + + +class TestScandir(unittest.TestCase): + def setUp(self): + self.path = os.path.realpath(support.TESTFN) + self.addCleanup(support.rmtree, self.path) + os.mkdir(self.path) + + def create_file(self, name="file.txt"): + filename = os.path.join(self.path, name) + with open(filename, "wb") as fp: + fp.write(b'python') + return filename + + def get_entries(self, names): + entries = dict((entry.name, entry) + for entry in os.scandir(self.path)) + self.assertEqual(sorted(entries.keys()), names) + return entries + + def assert_stat_equal(self, stat1, stat2, skip_fields): + if skip_fields: + for attr in dir(stat1): + if not attr.startswith("st_"): + continue + if attr in ("st_dev", "st_ino", "st_nlink"): + continue + self.assertEqual(getattr(stat1, attr), + getattr(stat2, attr), + (stat1, stat2, attr)) + else: + self.assertEqual(stat1, stat2) + + def check_entry(self, entry, name, is_dir, is_file, is_symlink): + self.assertEqual(entry.name, name) + self.assertEqual(entry.path, os.path.join(self.path, name)) + self.assertEqual(entry.inode(), + os.stat(entry.path, follow_symlinks=False).st_ino) + + entry_stat = os.stat(entry.path) + self.assertEqual(entry.is_dir(), + stat.S_ISDIR(entry_stat.st_mode)) + self.assertEqual(entry.is_file(), + stat.S_ISREG(entry_stat.st_mode)) + self.assertEqual(entry.is_symlink(), + os.path.islink(entry.path)) + + entry_lstat = os.stat(entry.path, follow_symlinks=False) + self.assertEqual(entry.is_dir(follow_symlinks=False), + stat.S_ISDIR(entry_lstat.st_mode)) + self.assertEqual(entry.is_file(follow_symlinks=False), + stat.S_ISREG(entry_lstat.st_mode)) + + self.assert_stat_equal(entry.stat(), + entry_stat, + os.name == 'nt' and not is_symlink) + self.assert_stat_equal(entry.stat(follow_symlinks=False), + entry_lstat, + os.name == 'nt') + + def test_attributes(self): + link = hasattr(os, 'link') + symlink = support.can_symlink() + + dirname = os.path.join(self.path, "dir") + os.mkdir(dirname) + filename = self.create_file("file.txt") + if link: + os.link(filename, os.path.join(self.path, "link_file.txt")) + if symlink: + os.symlink(dirname, os.path.join(self.path, "symlink_dir"), + target_is_directory=True) + os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) + + names = ['dir', 'file.txt'] + if link: + names.append('link_file.txt') + if symlink: + names.extend(('symlink_dir', 'symlink_file.txt')) + entries = self.get_entries(names) + + entry = entries['dir'] + self.check_entry(entry, 'dir', True, False, False) + + entry = entries['file.txt'] + self.check_entry(entry, 'file.txt', False, True, False) + + if link: + entry = entries['link_file.txt'] + self.check_entry(entry, 'link_file.txt', False, True, False) + + if symlink: + entry = entries['symlink_dir'] + self.check_entry(entry, 'symlink_dir', True, False, True) + + entry = entries['symlink_file.txt'] + self.check_entry(entry, 'symlink_file.txt', False, True, True) + + def get_entry(self, name): + entries = list(os.scandir(self.path)) + self.assertEqual(len(entries), 1) + + entry = entries[0] + self.assertEqual(entry.name, name) + return entry + + def create_file_entry(self): + filename = self.create_file() + return self.get_entry(os.path.basename(filename)) + + def test_current_directory(self): + filename = self.create_file() + old_dir = os.getcwd() + try: + os.chdir(self.path) + + # call scandir() without parameter: it must list the content + # of the current directory + entries = dict((entry.name, entry) for entry in os.scandir()) + self.assertEqual(sorted(entries.keys()), + [os.path.basename(filename)]) + finally: + os.chdir(old_dir) + + def test_repr(self): + entry = self.create_file_entry() + self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") + + def test_removed_dir(self): + path = os.path.join(self.path, 'dir') + + os.mkdir(path) + entry = self.get_entry('dir') + os.rmdir(path) + + # On POSIX, is_dir() result depends if scandir() filled d_type or not + if os.name == 'nt': + self.assertTrue(entry.is_dir()) + self.assertFalse(entry.is_file()) + self.assertFalse(entry.is_symlink()) + if os.name == 'nt': + self.assertRaises(FileNotFoundError, entry.inode) + # don't fail + entry.stat() + entry.stat(follow_symlinks=False) + else: + self.assertGreater(entry.inode(), 0) + self.assertRaises(FileNotFoundError, entry.stat) + self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) + + def test_removed_file(self): + entry = self.create_file_entry() + os.unlink(entry.path) + + self.assertFalse(entry.is_dir()) + # On POSIX, is_dir() result depends if scandir() filled d_type or not + if os.name == 'nt': + self.assertTrue(entry.is_file()) + self.assertFalse(entry.is_symlink()) + if os.name == 'nt': + self.assertRaises(FileNotFoundError, entry.inode) + # don't fail + entry.stat() + entry.stat(follow_symlinks=False) + else: + self.assertGreater(entry.inode(), 0) + self.assertRaises(FileNotFoundError, entry.stat) + self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) + + def test_broken_symlink(self): + if not support.can_symlink(): + return self.skipTest('cannot create symbolic link') + + filename = self.create_file("file.txt") + os.symlink(filename, + os.path.join(self.path, "symlink.txt")) + entries = self.get_entries(['file.txt', 'symlink.txt']) + entry = entries['symlink.txt'] + os.unlink(filename) + + self.assertGreater(entry.inode(), 0) + self.assertFalse(entry.is_dir()) + self.assertFalse(entry.is_file()) # broken symlink returns False + self.assertFalse(entry.is_dir(follow_symlinks=False)) + self.assertFalse(entry.is_file(follow_symlinks=False)) + self.assertTrue(entry.is_symlink()) + self.assertRaises(FileNotFoundError, entry.stat) + # don't fail + entry.stat(follow_symlinks=False) + + def test_bytes(self): + if os.name == "nt": + # On Windows, os.scandir(bytes) must raise an exception + self.assertRaises(TypeError, os.scandir, b'.') + return + + self.create_file("file.txt") + + path_bytes = os.fsencode(self.path) + entries = list(os.scandir(path_bytes)) + self.assertEqual(len(entries), 1, entries) + entry = entries[0] + + self.assertEqual(entry.name, b'file.txt') + self.assertEqual(entry.path, + os.fsencode(os.path.join(self.path, 'file.txt'))) + + def test_empty_path(self): + self.assertRaises(FileNotFoundError, os.scandir, '') + + def test_consume_iterator_twice(self): + self.create_file("file.txt") + iterator = os.scandir(self.path) + + entries = list(iterator) + self.assertEqual(len(entries), 1, entries) + + # check than consuming the iterator twice doesn't raise exception + entries2 = list(iterator) + self.assertEqual(len(entries2), 0, entries2) + + def test_bad_path_type(self): + for obj in [1234, 1.234, {}, []]: + self.assertRaises(TypeError, os.scandir, obj) + if __name__ == "__main__": - test_main() + unittest.main() |