diff options
Diffstat (limited to 'Lib/test/test_zlib.py')
-rw-r--r-- | Lib/test/test_zlib.py | 298 |
1 files changed, 176 insertions, 122 deletions
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index f3dffd6f71d..4661c1d6135 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -1,75 +1,76 @@ import unittest -from test.test_support import TESTFN, run_unittest, import_module, unlink, requires +from test import support import binascii import random -from test.test_support import precisionbigmemtest, _1G, _4G import sys +from test.support import bigmemtest, _1G, _4G + +zlib = support.import_module('zlib') try: import mmap except ImportError: mmap = None -zlib = import_module('zlib') - class ChecksumTestCase(unittest.TestCase): # checksum test cases def test_crc32start(self): - self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) - self.assertTrue(zlib.crc32("abc", 0xffffffff)) + self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) + self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) def test_crc32empty(self): - self.assertEqual(zlib.crc32("", 0), 0) - self.assertEqual(zlib.crc32("", 1), 1) - self.assertEqual(zlib.crc32("", 432), 432) + self.assertEqual(zlib.crc32(b"", 0), 0) + self.assertEqual(zlib.crc32(b"", 1), 1) + self.assertEqual(zlib.crc32(b"", 432), 432) def test_adler32start(self): - self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) - self.assertTrue(zlib.adler32("abc", 0xffffffff)) + self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) + self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) def test_adler32empty(self): - self.assertEqual(zlib.adler32("", 0), 0) - self.assertEqual(zlib.adler32("", 1), 1) - self.assertEqual(zlib.adler32("", 432), 432) + self.assertEqual(zlib.adler32(b"", 0), 0) + self.assertEqual(zlib.adler32(b"", 1), 1) + self.assertEqual(zlib.adler32(b"", 432), 432) def assertEqual32(self, seen, expected): # 32-bit values masked -- checksums on 32- vs 64- bit machines # This is important if bit 31 (0x08000000L) is set. - self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) + self.assertEqual(seen & 0x0FFFFFFFF, expected & 0x0FFFFFFFF) def test_penguins(self): - self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) - self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) - self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) - self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) + self.assertEqual32(zlib.crc32(b"penguin", 0), 0x0e5c1a120) + self.assertEqual32(zlib.crc32(b"penguin", 1), 0x43b6aa94) + self.assertEqual32(zlib.adler32(b"penguin", 0), 0x0bcf02f6) + self.assertEqual32(zlib.adler32(b"penguin", 1), 0x0bd602f7) - self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) - self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) + self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) + self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) - def test_abcdefghijklmnop(self): - """test issue1202 compliance: signed crc32, adler32 in 2.x""" - foo = 'abcdefghijklmnop' + def test_crc32_adler32_unsigned(self): + foo = b'abcdefghijklmnop' # explicitly test signed behavior - self.assertEqual(zlib.crc32(foo), -1808088941) - self.assertEqual(zlib.crc32('spam'), 1138425661) - self.assertEqual(zlib.adler32(foo+foo), -721416943) - self.assertEqual(zlib.adler32('spam'), 72286642) + self.assertEqual(zlib.crc32(foo), 2486878355) + self.assertEqual(zlib.crc32(b'spam'), 1138425661) + self.assertEqual(zlib.adler32(foo+foo), 3573550353) + self.assertEqual(zlib.adler32(b'spam'), 72286642) def test_same_as_binascii_crc32(self): - foo = 'abcdefghijklmnop' - self.assertEqual(binascii.crc32(foo), zlib.crc32(foo)) - self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam')) + foo = b'abcdefghijklmnop' + crc = 2486878355 + self.assertEqual(binascii.crc32(foo), crc) + self.assertEqual(zlib.crc32(foo), crc) + self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) + - def test_negative_crc_iv_input(self): - # The range of valid input values for the crc state should be - # -2**31 through 2**32-1 to allow inputs artifically constrained - # to a signed 32-bit integer. - self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL)) - self.assertEqual(zlib.crc32('spam', -3141593), - zlib.crc32('spam', 0xffd01027L)) - self.assertEqual(zlib.crc32('spam', -(2**31)), - zlib.crc32('spam', (2**31))) +# Issue #10276 - check that inputs >=4GB are handled correctly. +class ChecksumBigBufferTestCase(unittest.TestCase): + + @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) + def test_big_buffer(self, size): + data = b"nyan" * (_1G + 1) + self.assertEqual(zlib.crc32(data), 1044521549) + self.assertEqual(zlib.adler32(data), 2256789997) class ExceptionTestCase(unittest.TestCase): @@ -78,7 +79,18 @@ class ExceptionTestCase(unittest.TestCase): # specifying compression level out of range causes an error # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib # accepts 0 too) - self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10) + self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) + + def test_badargs(self): + self.assertRaises(TypeError, zlib.adler32) + self.assertRaises(TypeError, zlib.crc32) + self.assertRaises(TypeError, zlib.compress) + self.assertRaises(TypeError, zlib.decompress) + for arg in (42, None, '', 'abc', (), []): + self.assertRaises(TypeError, zlib.adler32, arg) + self.assertRaises(TypeError, zlib.crc32, arg) + self.assertRaises(TypeError, zlib.compress, arg) + self.assertRaises(TypeError, zlib.decompress, arg) def test_badcompressobj(self): # verify failure on building compress object with bad params @@ -104,7 +116,7 @@ class BaseCompressTestCase(object): # Generate 10MB worth of random, and expand it by repeating it. # The assumption is that zlib's memory is not big enough to exploit # such spread out redundancy. - data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M)) + data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little') for i in range(10)]) data = data * (size // len(data) + 1) try: @@ -114,7 +126,7 @@ class BaseCompressTestCase(object): data = None def check_big_decompress_buffer(self, size, decompress_func): - data = 'x' * size + data = b'x' * size try: compressed = zlib.compress(data, 1) finally: @@ -124,7 +136,7 @@ class BaseCompressTestCase(object): # Sanity check try: self.assertEqual(len(data), size) - self.assertEqual(len(data.strip('x')), 0) + self.assertEqual(len(data.strip(b'x')), 0) finally: data = None @@ -139,40 +151,60 @@ class CompressTestCase(BaseCompressTestCase, unittest.TestCase): # compress more data data = HAMLET_SCENE * 128 x = zlib.compress(data) - self.assertEqual(zlib.decompress(x), data) + self.assertEqual(zlib.compress(bytearray(data)), x) + for ob in x, bytearray(x): + self.assertEqual(zlib.decompress(ob), data) def test_incomplete_stream(self): # An useful error message is given x = zlib.compress(HAMLET_SCENE) - self.assertRaisesRegexp(zlib.error, + self.assertRaisesRegex(zlib.error, "Error -5 while decompressing data: incomplete or truncated stream", zlib.decompress, x[:-1]) # Memory use of the following functions takes into account overallocation - @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) + @bigmemtest(size=_1G + 1024 * 1024, memuse=3) def test_big_compress_buffer(self, size): compress = lambda s: zlib.compress(s, 1) self.check_big_compress_buffer(size, compress) - @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) + @bigmemtest(size=_1G + 1024 * 1024, memuse=2) def test_big_decompress_buffer(self, size): self.check_big_decompress_buffer(size, zlib.decompress) + @bigmemtest(size=_4G + 100, memuse=1) + def test_length_overflow(self, size): + if size < _4G + 100: + self.skipTest("not enough free memory, need at least 4 GB") + data = b'x' * size + try: + self.assertRaises(OverflowError, zlib.compress, data, 1) + self.assertRaises(OverflowError, zlib.decompress, data) + finally: + data = None + class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): # Test compression object def test_pair(self): # straightforward compress/decompress objects - data = HAMLET_SCENE * 128 - co = zlib.compressobj() - x1 = co.compress(data) - x2 = co.flush() - self.assertRaises(zlib.error, co.flush) # second flush should not work - dco = zlib.decompressobj() - y1 = dco.decompress(x1 + x2) - y2 = dco.flush() - self.assertEqual(data, y1 + y2) + datasrc = HAMLET_SCENE * 128 + datazip = zlib.compress(datasrc) + # should compress both bytes and bytearray data + for data in (datasrc, bytearray(datasrc)): + co = zlib.compressobj() + x1 = co.compress(data) + x2 = co.flush() + self.assertRaises(zlib.error, co.flush) # second flush should not work + self.assertEqual(x1 + x2, datazip) + for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): + dco = zlib.decompressobj() + y1 = dco.decompress(v1 + v2) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + self.assertIsInstance(dco.unconsumed_tail, bytes) + self.assertIsInstance(dco.unused_data, bytes) def test_compressoptions(self): # specify lots of options to compressobj() @@ -197,10 +229,10 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): for i in range(0, len(data), 256): bufs.append(co.compress(data[i:i+256])) bufs.append(co.flush()) - combuf = ''.join(bufs) + combuf = b''.join(bufs) dco = zlib.decompressobj() - y1 = dco.decompress(''.join(bufs)) + y1 = dco.decompress(b''.join(bufs)) y2 = dco.flush() self.assertEqual(data, y1 + y2) @@ -213,30 +245,36 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): for i in range(0, len(data), cx): bufs.append(co.compress(data[i:i+cx])) bufs.append(co.flush()) - combuf = ''.join(bufs) + combuf = b''.join(bufs) + + decombuf = zlib.decompress(combuf) + # Test type of return value + self.assertIsInstance(decombuf, bytes) - self.assertEqual(data, zlib.decompress(combuf)) + self.assertEqual(data, decombuf) dco = zlib.decompressobj() bufs = [] for i in range(0, len(combuf), dcx): bufs.append(dco.decompress(combuf[i:i+dcx])) - self.assertEqual('', dco.unconsumed_tail, ######## - "(A) uct should be '': not %d long" % + self.assertEqual(b'', dco.unconsumed_tail, ######## + "(A) uct should be b'': not %d long" % len(dco.unconsumed_tail)) + self.assertEqual(b'', dco.unused_data) if flush: bufs.append(dco.flush()) else: while True: - chunk = dco.decompress('') + chunk = dco.decompress(b'') if chunk: bufs.append(chunk) else: break - self.assertEqual('', dco.unconsumed_tail, ######## - "(B) uct should be '': not %d long" % + self.assertEqual(b'', dco.unconsumed_tail, ######## + "(B) uct should be b'': not %d long" % len(dco.unconsumed_tail)) - self.assertEqual(data, ''.join(bufs)) + self.assertEqual(b'', dco.unused_data) + self.assertEqual(data, b''.join(bufs)) # Failure means: "decompressobj with init options failed" def test_decompincflush(self): @@ -252,7 +290,7 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): for i in range(0, len(data), cx): bufs.append(co.compress(data[i:i+cx])) bufs.append(co.flush()) - combuf = ''.join(bufs) + combuf = b''.join(bufs) self.assertEqual(data, zlib.decompress(combuf), 'compressed data failure') @@ -267,7 +305,7 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): bufs.append(chunk) cb = dco.unconsumed_tail bufs.append(dco.flush()) - self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') def test_decompressmaxlen(self, flush=False): # Check a decompression object with max_length specified @@ -277,7 +315,7 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): for i in range(0, len(data), 256): bufs.append(co.compress(data[i:i+256])) bufs.append(co.flush()) - combuf = ''.join(bufs) + combuf = b''.join(bufs) self.assertEqual(data, zlib.decompress(combuf), 'compressed data failure') @@ -295,11 +333,11 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): bufs.append(dco.flush()) else: while chunk: - chunk = dco.decompress('', max_length) + chunk = dco.decompress(b'', max_length) self.assertFalse(len(chunk) > max_length, 'chunk too big (%d>%d)' % (len(chunk),max_length)) bufs.append(chunk) - self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') def test_decompressmaxlenflush(self): self.test_decompressmaxlen(flush=True) @@ -307,17 +345,17 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): def test_maxlenmisc(self): # Misc tests of max_length dco = zlib.decompressobj() - self.assertRaises(ValueError, dco.decompress, "", -1) - self.assertEqual('', dco.unconsumed_tail) + self.assertRaises(ValueError, dco.decompress, b"", -1) + self.assertEqual(b'', dco.unconsumed_tail) def test_clear_unconsumed_tail(self): # Issue #12050: calling decompress() without providing max_length # should clear the unconsumed_tail attribute. - cdata = "x\x9cKLJ\x06\x00\x02M\x01" # "abc" + cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" dco = zlib.decompressobj() ddata = dco.decompress(cdata, 1) ddata += dco.decompress(dco.unconsumed_tail) - self.assertEqual(dco.unconsumed_tail, "") + self.assertEqual(dco.unconsumed_tail, b"") def test_flushes(self): # Test flush() with the various options, using all the @@ -334,7 +372,7 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): b = obj.flush( sync ) c = obj.compress( data[3000:] ) d = obj.flush() - self.assertEqual(zlib.decompress(''.join([a,b,c,d])), + self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), data, ("Decompress failed: flush " "mode=%i, level=%i") % (sync, level)) del obj @@ -381,26 +419,51 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) self.assertTrue(co.flush()) # Returns a zlib header dco = zlib.decompressobj() - self.assertEqual(dco.flush(), "") # Returns nothing + self.assertEqual(dco.flush(), b"") # Returns nothing def test_decompress_incomplete_stream(self): # This is 'foo', deflated - x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' + x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # For the record - self.assertEqual(zlib.decompress(x), 'foo') + self.assertEqual(zlib.decompress(x), b'foo') self.assertRaises(zlib.error, zlib.decompress, x[:-5]) # Omitting the stream end works with decompressor objects # (see issue #8672). dco = zlib.decompressobj() y = dco.decompress(x[:-5]) y += dco.flush() - self.assertEqual(y, 'foo') + self.assertEqual(y, b'foo') + + def test_decompress_unused_data(self): + # Repeated calls to decompress() after EOF should accumulate data in + # dco.unused_data, instead of just storing the arg to the last call. + source = b'abcdefghijklmnopqrstuvwxyz' + remainder = b'0123456789' + y = zlib.compress(source) + x = y + remainder + for maxlen in 0, 1000: + for step in 1, 2, len(y), len(x): + dco = zlib.decompressobj() + data = b'' + for i in range(0, len(x), step): + if i < len(y): + self.assertEqual(dco.unused_data, b'') + if maxlen == 0: + data += dco.decompress(x[i : i + step]) + self.assertEqual(dco.unconsumed_tail, b'') + else: + data += dco.decompress( + dco.unconsumed_tail + x[i : i + step], maxlen) + data += dco.flush() + self.assertEqual(data, source) + self.assertEqual(dco.unconsumed_tail, b'') + self.assertEqual(dco.unused_data, remainder) def test_flush_with_freed_input(self): # Issue #16411: decompressor accesses input to last decompress() call # in flush(), even if this object has been freed in the meanwhile. - input1 = 'abcdefghijklmnopqrstuvwxyz' - input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM' + input1 = b'abcdefghijklmnopqrstuvwxyz' + input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' data = zlib.compress(input1) dco = zlib.decompressobj() dco.decompress(data, 1) @@ -412,7 +475,7 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): def test_compresscopy(self): # Test copying a compression object data0 = HAMLET_SCENE - data1 = HAMLET_SCENE.swapcase() + data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) bufs0 = [] bufs0.append(c0.compress(data0)) @@ -422,11 +485,11 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): bufs0.append(c0.compress(data0)) bufs0.append(c0.flush()) - s0 = ''.join(bufs0) + s0 = b''.join(bufs0) bufs1.append(c1.compress(data1)) bufs1.append(c1.flush()) - s1 = ''.join(bufs1) + s1 = b''.join(bufs1) self.assertEqual(zlib.decompress(s0),data0+data0) self.assertEqual(zlib.decompress(s1),data0+data1) @@ -438,36 +501,13 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): c.flush() self.assertRaises(ValueError, c.copy) - def test_decompress_unused_data(self): - # Repeated calls to decompress() after EOF should accumulate data in - # dco.unused_data, instead of just storing the arg to the last call. - source = b'abcdefghijklmnopqrstuvwxyz' - remainder = b'0123456789' - y = zlib.compress(source) - x = y + remainder - for maxlen in 0, 1000: - for step in 1, 2, len(y), len(x): - dco = zlib.decompressobj() - data = b'' - for i in range(0, len(x), step): - if i < len(y): - self.assertEqual(dco.unused_data, b'') - if maxlen == 0: - data += dco.decompress(x[i : i + step]) - self.assertEqual(dco.unconsumed_tail, b'') - else: - data += dco.decompress( - dco.unconsumed_tail + x[i : i + step], maxlen) - data += dco.flush() - self.assertEqual(data, source) - self.assertEqual(dco.unconsumed_tail, b'') - self.assertEqual(dco.unused_data, remainder) - if hasattr(zlib.decompressobj(), "copy"): def test_decompresscopy(self): # Test copying a decompression object data = HAMLET_SCENE comp = zlib.compress(data) + # Test type of return value + self.assertIsInstance(comp, bytes) d0 = zlib.decompressobj() bufs0 = [] @@ -477,10 +517,10 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): bufs1 = bufs0[:] bufs0.append(d0.decompress(comp[32:])) - s0 = ''.join(bufs0) + s0 = b''.join(bufs0) bufs1.append(d1.decompress(comp[32:])) - s1 = ''.join(bufs1) + s1 = b''.join(bufs1) self.assertEqual(s0,s1) self.assertEqual(s0,data) @@ -495,18 +535,31 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): # Memory use of the following functions takes into account overallocation - @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) + @bigmemtest(size=_1G + 1024 * 1024, memuse=3) def test_big_compress_buffer(self, size): c = zlib.compressobj(1) compress = lambda s: c.compress(s) + c.flush() self.check_big_compress_buffer(size, compress) - @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) + @bigmemtest(size=_1G + 1024 * 1024, memuse=2) def test_big_decompress_buffer(self, size): d = zlib.decompressobj() decompress = lambda s: d.decompress(s) + d.flush() self.check_big_decompress_buffer(size, decompress) + @bigmemtest(size=_4G + 100, memuse=1) + def test_length_overflow(self, size): + if size < _4G + 100: + self.skipTest("not enough free memory, need at least 4 GB") + data = b'x' * size + c = zlib.compressobj(1) + d = zlib.decompressobj() + try: + self.assertRaises(OverflowError, c.compress, data) + self.assertRaises(OverflowError, d.decompress, data) + finally: + data = None + def genblock(seed, length, step=1024, generator=random): """length-byte stream of random data from a seed (in step-byte blocks).""" @@ -515,11 +568,10 @@ def genblock(seed, length, step=1024, generator=random): randint = generator.randint if length < step or step < 2: step = length - blocks = [] + blocks = bytes() for i in range(0, length, step): - blocks.append(''.join([chr(randint(0,255)) - for x in range(step)])) - return ''.join(blocks)[:length] + blocks += bytes(randint(0, 255) for x in range(step)) + return blocks @@ -532,7 +584,7 @@ def choose_lines(source, number, seed=None, generator=random): -HAMLET_SCENE = """ +HAMLET_SCENE = b""" LAERTES O, fear me not. @@ -598,12 +650,14 @@ LAERTES def test_main(): - run_unittest( + support.run_unittest( ChecksumTestCase, + ChecksumBigBufferTestCase, ExceptionTestCase, CompressTestCase, CompressObjectTestCase ) if __name__ == "__main__": - test_main() + unittest.main() # XXX + ###test_main() |