diff options
Diffstat (limited to 'Lib/test/test_zstd.py')
-rw-r--r-- | Lib/test/test_zstd.py | 207 |
1 files changed, 189 insertions, 18 deletions
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index 014634e450e..d4c28aed38e 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -293,11 +293,11 @@ class CompressorTestCase(unittest.TestCase): # zstd lib doesn't support MT compression if not SUPPORT_MULTITHREADING: - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.nb_workers:4}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.job_size:4}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.overlap_log:4}) # out of bounds error msg @@ -395,6 +395,115 @@ class CompressorTestCase(unittest.TestCase): c = ZstdCompressor() self.assertNotEqual(c.compress(b'', c.FLUSH_FRAME), b'') + def test_set_pledged_input_size(self): + DAT = DECOMPRESSED_100_PLUS_32KB + CHUNK_SIZE = len(DAT) // 3 + + # wrong value + c = ZstdCompressor() + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(-300) + # overflow + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64) + # ZSTD_CONTENTSIZE_ERROR is invalid + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64-2) + # ZSTD_CONTENTSIZE_UNKNOWN should use None + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64-1) + + # check valid values are settable + c.set_pledged_input_size(2**63) + c.set_pledged_input_size(2**64-3) + + # check that zero means empty frame + c = ZstdCompressor(level=1) + c.set_pledged_input_size(0) + c.compress(b'') + dat = c.flush() + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, 0) + + + # wrong mode + c = ZstdCompressor(level=1) + c.compress(b'123456') + self.assertEqual(c.last_mode, c.CONTINUE) + with self.assertRaisesRegex(ValueError, + r'last_mode == FLUSH_FRAME'): + c.set_pledged_input_size(300) + + # None value + c = ZstdCompressor(level=1) + c.set_pledged_input_size(None) + dat = c.compress(DAT) + c.flush() + + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, None) + + # correct value + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)) + + chunks = [] + posi = 0 + while posi < len(DAT): + dat = c.compress(DAT[posi:posi+CHUNK_SIZE]) + posi += CHUNK_SIZE + chunks.append(dat) + + dat = c.flush() + chunks.append(dat) + chunks = b''.join(chunks) + + ret = get_frame_info(chunks) + self.assertEqual(ret.decompressed_size, len(DAT)) + self.assertEqual(decompress(chunks), DAT) + + c.set_pledged_input_size(len(DAT)) # the second frame + dat = c.compress(DAT) + c.flush() + + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, len(DAT)) + self.assertEqual(decompress(dat), DAT) + + # not enough data + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)+1) + + for start in range(0, len(DAT), CHUNK_SIZE): + end = min(start+CHUNK_SIZE, len(DAT)) + _dat = c.compress(DAT[start:end]) + + with self.assertRaises(ZstdError): + c.flush() + + # too much data + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)) + + for start in range(0, len(DAT), CHUNK_SIZE): + end = min(start+CHUNK_SIZE, len(DAT)) + _dat = c.compress(DAT[start:end]) + + with self.assertRaises(ZstdError): + c.compress(b'extra', ZstdCompressor.FLUSH_FRAME) + + # content size not set if content_size_flag == 0 + c = ZstdCompressor(options={CompressionParameter.content_size_flag: 0}) + c.set_pledged_input_size(10) + dat1 = c.compress(b"hello") + dat2 = c.compress(b"world") + dat3 = c.flush() + frame_data = get_frame_info(dat1 + dat2 + dat3) + self.assertIsNone(frame_data.decompressed_size) + + class DecompressorTestCase(unittest.TestCase): def test_simple_decompress_bad_args(self): @@ -1138,27 +1247,41 @@ class ZstdDictTestCase(unittest.TestCase): ZstdDecompressor(zd) # wrong type - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): - ZstdCompressor(zstd_dict=(zd, b'123')) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=[zd, 1]) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, 1.0)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=(zd,)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, 1, 2)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, -1)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, 3)) - - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): - ZstdDecompressor(zstd_dict=(zd, b'123')) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaises(OverflowError): + ZstdCompressor(zstd_dict=(zd, 2**1000)) + with self.assertRaises(OverflowError): + ZstdCompressor(zstd_dict=(zd, -2**1000)) + + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor(zstd_dict=[zd, 1]) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor(zstd_dict=(zd, 1.0)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor((zd,)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, 1, 2)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, -1)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, 3)) + with self.assertRaises(OverflowError): + ZstdDecompressor((zd, 2**1000)) + with self.assertRaises(OverflowError): + ZstdDecompressor((zd, -2**1000)) def test_train_dict(self): - - TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1) ZstdDict(TRAINED_DICT.dict_content, is_raw=False) @@ -1240,17 +1363,36 @@ class ZstdDictTestCase(unittest.TestCase): with self.assertRaises(TypeError): _zstd.train_dict({}, (), 100) with self.assertRaises(TypeError): + _zstd.train_dict(bytearray(), (), 100) + with self.assertRaises(TypeError): _zstd.train_dict(b'', 99, 100) with self.assertRaises(TypeError): + _zstd.train_dict(b'', [], 100) + with self.assertRaises(TypeError): _zstd.train_dict(b'', (), 100.1) + with self.assertRaises(TypeError): + _zstd.train_dict(b'', (99.1,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'abc', (4, -1), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'abc', (2,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (99,), 100) # size > size_t with self.assertRaises(ValueError): - _zstd.train_dict(b'', (2**64+1,), 100) + _zstd.train_dict(b'', (2**1000,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (-2**1000,), 100) # dict_size <= 0 with self.assertRaises(ValueError): _zstd.train_dict(b'', (), 0) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (), -1) + + with self.assertRaises(ZstdError): + _zstd.train_dict(b'', (), 1) def test_finalize_dict_c(self): with self.assertRaises(TypeError): @@ -1260,21 +1402,50 @@ class ZstdDictTestCase(unittest.TestCase): with self.assertRaises(TypeError): _zstd.finalize_dict({}, b'', (), 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(bytearray(TRAINED_DICT.dict_content), b'', (), 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, bytearray(), (), 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', [], 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5) with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (4, -1), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (2,), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (99,), 100, 5) + # size > size_t with self.assertRaises(ValueError): - _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**1000,), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (-2**1000,), 100, 5) # dict_size <= 0 with self.assertRaises(ValueError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -1, 5) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 2**1000, 5) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -2**1000, 5) + + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 2**1000) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, -2**1000) + + with self.assertRaises(ZstdError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5) def test_train_buffer_protocol_samples(self): def _nbytes(dat): |