diff options
Diffstat (limited to 'Lib/test/test_shutil.py')
-rw-r--r-- | Lib/test/test_shutil.py | 443 |
1 files changed, 317 insertions, 126 deletions
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 9bdb7243d3d..a9b4676dfff 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -7,17 +7,28 @@ import sys import stat import os import os.path +import functools import errno +from test import support +from test.support import TESTFN from os.path import splitdrive from distutils.spawn import find_executable, spawn from shutil import (_make_tarball, _make_zipfile, make_archive, register_archive_format, unregister_archive_format, - get_archive_formats) + get_archive_formats, Error, unpack_archive, + register_unpack_format, RegistryError, + unregister_unpack_format, get_unpack_formats) import tarfile import warnings -from test import test_support -from test.test_support import TESTFN, check_warnings, captured_stdout +from test import support +from test.support import TESTFN, check_warnings, captured_stdout + +try: + import bz2 + BZ2_SUPPORTED = True +except ImportError: + BZ2_SUPPORTED = False TESTFN2 = TESTFN + "2" @@ -39,6 +50,21 @@ try: except ImportError: ZIP_SUPPORT = find_executable('zip') +def _fake_rename(*args, **kwargs): + # Pretend the destination path is on a different filesystem. + raise OSError() + +def mock_rename(func): + @functools.wraps(func) + def wrap(*args, **kwargs): + try: + builtin_rename = os.rename + os.rename = _fake_rename + return func(*args, **kwargs) + finally: + os.rename = builtin_rename + return wrap + class TestShutil(unittest.TestCase): def setUp(self): @@ -73,10 +99,60 @@ class TestShutil(unittest.TestCase): d = tempfile.mkdtemp() self.tempdirs.append(d) return d + + @support.skip_unless_symlink + def test_rmtree_fails_on_symlink(self): + tmp = self.mkdtemp() + dir_ = os.path.join(tmp, 'dir') + os.mkdir(dir_) + link = os.path.join(tmp, 'link') + os.symlink(dir_, link) + self.assertRaises(OSError, shutil.rmtree, link) + self.assertTrue(os.path.exists(dir_)) + self.assertTrue(os.path.lexists(link)) + errors = [] + def onerror(*args): + errors.append(args) + shutil.rmtree(link, onerror=onerror) + self.assertEqual(len(errors), 1) + self.assertIs(errors[0][0], os.path.islink) + self.assertEqual(errors[0][1], link) + self.assertIsInstance(errors[0][2][1], OSError) + def test_rmtree_errors(self): # filename is guaranteed not to exist filename = tempfile.mktemp() self.assertRaises(OSError, shutil.rmtree, filename) + # test that ignore_errors option is honoured + shutil.rmtree(filename, ignore_errors=True) + + # existing file + tmpdir = self.mkdtemp() + self.write_file((tmpdir, "tstfile"), "") + filename = os.path.join(tmpdir, "tstfile") + with self.assertRaises(OSError) as cm: + shutil.rmtree(filename) + # The reason for this rather odd construct is that Windows sprinkles + # a \*.* at the end of file names. But only sometimes on some buildbots + possible_args = [filename, os.path.join(filename, '*.*')] + self.assertIn(cm.exception.filename, possible_args) + self.assertTrue(os.path.exists(filename)) + # test that ignore_errors option is honored + shutil.rmtree(filename, ignore_errors=True) + self.assertTrue(os.path.exists(filename)) + errors = [] + def onerror(*args): + errors.append(args) + shutil.rmtree(filename, onerror=onerror) + self.assertEqual(len(errors), 2) + self.assertIs(errors[0][0], os.listdir) + self.assertEqual(errors[0][1], filename) + self.assertIsInstance(errors[0][2][1], OSError) + self.assertIn(errors[0][2][1].filename, possible_args) + self.assertIs(errors[1][0], os.rmdir) + self.assertEqual(errors[1][1], filename) + self.assertIsInstance(errors[1][2][1], OSError) + self.assertIn(errors[1][2][1].filename, possible_args) # See bug #1071513 for why we don't run this on cygwin # and bug #1076467 for why we don't run this as root. @@ -139,11 +215,12 @@ class TestShutil(unittest.TestCase): self.assertRaises(OSError, shutil.rmtree, path) os.remove(path) + def _write_data(self, path, data): + f = open(path, "w") + f.write(data) + f.close() + def test_copytree_simple(self): - def write_data(path, data): - f = open(path, "w") - f.write(data) - f.close() def read_data(path): f = open(path) @@ -153,11 +230,9 @@ class TestShutil(unittest.TestCase): src_dir = tempfile.mkdtemp() dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') - - write_data(os.path.join(src_dir, 'test.txt'), '123') - + self._write_data(os.path.join(src_dir, 'test.txt'), '123') os.mkdir(os.path.join(src_dir, 'test_dir')) - write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') try: shutil.copytree(src_dir, dst_dir) @@ -186,11 +261,6 @@ class TestShutil(unittest.TestCase): def test_copytree_with_exclude(self): - def write_data(path, data): - f = open(path, "w") - f.write(data) - f.close() - def read_data(path): f = open(path) data = f.read() @@ -203,16 +273,18 @@ class TestShutil(unittest.TestCase): src_dir = tempfile.mkdtemp() try: dst_dir = join(tempfile.mkdtemp(), 'destination') - write_data(join(src_dir, 'test.txt'), '123') - write_data(join(src_dir, 'test.tmp'), '123') + self._write_data(join(src_dir, 'test.txt'), '123') + self._write_data(join(src_dir, 'test.tmp'), '123') os.mkdir(join(src_dir, 'test_dir')) - write_data(join(src_dir, 'test_dir', 'test.txt'), '456') + self._write_data(join(src_dir, 'test_dir', 'test.txt'), '456') os.mkdir(join(src_dir, 'test_dir2')) - write_data(join(src_dir, 'test_dir2', 'test.txt'), '456') + self._write_data(join(src_dir, 'test_dir2', 'test.txt'), '456') os.mkdir(join(src_dir, 'test_dir2', 'subdir')) os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) - write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') - write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') + self._write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), + '456') + self._write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), + '456') # testing glob-like patterns @@ -265,48 +337,87 @@ class TestShutil(unittest.TestCase): shutil.rmtree(src_dir) shutil.rmtree(os.path.dirname(dst_dir)) - if hasattr(os, "symlink"): - def test_dont_copy_file_onto_link_to_itself(self): - # bug 851123. - os.mkdir(TESTFN) - src = os.path.join(TESTFN, 'cheese') - dst = os.path.join(TESTFN, 'shop') - try: - f = open(src, 'w') + @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') + def test_dont_copy_file_onto_link_to_itself(self): + # Temporarily disable test on Windows. + if os.name == 'nt': + return + # bug 851123. + os.mkdir(TESTFN) + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + try: + with open(src, 'w') as f: f.write('cheddar') - f.close() - - os.link(src, dst) - self.assertRaises(shutil.Error, shutil.copyfile, src, dst) - with open(src, 'r') as f: - self.assertEqual(f.read(), 'cheddar') - os.remove(dst) - - # Using `src` here would mean we end up with a symlink pointing - # to TESTFN/TESTFN/cheese, while it should point at - # TESTFN/cheese. - os.symlink('cheese', dst) - self.assertRaises(shutil.Error, shutil.copyfile, src, dst) - with open(src, 'r') as f: - self.assertEqual(f.read(), 'cheddar') - os.remove(dst) - finally: - try: - shutil.rmtree(TESTFN) - except OSError: - pass + os.link(src, dst) + self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + with open(src, 'r') as f: + self.assertEqual(f.read(), 'cheddar') + os.remove(dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) - def test_rmtree_on_symlink(self): - # bug 1669. - os.mkdir(TESTFN) - try: - src = os.path.join(TESTFN, 'cheese') - dst = os.path.join(TESTFN, 'shop') - os.mkdir(src) - os.symlink(src, dst) - self.assertRaises(OSError, shutil.rmtree, dst) - finally: - shutil.rmtree(TESTFN, ignore_errors=True) + @unittest.skipUnless(hasattr(os, 'chflags') and + hasattr(errno, 'EOPNOTSUPP') and + hasattr(errno, 'ENOTSUP'), + "requires os.chflags, EOPNOTSUPP & ENOTSUP") + def test_copystat_handles_harmless_chflags_errors(self): + tmpdir = self.mkdtemp() + file1 = os.path.join(tmpdir, 'file1') + file2 = os.path.join(tmpdir, 'file2') + self.write_file(file1, 'xxx') + self.write_file(file2, 'xxx') + + def make_chflags_raiser(err): + ex = OSError() + + def _chflags_raiser(path, flags): + ex.errno = err + raise ex + return _chflags_raiser + old_chflags = os.chflags + try: + for err in errno.EOPNOTSUPP, errno.ENOTSUP: + os.chflags = make_chflags_raiser(err) + shutil.copystat(file1, file2) + # assert others errors break it + os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) + self.assertRaises(OSError, shutil.copystat, file1, file2) + finally: + os.chflags = old_chflags + + @support.skip_unless_symlink + def test_dont_copy_file_onto_symlink_to_itself(self): + # bug 851123. + os.mkdir(TESTFN) + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + try: + with open(src, 'w') as f: + f.write('cheddar') + # Using `src` here would mean we end up with a symlink pointing + # to TESTFN/TESTFN/cheese, while it should point at + # TESTFN/cheese. + os.symlink('cheese', dst) + self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + with open(src, 'r') as f: + self.assertEqual(f.read(), 'cheddar') + os.remove(dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + + @support.skip_unless_symlink + def test_rmtree_on_symlink(self): + # bug 1669. + os.mkdir(TESTFN) + try: + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + os.mkdir(src) + os.symlink(src, dst) + self.assertRaises(OSError, shutil.rmtree, dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) if hasattr(os, "mkfifo"): # Issue #3002: copyfile and copytree block indefinitely on named pipes @@ -320,6 +431,7 @@ class TestShutil(unittest.TestCase): finally: os.remove(TESTFN) + @support.skip_unless_symlink def test_copytree_named_pipe(self): os.mkdir(TESTFN) try: @@ -340,34 +452,76 @@ class TestShutil(unittest.TestCase): shutil.rmtree(TESTFN, ignore_errors=True) shutil.rmtree(TESTFN2, ignore_errors=True) - @unittest.skipUnless(hasattr(os, 'chflags') and - hasattr(errno, 'EOPNOTSUPP') and - hasattr(errno, 'ENOTSUP'), - "requires os.chflags, EOPNOTSUPP & ENOTSUP") - def test_copystat_handles_harmless_chflags_errors(self): - tmpdir = self.mkdtemp() - file1 = os.path.join(tmpdir, 'file1') - file2 = os.path.join(tmpdir, 'file2') - self.write_file(file1, 'xxx') - self.write_file(file2, 'xxx') + def test_copytree_special_func(self): - def make_chflags_raiser(err): - ex = OSError() + src_dir = self.mkdtemp() + dst_dir = os.path.join(self.mkdtemp(), 'destination') + self._write_data(os.path.join(src_dir, 'test.txt'), '123') + os.mkdir(os.path.join(src_dir, 'test_dir')) + self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') - def _chflags_raiser(path, flags): - ex.errno = err - raise ex - return _chflags_raiser - old_chflags = os.chflags - try: - for err in errno.EOPNOTSUPP, errno.ENOTSUP: - os.chflags = make_chflags_raiser(err) - shutil.copystat(file1, file2) - # assert others errors break it - os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) - self.assertRaises(OSError, shutil.copystat, file1, file2) - finally: - os.chflags = old_chflags + copied = [] + def _copy(src, dst): + copied.append((src, dst)) + + shutil.copytree(src_dir, dst_dir, copy_function=_copy) + self.assertEqual(len(copied), 2) + + @support.skip_unless_symlink + def test_copytree_dangling_symlinks(self): + + # a dangling symlink raises an error at the end + src_dir = self.mkdtemp() + dst_dir = os.path.join(self.mkdtemp(), 'destination') + os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) + os.mkdir(os.path.join(src_dir, 'test_dir')) + self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) + + # a dangling symlink is ignored with the proper flag + dst_dir = os.path.join(self.mkdtemp(), 'destination2') + shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) + self.assertNotIn('test.txt', os.listdir(dst_dir)) + + # a dangling symlink is copied if symlinks=True + dst_dir = os.path.join(self.mkdtemp(), 'destination3') + shutil.copytree(src_dir, dst_dir, symlinks=True) + self.assertIn('test.txt', os.listdir(dst_dir)) + + def _copy_file(self, method): + fname = 'test.txt' + tmpdir = self.mkdtemp() + self.write_file([tmpdir, fname]) + file1 = os.path.join(tmpdir, fname) + tmpdir2 = self.mkdtemp() + method(file1, tmpdir2) + file2 = os.path.join(tmpdir2, fname) + return (file1, file2) + + @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') + def test_copy(self): + # Ensure that the copied file exists and has the same mode bits. + file1, file2 = self._copy_file(shutil.copy) + self.assertTrue(os.path.exists(file2)) + self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) + + @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') + @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') + def test_copy2(self): + # Ensure that the copied file exists and has the same mode and + # modification time bits. + file1, file2 = self._copy_file(shutil.copy2) + self.assertTrue(os.path.exists(file2)) + file1_stat = os.stat(file1) + file2_stat = os.stat(file2) + self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(file1_stat, attr), + getattr(file2_stat, attr) + 1) + if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): + self.assertEqual(getattr(file1_stat, 'st_flags'), + getattr(file2_stat, 'st_flags')) @unittest.skipUnless(zlib, "requires zlib") def test_make_tarball(self): @@ -538,6 +692,7 @@ class TestShutil(unittest.TestCase): owner='kjhkjhkjg', group='oihohoh') self.assertTrue(os.path.exists(res)) + @unittest.skipUnless(zlib, "Requires zlib") @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") def test_tarfile_root_owner(self): @@ -595,6 +750,69 @@ class TestShutil(unittest.TestCase): formats = [name for name, params in get_archive_formats()] self.assertNotIn('xxx', formats) + def _compare_dirs(self, dir1, dir2): + # check that dir1 and dir2 are equivalent, + # return the diff + diff = [] + for root, dirs, files in os.walk(dir1): + for file_ in files: + path = os.path.join(root, file_) + target_path = os.path.join(dir2, os.path.split(path)[-1]) + if not os.path.exists(target_path): + diff.append(file_) + return diff + + @unittest.skipUnless(zlib, "Requires zlib") + def test_unpack_archive(self): + formats = ['tar', 'gztar', 'zip'] + if BZ2_SUPPORTED: + formats.append('bztar') + + for format in formats: + tmpdir = self.mkdtemp() + base_dir, root_dir, base_name = self._create_files() + tmpdir2 = self.mkdtemp() + filename = make_archive(base_name, format, root_dir, base_dir) + + # let's try to unpack it now + unpack_archive(filename, tmpdir2) + diff = self._compare_dirs(tmpdir, tmpdir2) + self.assertEqual(diff, []) + + # and again, this time with the format specified + tmpdir3 = self.mkdtemp() + unpack_archive(filename, tmpdir3, format=format) + diff = self._compare_dirs(tmpdir, tmpdir3) + self.assertEqual(diff, []) + self.assertRaises(shutil.ReadError, unpack_archive, TESTFN) + self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx') + + def test_unpack_registery(self): + + formats = get_unpack_formats() + + def _boo(filename, extract_dir, extra): + self.assertEqual(extra, 1) + self.assertEqual(filename, 'stuff.boo') + self.assertEqual(extract_dir, 'xx') + + register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) + unpack_archive('stuff.boo', 'xx') + + # trying to register a .boo unpacker again + self.assertRaises(RegistryError, register_unpack_format, 'Boo2', + ['.boo'], _boo) + + # should work now + unregister_unpack_format('Boo') + register_unpack_format('Boo2', ['.boo'], _boo) + self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) + self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) + + # let's leave a clean state + unregister_unpack_format('Boo2') + self.assertEqual(get_unpack_formats(), formats) + class TestMove(unittest.TestCase): @@ -604,20 +822,11 @@ class TestMove(unittest.TestCase): self.dst_dir = tempfile.mkdtemp() self.src_file = os.path.join(self.src_dir, filename) self.dst_file = os.path.join(self.dst_dir, filename) - # Try to create a dir in the current directory, hoping that it is - # not located on the same filesystem as the system tmp dir. - try: - self.dir_other_fs = tempfile.mkdtemp( - dir=os.path.dirname(__file__)) - self.file_other_fs = os.path.join(self.dir_other_fs, - filename) - except OSError: - self.dir_other_fs = None with open(self.src_file, "wb") as f: - f.write("spam") + f.write(b"spam") def tearDown(self): - for d in (self.src_dir, self.dst_dir, self.dir_other_fs): + for d in (self.src_dir, self.dst_dir): try: if d: shutil.rmtree(d) @@ -646,21 +855,15 @@ class TestMove(unittest.TestCase): # Move a file inside an existing dir on the same filesystem. self._check_move_file(self.src_file, self.dst_dir, self.dst_file) + @mock_rename def test_move_file_other_fs(self): # Move a file to an existing dir on another filesystem. - if not self.dir_other_fs: - # skip - return - self._check_move_file(self.src_file, self.file_other_fs, - self.file_other_fs) + self.test_move_file() + @mock_rename def test_move_file_to_dir_other_fs(self): # Move a file to another location on another filesystem. - if not self.dir_other_fs: - # skip - return - self._check_move_file(self.src_file, self.dir_other_fs, - self.file_other_fs) + self.test_move_file_to_dir() def test_move_dir(self): # Move a dir to another location on the same filesystem. @@ -673,32 +876,20 @@ class TestMove(unittest.TestCase): except: pass + @mock_rename def test_move_dir_other_fs(self): # Move a dir to another location on another filesystem. - if not self.dir_other_fs: - # skip - return - dst_dir = tempfile.mktemp(dir=self.dir_other_fs) - try: - self._check_move_dir(self.src_dir, dst_dir, dst_dir) - finally: - try: - shutil.rmtree(dst_dir) - except: - pass + self.test_move_dir() def test_move_dir_to_dir(self): # Move a dir inside an existing dir on the same filesystem. self._check_move_dir(self.src_dir, self.dst_dir, os.path.join(self.dst_dir, os.path.basename(self.src_dir))) + @mock_rename def test_move_dir_to_dir_other_fs(self): # Move a dir inside an existing dir on another filesystem. - if not self.dir_other_fs: - # skip - return - self._check_move_dir(self.src_dir, self.dir_other_fs, - os.path.join(self.dir_other_fs, os.path.basename(self.src_dir))) + self.test_move_dir_to_dir() def test_existing_file_inside_dest_dir(self): # A file with the same name inside the destination dir already exists. @@ -859,7 +1050,7 @@ class TestCopyFile(unittest.TestCase): def test_main(): - test_support.run_unittest(TestShutil, TestMove, TestCopyFile) + support.run_unittest(TestShutil, TestMove, TestCopyFile) if __name__ == '__main__': test_main() |