aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_shutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_shutil.py')
-rw-r--r--Lib/test/test_shutil.py443
1 files changed, 317 insertions, 126 deletions
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 9bdb7243d3d..a9b4676dfff 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -7,17 +7,28 @@ import sys
import stat
import os
import os.path
+import functools
import errno
+from test import support
+from test.support import TESTFN
from os.path import splitdrive
from distutils.spawn import find_executable, spawn
from shutil import (_make_tarball, _make_zipfile, make_archive,
register_archive_format, unregister_archive_format,
- get_archive_formats)
+ get_archive_formats, Error, unpack_archive,
+ register_unpack_format, RegistryError,
+ unregister_unpack_format, get_unpack_formats)
import tarfile
import warnings
-from test import test_support
-from test.test_support import TESTFN, check_warnings, captured_stdout
+from test import support
+from test.support import TESTFN, check_warnings, captured_stdout
+
+try:
+ import bz2
+ BZ2_SUPPORTED = True
+except ImportError:
+ BZ2_SUPPORTED = False
TESTFN2 = TESTFN + "2"
@@ -39,6 +50,21 @@ try:
except ImportError:
ZIP_SUPPORT = find_executable('zip')
+def _fake_rename(*args, **kwargs):
+ # Pretend the destination path is on a different filesystem.
+ raise OSError()
+
+def mock_rename(func):
+ @functools.wraps(func)
+ def wrap(*args, **kwargs):
+ try:
+ builtin_rename = os.rename
+ os.rename = _fake_rename
+ return func(*args, **kwargs)
+ finally:
+ os.rename = builtin_rename
+ return wrap
+
class TestShutil(unittest.TestCase):
def setUp(self):
@@ -73,10 +99,60 @@ class TestShutil(unittest.TestCase):
d = tempfile.mkdtemp()
self.tempdirs.append(d)
return d
+
+ @support.skip_unless_symlink
+ def test_rmtree_fails_on_symlink(self):
+ tmp = self.mkdtemp()
+ dir_ = os.path.join(tmp, 'dir')
+ os.mkdir(dir_)
+ link = os.path.join(tmp, 'link')
+ os.symlink(dir_, link)
+ self.assertRaises(OSError, shutil.rmtree, link)
+ self.assertTrue(os.path.exists(dir_))
+ self.assertTrue(os.path.lexists(link))
+ errors = []
+ def onerror(*args):
+ errors.append(args)
+ shutil.rmtree(link, onerror=onerror)
+ self.assertEqual(len(errors), 1)
+ self.assertIs(errors[0][0], os.path.islink)
+ self.assertEqual(errors[0][1], link)
+ self.assertIsInstance(errors[0][2][1], OSError)
+
def test_rmtree_errors(self):
# filename is guaranteed not to exist
filename = tempfile.mktemp()
self.assertRaises(OSError, shutil.rmtree, filename)
+ # test that ignore_errors option is honoured
+ shutil.rmtree(filename, ignore_errors=True)
+
+ # existing file
+ tmpdir = self.mkdtemp()
+ self.write_file((tmpdir, "tstfile"), "")
+ filename = os.path.join(tmpdir, "tstfile")
+ with self.assertRaises(OSError) as cm:
+ shutil.rmtree(filename)
+ # The reason for this rather odd construct is that Windows sprinkles
+ # a \*.* at the end of file names. But only sometimes on some buildbots
+ possible_args = [filename, os.path.join(filename, '*.*')]
+ self.assertIn(cm.exception.filename, possible_args)
+ self.assertTrue(os.path.exists(filename))
+ # test that ignore_errors option is honored
+ shutil.rmtree(filename, ignore_errors=True)
+ self.assertTrue(os.path.exists(filename))
+ errors = []
+ def onerror(*args):
+ errors.append(args)
+ shutil.rmtree(filename, onerror=onerror)
+ self.assertEqual(len(errors), 2)
+ self.assertIs(errors[0][0], os.listdir)
+ self.assertEqual(errors[0][1], filename)
+ self.assertIsInstance(errors[0][2][1], OSError)
+ self.assertIn(errors[0][2][1].filename, possible_args)
+ self.assertIs(errors[1][0], os.rmdir)
+ self.assertEqual(errors[1][1], filename)
+ self.assertIsInstance(errors[1][2][1], OSError)
+ self.assertIn(errors[1][2][1].filename, possible_args)
# See bug #1071513 for why we don't run this on cygwin
# and bug #1076467 for why we don't run this as root.
@@ -139,11 +215,12 @@ class TestShutil(unittest.TestCase):
self.assertRaises(OSError, shutil.rmtree, path)
os.remove(path)
+ def _write_data(self, path, data):
+ f = open(path, "w")
+ f.write(data)
+ f.close()
+
def test_copytree_simple(self):
- def write_data(path, data):
- f = open(path, "w")
- f.write(data)
- f.close()
def read_data(path):
f = open(path)
@@ -153,11 +230,9 @@ class TestShutil(unittest.TestCase):
src_dir = tempfile.mkdtemp()
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
-
- write_data(os.path.join(src_dir, 'test.txt'), '123')
-
+ self._write_data(os.path.join(src_dir, 'test.txt'), '123')
os.mkdir(os.path.join(src_dir, 'test_dir'))
- write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
+ self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
try:
shutil.copytree(src_dir, dst_dir)
@@ -186,11 +261,6 @@ class TestShutil(unittest.TestCase):
def test_copytree_with_exclude(self):
- def write_data(path, data):
- f = open(path, "w")
- f.write(data)
- f.close()
-
def read_data(path):
f = open(path)
data = f.read()
@@ -203,16 +273,18 @@ class TestShutil(unittest.TestCase):
src_dir = tempfile.mkdtemp()
try:
dst_dir = join(tempfile.mkdtemp(), 'destination')
- write_data(join(src_dir, 'test.txt'), '123')
- write_data(join(src_dir, 'test.tmp'), '123')
+ self._write_data(join(src_dir, 'test.txt'), '123')
+ self._write_data(join(src_dir, 'test.tmp'), '123')
os.mkdir(join(src_dir, 'test_dir'))
- write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
+ self._write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
os.mkdir(join(src_dir, 'test_dir2'))
- write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
+ self._write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
- write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
- write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
+ self._write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'),
+ '456')
+ self._write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'),
+ '456')
# testing glob-like patterns
@@ -265,48 +337,87 @@ class TestShutil(unittest.TestCase):
shutil.rmtree(src_dir)
shutil.rmtree(os.path.dirname(dst_dir))
- if hasattr(os, "symlink"):
- def test_dont_copy_file_onto_link_to_itself(self):
- # bug 851123.
- os.mkdir(TESTFN)
- src = os.path.join(TESTFN, 'cheese')
- dst = os.path.join(TESTFN, 'shop')
- try:
- f = open(src, 'w')
+ @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
+ def test_dont_copy_file_onto_link_to_itself(self):
+ # Temporarily disable test on Windows.
+ if os.name == 'nt':
+ return
+ # bug 851123.
+ os.mkdir(TESTFN)
+ src = os.path.join(TESTFN, 'cheese')
+ dst = os.path.join(TESTFN, 'shop')
+ try:
+ with open(src, 'w') as f:
f.write('cheddar')
- f.close()
-
- os.link(src, dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
- with open(src, 'r') as f:
- self.assertEqual(f.read(), 'cheddar')
- os.remove(dst)
-
- # Using `src` here would mean we end up with a symlink pointing
- # to TESTFN/TESTFN/cheese, while it should point at
- # TESTFN/cheese.
- os.symlink('cheese', dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
- with open(src, 'r') as f:
- self.assertEqual(f.read(), 'cheddar')
- os.remove(dst)
- finally:
- try:
- shutil.rmtree(TESTFN)
- except OSError:
- pass
+ os.link(src, dst)
+ self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ with open(src, 'r') as f:
+ self.assertEqual(f.read(), 'cheddar')
+ os.remove(dst)
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
- def test_rmtree_on_symlink(self):
- # bug 1669.
- os.mkdir(TESTFN)
- try:
- src = os.path.join(TESTFN, 'cheese')
- dst = os.path.join(TESTFN, 'shop')
- os.mkdir(src)
- os.symlink(src, dst)
- self.assertRaises(OSError, shutil.rmtree, dst)
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
+ @unittest.skipUnless(hasattr(os, 'chflags') and
+ hasattr(errno, 'EOPNOTSUPP') and
+ hasattr(errno, 'ENOTSUP'),
+ "requires os.chflags, EOPNOTSUPP & ENOTSUP")
+ def test_copystat_handles_harmless_chflags_errors(self):
+ tmpdir = self.mkdtemp()
+ file1 = os.path.join(tmpdir, 'file1')
+ file2 = os.path.join(tmpdir, 'file2')
+ self.write_file(file1, 'xxx')
+ self.write_file(file2, 'xxx')
+
+ def make_chflags_raiser(err):
+ ex = OSError()
+
+ def _chflags_raiser(path, flags):
+ ex.errno = err
+ raise ex
+ return _chflags_raiser
+ old_chflags = os.chflags
+ try:
+ for err in errno.EOPNOTSUPP, errno.ENOTSUP:
+ os.chflags = make_chflags_raiser(err)
+ shutil.copystat(file1, file2)
+ # assert others errors break it
+ os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
+ self.assertRaises(OSError, shutil.copystat, file1, file2)
+ finally:
+ os.chflags = old_chflags
+
+ @support.skip_unless_symlink
+ def test_dont_copy_file_onto_symlink_to_itself(self):
+ # bug 851123.
+ os.mkdir(TESTFN)
+ src = os.path.join(TESTFN, 'cheese')
+ dst = os.path.join(TESTFN, 'shop')
+ try:
+ with open(src, 'w') as f:
+ f.write('cheddar')
+ # Using `src` here would mean we end up with a symlink pointing
+ # to TESTFN/TESTFN/cheese, while it should point at
+ # TESTFN/cheese.
+ os.symlink('cheese', dst)
+ self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ with open(src, 'r') as f:
+ self.assertEqual(f.read(), 'cheddar')
+ os.remove(dst)
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
+
+ @support.skip_unless_symlink
+ def test_rmtree_on_symlink(self):
+ # bug 1669.
+ os.mkdir(TESTFN)
+ try:
+ src = os.path.join(TESTFN, 'cheese')
+ dst = os.path.join(TESTFN, 'shop')
+ os.mkdir(src)
+ os.symlink(src, dst)
+ self.assertRaises(OSError, shutil.rmtree, dst)
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
if hasattr(os, "mkfifo"):
# Issue #3002: copyfile and copytree block indefinitely on named pipes
@@ -320,6 +431,7 @@ class TestShutil(unittest.TestCase):
finally:
os.remove(TESTFN)
+ @support.skip_unless_symlink
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
@@ -340,34 +452,76 @@ class TestShutil(unittest.TestCase):
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
- @unittest.skipUnless(hasattr(os, 'chflags') and
- hasattr(errno, 'EOPNOTSUPP') and
- hasattr(errno, 'ENOTSUP'),
- "requires os.chflags, EOPNOTSUPP & ENOTSUP")
- def test_copystat_handles_harmless_chflags_errors(self):
- tmpdir = self.mkdtemp()
- file1 = os.path.join(tmpdir, 'file1')
- file2 = os.path.join(tmpdir, 'file2')
- self.write_file(file1, 'xxx')
- self.write_file(file2, 'xxx')
+ def test_copytree_special_func(self):
- def make_chflags_raiser(err):
- ex = OSError()
+ src_dir = self.mkdtemp()
+ dst_dir = os.path.join(self.mkdtemp(), 'destination')
+ self._write_data(os.path.join(src_dir, 'test.txt'), '123')
+ os.mkdir(os.path.join(src_dir, 'test_dir'))
+ self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
- def _chflags_raiser(path, flags):
- ex.errno = err
- raise ex
- return _chflags_raiser
- old_chflags = os.chflags
- try:
- for err in errno.EOPNOTSUPP, errno.ENOTSUP:
- os.chflags = make_chflags_raiser(err)
- shutil.copystat(file1, file2)
- # assert others errors break it
- os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
- self.assertRaises(OSError, shutil.copystat, file1, file2)
- finally:
- os.chflags = old_chflags
+ copied = []
+ def _copy(src, dst):
+ copied.append((src, dst))
+
+ shutil.copytree(src_dir, dst_dir, copy_function=_copy)
+ self.assertEqual(len(copied), 2)
+
+ @support.skip_unless_symlink
+ def test_copytree_dangling_symlinks(self):
+
+ # a dangling symlink raises an error at the end
+ src_dir = self.mkdtemp()
+ dst_dir = os.path.join(self.mkdtemp(), 'destination')
+ os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
+ os.mkdir(os.path.join(src_dir, 'test_dir'))
+ self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
+ self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
+
+ # a dangling symlink is ignored with the proper flag
+ dst_dir = os.path.join(self.mkdtemp(), 'destination2')
+ shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
+ self.assertNotIn('test.txt', os.listdir(dst_dir))
+
+ # a dangling symlink is copied if symlinks=True
+ dst_dir = os.path.join(self.mkdtemp(), 'destination3')
+ shutil.copytree(src_dir, dst_dir, symlinks=True)
+ self.assertIn('test.txt', os.listdir(dst_dir))
+
+ def _copy_file(self, method):
+ fname = 'test.txt'
+ tmpdir = self.mkdtemp()
+ self.write_file([tmpdir, fname])
+ file1 = os.path.join(tmpdir, fname)
+ tmpdir2 = self.mkdtemp()
+ method(file1, tmpdir2)
+ file2 = os.path.join(tmpdir2, fname)
+ return (file1, file2)
+
+ @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
+ def test_copy(self):
+ # Ensure that the copied file exists and has the same mode bits.
+ file1, file2 = self._copy_file(shutil.copy)
+ self.assertTrue(os.path.exists(file2))
+ self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
+
+ @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
+ @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
+ def test_copy2(self):
+ # Ensure that the copied file exists and has the same mode and
+ # modification time bits.
+ file1, file2 = self._copy_file(shutil.copy2)
+ self.assertTrue(os.path.exists(file2))
+ file1_stat = os.stat(file1)
+ file2_stat = os.stat(file2)
+ self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(file1_stat, attr),
+ getattr(file2_stat, attr) + 1)
+ if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
+ self.assertEqual(getattr(file1_stat, 'st_flags'),
+ getattr(file2_stat, 'st_flags'))
@unittest.skipUnless(zlib, "requires zlib")
def test_make_tarball(self):
@@ -538,6 +692,7 @@ class TestShutil(unittest.TestCase):
owner='kjhkjhkjg', group='oihohoh')
self.assertTrue(os.path.exists(res))
+
@unittest.skipUnless(zlib, "Requires zlib")
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
def test_tarfile_root_owner(self):
@@ -595,6 +750,69 @@ class TestShutil(unittest.TestCase):
formats = [name for name, params in get_archive_formats()]
self.assertNotIn('xxx', formats)
+ def _compare_dirs(self, dir1, dir2):
+ # check that dir1 and dir2 are equivalent,
+ # return the diff
+ diff = []
+ for root, dirs, files in os.walk(dir1):
+ for file_ in files:
+ path = os.path.join(root, file_)
+ target_path = os.path.join(dir2, os.path.split(path)[-1])
+ if not os.path.exists(target_path):
+ diff.append(file_)
+ return diff
+
+ @unittest.skipUnless(zlib, "Requires zlib")
+ def test_unpack_archive(self):
+ formats = ['tar', 'gztar', 'zip']
+ if BZ2_SUPPORTED:
+ formats.append('bztar')
+
+ for format in formats:
+ tmpdir = self.mkdtemp()
+ base_dir, root_dir, base_name = self._create_files()
+ tmpdir2 = self.mkdtemp()
+ filename = make_archive(base_name, format, root_dir, base_dir)
+
+ # let's try to unpack it now
+ unpack_archive(filename, tmpdir2)
+ diff = self._compare_dirs(tmpdir, tmpdir2)
+ self.assertEqual(diff, [])
+
+ # and again, this time with the format specified
+ tmpdir3 = self.mkdtemp()
+ unpack_archive(filename, tmpdir3, format=format)
+ diff = self._compare_dirs(tmpdir, tmpdir3)
+ self.assertEqual(diff, [])
+ self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
+ self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
+
+ def test_unpack_registery(self):
+
+ formats = get_unpack_formats()
+
+ def _boo(filename, extract_dir, extra):
+ self.assertEqual(extra, 1)
+ self.assertEqual(filename, 'stuff.boo')
+ self.assertEqual(extract_dir, 'xx')
+
+ register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
+ unpack_archive('stuff.boo', 'xx')
+
+ # trying to register a .boo unpacker again
+ self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
+ ['.boo'], _boo)
+
+ # should work now
+ unregister_unpack_format('Boo')
+ register_unpack_format('Boo2', ['.boo'], _boo)
+ self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
+ self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
+
+ # let's leave a clean state
+ unregister_unpack_format('Boo2')
+ self.assertEqual(get_unpack_formats(), formats)
+
class TestMove(unittest.TestCase):
@@ -604,20 +822,11 @@ class TestMove(unittest.TestCase):
self.dst_dir = tempfile.mkdtemp()
self.src_file = os.path.join(self.src_dir, filename)
self.dst_file = os.path.join(self.dst_dir, filename)
- # Try to create a dir in the current directory, hoping that it is
- # not located on the same filesystem as the system tmp dir.
- try:
- self.dir_other_fs = tempfile.mkdtemp(
- dir=os.path.dirname(__file__))
- self.file_other_fs = os.path.join(self.dir_other_fs,
- filename)
- except OSError:
- self.dir_other_fs = None
with open(self.src_file, "wb") as f:
- f.write("spam")
+ f.write(b"spam")
def tearDown(self):
- for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
+ for d in (self.src_dir, self.dst_dir):
try:
if d:
shutil.rmtree(d)
@@ -646,21 +855,15 @@ class TestMove(unittest.TestCase):
# Move a file inside an existing dir on the same filesystem.
self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
+ @mock_rename
def test_move_file_other_fs(self):
# Move a file to an existing dir on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- self._check_move_file(self.src_file, self.file_other_fs,
- self.file_other_fs)
+ self.test_move_file()
+ @mock_rename
def test_move_file_to_dir_other_fs(self):
# Move a file to another location on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- self._check_move_file(self.src_file, self.dir_other_fs,
- self.file_other_fs)
+ self.test_move_file_to_dir()
def test_move_dir(self):
# Move a dir to another location on the same filesystem.
@@ -673,32 +876,20 @@ class TestMove(unittest.TestCase):
except:
pass
+ @mock_rename
def test_move_dir_other_fs(self):
# Move a dir to another location on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
- try:
- self._check_move_dir(self.src_dir, dst_dir, dst_dir)
- finally:
- try:
- shutil.rmtree(dst_dir)
- except:
- pass
+ self.test_move_dir()
def test_move_dir_to_dir(self):
# Move a dir inside an existing dir on the same filesystem.
self._check_move_dir(self.src_dir, self.dst_dir,
os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
+ @mock_rename
def test_move_dir_to_dir_other_fs(self):
# Move a dir inside an existing dir on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- self._check_move_dir(self.src_dir, self.dir_other_fs,
- os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
+ self.test_move_dir_to_dir()
def test_existing_file_inside_dest_dir(self):
# A file with the same name inside the destination dir already exists.
@@ -859,7 +1050,7 @@ class TestCopyFile(unittest.TestCase):
def test_main():
- test_support.run_unittest(TestShutil, TestMove, TestCopyFile)
+ support.run_unittest(TestShutil, TestMove, TestCopyFile)
if __name__ == '__main__':
test_main()