diff options
Diffstat (limited to 'Lib/test/test_tempfile.py')
-rw-r--r-- | Lib/test/test_tempfile.py | 325 |
1 files changed, 275 insertions, 50 deletions
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 2ddc04cedae..50cf3b49cb8 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -7,11 +7,8 @@ import re import warnings import unittest -from test import test_support +from test import support -warnings.filterwarnings("ignore", - category=RuntimeWarning, - message="mktemp", module=__name__) if hasattr(os, 'stat'): import stat @@ -38,6 +35,16 @@ class TC(unittest.TestCase): str_check = re.compile(r"[a-zA-Z0-9_-]{6}$") + def setUp(self): + self._warnings_manager = support.check_warnings() + self._warnings_manager.__enter__() + warnings.filterwarnings("ignore", category=RuntimeWarning, + message="mktemp", module=__name__) + + def tearDown(self): + self._warnings_manager.__exit__(None, None, None) + + def failOnException(self, what, ei=None): if ei is None: ei = sys.exc_info() @@ -79,7 +86,8 @@ class test_exports(TC): "gettempdir" : 1, "tempdir" : 1, "template" : 1, - "SpooledTemporaryFile" : 1 + "SpooledTemporaryFile" : 1, + "TemporaryDirectory" : 1, } unexp = [] @@ -97,10 +105,11 @@ class test__RandomNameSequence(TC): def setUp(self): self.r = tempfile._RandomNameSequence() + super().setUp() def test_get_six_char_str(self): # _RandomNameSequence returns a six-character string - s = self.r.next() + s = next(self.r) self.nameCheck(s, '', '', '') def test_many(self): @@ -108,13 +117,13 @@ class test__RandomNameSequence(TC): dict = {} r = self.r - for i in xrange(TEST_FILES): - s = r.next() + for i in range(TEST_FILES): + s = next(r) self.nameCheck(s, '', '', '') self.assertNotIn(s, dict) dict[s] = 1 - def test_supports_iter(self): + def supports_iter(self): # _RandomNameSequence supports the iterator protocol i = 0 @@ -171,13 +180,13 @@ class test__candidate_tempdir_list(TC): self.assertFalse(len(cand) == 0) for c in cand: - self.assertIsInstance(c, basestring) + self.assertIsInstance(c, str) def test_wanted_dirs(self): # _candidate_tempdir_list contains the expected directories # Make sure the interesting environment variables are all set. - with test_support.EnvironmentVarGuard() as env: + with support.EnvironmentVarGuard() as env: for envname in 'TMPDIR', 'TEMP', 'TMP': dirname = os.getenv(envname) if not dirname: @@ -259,15 +268,15 @@ class test__mkstemp_inner(TC): def test_basic(self): # _mkstemp_inner can create files - self.do_create().write("blat") - self.do_create(pre="a").write("blat") - self.do_create(suf="b").write("blat") - self.do_create(pre="a", suf="b").write("blat") - self.do_create(pre="aa", suf=".txt").write("blat") + self.do_create().write(b"blat") + self.do_create(pre="a").write(b"blat") + self.do_create(suf="b").write(b"blat") + self.do_create(pre="a", suf="b").write(b"blat") + self.do_create(pre="aa", suf=".txt").write(b"blat") def test_basic_many(self): # _mkstemp_inner can create many files (stochastic) - extant = range(TEST_FILES) + extant = list(range(TEST_FILES)) for i in extant: extant[i] = self.do_create(pre="aa") @@ -275,7 +284,7 @@ class test__mkstemp_inner(TC): # _mkstemp_inner can create files in a user-selected directory dir = tempfile.mkdtemp() try: - self.do_create(dir=dir).write("blat") + self.do_create(dir=dir).write(b"blat") finally: os.rmdir(dir) @@ -286,7 +295,7 @@ class test__mkstemp_inner(TC): file = self.do_create() mode = stat.S_IMODE(os.stat(file.name).st_mode) - expected = 0600 + expected = 0o600 if sys.platform in ('win32', 'os2emx'): # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. @@ -299,7 +308,7 @@ class test__mkstemp_inner(TC): if not has_spawnl: return # ugh, can't use SkipTest. - if test_support.verbose: + if support.verbose: v="v" else: v="q" @@ -337,8 +346,12 @@ class test__mkstemp_inner(TC): if not has_textmode: return # ugh, can't use SkipTest. - self.do_create(bin=0).write("blat\n") - # XXX should test that the file really is a text file + # A text file is truncated at the first Ctrl+Z byte + f = self.do_create(bin=0) + f.write(b"blat\x1a") + f.write(b"extra\n") + os.lseek(f.fd, 0, os.SEEK_SET) + self.assertEqual(os.read(f.fd, 20), b"blat") test_classes.append(test__mkstemp_inner) @@ -350,7 +363,7 @@ class test_gettempprefix(TC): # gettempprefix returns a nonempty prefix string p = tempfile.gettempprefix() - self.assertIsInstance(p, basestring) + self.assertIsInstance(p, str) self.assertTrue(len(p) > 0) def test_usable_template(self): @@ -395,7 +408,7 @@ class test_gettempdir(TC): # gettempdir. try: file = tempfile.NamedTemporaryFile() - file.write("blat") + file.write(b"blat") file.close() except: self.failOnException("create file in %s" % tempfile.gettempdir()) @@ -479,13 +492,13 @@ class test_mkdtemp(TC): def test_basic_many(self): # mkdtemp can create many directories (stochastic) - extant = range(TEST_FILES) + extant = list(range(TEST_FILES)) try: for i in extant: extant[i] = self.do_create(pre="aa") finally: for i in extant: - if(isinstance(i, basestring)): + if(isinstance(i, str)): os.rmdir(i) def test_choose_directory(self): @@ -504,8 +517,8 @@ class test_mkdtemp(TC): dir = self.do_create() try: mode = stat.S_IMODE(os.stat(dir).st_mode) - mode &= 0777 # Mask off sticky bits inherited from /tmp - expected = 0700 + mode &= 0o777 # Mask off sticky bits inherited from /tmp + expected = 0o700 if sys.platform in ('win32', 'os2emx'): # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. @@ -525,11 +538,13 @@ class test_mktemp(TC): # We must also suppress the RuntimeWarning it generates. def setUp(self): self.dir = tempfile.mkdtemp() + super().setUp() def tearDown(self): if self.dir: os.rmdir(self.dir) self.dir = None + super().tearDown() class mktemped: _unlink = os.unlink @@ -539,7 +554,7 @@ class test_mktemp(TC): self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) # Create the file. This will raise an exception if it's # mysteriously appeared in the meanwhile. - os.close(os.open(self.name, self._bflags, 0600)) + os.close(os.open(self.name, self._bflags, 0o600)) def __del__(self): self._unlink(self.name) @@ -563,7 +578,7 @@ class test_mktemp(TC): def test_many(self): # mktemp can choose many usable file names (stochastic) - extant = range(TEST_FILES) + extant = list(range(TEST_FILES)) for i in extant: extant[i] = self.do_create(pre="aa") @@ -616,7 +631,7 @@ class test_NamedTemporaryFile(TC): dir = tempfile.mkdtemp() try: f = tempfile.NamedTemporaryFile(dir=dir) - f.write('blat') + f.write(b'blat') f.close() self.assertFalse(os.path.exists(f.name), "NamedTemporaryFile %s exists after close" % f.name) @@ -630,7 +645,7 @@ class test_NamedTemporaryFile(TC): try: f = tempfile.NamedTemporaryFile(dir=dir, delete=False) tmp = f.name - f.write('blat') + f.write(b'blat') f.close() self.assertTrue(os.path.exists(f.name), "NamedTemporaryFile %s missing after close" % f.name) @@ -642,7 +657,7 @@ class test_NamedTemporaryFile(TC): def test_multiple_close(self): # A NamedTemporaryFile can be closed many times without error f = tempfile.NamedTemporaryFile() - f.write('abc\n') + f.write(b'abc\n') f.close() try: f.close() @@ -691,11 +706,11 @@ class test_SpooledTemporaryFile(TC): try: f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) self.assertFalse(f._rolled) - f.write('blat ' * 5) + f.write(b'blat ' * 5) self.assertTrue(f._rolled) filename = f.name f.close() - self.assertFalse(os.path.exists(filename), + self.assertFalse(isinstance(filename, str) and os.path.exists(filename), "SpooledTemporaryFile %s exists after close" % filename) finally: os.rmdir(dir) @@ -706,7 +721,7 @@ class test_SpooledTemporaryFile(TC): self.assertFalse(f._rolled) for i in range(5): f.seek(0, 0) - f.write('x' * 20) + f.write(b'x' * 20) self.assertFalse(f._rolled) def test_write_sequential(self): @@ -714,11 +729,11 @@ class test_SpooledTemporaryFile(TC): # over afterward f = self.do_create(max_size=30) self.assertFalse(f._rolled) - f.write('x' * 20) + f.write(b'x' * 20) self.assertFalse(f._rolled) - f.write('x' * 10) + f.write(b'x' * 10) self.assertFalse(f._rolled) - f.write('x') + f.write(b'x') self.assertTrue(f._rolled) def test_writelines(self): @@ -745,7 +760,7 @@ class test_SpooledTemporaryFile(TC): self.assertFalse(f._rolled) f.seek(100, 0) self.assertFalse(f._rolled) - f.write('x') + f.write(b'x') self.assertTrue(f._rolled) def test_fileno(self): @@ -758,7 +773,7 @@ class test_SpooledTemporaryFile(TC): def test_multiple_close_before_rollover(self): # A SpooledTemporaryFile can be closed many times without error f = tempfile.SpooledTemporaryFile() - f.write('abc\n') + f.write(b'abc\n') self.assertFalse(f._rolled) f.close() try: @@ -770,7 +785,7 @@ class test_SpooledTemporaryFile(TC): def test_multiple_close_after_rollover(self): # A SpooledTemporaryFile can be closed many times without error f = tempfile.SpooledTemporaryFile(max_size=1) - f.write('abc\n') + f.write(b'abc\n') self.assertTrue(f._rolled) f.close() try: @@ -788,10 +803,41 @@ class test_SpooledTemporaryFile(TC): write = f.write seek = f.seek - write("a" * 35) - write("b" * 35) + write(b"a" * 35) + write(b"b" * 35) seek(0, 0) - self.assertTrue(read(70) == 'a'*35 + 'b'*35) + self.assertEqual(read(70), b'a'*35 + b'b'*35) + + def test_text_mode(self): + # Creating a SpooledTemporaryFile with a text mode should produce + # a file object reading and writing (Unicode) text strings. + f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) + f.write("abc\n") + f.seek(0) + self.assertEqual(f.read(), "abc\n") + f.write("def\n") + f.seek(0) + self.assertEqual(f.read(), "abc\ndef\n") + f.write("xyzzy\n") + f.seek(0) + self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") + # Check that Ctrl+Z doesn't truncate the file + f.write("foo\x1abar\n") + f.seek(0) + self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") + + def test_text_newline_and_encoding(self): + f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, + newline='', encoding='utf-8') + f.write("\u039B\r\n") + f.seek(0) + self.assertEqual(f.read(), "\u039B\r\n") + self.assertFalse(f._rolled) + + f.write("\u039B" * 20 + "\r\n") + f.seek(0) + self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") + self.assertTrue(f._rolled) def test_context_manager_before_rollover(self): # A SpooledTemporaryFile can be used as a context manager @@ -808,7 +854,7 @@ class test_SpooledTemporaryFile(TC): # A SpooledTemporaryFile can be used as a context manager with tempfile.SpooledTemporaryFile(max_size=1) as f: self.assertFalse(f._rolled) - f.write('abc\n') + f.write(b'abc\n') f.flush() self.assertTrue(f._rolled) self.assertFalse(f.closed) @@ -821,7 +867,7 @@ class test_SpooledTemporaryFile(TC): def test_context_manager_after_rollover(self): # A SpooledTemporaryFile can be used as a context manager f = tempfile.SpooledTemporaryFile(max_size=1) - f.write('abc\n') + f.write(b'abc\n') f.flush() self.assertTrue(f._rolled) with f: @@ -851,7 +897,7 @@ class test_TemporaryFile(TC): # TemporaryFile creates files with no names (on this system) dir = tempfile.mkdtemp() f = tempfile.TemporaryFile(dir=dir) - f.write('blat') + f.write(b'blat') # Sneaky: because this file has no name, it should not prevent # us from removing the directory it was created in. @@ -867,7 +913,7 @@ class test_TemporaryFile(TC): def test_multiple_close(self): # A TemporaryFile can be closed many times without error f = tempfile.TemporaryFile() - f.write('abc\n') + f.write(b'abc\n') f.close() try: f.close() @@ -876,13 +922,192 @@ class test_TemporaryFile(TC): self.failOnException("close") # How to test the mode and bufsize parameters? + def test_mode_and_encoding(self): + + def roundtrip(input, *args, **kwargs): + with tempfile.TemporaryFile(*args, **kwargs) as fileobj: + fileobj.write(input) + fileobj.seek(0) + self.assertEqual(input, fileobj.read()) + + roundtrip(b"1234", "w+b") + roundtrip("abdc\n", "w+") + roundtrip("\u039B", "w+", encoding="utf-16") + roundtrip("foo\r\n", "w+", newline="") if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: test_classes.append(test_TemporaryFile) + +# Helper for test_del_on_shutdown +class NulledModules: + def __init__(self, *modules): + self.refs = [mod.__dict__ for mod in modules] + self.contents = [ref.copy() for ref in self.refs] + + def __enter__(self): + for d in self.refs: + for key in d: + d[key] = None + + def __exit__(self, *exc_info): + for d, c in zip(self.refs, self.contents): + d.clear() + d.update(c) + +class test_TemporaryDirectory(TC): + """Test TemporaryDirectory().""" + + def do_create(self, dir=None, pre="", suf="", recurse=1): + if dir is None: + dir = tempfile.gettempdir() + try: + tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) + except: + self.failOnException("TemporaryDirectory") + self.nameCheck(tmp.name, dir, pre, suf) + # Create a subdirectory and some files + if recurse: + self.do_create(tmp.name, pre, suf, recurse-1) + with open(os.path.join(tmp.name, "test.txt"), "wb") as f: + f.write(b"Hello world!") + return tmp + + def test_mkdtemp_failure(self): + # Check no additional exception if mkdtemp fails + # Previously would raise AttributeError instead + # (noted as part of Issue #10188) + with tempfile.TemporaryDirectory() as nonexistent: + pass + with self.assertRaises(os.error): + tempfile.TemporaryDirectory(dir=nonexistent) + + def test_explicit_cleanup(self): + # A TemporaryDirectory is deleted when cleaned up + dir = tempfile.mkdtemp() + try: + d = self.do_create(dir=dir) + self.assertTrue(os.path.exists(d.name), + "TemporaryDirectory %s does not exist" % d.name) + d.cleanup() + self.assertFalse(os.path.exists(d.name), + "TemporaryDirectory %s exists after cleanup" % d.name) + finally: + os.rmdir(dir) + + @support.skip_unless_symlink + def test_cleanup_with_symlink_to_a_directory(self): + # cleanup() should not follow symlinks to directories (issue #12464) + d1 = self.do_create() + d2 = self.do_create() + + # Symlink d1/foo -> d2 + os.symlink(d2.name, os.path.join(d1.name, "foo")) + + # This call to cleanup() should not follow the "foo" symlink + d1.cleanup() + + self.assertFalse(os.path.exists(d1.name), + "TemporaryDirectory %s exists after cleanup" % d1.name) + self.assertTrue(os.path.exists(d2.name), + "Directory pointed to by a symlink was deleted") + self.assertEqual(os.listdir(d2.name), ['test.txt'], + "Contents of the directory pointed to by a symlink " + "were deleted") + d2.cleanup() + + @support.cpython_only + def test_del_on_collection(self): + # A TemporaryDirectory is deleted when garbage collected + dir = tempfile.mkdtemp() + try: + d = self.do_create(dir=dir) + name = d.name + del d # Rely on refcounting to invoke __del__ + self.assertFalse(os.path.exists(name), + "TemporaryDirectory %s exists after __del__" % name) + finally: + os.rmdir(dir) + + @unittest.expectedFailure # See issue #10188 + def test_del_on_shutdown(self): + # A TemporaryDirectory may be cleaned up during shutdown + # Make sure it works with the relevant modules nulled out + with self.do_create() as dir: + d = self.do_create(dir=dir) + # Mimic the nulling out of modules that + # occurs during system shutdown + modules = [os, os.path] + if has_stat: + modules.append(stat) + # Currently broken, so suppress the warning + # that is otherwise emitted on stdout + with support.captured_stderr() as err: + with NulledModules(*modules): + d.cleanup() + # Currently broken, so stop spurious exception by + # indicating the object has already been closed + d._closed = True + # And this assert will fail, as expected by the + # unittest decorator... + self.assertFalse(os.path.exists(d.name), + "TemporaryDirectory %s exists after cleanup" % d.name) + + def test_warnings_on_cleanup(self): + # Two kinds of warning on shutdown + # Issue 10888: may write to stderr if modules are nulled out + # ResourceWarning will be triggered by __del__ + with self.do_create() as dir: + if os.sep != '\\': + # Embed a backslash in order to make sure string escaping + # in the displayed error message is dealt with correctly + suffix = '\\check_backslash_handling' + else: + suffix = '' + d = self.do_create(dir=dir, suf=suffix) + + #Check for the Issue 10888 message + modules = [os, os.path] + if has_stat: + modules.append(stat) + with support.captured_stderr() as err: + with NulledModules(*modules): + d.cleanup() + message = err.getvalue().replace('\\\\', '\\') + self.assertIn("while cleaning up", message) + self.assertIn(d.name, message) + + # Check for the resource warning + with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): + warnings.filterwarnings("always", category=ResourceWarning) + d.__del__() + self.assertFalse(os.path.exists(d.name), + "TemporaryDirectory %s exists after __del__" % d.name) + + def test_multiple_close(self): + # Can be cleaned-up many times without error + d = self.do_create() + d.cleanup() + try: + d.cleanup() + d.cleanup() + except: + self.failOnException("cleanup") + + def test_context_manager(self): + # Can be used as a context manager + d = self.do_create() + with d as name: + self.assertTrue(os.path.exists(name)) + self.assertEqual(name, d.name) + self.assertFalse(os.path.exists(name)) + + +test_classes.append(test_TemporaryDirectory) + def test_main(): - test_support.run_unittest(*test_classes) + support.run_unittest(*test_classes) if __name__ == "__main__": test_main() |