diff options
Diffstat (limited to 'Lib/test/test_tarfile.py')
-rw-r--r-- | Lib/test/test_tarfile.py | 160 |
1 files changed, 107 insertions, 53 deletions
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index f32eb8c438d..c7bf774b426 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -21,6 +21,10 @@ try: import bz2 except ImportError: bz2 = None +try: + import lzma +except ImportError: + lzma = None def md5sum(data): return md5(data).hexdigest() @@ -29,6 +33,7 @@ TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" tarname = support.findfile("testtar.tar") gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") +xzname = os.path.join(TEMPDIR, "testtar.tar.xz") tmpname = os.path.join(TEMPDIR, "tmp.tar") md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" @@ -51,13 +56,10 @@ class UstarReadTest(ReadTest): def test_fileobj_regular_file(self): tarinfo = self.tar.getmember("ustar/regtype") - fobj = self.tar.extractfile(tarinfo) - try: + with self.tar.extractfile(tarinfo) as fobj: data = fobj.read() self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), "regular file extraction failed") - finally: - fobj.close() def test_fileobj_readlines(self): self.tar.extract("ustar/regtype", TEMPDIR) @@ -65,8 +67,7 @@ class UstarReadTest(ReadTest): with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() - fobj = self.tar.extractfile(tarinfo) - try: + with self.tar.extractfile(tarinfo) as fobj: fobj2 = io.TextIOWrapper(fobj) lines2 = fobj2.readlines() self.assertTrue(lines1 == lines2, @@ -76,21 +77,16 @@ class UstarReadTest(ReadTest): self.assertTrue(lines2[83] == "I will gladly admit that Python is not the fastest running scripting language.\n", "fileobj.readlines() failed") - finally: - fobj.close() def test_fileobj_iter(self): self.tar.extract("ustar/regtype", TEMPDIR) tarinfo = self.tar.getmember("ustar/regtype") - with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1: + with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() - fobj2 = self.tar.extractfile(tarinfo) - try: + with self.tar.extractfile(tarinfo) as fobj2: lines2 = list(io.TextIOWrapper(fobj2)) self.assertTrue(lines1 == lines2, "fileobj.__iter__() failed") - finally: - fobj2.close() def test_fileobj_seek(self): self.tar.extract("ustar/regtype", TEMPDIR) @@ -142,17 +138,24 @@ class UstarReadTest(ReadTest): "read() after readline() failed") fobj.close() + def test_fileobj_text(self): + with self.tar.extractfile("ustar/regtype") as fobj: + fobj = io.TextIOWrapper(fobj) + data = fobj.read().encode("iso8859-1") + self.assertEqual(md5sum(data), md5_regtype) + try: + fobj.seek(100) + except AttributeError: + # Issue #13815: seek() complained about a missing + # flush() method. + self.fail("seeking failed in text mode") + # Test if symbolic and hard links are resolved by extractfile(). The # test link members each point to a regular member whose data is # supposed to be exported. def _test_fileobj_link(self, lnktype, regtype): - a = self.tar.extractfile(lnktype) - b = self.tar.extractfile(regtype) - try: + with self.tar.extractfile(lnktype) as a, self.tar.extractfile(regtype) as b: self.assertEqual(a.name, b.name) - finally: - a.close() - b.close() def test_fileobj_link1(self): self._test_fileobj_link("ustar/lnktype", "ustar/regtype") @@ -204,13 +207,15 @@ class CommonReadTest(ReadTest): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File + elif self.mode.endswith(":xz"): + _open = lzma.LZMAFile else: - _open = open + _open = io.FileIO for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. - with _open(tmpname, "wb") as fobj: + with _open(tmpname, "w") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) @@ -225,6 +230,10 @@ class CommonReadTest(ReadTest): class MiscReadTest(CommonReadTest): def test_no_name_argument(self): + if self.mode.endswith(("bz2", "xz")): + # BZ2File and LZMAFile have no name attribute. + self.skipTest("no name attribute") + with open(self.tarname, "rb") as fobj: tar = tarfile.open(fileobj=fobj, mode=self.mode) self.assertEqual(tar.name, os.path.abspath(fobj.name)) @@ -254,9 +263,8 @@ class MiscReadTest(CommonReadTest): t = tar.next() name = t.name offset = t.offset - f = tar.extractfile(t) - data = f.read() - f.close() + with tar.extractfile(t) as f: + data = f.read() finally: tar.close() @@ -265,10 +273,12 @@ class MiscReadTest(CommonReadTest): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File + elif self.mode.endswith(":xz"): + _open = lzma.LZMAFile else: - _open = open - fobj = _open(self.tarname, "rb") - try: + _open = io.FileIO + + with _open(self.tarname) as fobj: fobj.seek(offset) # Test if the tarfile starts with the second member. @@ -281,8 +291,6 @@ class MiscReadTest(CommonReadTest): self.assertEqual(tar.extractfile(t).read(), data, "seek back did not work") tar.close() - finally: - fobj.close() def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. @@ -428,27 +436,26 @@ class StreamReadTest(CommonReadTest): for tarinfo in self.tar: if not tarinfo.isreg(): continue - fobj = self.tar.extractfile(tarinfo) - while True: - try: - buf = fobj.read(512) - except tarfile.StreamError: - self.fail("simple read-through using TarFile.extractfile() failed") - if not buf: - break - fobj.close() + with self.tar.extractfile(tarinfo) as fobj: + while True: + try: + buf = fobj.read(512) + except tarfile.StreamError: + self.fail("simple read-through using TarFile.extractfile() failed") + if not buf: + break def test_fileobj_regular_file(self): tarinfo = self.tar.next() # get "regtype" (can't use getmember) - fobj = self.tar.extractfile(tarinfo) - data = fobj.read() + with self.tar.extractfile(tarinfo) as fobj: + data = fobj.read() self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), "regular file extraction failed") def test_provoke_stream_error(self): tarinfos = self.tar.getmembers() - f = self.tar.extractfile(tarinfos[0]) # read the first member - self.assertRaises(tarfile.StreamError, f.read) + with self.tar.extractfile(tarinfos[0]) as f: # read the first member + self.assertRaises(tarfile.StreamError, f.read) def test_compare_members(self): tar1 = tarfile.open(tarname, encoding="iso8859-1") @@ -526,6 +533,18 @@ class DetectReadTest(unittest.TestCase): testfunc(bz2name, "r|*") testfunc(bz2name, "r|bz2") + if lzma: + self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:xz") + self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|xz") + self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r:") + self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r|") + + testfunc(xzname, "r") + testfunc(xzname, "r:*") + testfunc(xzname, "r:xz") + testfunc(xzname, "r|*") + testfunc(xzname, "r|xz") + def test_detect_file(self): self._test_modes(self._testfunc_file) @@ -723,7 +742,7 @@ class GNUReadTest(LongnameTest): # Return True if the platform knows the st_blocks stat attribute and # uses st_blocks units of 512 bytes, and if the filesystem is able to # store holes in files. - if sys.platform == "linux2": + if sys.platform.startswith("linux"): # Linux evidentially has 512 byte st_blocks units. name = os.path.join(TEMPDIR, "sparse-test") with open(name, "wb") as fobj: @@ -913,7 +932,7 @@ class WriteTest(WriteTestBase): try: for name in ("foo", "bar", "baz"): name = os.path.join(tempdir, name) - open(name, "wb").close() + support.create_empty_file(name) exclude = os.path.isfile @@ -940,7 +959,7 @@ class WriteTest(WriteTestBase): try: for name in ("foo", "bar", "baz"): name = os.path.join(tempdir, name) - open(name, "wb").close() + support.create_empty_file(name) def filter(tarinfo): if os.path.basename(tarinfo.name) == "bar": @@ -979,7 +998,7 @@ class WriteTest(WriteTestBase): # and compare the stored name with the original. foo = os.path.join(TEMPDIR, "foo") if not dir: - open(foo, "w").close() + support.create_empty_file(foo) else: os.mkdir(foo) @@ -1096,6 +1115,9 @@ class StreamWriteTest(WriteTestBase): data = dec.decompress(data) self.assertTrue(len(dec.unused_data) == 0, "found trailing data") + elif self.mode.endswith("xz"): + with lzma.LZMAFile(tmpname) as fobj: + data = fobj.read() else: with open(tmpname, "rb") as fobj: data = fobj.read() @@ -1339,7 +1361,7 @@ class UstarUnicodeTest(unittest.TestCase): self._test_unicode_filename("utf7") def test_utf8_filename(self): - self._test_unicode_filename("utf8") + self._test_unicode_filename("utf-8") def _test_unicode_filename(self, encoding): tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") @@ -1418,7 +1440,7 @@ class GNUUnicodeTest(UstarUnicodeTest): def test_bad_pax_header(self): # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields # without a hdrcharset=BINARY header. - for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"), + for encoding, name in (("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: @@ -1433,7 +1455,7 @@ class PAXUnicodeTest(UstarUnicodeTest): def test_binary_header(self): # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. - for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"), + for encoding, name in (("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: @@ -1458,12 +1480,9 @@ class AppendTest(unittest.TestCase): with tarfile.open(tarname, encoding="iso8859-1") as src: t = src.getmember("ustar/regtype") t.name = "foo" - f = src.extractfile(t) - try: + with src.extractfile(t) as f: with tarfile.open(self.tarname, mode) as tar: tar.addfile(t, f) - finally: - f.close() def _test(self, names=["bar"], fileobj=None): with tarfile.open(self.tarname, fileobj=fileobj) as tar: @@ -1510,6 +1529,12 @@ class AppendTest(unittest.TestCase): self._create_testtar("w:bz2") self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") + def test_append_lzma(self): + if lzma is None: + self.skipTest("lzma module not available") + self._create_testtar("w:xz") + self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") + # Append mode is supposed to fail if the tarfile to append to # does not end with a zero block. def _test_error(self, data): @@ -1788,6 +1813,21 @@ class Bz2PartialReadTest(unittest.TestCase): self._test_partial_input("r:bz2") +class LzmaMiscReadTest(MiscReadTest): + tarname = xzname + mode = "r:xz" +class LzmaUstarReadTest(UstarReadTest): + tarname = xzname + mode = "r:xz" +class LzmaStreamReadTest(StreamReadTest): + tarname = xzname + mode = "r|xz" +class LzmaWriteTest(WriteTest): + mode = "w:xz" +class LzmaStreamWriteTest(StreamWriteTest): + mode = "w|xz" + + def test_main(): support.unlink(TEMPDIR) os.makedirs(TEMPDIR) @@ -1850,6 +1890,20 @@ def test_main(): Bz2PartialReadTest, ] + if lzma: + # Create testtar.tar.xz and add lzma-specific tests. + support.unlink(xzname) + with lzma.LZMAFile(xzname, "w") as tar: + tar.write(data) + + tests += [ + LzmaMiscReadTest, + LzmaUstarReadTest, + LzmaStreamReadTest, + LzmaWriteTest, + LzmaStreamWriteTest, + ] + try: support.run_unittest(*tests) finally: |