diff options
Diffstat (limited to 'Lib/test/test_zstd.py')
-rw-r--r-- | Lib/test/test_zstd.py | 356 |
1 files changed, 294 insertions, 62 deletions
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index bc809603cbc..d4c28aed38e 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -64,6 +64,10 @@ TRAINED_DICT = None SUPPORT_MULTITHREADING = False +C_INT_MIN = -(2**31) +C_INT_MAX = (2**31) - 1 + + def setUpModule(): global SUPPORT_MULTITHREADING SUPPORT_MULTITHREADING = CompressionParameter.nb_workers.bounds() != (0, 0) @@ -195,14 +199,21 @@ class CompressorTestCase(unittest.TestCase): self.assertRaises(TypeError, ZstdCompressor, zstd_dict=b"abcd1234") self.assertRaises(TypeError, ZstdCompressor, zstd_dict={1: 2, 3: 4}) - with self.assertRaises(ValueError): - ZstdCompressor(2**31) - with self.assertRaises(ValueError): - ZstdCompressor(options={2**31: 100}) + # valid range for compression level is [-(1<<17), 22] + msg = r'illegal compression level {}; the valid range is \[-?\d+, -?\d+\]' + with self.assertRaisesRegex(ValueError, msg.format(C_INT_MAX)): + ZstdCompressor(C_INT_MAX) + with self.assertRaisesRegex(ValueError, msg.format(C_INT_MIN)): + ZstdCompressor(C_INT_MIN) + msg = r'illegal compression level; the valid range is \[-?\d+, -?\d+\]' + with self.assertRaisesRegex(ValueError, msg): + ZstdCompressor(level=-(2**1000)) + with self.assertRaisesRegex(ValueError, msg): + ZstdCompressor(level=2**1000) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.window_log: 100}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={3333: 100}) # Method bad arguments @@ -253,43 +264,57 @@ class CompressorTestCase(unittest.TestCase): } ZstdCompressor(options=d) - # larger than signed int, ValueError d1 = d.copy() - d1[CompressionParameter.ldm_bucket_size_log] = 2**31 - self.assertRaises(ValueError, ZstdCompressor, options=d1) + # larger than signed int + d1[CompressionParameter.ldm_bucket_size_log] = C_INT_MAX + with self.assertRaises(ValueError): + ZstdCompressor(options=d1) + # smaller than signed int + d1[CompressionParameter.ldm_bucket_size_log] = C_INT_MIN + with self.assertRaises(ValueError): + ZstdCompressor(options=d1) - # clamp compressionLevel + # out of bounds compression level level_min, level_max = CompressionParameter.compression_level.bounds() - compress(b'', level_max+1) - compress(b'', level_min-1) - - compress(b'', options={CompressionParameter.compression_level:level_max+1}) - compress(b'', options={CompressionParameter.compression_level:level_min-1}) + with self.assertRaises(ValueError): + compress(b'', level_max+1) + with self.assertRaises(ValueError): + compress(b'', level_min-1) + with self.assertRaises(ValueError): + compress(b'', 2**1000) + with self.assertRaises(ValueError): + compress(b'', -(2**1000)) + with self.assertRaises(ValueError): + compress(b'', options={ + CompressionParameter.compression_level: level_max+1}) + with self.assertRaises(ValueError): + compress(b'', options={ + CompressionParameter.compression_level: level_min-1}) # zstd lib doesn't support MT compression if not SUPPORT_MULTITHREADING: - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.nb_workers:4}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.job_size:4}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.overlap_log:4}) # out of bounds error msg option = {CompressionParameter.window_log:100} - with self.assertRaisesRegex(ZstdError, - (r'Error when setting zstd compression parameter "window_log", ' - r'it should \d+ <= value <= \d+, provided value is 100\. ' - r'\((?:32|64)-bit build\)')): + with self.assertRaisesRegex( + ValueError, + "compression parameter 'window_log' received an illegal value 100; " + r'the valid range is \[-?\d+, -?\d+\]', + ): compress(b'', options=option) def test_unknown_compression_parameter(self): KEY = 100001234 option = {CompressionParameter.compression_level: 10, KEY: 200000000} - pattern = (r'Invalid zstd compression parameter.*?' - fr'"unknown parameter \(key {KEY}\)"') - with self.assertRaisesRegex(ZstdError, pattern): + pattern = rf"invalid compression parameter 'unknown parameter \(key {KEY}\)'" + with self.assertRaisesRegex(ValueError, pattern): ZstdCompressor(options=option) @unittest.skipIf(not SUPPORT_MULTITHREADING, @@ -370,6 +395,115 @@ class CompressorTestCase(unittest.TestCase): c = ZstdCompressor() self.assertNotEqual(c.compress(b'', c.FLUSH_FRAME), b'') + def test_set_pledged_input_size(self): + DAT = DECOMPRESSED_100_PLUS_32KB + CHUNK_SIZE = len(DAT) // 3 + + # wrong value + c = ZstdCompressor() + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(-300) + # overflow + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64) + # ZSTD_CONTENTSIZE_ERROR is invalid + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64-2) + # ZSTD_CONTENTSIZE_UNKNOWN should use None + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64-1) + + # check valid values are settable + c.set_pledged_input_size(2**63) + c.set_pledged_input_size(2**64-3) + + # check that zero means empty frame + c = ZstdCompressor(level=1) + c.set_pledged_input_size(0) + c.compress(b'') + dat = c.flush() + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, 0) + + + # wrong mode + c = ZstdCompressor(level=1) + c.compress(b'123456') + self.assertEqual(c.last_mode, c.CONTINUE) + with self.assertRaisesRegex(ValueError, + r'last_mode == FLUSH_FRAME'): + c.set_pledged_input_size(300) + + # None value + c = ZstdCompressor(level=1) + c.set_pledged_input_size(None) + dat = c.compress(DAT) + c.flush() + + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, None) + + # correct value + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)) + + chunks = [] + posi = 0 + while posi < len(DAT): + dat = c.compress(DAT[posi:posi+CHUNK_SIZE]) + posi += CHUNK_SIZE + chunks.append(dat) + + dat = c.flush() + chunks.append(dat) + chunks = b''.join(chunks) + + ret = get_frame_info(chunks) + self.assertEqual(ret.decompressed_size, len(DAT)) + self.assertEqual(decompress(chunks), DAT) + + c.set_pledged_input_size(len(DAT)) # the second frame + dat = c.compress(DAT) + c.flush() + + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, len(DAT)) + self.assertEqual(decompress(dat), DAT) + + # not enough data + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)+1) + + for start in range(0, len(DAT), CHUNK_SIZE): + end = min(start+CHUNK_SIZE, len(DAT)) + _dat = c.compress(DAT[start:end]) + + with self.assertRaises(ZstdError): + c.flush() + + # too much data + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)) + + for start in range(0, len(DAT), CHUNK_SIZE): + end = min(start+CHUNK_SIZE, len(DAT)) + _dat = c.compress(DAT[start:end]) + + with self.assertRaises(ZstdError): + c.compress(b'extra', ZstdCompressor.FLUSH_FRAME) + + # content size not set if content_size_flag == 0 + c = ZstdCompressor(options={CompressionParameter.content_size_flag: 0}) + c.set_pledged_input_size(10) + dat1 = c.compress(b"hello") + dat2 = c.compress(b"world") + dat3 = c.flush() + frame_data = get_frame_info(dat1 + dat2 + dat3) + self.assertIsNone(frame_data.decompressed_size) + + class DecompressorTestCase(unittest.TestCase): def test_simple_decompress_bad_args(self): @@ -384,12 +518,22 @@ class DecompressorTestCase(unittest.TestCase): self.assertRaises(TypeError, ZstdDecompressor, options=b'abc') with self.assertRaises(ValueError): - ZstdDecompressor(options={2**31 : 100}) + ZstdDecompressor(options={C_INT_MAX: 100}) + with self.assertRaises(ValueError): + ZstdDecompressor(options={C_INT_MIN: 100}) + with self.assertRaises(ValueError): + ZstdDecompressor(options={0: C_INT_MAX}) + with self.assertRaises(OverflowError): + ZstdDecompressor(options={2**1000: 100}) + with self.assertRaises(OverflowError): + ZstdDecompressor(options={-(2**1000): 100}) + with self.assertRaises(OverflowError): + ZstdDecompressor(options={0: -(2**1000)}) - with self.assertRaises(ZstdError): - ZstdDecompressor(options={DecompressionParameter.window_log_max:100}) - with self.assertRaises(ZstdError): - ZstdDecompressor(options={3333 : 100}) + with self.assertRaises(ValueError): + ZstdDecompressor(options={DecompressionParameter.window_log_max: 100}) + with self.assertRaises(ValueError): + ZstdDecompressor(options={3333: 100}) empty = compress(b'') lzd = ZstdDecompressor() @@ -402,26 +546,52 @@ class DecompressorTestCase(unittest.TestCase): d = {DecompressionParameter.window_log_max : 15} ZstdDecompressor(options=d) - # larger than signed int, ValueError d1 = d.copy() - d1[DecompressionParameter.window_log_max] = 2**31 - self.assertRaises(ValueError, ZstdDecompressor, None, d1) + # larger than signed int + d1[DecompressionParameter.window_log_max] = 2**1000 + with self.assertRaises(OverflowError): + ZstdDecompressor(None, d1) + # smaller than signed int + d1[DecompressionParameter.window_log_max] = -(2**1000) + with self.assertRaises(OverflowError): + ZstdDecompressor(None, d1) + + d1[DecompressionParameter.window_log_max] = C_INT_MAX + with self.assertRaises(ValueError): + ZstdDecompressor(None, d1) + d1[DecompressionParameter.window_log_max] = C_INT_MIN + with self.assertRaises(ValueError): + ZstdDecompressor(None, d1) # out of bounds error msg options = {DecompressionParameter.window_log_max:100} - with self.assertRaisesRegex(ZstdError, - (r'Error when setting zstd decompression parameter "window_log_max", ' - r'it should \d+ <= value <= \d+, provided value is 100\. ' - r'\((?:32|64)-bit build\)')): + with self.assertRaisesRegex( + ValueError, + "decompression parameter 'window_log_max' received an illegal value 100; " + r'the valid range is \[-?\d+, -?\d+\]', + ): + decompress(b'', options=options) + + # out of bounds deecompression parameter + options[DecompressionParameter.window_log_max] = C_INT_MAX + with self.assertRaises(ValueError): + decompress(b'', options=options) + options[DecompressionParameter.window_log_max] = C_INT_MIN + with self.assertRaises(ValueError): + decompress(b'', options=options) + options[DecompressionParameter.window_log_max] = 2**1000 + with self.assertRaises(OverflowError): + decompress(b'', options=options) + options[DecompressionParameter.window_log_max] = -(2**1000) + with self.assertRaises(OverflowError): decompress(b'', options=options) def test_unknown_decompression_parameter(self): KEY = 100001234 options = {DecompressionParameter.window_log_max: DecompressionParameter.window_log_max.bounds()[1], KEY: 200000000} - pattern = (r'Invalid zstd decompression parameter.*?' - fr'"unknown parameter \(key {KEY}\)"') - with self.assertRaisesRegex(ZstdError, pattern): + pattern = rf"invalid decompression parameter 'unknown parameter \(key {KEY}\)'" + with self.assertRaisesRegex(ValueError, pattern): ZstdDecompressor(options=options) def test_decompress_epilogue_flags(self): @@ -1077,27 +1247,41 @@ class ZstdDictTestCase(unittest.TestCase): ZstdDecompressor(zd) # wrong type - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): - ZstdCompressor(zstd_dict=(zd, b'123')) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=[zd, 1]) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, 1.0)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=(zd,)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, 1, 2)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, -1)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, 3)) - - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): - ZstdDecompressor(zstd_dict=(zd, b'123')) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaises(OverflowError): + ZstdCompressor(zstd_dict=(zd, 2**1000)) + with self.assertRaises(OverflowError): + ZstdCompressor(zstd_dict=(zd, -2**1000)) + + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor(zstd_dict=[zd, 1]) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor(zstd_dict=(zd, 1.0)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor((zd,)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, 1, 2)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, -1)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, 3)) + with self.assertRaises(OverflowError): + ZstdDecompressor((zd, 2**1000)) + with self.assertRaises(OverflowError): + ZstdDecompressor((zd, -2**1000)) def test_train_dict(self): - - TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1) ZstdDict(TRAINED_DICT.dict_content, is_raw=False) @@ -1179,17 +1363,36 @@ class ZstdDictTestCase(unittest.TestCase): with self.assertRaises(TypeError): _zstd.train_dict({}, (), 100) with self.assertRaises(TypeError): + _zstd.train_dict(bytearray(), (), 100) + with self.assertRaises(TypeError): _zstd.train_dict(b'', 99, 100) with self.assertRaises(TypeError): + _zstd.train_dict(b'', [], 100) + with self.assertRaises(TypeError): _zstd.train_dict(b'', (), 100.1) + with self.assertRaises(TypeError): + _zstd.train_dict(b'', (99.1,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'abc', (4, -1), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'abc', (2,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (99,), 100) # size > size_t with self.assertRaises(ValueError): - _zstd.train_dict(b'', (2**64+1,), 100) + _zstd.train_dict(b'', (2**1000,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (-2**1000,), 100) # dict_size <= 0 with self.assertRaises(ValueError): _zstd.train_dict(b'', (), 0) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (), -1) + + with self.assertRaises(ZstdError): + _zstd.train_dict(b'', (), 1) def test_finalize_dict_c(self): with self.assertRaises(TypeError): @@ -1199,21 +1402,50 @@ class ZstdDictTestCase(unittest.TestCase): with self.assertRaises(TypeError): _zstd.finalize_dict({}, b'', (), 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(bytearray(TRAINED_DICT.dict_content), b'', (), 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, bytearray(), (), 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', [], 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5) with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (4, -1), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (2,), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (99,), 100, 5) + # size > size_t with self.assertRaises(ValueError): - _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**1000,), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (-2**1000,), 100, 5) # dict_size <= 0 with self.assertRaises(ValueError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -1, 5) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 2**1000, 5) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -2**1000, 5) + + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 2**1000) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, -2**1000) + + with self.assertRaises(ZstdError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5) def test_train_buffer_protocol_samples(self): def _nbytes(dat): @@ -1424,11 +1656,11 @@ class FileTestCase(unittest.TestCase): ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rw") with self.assertRaisesRegex(TypeError, - r"NOT be a CompressionParameter"): + r"not be a CompressionParameter"): ZstdFile(io.BytesIO(), 'rb', options={CompressionParameter.compression_level:5}) with self.assertRaisesRegex(TypeError, - r"NOT be a DecompressionParameter"): + r"not be a DecompressionParameter"): ZstdFile(io.BytesIO(), 'wb', options={DecompressionParameter.window_log_max:21}) @@ -1439,19 +1671,19 @@ class FileTestCase(unittest.TestCase): with self.assertRaises(TypeError): ZstdFile(io.BytesIO(), "w", level='asd') # CHECK_UNKNOWN and anything above CHECK_ID_MAX should be invalid. - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdFile(io.BytesIO(), "w", options={999:9999}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdFile(io.BytesIO(), "w", options={CompressionParameter.window_log:99}) with self.assertRaises(TypeError): ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "r", options=33) - with self.assertRaises(ValueError): + with self.assertRaises(OverflowError): ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), options={DecompressionParameter.window_log_max:2**31}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), options={444:333}) @@ -1467,7 +1699,7 @@ class FileTestCase(unittest.TestCase): tmp_f.write(DAT_130K_C) filename = tmp_f.name - with self.assertRaises(ValueError): + with self.assertRaises(TypeError): ZstdFile(filename, options={'a':'b'}) # for PyPy |