diff options
Diffstat (limited to 'Lib/test/test_zstd.py')
-rw-r--r-- | Lib/test/test_zstd.py | 2507 |
1 files changed, 2507 insertions, 0 deletions
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py new file mode 100644 index 00000000000..713294c4c27 --- /dev/null +++ b/Lib/test/test_zstd.py @@ -0,0 +1,2507 @@ +import array +import gc +import io +import pathlib +import random +import re +import os +import unittest +import tempfile +import threading + +from test.support.import_helper import import_module +from test.support import threading_helper +from test.support import _1M +from test.support import Py_GIL_DISABLED + +_zstd = import_module("_zstd") +zstd = import_module("compression.zstd") + +from compression.zstd import ( + open, + compress, + decompress, + ZstdCompressor, + ZstdDecompressor, + ZstdDict, + ZstdError, + zstd_version, + zstd_version_info, + COMPRESSION_LEVEL_DEFAULT, + get_frame_info, + get_frame_size, + finalize_dict, + train_dict, + CompressionParameter, + DecompressionParameter, + Strategy, + ZstdFile, +) + +_1K = 1024 +_130_1K = 130 * _1K +DICT_SIZE1 = 3*_1K + +DAT_130K_D = None +DAT_130K_C = None + +DECOMPRESSED_DAT = None +COMPRESSED_DAT = None + +DECOMPRESSED_100_PLUS_32KB = None +COMPRESSED_100_PLUS_32KB = None + +SKIPPABLE_FRAME = None + +THIS_FILE_BYTES = None +THIS_FILE_STR = None +COMPRESSED_THIS_FILE = None + +COMPRESSED_BOGUS = None + +SAMPLES = None + +TRAINED_DICT = None + +SUPPORT_MULTITHREADING = False + +def setUpModule(): + global SUPPORT_MULTITHREADING + SUPPORT_MULTITHREADING = CompressionParameter.nb_workers.bounds() != (0, 0) + # uncompressed size 130KB, more than a zstd block. + # with a frame epilogue, 4 bytes checksum. + global DAT_130K_D + DAT_130K_D = bytes([random.randint(0, 127) for _ in range(130*_1K)]) + + global DAT_130K_C + DAT_130K_C = compress(DAT_130K_D, options={CompressionParameter.checksum_flag:1}) + + global DECOMPRESSED_DAT + DECOMPRESSED_DAT = b'abcdefg123456' * 1000 + + global COMPRESSED_DAT + COMPRESSED_DAT = compress(DECOMPRESSED_DAT) + + global DECOMPRESSED_100_PLUS_32KB + DECOMPRESSED_100_PLUS_32KB = b'a' * (100 + 32*_1K) + + global COMPRESSED_100_PLUS_32KB + COMPRESSED_100_PLUS_32KB = compress(DECOMPRESSED_100_PLUS_32KB) + + global SKIPPABLE_FRAME + SKIPPABLE_FRAME = (0x184D2A50).to_bytes(4, byteorder='little') + \ + (32*_1K).to_bytes(4, byteorder='little') + \ + b'a' * (32*_1K) + + global THIS_FILE_BYTES, THIS_FILE_STR + with io.open(os.path.abspath(__file__), 'rb') as f: + THIS_FILE_BYTES = f.read() + THIS_FILE_BYTES = re.sub(rb'\r?\n', rb'\n', THIS_FILE_BYTES) + THIS_FILE_STR = THIS_FILE_BYTES.decode('utf-8') + + global COMPRESSED_THIS_FILE + COMPRESSED_THIS_FILE = compress(THIS_FILE_BYTES) + + global COMPRESSED_BOGUS + COMPRESSED_BOGUS = DECOMPRESSED_DAT + + # dict data + words = [b'red', b'green', b'yellow', b'black', b'withe', b'blue', + b'lilac', b'purple', b'navy', b'glod', b'silver', b'olive', + b'dog', b'cat', b'tiger', b'lion', b'fish', b'bird'] + lst = [] + for i in range(300): + sample = [b'%s = %d' % (random.choice(words), random.randrange(100)) + for j in range(20)] + sample = b'\n'.join(sample) + + lst.append(sample) + global SAMPLES + SAMPLES = lst + assert len(SAMPLES) > 10 + + global TRAINED_DICT + TRAINED_DICT = train_dict(SAMPLES, 3*_1K) + assert len(TRAINED_DICT.dict_content) <= 3*_1K + + +class FunctionsTestCase(unittest.TestCase): + + def test_version(self): + s = ".".join((str(i) for i in zstd_version_info)) + self.assertEqual(s, zstd_version) + + def test_compressionLevel_values(self): + min, max = CompressionParameter.compression_level.bounds() + self.assertIs(type(COMPRESSION_LEVEL_DEFAULT), int) + self.assertIs(type(min), int) + self.assertIs(type(max), int) + self.assertLess(min, max) + + def test_roundtrip_default(self): + raw_dat = THIS_FILE_BYTES[: len(THIS_FILE_BYTES) // 6] + dat1 = compress(raw_dat) + dat2 = decompress(dat1) + self.assertEqual(dat2, raw_dat) + + def test_roundtrip_level(self): + raw_dat = THIS_FILE_BYTES[: len(THIS_FILE_BYTES) // 6] + level_min, level_max = CompressionParameter.compression_level.bounds() + + for level in range(max(-20, level_min), level_max + 1): + dat1 = compress(raw_dat, level) + dat2 = decompress(dat1) + self.assertEqual(dat2, raw_dat) + + def test_get_frame_info(self): + # no dict + info = get_frame_info(COMPRESSED_100_PLUS_32KB[:20]) + self.assertEqual(info.decompressed_size, 32 * _1K + 100) + self.assertEqual(info.dictionary_id, 0) + + # use dict + dat = compress(b"a" * 345, zstd_dict=TRAINED_DICT) + info = get_frame_info(dat) + self.assertEqual(info.decompressed_size, 345) + self.assertEqual(info.dictionary_id, TRAINED_DICT.dict_id) + + with self.assertRaisesRegex(ZstdError, "not less than the frame header"): + get_frame_info(b"aaaaaaaaaaaaaa") + + def test_get_frame_size(self): + size = get_frame_size(COMPRESSED_100_PLUS_32KB) + self.assertEqual(size, len(COMPRESSED_100_PLUS_32KB)) + + with self.assertRaisesRegex(ZstdError, "not less than this complete frame"): + get_frame_size(b"aaaaaaaaaaaaaa") + + def test_decompress_2x130_1K(self): + decompressed_size = get_frame_info(DAT_130K_C).decompressed_size + self.assertEqual(decompressed_size, _130_1K) + + dat = decompress(DAT_130K_C + DAT_130K_C) + self.assertEqual(len(dat), 2 * _130_1K) + + +class CompressorTestCase(unittest.TestCase): + + def test_simple_compress_bad_args(self): + # ZstdCompressor + self.assertRaises(TypeError, ZstdCompressor, []) + self.assertRaises(TypeError, ZstdCompressor, level=3.14) + self.assertRaises(TypeError, ZstdCompressor, level="abc") + self.assertRaises(TypeError, ZstdCompressor, options=b"abc") + + self.assertRaises(TypeError, ZstdCompressor, zstd_dict=123) + self.assertRaises(TypeError, ZstdCompressor, zstd_dict=b"abcd1234") + self.assertRaises(TypeError, ZstdCompressor, zstd_dict={1: 2, 3: 4}) + + with self.assertRaises(ValueError): + ZstdCompressor(2**31) + with self.assertRaises(ValueError): + ZstdCompressor(options={2**31: 100}) + + with self.assertRaises(ZstdError): + ZstdCompressor(options={CompressionParameter.window_log: 100}) + with self.assertRaises(ZstdError): + ZstdCompressor(options={3333: 100}) + + # Method bad arguments + zc = ZstdCompressor() + self.assertRaises(TypeError, zc.compress) + self.assertRaises((TypeError, ValueError), zc.compress, b"foo", b"bar") + self.assertRaises(TypeError, zc.compress, "str") + self.assertRaises((TypeError, ValueError), zc.flush, b"foo") + self.assertRaises(TypeError, zc.flush, b"blah", 1) + + self.assertRaises(ValueError, zc.compress, b'', -1) + self.assertRaises(ValueError, zc.compress, b'', 3) + self.assertRaises(ValueError, zc.flush, zc.CONTINUE) # 0 + self.assertRaises(ValueError, zc.flush, 3) + + zc.compress(b'') + zc.compress(b'', zc.CONTINUE) + zc.compress(b'', zc.FLUSH_BLOCK) + zc.compress(b'', zc.FLUSH_FRAME) + empty = zc.flush() + zc.flush(zc.FLUSH_BLOCK) + zc.flush(zc.FLUSH_FRAME) + + def test_compress_parameters(self): + d = {CompressionParameter.compression_level : 10, + + CompressionParameter.window_log : 12, + CompressionParameter.hash_log : 10, + CompressionParameter.chain_log : 12, + CompressionParameter.search_log : 12, + CompressionParameter.min_match : 4, + CompressionParameter.target_length : 12, + CompressionParameter.strategy : Strategy.lazy, + + CompressionParameter.enable_long_distance_matching : 1, + CompressionParameter.ldm_hash_log : 12, + CompressionParameter.ldm_min_match : 11, + CompressionParameter.ldm_bucket_size_log : 5, + CompressionParameter.ldm_hash_rate_log : 12, + + CompressionParameter.content_size_flag : 1, + CompressionParameter.checksum_flag : 1, + CompressionParameter.dict_id_flag : 0, + + CompressionParameter.nb_workers : 2 if SUPPORT_MULTITHREADING else 0, + CompressionParameter.job_size : 5*_1M if SUPPORT_MULTITHREADING else 0, + CompressionParameter.overlap_log : 9 if SUPPORT_MULTITHREADING else 0, + } + ZstdCompressor(options=d) + + # larger than signed int, ValueError + d1 = d.copy() + d1[CompressionParameter.ldm_bucket_size_log] = 2**31 + self.assertRaises(ValueError, ZstdCompressor, options=d1) + + # clamp compressionLevel + level_min, level_max = CompressionParameter.compression_level.bounds() + compress(b'', level_max+1) + compress(b'', level_min-1) + + compress(b'', options={CompressionParameter.compression_level:level_max+1}) + compress(b'', options={CompressionParameter.compression_level:level_min-1}) + + # zstd lib doesn't support MT compression + if not SUPPORT_MULTITHREADING: + with self.assertRaises(ZstdError): + ZstdCompressor(options={CompressionParameter.nb_workers:4}) + with self.assertRaises(ZstdError): + ZstdCompressor(options={CompressionParameter.job_size:4}) + with self.assertRaises(ZstdError): + ZstdCompressor(options={CompressionParameter.overlap_log:4}) + + # out of bounds error msg + option = {CompressionParameter.window_log:100} + with self.assertRaisesRegex(ZstdError, + (r'Error when setting zstd compression parameter "window_log", ' + r'it should \d+ <= value <= \d+, provided value is 100\. ' + r'\((?:32|64)-bit build\)')): + compress(b'', options=option) + + def test_unknown_compression_parameter(self): + KEY = 100001234 + option = {CompressionParameter.compression_level: 10, + KEY: 200000000} + pattern = r'Zstd compression parameter.*?"unknown parameter \(key %d\)"' \ + % KEY + with self.assertRaisesRegex(ZstdError, pattern): + ZstdCompressor(options=option) + + @unittest.skipIf(not SUPPORT_MULTITHREADING, + "zstd build doesn't support multi-threaded compression") + def test_zstd_multithread_compress(self): + size = 40*_1M + b = THIS_FILE_BYTES * (size // len(THIS_FILE_BYTES)) + + options = {CompressionParameter.compression_level : 4, + CompressionParameter.nb_workers : 2} + + # compress() + dat1 = compress(b, options=options) + dat2 = decompress(dat1) + self.assertEqual(dat2, b) + + # ZstdCompressor + c = ZstdCompressor(options=options) + dat1 = c.compress(b, c.CONTINUE) + dat2 = c.compress(b, c.FLUSH_BLOCK) + dat3 = c.compress(b, c.FLUSH_FRAME) + dat4 = decompress(dat1+dat2+dat3) + self.assertEqual(dat4, b * 3) + + # ZstdFile + with ZstdFile(io.BytesIO(), 'w', options=options) as f: + f.write(b) + + def test_compress_flushblock(self): + point = len(THIS_FILE_BYTES) // 2 + + c = ZstdCompressor() + self.assertEqual(c.last_mode, c.FLUSH_FRAME) + dat1 = c.compress(THIS_FILE_BYTES[:point]) + self.assertEqual(c.last_mode, c.CONTINUE) + dat1 += c.compress(THIS_FILE_BYTES[point:], c.FLUSH_BLOCK) + self.assertEqual(c.last_mode, c.FLUSH_BLOCK) + dat2 = c.flush() + pattern = "Compressed data ended before the end-of-stream marker" + with self.assertRaisesRegex(ZstdError, pattern): + decompress(dat1) + + dat3 = decompress(dat1 + dat2) + + self.assertEqual(dat3, THIS_FILE_BYTES) + + def test_compress_flushframe(self): + # test compress & decompress + point = len(THIS_FILE_BYTES) // 2 + + c = ZstdCompressor() + + dat1 = c.compress(THIS_FILE_BYTES[:point]) + self.assertEqual(c.last_mode, c.CONTINUE) + + dat1 += c.compress(THIS_FILE_BYTES[point:], c.FLUSH_FRAME) + self.assertEqual(c.last_mode, c.FLUSH_FRAME) + + nt = get_frame_info(dat1) + self.assertEqual(nt.decompressed_size, None) # no content size + + dat2 = decompress(dat1) + + self.assertEqual(dat2, THIS_FILE_BYTES) + + # single .FLUSH_FRAME mode has content size + c = ZstdCompressor() + dat = c.compress(THIS_FILE_BYTES, mode=c.FLUSH_FRAME) + self.assertEqual(c.last_mode, c.FLUSH_FRAME) + + nt = get_frame_info(dat) + self.assertEqual(nt.decompressed_size, len(THIS_FILE_BYTES)) + + def test_compress_empty(self): + # output empty content frame + self.assertNotEqual(compress(b''), b'') + + c = ZstdCompressor() + self.assertNotEqual(c.compress(b'', c.FLUSH_FRAME), b'') + +class DecompressorTestCase(unittest.TestCase): + + def test_simple_decompress_bad_args(self): + # ZstdDecompressor + self.assertRaises(TypeError, ZstdDecompressor, ()) + self.assertRaises(TypeError, ZstdDecompressor, zstd_dict=123) + self.assertRaises(TypeError, ZstdDecompressor, zstd_dict=b'abc') + self.assertRaises(TypeError, ZstdDecompressor, zstd_dict={1:2, 3:4}) + + self.assertRaises(TypeError, ZstdDecompressor, options=123) + self.assertRaises(TypeError, ZstdDecompressor, options='abc') + self.assertRaises(TypeError, ZstdDecompressor, options=b'abc') + + with self.assertRaises(ValueError): + ZstdDecompressor(options={2**31 : 100}) + + with self.assertRaises(ZstdError): + ZstdDecompressor(options={DecompressionParameter.window_log_max:100}) + with self.assertRaises(ZstdError): + ZstdDecompressor(options={3333 : 100}) + + empty = compress(b'') + lzd = ZstdDecompressor() + self.assertRaises(TypeError, lzd.decompress) + self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar") + self.assertRaises(TypeError, lzd.decompress, "str") + lzd.decompress(empty) + + def test_decompress_parameters(self): + d = {DecompressionParameter.window_log_max : 15} + ZstdDecompressor(options=d) + + # larger than signed int, ValueError + d1 = d.copy() + d1[DecompressionParameter.window_log_max] = 2**31 + self.assertRaises(ValueError, ZstdDecompressor, None, d1) + + # out of bounds error msg + options = {DecompressionParameter.window_log_max:100} + with self.assertRaisesRegex(ZstdError, + (r'Error when setting zstd decompression parameter "window_log_max", ' + r'it should \d+ <= value <= \d+, provided value is 100\. ' + r'\((?:32|64)-bit build\)')): + decompress(b'', options=options) + + def test_unknown_decompression_parameter(self): + KEY = 100001234 + options = {DecompressionParameter.window_log_max: DecompressionParameter.window_log_max.bounds()[1], + KEY: 200000000} + pattern = r'Zstd decompression parameter.*?"unknown parameter \(key %d\)"' \ + % KEY + with self.assertRaisesRegex(ZstdError, pattern): + ZstdDecompressor(options=options) + + def test_decompress_epilogue_flags(self): + # DAT_130K_C has a 4 bytes checksum at frame epilogue + + # full unlimited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C) + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.needs_input) + + with self.assertRaises(EOFError): + dat = d.decompress(b'') + + # full limited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C, _130_1K) + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.needs_input) + + with self.assertRaises(EOFError): + dat = d.decompress(b'', 0) + + # [:-4] unlimited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-4]) + self.assertEqual(len(dat), _130_1K) + self.assertTrue(d.needs_input) + + dat = d.decompress(b'') + self.assertEqual(len(dat), 0) + self.assertTrue(d.needs_input) + + # [:-4] limited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-4], _130_1K) + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.needs_input) + + dat = d.decompress(b'', 0) + self.assertEqual(len(dat), 0) + self.assertFalse(d.needs_input) + + # [:-3] unlimited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-3]) + self.assertEqual(len(dat), _130_1K) + self.assertTrue(d.needs_input) + + dat = d.decompress(b'') + self.assertEqual(len(dat), 0) + self.assertTrue(d.needs_input) + + # [:-3] limited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-3], _130_1K) + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.needs_input) + + dat = d.decompress(b'', 0) + self.assertEqual(len(dat), 0) + self.assertFalse(d.needs_input) + + # [:-1] unlimited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-1]) + self.assertEqual(len(dat), _130_1K) + self.assertTrue(d.needs_input) + + dat = d.decompress(b'') + self.assertEqual(len(dat), 0) + self.assertTrue(d.needs_input) + + # [:-1] limited + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-1], _130_1K) + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.needs_input) + + dat = d.decompress(b'', 0) + self.assertEqual(len(dat), 0) + self.assertFalse(d.needs_input) + + def test_decompressor_arg(self): + zd = ZstdDict(b'12345678', True) + + with self.assertRaises(TypeError): + d = ZstdDecompressor(zstd_dict={}) + + with self.assertRaises(TypeError): + d = ZstdDecompressor(options=zd) + + ZstdDecompressor() + ZstdDecompressor(zd, {}) + ZstdDecompressor(zstd_dict=zd, options={DecompressionParameter.window_log_max:25}) + + def test_decompressor_1(self): + # empty + d = ZstdDecompressor() + dat = d.decompress(b'') + + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + + # 130_1K full + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C) + + self.assertEqual(len(dat), _130_1K) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + + # 130_1K full, limit output + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C, _130_1K) + + self.assertEqual(len(dat), _130_1K) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + + # 130_1K, without 4 bytes checksum + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-4]) + + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + + # above, limit output + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C[:-4], _130_1K) + + self.assertEqual(len(dat), _130_1K) + self.assertFalse(d.eof) + self.assertFalse(d.needs_input) + + # full, unused_data + TRAIL = b'89234893abcd' + d = ZstdDecompressor() + dat = d.decompress(DAT_130K_C + TRAIL, _130_1K) + + self.assertEqual(len(dat), _130_1K) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, TRAIL) + + def test_decompressor_chunks_read_300(self): + TRAIL = b'89234893abcd' + DAT = DAT_130K_C + TRAIL + d = ZstdDecompressor() + + bi = io.BytesIO(DAT) + lst = [] + while True: + if d.needs_input: + dat = bi.read(300) + if not dat: + break + else: + raise Exception('should not get here') + + ret = d.decompress(dat) + lst.append(ret) + if d.eof: + break + + ret = b''.join(lst) + + self.assertEqual(len(ret), _130_1K) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data + bi.read(), TRAIL) + + def test_decompressor_chunks_read_3(self): + TRAIL = b'89234893' + DAT = DAT_130K_C + TRAIL + d = ZstdDecompressor() + + bi = io.BytesIO(DAT) + lst = [] + while True: + if d.needs_input: + dat = bi.read(3) + if not dat: + break + else: + dat = b'' + + ret = d.decompress(dat, 1) + lst.append(ret) + if d.eof: + break + + ret = b''.join(lst) + + self.assertEqual(len(ret), _130_1K) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data + bi.read(), TRAIL) + + + def test_decompress_empty(self): + with self.assertRaises(ZstdError): + decompress(b'') + + d = ZstdDecompressor() + self.assertEqual(d.decompress(b''), b'') + self.assertFalse(d.eof) + + def test_decompress_empty_content_frame(self): + DAT = compress(b'') + # decompress + self.assertGreaterEqual(len(DAT), 4) + self.assertEqual(decompress(DAT), b'') + + with self.assertRaises(ZstdError): + decompress(DAT[:-1]) + + # ZstdDecompressor + d = ZstdDecompressor() + dat = d.decompress(DAT) + self.assertEqual(dat, b'') + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + d = ZstdDecompressor() + dat = d.decompress(DAT[:-1]) + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + +class DecompressorFlagsTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + options = {CompressionParameter.checksum_flag:1} + c = ZstdCompressor(options=options) + + cls.DECOMPRESSED_42 = b'a'*42 + cls.FRAME_42 = c.compress(cls.DECOMPRESSED_42, c.FLUSH_FRAME) + + cls.DECOMPRESSED_60 = b'a'*60 + cls.FRAME_60 = c.compress(cls.DECOMPRESSED_60, c.FLUSH_FRAME) + + cls.FRAME_42_60 = cls.FRAME_42 + cls.FRAME_60 + cls.DECOMPRESSED_42_60 = cls.DECOMPRESSED_42 + cls.DECOMPRESSED_60 + + cls._130_1K = 130*_1K + + c = ZstdCompressor() + cls.UNKNOWN_FRAME_42 = c.compress(cls.DECOMPRESSED_42) + c.flush() + cls.UNKNOWN_FRAME_60 = c.compress(cls.DECOMPRESSED_60) + c.flush() + cls.UNKNOWN_FRAME_42_60 = cls.UNKNOWN_FRAME_42 + cls.UNKNOWN_FRAME_60 + + cls.TRAIL = b'12345678abcdefg!@#$%^&*()_+|' + + def test_function_decompress(self): + + self.assertEqual(len(decompress(COMPRESSED_100_PLUS_32KB)), 100+32*_1K) + + # 1 frame + self.assertEqual(decompress(self.FRAME_42), self.DECOMPRESSED_42) + + self.assertEqual(decompress(self.UNKNOWN_FRAME_42), self.DECOMPRESSED_42) + + pattern = r"Compressed data ended before the end-of-stream marker" + with self.assertRaisesRegex(ZstdError, pattern): + decompress(self.FRAME_42[:1]) + + with self.assertRaisesRegex(ZstdError, pattern): + decompress(self.FRAME_42[:-4]) + + with self.assertRaisesRegex(ZstdError, pattern): + decompress(self.FRAME_42[:-1]) + + # 2 frames + self.assertEqual(decompress(self.FRAME_42_60), self.DECOMPRESSED_42_60) + + self.assertEqual(decompress(self.UNKNOWN_FRAME_42_60), self.DECOMPRESSED_42_60) + + self.assertEqual(decompress(self.FRAME_42 + self.UNKNOWN_FRAME_60), + self.DECOMPRESSED_42_60) + + self.assertEqual(decompress(self.UNKNOWN_FRAME_42 + self.FRAME_60), + self.DECOMPRESSED_42_60) + + with self.assertRaisesRegex(ZstdError, pattern): + decompress(self.FRAME_42_60[:-4]) + + with self.assertRaisesRegex(ZstdError, pattern): + decompress(self.UNKNOWN_FRAME_42_60[:-1]) + + # 130_1K + self.assertEqual(decompress(DAT_130K_C), DAT_130K_D) + + with self.assertRaisesRegex(ZstdError, pattern): + decompress(DAT_130K_C[:-4]) + + with self.assertRaisesRegex(ZstdError, pattern): + decompress(DAT_130K_C[:-1]) + + # Unknown frame descriptor + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + decompress(b'aaaaaaaaa') + + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + decompress(self.FRAME_42 + b'aaaaaaaaa') + + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + decompress(self.UNKNOWN_FRAME_42_60 + b'aaaaaaaaa') + + # doesn't match checksum + checksum = DAT_130K_C[-4:] + if checksum[0] == 255: + wrong_checksum = bytes([254]) + checksum[1:] + else: + wrong_checksum = bytes([checksum[0]+1]) + checksum[1:] + + dat = DAT_130K_C[:-4] + wrong_checksum + + with self.assertRaisesRegex(ZstdError, "doesn't match checksum"): + decompress(dat) + + def test_function_skippable(self): + self.assertEqual(decompress(SKIPPABLE_FRAME), b'') + self.assertEqual(decompress(SKIPPABLE_FRAME + SKIPPABLE_FRAME), b'') + + # 1 frame + 2 skippable + self.assertEqual(len(decompress(SKIPPABLE_FRAME + SKIPPABLE_FRAME + DAT_130K_C)), + self._130_1K) + + self.assertEqual(len(decompress(DAT_130K_C + SKIPPABLE_FRAME + SKIPPABLE_FRAME)), + self._130_1K) + + self.assertEqual(len(decompress(SKIPPABLE_FRAME + DAT_130K_C + SKIPPABLE_FRAME)), + self._130_1K) + + # unknown size + self.assertEqual(decompress(SKIPPABLE_FRAME + self.UNKNOWN_FRAME_60), + self.DECOMPRESSED_60) + + self.assertEqual(decompress(self.UNKNOWN_FRAME_60 + SKIPPABLE_FRAME), + self.DECOMPRESSED_60) + + # 2 frames + 1 skippable + self.assertEqual(decompress(self.FRAME_42 + SKIPPABLE_FRAME + self.FRAME_60), + self.DECOMPRESSED_42_60) + + self.assertEqual(decompress(SKIPPABLE_FRAME + self.FRAME_42_60), + self.DECOMPRESSED_42_60) + + self.assertEqual(decompress(self.UNKNOWN_FRAME_42_60 + SKIPPABLE_FRAME), + self.DECOMPRESSED_42_60) + + # incomplete + with self.assertRaises(ZstdError): + decompress(SKIPPABLE_FRAME[:1]) + + with self.assertRaises(ZstdError): + decompress(SKIPPABLE_FRAME[:-1]) + + with self.assertRaises(ZstdError): + decompress(self.FRAME_42 + SKIPPABLE_FRAME[:-1]) + + # Unknown frame descriptor + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + decompress(b'aaaaaaaaa' + SKIPPABLE_FRAME) + + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + decompress(SKIPPABLE_FRAME + b'aaaaaaaaa') + + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + decompress(SKIPPABLE_FRAME + SKIPPABLE_FRAME + b'aaaaaaaaa') + + def test_decompressor_1(self): + # empty 1 + d = ZstdDecompressor() + + dat = d.decompress(b'') + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + dat = d.decompress(b'', 0) + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + dat = d.decompress(COMPRESSED_100_PLUS_32KB + b'a') + self.assertEqual(dat, DECOMPRESSED_100_PLUS_32KB) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'a') + self.assertEqual(d.unused_data, b'a') # twice + + # empty 2 + d = ZstdDecompressor() + + dat = d.decompress(b'', 0) + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + dat = d.decompress(b'') + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + dat = d.decompress(COMPRESSED_100_PLUS_32KB + b'a') + self.assertEqual(dat, DECOMPRESSED_100_PLUS_32KB) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'a') + self.assertEqual(d.unused_data, b'a') # twice + + # 1 frame + d = ZstdDecompressor() + dat = d.decompress(self.FRAME_42) + + self.assertEqual(dat, self.DECOMPRESSED_42) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + with self.assertRaises(EOFError): + d.decompress(b'') + + # 1 frame, trail + d = ZstdDecompressor() + dat = d.decompress(self.FRAME_42 + self.TRAIL) + + self.assertEqual(dat, self.DECOMPRESSED_42) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, self.TRAIL) + self.assertEqual(d.unused_data, self.TRAIL) # twice + + # 1 frame, 32_1K + temp = compress(b'a'*(32*_1K)) + d = ZstdDecompressor() + dat = d.decompress(temp, 32*_1K) + + self.assertEqual(dat, b'a'*(32*_1K)) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + with self.assertRaises(EOFError): + d.decompress(b'') + + # 1 frame, 32_1K+100, trail + d = ZstdDecompressor() + dat = d.decompress(COMPRESSED_100_PLUS_32KB+self.TRAIL, 100) # 100 bytes + + self.assertEqual(len(dat), 100) + self.assertFalse(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + + dat = d.decompress(b'') # 32_1K + + self.assertEqual(len(dat), 32*_1K) + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, self.TRAIL) + self.assertEqual(d.unused_data, self.TRAIL) # twice + + with self.assertRaises(EOFError): + d.decompress(b'') + + # incomplete 1 + d = ZstdDecompressor() + dat = d.decompress(self.FRAME_60[:1]) + + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + # incomplete 2 + d = ZstdDecompressor() + + dat = d.decompress(self.FRAME_60[:-4]) + self.assertEqual(dat, self.DECOMPRESSED_60) + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + # incomplete 3 + d = ZstdDecompressor() + + dat = d.decompress(self.FRAME_60[:-1]) + self.assertEqual(dat, self.DECOMPRESSED_60) + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + + # incomplete 4 + d = ZstdDecompressor() + + dat = d.decompress(self.FRAME_60[:-4], 60) + self.assertEqual(dat, self.DECOMPRESSED_60) + self.assertFalse(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + dat = d.decompress(b'') + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + # Unknown frame descriptor + d = ZstdDecompressor() + with self.assertRaisesRegex(ZstdError, "Unknown frame descriptor"): + d.decompress(b'aaaaaaaaa') + + def test_decompressor_skippable(self): + # 1 skippable + d = ZstdDecompressor() + dat = d.decompress(SKIPPABLE_FRAME) + + self.assertEqual(dat, b'') + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + # 1 skippable, max_length=0 + d = ZstdDecompressor() + dat = d.decompress(SKIPPABLE_FRAME, 0) + + self.assertEqual(dat, b'') + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + # 1 skippable, trail + d = ZstdDecompressor() + dat = d.decompress(SKIPPABLE_FRAME + self.TRAIL) + + self.assertEqual(dat, b'') + self.assertTrue(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, self.TRAIL) + self.assertEqual(d.unused_data, self.TRAIL) # twice + + # incomplete + d = ZstdDecompressor() + dat = d.decompress(SKIPPABLE_FRAME[:-1]) + + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + # incomplete + d = ZstdDecompressor() + dat = d.decompress(SKIPPABLE_FRAME[:-1], 0) + + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertFalse(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + dat = d.decompress(b'') + + self.assertEqual(dat, b'') + self.assertFalse(d.eof) + self.assertTrue(d.needs_input) + self.assertEqual(d.unused_data, b'') + self.assertEqual(d.unused_data, b'') # twice + + + +class ZstdDictTestCase(unittest.TestCase): + + def test_is_raw(self): + # content < 8 + b = b'1234567' + with self.assertRaises(ValueError): + ZstdDict(b) + + # content == 8 + b = b'12345678' + zd = ZstdDict(b, is_raw=True) + self.assertEqual(zd.dict_id, 0) + + temp = compress(b'aaa12345678', level=3, zstd_dict=zd) + self.assertEqual(b'aaa12345678', decompress(temp, zd)) + + # is_raw == False + b = b'12345678abcd' + with self.assertRaises(ValueError): + ZstdDict(b) + + # read only attributes + with self.assertRaises(AttributeError): + zd.dict_content = b + + with self.assertRaises(AttributeError): + zd.dict_id = 10000 + + # ZstdDict arguments + zd = ZstdDict(TRAINED_DICT.dict_content, is_raw=False) + self.assertNotEqual(zd.dict_id, 0) + + zd = ZstdDict(TRAINED_DICT.dict_content, is_raw=True) + self.assertNotEqual(zd.dict_id, 0) # note this assertion + + with self.assertRaises(TypeError): + ZstdDict("12345678abcdef", is_raw=True) + with self.assertRaises(TypeError): + ZstdDict(TRAINED_DICT) + + # invalid parameter + with self.assertRaises(TypeError): + ZstdDict(desk333=345) + + def test_invalid_dict(self): + DICT_MAGIC = 0xEC30A437.to_bytes(4, byteorder='little') + dict_content = DICT_MAGIC + b'abcdefghighlmnopqrstuvwxyz' + + # corrupted + zd = ZstdDict(dict_content, is_raw=False) + with self.assertRaisesRegex(ZstdError, r'ZSTD_CDict.*?corrupted'): + ZstdCompressor(zstd_dict=zd.as_digested_dict) + with self.assertRaisesRegex(ZstdError, r'ZSTD_DDict.*?corrupted'): + ZstdDecompressor(zd) + + # wrong type + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, b'123')) + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, 1, 2)) + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, -1)) + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, 3)) + + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdDecompressor(zstd_dict=(zd, b'123')) + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdDecompressor((zd, 1, 2)) + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdDecompressor((zd, -1)) + with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + ZstdDecompressor((zd, 3)) + + def test_train_dict(self): + + + TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1) + ZstdDict(TRAINED_DICT.dict_content, False) + + self.assertNotEqual(TRAINED_DICT.dict_id, 0) + self.assertGreater(len(TRAINED_DICT.dict_content), 0) + self.assertLessEqual(len(TRAINED_DICT.dict_content), DICT_SIZE1) + self.assertTrue(re.match(r'^<ZstdDict dict_id=\d+ dict_size=\d+>$', str(TRAINED_DICT))) + + # compress/decompress + c = ZstdCompressor(zstd_dict=TRAINED_DICT) + for sample in SAMPLES: + dat1 = compress(sample, zstd_dict=TRAINED_DICT) + dat2 = decompress(dat1, TRAINED_DICT) + self.assertEqual(sample, dat2) + + dat1 = c.compress(sample) + dat1 += c.flush() + dat2 = decompress(dat1, TRAINED_DICT) + self.assertEqual(sample, dat2) + + def test_finalize_dict(self): + DICT_SIZE2 = 200*_1K + C_LEVEL = 6 + + try: + dic2 = finalize_dict(TRAINED_DICT, SAMPLES, DICT_SIZE2, C_LEVEL) + except NotImplementedError: + # < v1.4.5 at compile-time, >= v.1.4.5 at run-time + return + + self.assertNotEqual(dic2.dict_id, 0) + self.assertGreater(len(dic2.dict_content), 0) + self.assertLessEqual(len(dic2.dict_content), DICT_SIZE2) + + # compress/decompress + c = ZstdCompressor(C_LEVEL, zstd_dict=dic2) + for sample in SAMPLES: + dat1 = compress(sample, C_LEVEL, zstd_dict=dic2) + dat2 = decompress(dat1, dic2) + self.assertEqual(sample, dat2) + + dat1 = c.compress(sample) + dat1 += c.flush() + dat2 = decompress(dat1, dic2) + self.assertEqual(sample, dat2) + + # dict mismatch + self.assertNotEqual(TRAINED_DICT.dict_id, dic2.dict_id) + + dat1 = compress(SAMPLES[0], zstd_dict=TRAINED_DICT) + with self.assertRaises(ZstdError): + decompress(dat1, dic2) + + def test_train_dict_arguments(self): + with self.assertRaises(ValueError): + train_dict([], 100*_1K) + + with self.assertRaises(ValueError): + train_dict(SAMPLES, -100) + + with self.assertRaises(ValueError): + train_dict(SAMPLES, 0) + + def test_finalize_dict_arguments(self): + with self.assertRaises(TypeError): + finalize_dict({1:2}, (b'aaa', b'bbb'), 100*_1K, 2) + + with self.assertRaises(ValueError): + finalize_dict(TRAINED_DICT, [], 100*_1K, 2) + + with self.assertRaises(ValueError): + finalize_dict(TRAINED_DICT, SAMPLES, -100, 2) + + with self.assertRaises(ValueError): + finalize_dict(TRAINED_DICT, SAMPLES, 0, 2) + + def test_train_dict_c(self): + # argument wrong type + with self.assertRaises(TypeError): + _zstd.train_dict({}, (), 100) + with self.assertRaises(TypeError): + _zstd.train_dict(b'', 99, 100) + with self.assertRaises(TypeError): + _zstd.train_dict(b'', (), 100.1) + + # size > size_t + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (2**64+1,), 100) + + # dict_size <= 0 + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (), 0) + + def test_finalize_dict_c(self): + with self.assertRaises(TypeError): + _zstd.finalize_dict(1, 2, 3, 4, 5) + + # argument wrong type + with self.assertRaises(TypeError): + _zstd.finalize_dict({}, b'', (), 100, 5) + with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5) + with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5) + with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5) + with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1) + + # size > size_t + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5) + + # dict_size <= 0 + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5) + + def test_train_buffer_protocol_samples(self): + def _nbytes(dat): + if isinstance(dat, (bytes, bytearray)): + return len(dat) + return memoryview(dat).nbytes + + # prepare samples + chunk_lst = [] + wrong_size_lst = [] + correct_size_lst = [] + for _ in range(300): + arr = array.array('Q', [random.randint(0, 20) for i in range(20)]) + chunk_lst.append(arr) + correct_size_lst.append(_nbytes(arr)) + wrong_size_lst.append(len(arr)) + concatenation = b''.join(chunk_lst) + + # wrong size list + with self.assertRaisesRegex(ValueError, + "The samples size tuple doesn't match the concatenation's size"): + _zstd.train_dict(concatenation, tuple(wrong_size_lst), 100*_1K) + + # correct size list + _zstd.train_dict(concatenation, tuple(correct_size_lst), 3*_1K) + + # wrong size list + with self.assertRaisesRegex(ValueError, + "The samples size tuple doesn't match the concatenation's size"): + _zstd.finalize_dict(TRAINED_DICT.dict_content, + concatenation, tuple(wrong_size_lst), 300*_1K, 5) + + # correct size list + _zstd.finalize_dict(TRAINED_DICT.dict_content, + concatenation, tuple(correct_size_lst), 300*_1K, 5) + + def test_as_prefix(self): + # V1 + V1 = THIS_FILE_BYTES + zd = ZstdDict(V1, True) + + # V2 + mid = len(V1) // 2 + V2 = V1[:mid] + \ + (b'a' if V1[mid] != int.from_bytes(b'a') else b'b') + \ + V1[mid+1:] + + # compress + dat = compress(V2, zstd_dict=zd.as_prefix) + self.assertEqual(get_frame_info(dat).dictionary_id, 0) + + # decompress + self.assertEqual(decompress(dat, zd.as_prefix), V2) + + # use wrong prefix + zd2 = ZstdDict(SAMPLES[0], True) + try: + decompressed = decompress(dat, zd2.as_prefix) + except ZstdError: # expected + pass + else: + self.assertNotEqual(decompressed, V2) + + # read only attribute + with self.assertRaises(AttributeError): + zd.as_prefix = b'1234' + + def test_as_digested_dict(self): + zd = TRAINED_DICT + + # test .as_digested_dict + dat = compress(SAMPLES[0], zstd_dict=zd.as_digested_dict) + self.assertEqual(decompress(dat, zd.as_digested_dict), SAMPLES[0]) + with self.assertRaises(AttributeError): + zd.as_digested_dict = b'1234' + + # test .as_undigested_dict + dat = compress(SAMPLES[0], zstd_dict=zd.as_undigested_dict) + self.assertEqual(decompress(dat, zd.as_undigested_dict), SAMPLES[0]) + with self.assertRaises(AttributeError): + zd.as_undigested_dict = b'1234' + + def test_advanced_compression_parameters(self): + options = {CompressionParameter.compression_level: 6, + CompressionParameter.window_log: 20, + CompressionParameter.enable_long_distance_matching: 1} + + # automatically select + dat = compress(SAMPLES[0], options=options, zstd_dict=TRAINED_DICT) + self.assertEqual(decompress(dat, TRAINED_DICT), SAMPLES[0]) + + # explicitly select + dat = compress(SAMPLES[0], options=options, zstd_dict=TRAINED_DICT.as_digested_dict) + self.assertEqual(decompress(dat, TRAINED_DICT), SAMPLES[0]) + + def test_len(self): + self.assertEqual(len(TRAINED_DICT), len(TRAINED_DICT.dict_content)) + self.assertIn(str(len(TRAINED_DICT)), str(TRAINED_DICT)) + +class FileTestCase(unittest.TestCase): + def setUp(self): + self.DECOMPRESSED_42 = b'a'*42 + self.FRAME_42 = compress(self.DECOMPRESSED_42) + + def test_init(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + pass + with ZstdFile(io.BytesIO(), "w") as f: + pass + with ZstdFile(io.BytesIO(), "x") as f: + pass + with ZstdFile(io.BytesIO(), "a") as f: + pass + + with ZstdFile(io.BytesIO(), "w", level=12) as f: + pass + with ZstdFile(io.BytesIO(), "w", options={CompressionParameter.checksum_flag:1}) as f: + pass + with ZstdFile(io.BytesIO(), "w", options={}) as f: + pass + with ZstdFile(io.BytesIO(), "w", level=20, zstd_dict=TRAINED_DICT) as f: + pass + + with ZstdFile(io.BytesIO(), "r", options={DecompressionParameter.window_log_max:25}) as f: + pass + with ZstdFile(io.BytesIO(), "r", options={}, zstd_dict=TRAINED_DICT) as f: + pass + + def test_init_with_PathLike_filename(self): + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + filename = pathlib.Path(tmp_f.name) + + with ZstdFile(filename, "a") as f: + f.write(DECOMPRESSED_100_PLUS_32KB) + with ZstdFile(filename) as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB) + + with ZstdFile(filename, "a") as f: + f.write(DECOMPRESSED_100_PLUS_32KB) + with ZstdFile(filename) as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB * 2) + + os.remove(filename) + + def test_init_with_filename(self): + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + filename = pathlib.Path(tmp_f.name) + + with ZstdFile(filename) as f: + pass + with ZstdFile(filename, "w") as f: + pass + with ZstdFile(filename, "a") as f: + pass + + os.remove(filename) + + def test_init_mode(self): + bi = io.BytesIO() + + with ZstdFile(bi, "r"): + pass + with ZstdFile(bi, "rb"): + pass + with ZstdFile(bi, "w"): + pass + with ZstdFile(bi, "wb"): + pass + with ZstdFile(bi, "a"): + pass + with ZstdFile(bi, "ab"): + pass + + def test_init_with_x_mode(self): + with tempfile.NamedTemporaryFile() as tmp_f: + filename = pathlib.Path(tmp_f.name) + + for mode in ("x", "xb"): + with ZstdFile(filename, mode): + pass + with self.assertRaises(FileExistsError): + with ZstdFile(filename, mode): + pass + os.remove(filename) + + def test_init_bad_mode(self): + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), (3, "x")) + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "xt") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "x+") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rx") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "wx") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rt") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "r+") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "wt") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "w+") + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rw") + + with self.assertRaisesRegex(TypeError, r"NOT be CompressionParameter"): + ZstdFile(io.BytesIO(), 'rb', + options={CompressionParameter.compression_level:5}) + with self.assertRaisesRegex(TypeError, + r"NOT be DecompressionParameter"): + ZstdFile(io.BytesIO(), 'wb', + options={DecompressionParameter.window_log_max:21}) + + with self.assertRaises(TypeError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "r", level=12) + + def test_init_bad_check(self): + with self.assertRaises(TypeError): + ZstdFile(io.BytesIO(), "w", level='asd') + # CHECK_UNKNOWN and anything above CHECK_ID_MAX should be invalid. + with self.assertRaises(ZstdError): + ZstdFile(io.BytesIO(), "w", options={999:9999}) + with self.assertRaises(ZstdError): + ZstdFile(io.BytesIO(), "w", options={CompressionParameter.window_log:99}) + + with self.assertRaises(TypeError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "r", options=33) + + with self.assertRaises(ValueError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), + options={DecompressionParameter.window_log_max:2**31}) + + with self.assertRaises(ZstdError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), + options={444:333}) + + with self.assertRaises(TypeError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), zstd_dict={1:2}) + + with self.assertRaises(TypeError): + ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), zstd_dict=b'dict123456') + + def test_init_close_fp(self): + # get a temp file name + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + tmp_f.write(DAT_130K_C) + filename = tmp_f.name + + with self.assertRaises(ValueError): + ZstdFile(filename, options={'a':'b'}) + + # for PyPy + gc.collect() + + os.remove(filename) + + def test_close(self): + with io.BytesIO(COMPRESSED_100_PLUS_32KB) as src: + f = ZstdFile(src) + f.close() + # ZstdFile.close() should not close the underlying file object. + self.assertFalse(src.closed) + # Try closing an already-closed ZstdFile. + f.close() + self.assertFalse(src.closed) + + # Test with a real file on disk, opened directly by ZstdFile. + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + filename = pathlib.Path(tmp_f.name) + + f = ZstdFile(filename) + fp = f._fp + f.close() + # Here, ZstdFile.close() *should* close the underlying file object. + self.assertTrue(fp.closed) + # Try closing an already-closed ZstdFile. + f.close() + + os.remove(filename) + + def test_closed(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + try: + self.assertFalse(f.closed) + f.read() + self.assertFalse(f.closed) + finally: + f.close() + self.assertTrue(f.closed) + + f = ZstdFile(io.BytesIO(), "w") + try: + self.assertFalse(f.closed) + finally: + f.close() + self.assertTrue(f.closed) + + def test_fileno(self): + # 1 + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + try: + self.assertRaises(io.UnsupportedOperation, f.fileno) + finally: + f.close() + self.assertRaises(ValueError, f.fileno) + + # 2 + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + filename = pathlib.Path(tmp_f.name) + + f = ZstdFile(filename) + try: + self.assertEqual(f.fileno(), f._fp.fileno()) + self.assertIsInstance(f.fileno(), int) + finally: + f.close() + self.assertRaises(ValueError, f.fileno) + + os.remove(filename) + + # 3, no .fileno() method + class C: + def read(self, size=-1): + return b'123' + with ZstdFile(C(), 'rb') as f: + with self.assertRaisesRegex(AttributeError, r'fileno'): + f.fileno() + + def test_name(self): + # 1 + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + try: + with self.assertRaises(AttributeError): + f.name + finally: + f.close() + with self.assertRaises(ValueError): + f.name + + # 2 + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + filename = pathlib.Path(tmp_f.name) + + f = ZstdFile(filename) + try: + self.assertEqual(f.name, f._fp.name) + self.assertIsInstance(f.name, str) + finally: + f.close() + with self.assertRaises(ValueError): + f.name + + os.remove(filename) + + # 3, no .filename property + class C: + def read(self, size=-1): + return b'123' + with ZstdFile(C(), 'rb') as f: + with self.assertRaisesRegex(AttributeError, r'name'): + f.name + + def test_seekable(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + try: + self.assertTrue(f.seekable()) + f.read() + self.assertTrue(f.seekable()) + finally: + f.close() + self.assertRaises(ValueError, f.seekable) + + f = ZstdFile(io.BytesIO(), "w") + try: + self.assertFalse(f.seekable()) + finally: + f.close() + self.assertRaises(ValueError, f.seekable) + + src = io.BytesIO(COMPRESSED_100_PLUS_32KB) + src.seekable = lambda: False + f = ZstdFile(src) + try: + self.assertFalse(f.seekable()) + finally: + f.close() + self.assertRaises(ValueError, f.seekable) + + def test_readable(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + try: + self.assertTrue(f.readable()) + f.read() + self.assertTrue(f.readable()) + finally: + f.close() + self.assertRaises(ValueError, f.readable) + + f = ZstdFile(io.BytesIO(), "w") + try: + self.assertFalse(f.readable()) + finally: + f.close() + self.assertRaises(ValueError, f.readable) + + def test_writable(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + try: + self.assertFalse(f.writable()) + f.read() + self.assertFalse(f.writable()) + finally: + f.close() + self.assertRaises(ValueError, f.writable) + + f = ZstdFile(io.BytesIO(), "w") + try: + self.assertTrue(f.writable()) + finally: + f.close() + self.assertRaises(ValueError, f.writable) + + def test_read_0(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + self.assertEqual(f.read(0), b"") + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB) + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), + options={DecompressionParameter.window_log_max:20}) as f: + self.assertEqual(f.read(0), b"") + + # empty file + with ZstdFile(io.BytesIO(b'')) as f: + self.assertEqual(f.read(0), b"") + with self.assertRaises(EOFError): + f.read(10) + + with ZstdFile(io.BytesIO(b'')) as f: + with self.assertRaises(EOFError): + f.read(10) + + def test_read_10(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + chunks = [] + while True: + result = f.read(10) + if not result: + break + self.assertLessEqual(len(result), 10) + chunks.append(result) + self.assertEqual(b"".join(chunks), DECOMPRESSED_100_PLUS_32KB) + + def test_read_multistream(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB * 5)) as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB * 5) + + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB + SKIPPABLE_FRAME)) as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB) + + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB + COMPRESSED_DAT)) as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB + DECOMPRESSED_DAT) + + def test_read_incomplete(self): + with ZstdFile(io.BytesIO(DAT_130K_C[:-200])) as f: + self.assertRaises(EOFError, f.read) + + # Trailing data isn't a valid compressed stream + with ZstdFile(io.BytesIO(self.FRAME_42 + b'12345')) as f: + self.assertRaises(ZstdError, f.read) + + with ZstdFile(io.BytesIO(SKIPPABLE_FRAME + b'12345')) as f: + self.assertRaises(ZstdError, f.read) + + def test_read_truncated(self): + # Drop stream epilogue: 4 bytes checksum + truncated = DAT_130K_C[:-4] + with ZstdFile(io.BytesIO(truncated)) as f: + self.assertRaises(EOFError, f.read) + + with ZstdFile(io.BytesIO(truncated)) as f: + # this is an important test, make sure it doesn't raise EOFError. + self.assertEqual(f.read(130*_1K), DAT_130K_D) + with self.assertRaises(EOFError): + f.read(1) + + # Incomplete header + for i in range(1, 20): + with ZstdFile(io.BytesIO(truncated[:i])) as f: + self.assertRaises(EOFError, f.read, 1) + + def test_read_bad_args(self): + f = ZstdFile(io.BytesIO(COMPRESSED_DAT)) + f.close() + self.assertRaises(ValueError, f.read) + with ZstdFile(io.BytesIO(), "w") as f: + self.assertRaises(ValueError, f.read) + with ZstdFile(io.BytesIO(COMPRESSED_DAT)) as f: + self.assertRaises(TypeError, f.read, float()) + + def test_read_bad_data(self): + with ZstdFile(io.BytesIO(COMPRESSED_BOGUS)) as f: + self.assertRaises(ZstdError, f.read) + + def test_read_exception(self): + class C: + def read(self, size=-1): + raise OSError + with ZstdFile(C()) as f: + with self.assertRaises(OSError): + f.read(10) + + def test_read1(self): + with ZstdFile(io.BytesIO(DAT_130K_C)) as f: + blocks = [] + while True: + result = f.read1() + if not result: + break + blocks.append(result) + self.assertEqual(b"".join(blocks), DAT_130K_D) + self.assertEqual(f.read1(), b"") + + def test_read1_0(self): + with ZstdFile(io.BytesIO(COMPRESSED_DAT)) as f: + self.assertEqual(f.read1(0), b"") + + def test_read1_10(self): + with ZstdFile(io.BytesIO(COMPRESSED_DAT)) as f: + blocks = [] + while True: + result = f.read1(10) + if not result: + break + blocks.append(result) + self.assertEqual(b"".join(blocks), DECOMPRESSED_DAT) + self.assertEqual(f.read1(), b"") + + def test_read1_multistream(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB * 5)) as f: + blocks = [] + while True: + result = f.read1() + if not result: + break + blocks.append(result) + self.assertEqual(b"".join(blocks), DECOMPRESSED_100_PLUS_32KB * 5) + self.assertEqual(f.read1(), b"") + + def test_read1_bad_args(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + f.close() + self.assertRaises(ValueError, f.read1) + with ZstdFile(io.BytesIO(), "w") as f: + self.assertRaises(ValueError, f.read1) + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + self.assertRaises(TypeError, f.read1, None) + + def test_readinto(self): + arr = array.array("I", range(100)) + self.assertEqual(len(arr), 100) + self.assertEqual(len(arr) * arr.itemsize, 400) + ba = bytearray(300) + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + # 0 length output buffer + self.assertEqual(f.readinto(ba[0:0]), 0) + + # use correct length for buffer protocol object + self.assertEqual(f.readinto(arr), 400) + self.assertEqual(arr.tobytes(), DECOMPRESSED_100_PLUS_32KB[:400]) + + # normal readinto + self.assertEqual(f.readinto(ba), 300) + self.assertEqual(ba, DECOMPRESSED_100_PLUS_32KB[400:700]) + + def test_peek(self): + with ZstdFile(io.BytesIO(DAT_130K_C)) as f: + result = f.peek() + self.assertGreater(len(result), 0) + self.assertTrue(DAT_130K_D.startswith(result)) + self.assertEqual(f.read(), DAT_130K_D) + with ZstdFile(io.BytesIO(DAT_130K_C)) as f: + result = f.peek(10) + self.assertGreater(len(result), 0) + self.assertTrue(DAT_130K_D.startswith(result)) + self.assertEqual(f.read(), DAT_130K_D) + + def test_peek_bad_args(self): + with ZstdFile(io.BytesIO(), "w") as f: + self.assertRaises(ValueError, f.peek) + + def test_iterator(self): + with io.BytesIO(THIS_FILE_BYTES) as f: + lines = f.readlines() + compressed = compress(THIS_FILE_BYTES) + + # iter + with ZstdFile(io.BytesIO(compressed)) as f: + self.assertListEqual(list(iter(f)), lines) + + # readline + with ZstdFile(io.BytesIO(compressed)) as f: + for line in lines: + self.assertEqual(f.readline(), line) + self.assertEqual(f.readline(), b'') + self.assertEqual(f.readline(), b'') + + # readlines + with ZstdFile(io.BytesIO(compressed)) as f: + self.assertListEqual(f.readlines(), lines) + + def test_decompress_limited(self): + _ZSTD_DStreamInSize = 128*_1K + 3 + + bomb = compress(b'\0' * int(2e6), level=10) + self.assertLess(len(bomb), _ZSTD_DStreamInSize) + + decomp = ZstdFile(io.BytesIO(bomb)) + self.assertEqual(decomp.read(1), b'\0') + + # BufferedReader uses 128 KiB buffer in __init__.py + max_decomp = 128*_1K + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + + def test_write(self): + raw_data = THIS_FILE_BYTES[: len(THIS_FILE_BYTES) // 6] + with io.BytesIO() as dst: + with ZstdFile(dst, "w") as f: + f.write(raw_data) + + comp = ZstdCompressor() + expected = comp.compress(raw_data) + comp.flush() + self.assertEqual(dst.getvalue(), expected) + + with io.BytesIO() as dst: + with ZstdFile(dst, "w", level=12) as f: + f.write(raw_data) + + comp = ZstdCompressor(12) + expected = comp.compress(raw_data) + comp.flush() + self.assertEqual(dst.getvalue(), expected) + + with io.BytesIO() as dst: + with ZstdFile(dst, "w", options={CompressionParameter.checksum_flag:1}) as f: + f.write(raw_data) + + comp = ZstdCompressor(options={CompressionParameter.checksum_flag:1}) + expected = comp.compress(raw_data) + comp.flush() + self.assertEqual(dst.getvalue(), expected) + + with io.BytesIO() as dst: + options = {CompressionParameter.compression_level:-5, + CompressionParameter.checksum_flag:1} + with ZstdFile(dst, "w", + options=options) as f: + f.write(raw_data) + + comp = ZstdCompressor(options=options) + expected = comp.compress(raw_data) + comp.flush() + self.assertEqual(dst.getvalue(), expected) + + def test_write_empty_frame(self): + # .FLUSH_FRAME generates an empty content frame + c = ZstdCompressor() + self.assertNotEqual(c.flush(c.FLUSH_FRAME), b'') + self.assertNotEqual(c.flush(c.FLUSH_FRAME), b'') + + # don't generate empty content frame + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + pass + self.assertEqual(bo.getvalue(), b'') + + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + f.flush(f.FLUSH_FRAME) + self.assertEqual(bo.getvalue(), b'') + + # if .write(b''), generate empty content frame + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + f.write(b'') + self.assertNotEqual(bo.getvalue(), b'') + + # has an empty content frame + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + f.flush(f.FLUSH_BLOCK) + self.assertNotEqual(bo.getvalue(), b'') + + def test_write_empty_block(self): + # If no internal data, .FLUSH_BLOCK return b''. + c = ZstdCompressor() + self.assertEqual(c.flush(c.FLUSH_BLOCK), b'') + self.assertNotEqual(c.compress(b'123', c.FLUSH_BLOCK), + b'') + self.assertEqual(c.flush(c.FLUSH_BLOCK), b'') + self.assertEqual(c.compress(b''), b'') + self.assertEqual(c.compress(b''), b'') + self.assertEqual(c.flush(c.FLUSH_BLOCK), b'') + + # mode = .last_mode + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + f.write(b'123') + f.flush(f.FLUSH_BLOCK) + fp_pos = f._fp.tell() + self.assertNotEqual(fp_pos, 0) + f.flush(f.FLUSH_BLOCK) + self.assertEqual(f._fp.tell(), fp_pos) + + # mode != .last_mode + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + f.flush(f.FLUSH_BLOCK) + self.assertEqual(f._fp.tell(), 0) + f.write(b'') + f.flush(f.FLUSH_BLOCK) + self.assertEqual(f._fp.tell(), 0) + + def test_write_101(self): + with io.BytesIO() as dst: + with ZstdFile(dst, "w") as f: + for start in range(0, len(THIS_FILE_BYTES), 101): + f.write(THIS_FILE_BYTES[start:start+101]) + + comp = ZstdCompressor() + expected = comp.compress(THIS_FILE_BYTES) + comp.flush() + self.assertEqual(dst.getvalue(), expected) + + def test_write_append(self): + def comp(data): + comp = ZstdCompressor() + return comp.compress(data) + comp.flush() + + part1 = THIS_FILE_BYTES[:_1K] + part2 = THIS_FILE_BYTES[_1K:1536] + part3 = THIS_FILE_BYTES[1536:] + expected = b"".join(comp(x) for x in (part1, part2, part3)) + with io.BytesIO() as dst: + with ZstdFile(dst, "w") as f: + f.write(part1) + with ZstdFile(dst, "a") as f: + f.write(part2) + with ZstdFile(dst, "a") as f: + f.write(part3) + self.assertEqual(dst.getvalue(), expected) + + def test_write_bad_args(self): + f = ZstdFile(io.BytesIO(), "w") + f.close() + self.assertRaises(ValueError, f.write, b"foo") + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "r") as f: + self.assertRaises(ValueError, f.write, b"bar") + with ZstdFile(io.BytesIO(), "w") as f: + self.assertRaises(TypeError, f.write, None) + self.assertRaises(TypeError, f.write, "text") + self.assertRaises(TypeError, f.write, 789) + + def test_writelines(self): + def comp(data): + comp = ZstdCompressor() + return comp.compress(data) + comp.flush() + + with io.BytesIO(THIS_FILE_BYTES) as f: + lines = f.readlines() + with io.BytesIO() as dst: + with ZstdFile(dst, "w") as f: + f.writelines(lines) + expected = comp(THIS_FILE_BYTES) + self.assertEqual(dst.getvalue(), expected) + + def test_seek_forward(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.seek(555) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB[555:]) + + def test_seek_forward_across_streams(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB * 2)) as f: + f.seek(len(DECOMPRESSED_100_PLUS_32KB) + 123) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB[123:]) + + def test_seek_forward_relative_to_current(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.read(100) + f.seek(1236, 1) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB[1336:]) + + def test_seek_forward_relative_to_end(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.seek(-555, 2) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB[-555:]) + + def test_seek_backward(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.read(1001) + f.seek(211) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB[211:]) + + def test_seek_backward_across_streams(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB * 2)) as f: + f.read(len(DECOMPRESSED_100_PLUS_32KB) + 333) + f.seek(737) + self.assertEqual(f.read(), + DECOMPRESSED_100_PLUS_32KB[737:] + DECOMPRESSED_100_PLUS_32KB) + + def test_seek_backward_relative_to_end(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.seek(-150, 2) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB[-150:]) + + def test_seek_past_end(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.seek(len(DECOMPRESSED_100_PLUS_32KB) + 9001) + self.assertEqual(f.tell(), len(DECOMPRESSED_100_PLUS_32KB)) + self.assertEqual(f.read(), b"") + + def test_seek_past_start(self): + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + f.seek(-88) + self.assertEqual(f.tell(), 0) + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB) + + def test_seek_bad_args(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + f.close() + self.assertRaises(ValueError, f.seek, 0) + with ZstdFile(io.BytesIO(), "w") as f: + self.assertRaises(ValueError, f.seek, 0) + with ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) as f: + self.assertRaises(ValueError, f.seek, 0, 3) + # io.BufferedReader raises TypeError instead of ValueError + self.assertRaises((TypeError, ValueError), f.seek, 9, ()) + self.assertRaises(TypeError, f.seek, None) + self.assertRaises(TypeError, f.seek, b"derp") + + def test_seek_not_seekable(self): + class C(io.BytesIO): + def seekable(self): + return False + obj = C(COMPRESSED_100_PLUS_32KB) + with ZstdFile(obj, 'r') as f: + d = f.read(1) + self.assertFalse(f.seekable()) + with self.assertRaisesRegex(io.UnsupportedOperation, + 'File or stream is not seekable'): + f.seek(0) + d += f.read() + self.assertEqual(d, DECOMPRESSED_100_PLUS_32KB) + + def test_tell(self): + with ZstdFile(io.BytesIO(DAT_130K_C)) as f: + pos = 0 + while True: + self.assertEqual(f.tell(), pos) + result = f.read(random.randint(171, 189)) + if not result: + break + pos += len(result) + self.assertEqual(f.tell(), len(DAT_130K_D)) + with ZstdFile(io.BytesIO(), "w") as f: + for pos in range(0, len(DAT_130K_D), 143): + self.assertEqual(f.tell(), pos) + f.write(DAT_130K_D[pos:pos+143]) + self.assertEqual(f.tell(), len(DAT_130K_D)) + + def test_tell_bad_args(self): + f = ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB)) + f.close() + self.assertRaises(ValueError, f.tell) + + def test_file_dict(self): + # default + bi = io.BytesIO() + with ZstdFile(bi, 'w', zstd_dict=TRAINED_DICT) as f: + f.write(SAMPLES[0]) + bi.seek(0) + with ZstdFile(bi, zstd_dict=TRAINED_DICT) as f: + dat = f.read() + self.assertEqual(dat, SAMPLES[0]) + + # .as_(un)digested_dict + bi = io.BytesIO() + with ZstdFile(bi, 'w', zstd_dict=TRAINED_DICT.as_digested_dict) as f: + f.write(SAMPLES[0]) + bi.seek(0) + with ZstdFile(bi, zstd_dict=TRAINED_DICT.as_undigested_dict) as f: + dat = f.read() + self.assertEqual(dat, SAMPLES[0]) + + def test_file_prefix(self): + bi = io.BytesIO() + with ZstdFile(bi, 'w', zstd_dict=TRAINED_DICT.as_prefix) as f: + f.write(SAMPLES[0]) + bi.seek(0) + with ZstdFile(bi, zstd_dict=TRAINED_DICT.as_prefix) as f: + dat = f.read() + self.assertEqual(dat, SAMPLES[0]) + + def test_UnsupportedOperation(self): + # 1 + with ZstdFile(io.BytesIO(), 'r') as f: + with self.assertRaises(io.UnsupportedOperation): + f.write(b'1234') + + # 2 + class T: + def read(self, size): + return b'a' * size + + with self.assertRaises(TypeError): # on creation + with ZstdFile(T(), 'w') as f: + pass + + # 3 + with ZstdFile(io.BytesIO(), 'w') as f: + with self.assertRaises(io.UnsupportedOperation): + f.read(100) + with self.assertRaises(io.UnsupportedOperation): + f.seek(100) + self.assertEqual(f.closed, True) + with self.assertRaises(ValueError): + f.readable() + with self.assertRaises(ValueError): + f.tell() + with self.assertRaises(ValueError): + f.read(100) + + def test_read_readinto_readinto1(self): + lst = [] + with ZstdFile(io.BytesIO(COMPRESSED_THIS_FILE*5)) as f: + while True: + method = random.randint(0, 2) + size = random.randint(0, 300) + + if method == 0: + dat = f.read(size) + if not dat and size: + break + lst.append(dat) + elif method == 1: + ba = bytearray(size) + read_size = f.readinto(ba) + if read_size == 0 and size: + break + lst.append(bytes(ba[:read_size])) + elif method == 2: + ba = bytearray(size) + read_size = f.readinto1(ba) + if read_size == 0 and size: + break + lst.append(bytes(ba[:read_size])) + self.assertEqual(b''.join(lst), THIS_FILE_BYTES*5) + + def test_zstdfile_flush(self): + # closed + f = ZstdFile(io.BytesIO(), 'w') + f.close() + with self.assertRaises(ValueError): + f.flush() + + # read + with ZstdFile(io.BytesIO(), 'r') as f: + # does nothing for read-only stream + f.flush() + + # write + DAT = b'abcd' + bi = io.BytesIO() + with ZstdFile(bi, 'w') as f: + self.assertEqual(f.write(DAT), len(DAT)) + self.assertEqual(f.tell(), len(DAT)) + self.assertEqual(bi.tell(), 0) # not enough for a block + + self.assertEqual(f.flush(), None) + self.assertEqual(f.tell(), len(DAT)) + self.assertGreater(bi.tell(), 0) # flushed + + # write, no .flush() method + class C: + def write(self, b): + return len(b) + with ZstdFile(C(), 'w') as f: + self.assertEqual(f.write(DAT), len(DAT)) + self.assertEqual(f.tell(), len(DAT)) + + self.assertEqual(f.flush(), None) + self.assertEqual(f.tell(), len(DAT)) + + def test_zstdfile_flush_mode(self): + self.assertEqual(ZstdFile.FLUSH_BLOCK, ZstdCompressor.FLUSH_BLOCK) + self.assertEqual(ZstdFile.FLUSH_FRAME, ZstdCompressor.FLUSH_FRAME) + with self.assertRaises(AttributeError): + ZstdFile.CONTINUE + + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + # flush block + self.assertEqual(f.write(b'123'), 3) + self.assertIsNone(f.flush(f.FLUSH_BLOCK)) + p1 = bo.tell() + # mode == .last_mode, should return + self.assertIsNone(f.flush()) + p2 = bo.tell() + self.assertEqual(p1, p2) + # flush frame + self.assertEqual(f.write(b'456'), 3) + self.assertIsNone(f.flush(mode=f.FLUSH_FRAME)) + # flush frame + self.assertEqual(f.write(b'789'), 3) + self.assertIsNone(f.flush(f.FLUSH_FRAME)) + p1 = bo.tell() + # mode == .last_mode, should return + self.assertIsNone(f.flush(f.FLUSH_FRAME)) + p2 = bo.tell() + self.assertEqual(p1, p2) + self.assertEqual(decompress(bo.getvalue()), b'123456789') + + bo = io.BytesIO() + with ZstdFile(bo, 'w') as f: + f.write(b'123') + with self.assertRaisesRegex(ValueError, r'\.FLUSH_.*?\.FLUSH_'): + f.flush(ZstdCompressor.CONTINUE) + with self.assertRaises(ValueError): + f.flush(-1) + with self.assertRaises(ValueError): + f.flush(123456) + with self.assertRaises(TypeError): + f.flush(node=ZstdCompressor.CONTINUE) + with self.assertRaises((TypeError, ValueError)): + f.flush('FLUSH_FRAME') + with self.assertRaises(TypeError): + f.flush(b'456', f.FLUSH_BLOCK) + + def test_zstdfile_truncate(self): + with ZstdFile(io.BytesIO(), 'w') as f: + with self.assertRaises(io.UnsupportedOperation): + f.truncate(200) + + def test_zstdfile_iter_issue45475(self): + lines = [l for l in ZstdFile(io.BytesIO(COMPRESSED_THIS_FILE))] + self.assertGreater(len(lines), 0) + + def test_append_new_file(self): + with tempfile.NamedTemporaryFile(delete=True) as tmp_f: + filename = tmp_f.name + + with ZstdFile(filename, 'a') as f: + pass + self.assertTrue(os.path.isfile(filename)) + + os.remove(filename) + +class OpenTestCase(unittest.TestCase): + + def test_binary_modes(self): + with open(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rb") as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB) + with io.BytesIO() as bio: + with open(bio, "wb") as f: + f.write(DECOMPRESSED_100_PLUS_32KB) + file_data = decompress(bio.getvalue()) + self.assertEqual(file_data, DECOMPRESSED_100_PLUS_32KB) + with open(bio, "ab") as f: + f.write(DECOMPRESSED_100_PLUS_32KB) + file_data = decompress(bio.getvalue()) + self.assertEqual(file_data, DECOMPRESSED_100_PLUS_32KB * 2) + + def test_text_modes(self): + # empty input + with self.assertRaises(EOFError): + with open(io.BytesIO(b''), "rt", encoding="utf-8", newline='\n') as reader: + for _ in reader: + pass + + # read + uncompressed = THIS_FILE_STR.replace(os.linesep, "\n") + with open(io.BytesIO(COMPRESSED_THIS_FILE), "rt", encoding="utf-8") as f: + self.assertEqual(f.read(), uncompressed) + + with io.BytesIO() as bio: + # write + with open(bio, "wt", encoding="utf-8") as f: + f.write(uncompressed) + file_data = decompress(bio.getvalue()).decode("utf-8") + self.assertEqual(file_data.replace(os.linesep, "\n"), uncompressed) + # append + with open(bio, "at", encoding="utf-8") as f: + f.write(uncompressed) + file_data = decompress(bio.getvalue()).decode("utf-8") + self.assertEqual(file_data.replace(os.linesep, "\n"), uncompressed * 2) + + def test_bad_params(self): + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + TESTFN = pathlib.Path(tmp_f.name) + + with self.assertRaises(ValueError): + open(TESTFN, "") + with self.assertRaises(ValueError): + open(TESTFN, "rbt") + with self.assertRaises(ValueError): + open(TESTFN, "rb", encoding="utf-8") + with self.assertRaises(ValueError): + open(TESTFN, "rb", errors="ignore") + with self.assertRaises(ValueError): + open(TESTFN, "rb", newline="\n") + + os.remove(TESTFN) + + def test_option(self): + options = {DecompressionParameter.window_log_max:25} + with open(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rb", options=options) as f: + self.assertEqual(f.read(), DECOMPRESSED_100_PLUS_32KB) + + options = {CompressionParameter.compression_level:12} + with io.BytesIO() as bio: + with open(bio, "wb", options=options) as f: + f.write(DECOMPRESSED_100_PLUS_32KB) + file_data = decompress(bio.getvalue()) + self.assertEqual(file_data, DECOMPRESSED_100_PLUS_32KB) + + def test_encoding(self): + uncompressed = THIS_FILE_STR.replace(os.linesep, "\n") + + with io.BytesIO() as bio: + with open(bio, "wt", encoding="utf-16-le") as f: + f.write(uncompressed) + file_data = decompress(bio.getvalue()).decode("utf-16-le") + self.assertEqual(file_data.replace(os.linesep, "\n"), uncompressed) + bio.seek(0) + with open(bio, "rt", encoding="utf-16-le") as f: + self.assertEqual(f.read().replace(os.linesep, "\n"), uncompressed) + + def test_encoding_error_handler(self): + with io.BytesIO(compress(b"foo\xffbar")) as bio: + with open(bio, "rt", encoding="ascii", errors="ignore") as f: + self.assertEqual(f.read(), "foobar") + + def test_newline(self): + # Test with explicit newline (universal newline mode disabled). + text = THIS_FILE_STR.replace(os.linesep, "\n") + with io.BytesIO() as bio: + with open(bio, "wt", encoding="utf-8", newline="\n") as f: + f.write(text) + bio.seek(0) + with open(bio, "rt", encoding="utf-8", newline="\r") as f: + self.assertEqual(f.readlines(), [text]) + + def test_x_mode(self): + with tempfile.NamedTemporaryFile(delete=False) as tmp_f: + TESTFN = pathlib.Path(tmp_f.name) + + for mode in ("x", "xb", "xt"): + os.remove(TESTFN) + + if mode == "xt": + encoding = "utf-8" + else: + encoding = None + with open(TESTFN, mode, encoding=encoding): + pass + with self.assertRaises(FileExistsError): + with open(TESTFN, mode): + pass + + os.remove(TESTFN) + + def test_open_dict(self): + # default + bi = io.BytesIO() + with open(bi, 'w', zstd_dict=TRAINED_DICT) as f: + f.write(SAMPLES[0]) + bi.seek(0) + with open(bi, zstd_dict=TRAINED_DICT) as f: + dat = f.read() + self.assertEqual(dat, SAMPLES[0]) + + # .as_(un)digested_dict + bi = io.BytesIO() + with open(bi, 'w', zstd_dict=TRAINED_DICT.as_digested_dict) as f: + f.write(SAMPLES[0]) + bi.seek(0) + with open(bi, zstd_dict=TRAINED_DICT.as_undigested_dict) as f: + dat = f.read() + self.assertEqual(dat, SAMPLES[0]) + + # invalid dictionary + bi = io.BytesIO() + with self.assertRaisesRegex(TypeError, 'zstd_dict'): + open(bi, 'w', zstd_dict={1:2, 2:3}) + + with self.assertRaisesRegex(TypeError, 'zstd_dict'): + open(bi, 'w', zstd_dict=b'1234567890') + + def test_open_prefix(self): + bi = io.BytesIO() + with open(bi, 'w', zstd_dict=TRAINED_DICT.as_prefix) as f: + f.write(SAMPLES[0]) + bi.seek(0) + with open(bi, zstd_dict=TRAINED_DICT.as_prefix) as f: + dat = f.read() + self.assertEqual(dat, SAMPLES[0]) + + def test_buffer_protocol(self): + # don't use len() for buffer protocol objects + arr = array.array("i", range(1000)) + LENGTH = len(arr) * arr.itemsize + + with open(io.BytesIO(), "wb") as f: + self.assertEqual(f.write(arr), LENGTH) + self.assertEqual(f.tell(), LENGTH) + +class FreeThreadingMethodTests(unittest.TestCase): + + @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled') + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_compress_locking(self): + input = b'a'* (16*_1K) + num_threads = 8 + + comp = ZstdCompressor() + parts = [] + for _ in range(num_threads): + res = comp.compress(input, ZstdCompressor.FLUSH_BLOCK) + if res: + parts.append(res) + rest1 = comp.flush() + expected = b''.join(parts) + rest1 + + comp = ZstdCompressor() + output = [] + def run_method(method, input_data, output_data): + res = method(input_data, ZstdCompressor.FLUSH_BLOCK) + if res: + output_data.append(res) + threads = [] + + for i in range(num_threads): + thread = threading.Thread(target=run_method, args=(comp.compress, input, output)) + + threads.append(thread) + + with threading_helper.start_threads(threads): + pass + + rest2 = comp.flush() + self.assertEqual(rest1, rest2) + actual = b''.join(output) + rest2 + self.assertEqual(expected, actual) + + @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled') + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_decompress_locking(self): + input = compress(b'a'* (16*_1K)) + num_threads = 8 + # to ensure we decompress over multiple calls, set maxsize + window_size = _1K * 16//num_threads + + decomp = ZstdDecompressor() + parts = [] + for _ in range(num_threads): + res = decomp.decompress(input, window_size) + if res: + parts.append(res) + expected = b''.join(parts) + + comp = ZstdDecompressor() + output = [] + def run_method(method, input_data, output_data): + res = method(input_data, window_size) + if res: + output_data.append(res) + threads = [] + + for i in range(num_threads): + thread = threading.Thread(target=run_method, args=(comp.decompress, input, output)) + + threads.append(thread) + + with threading_helper.start_threads(threads): + pass + + actual = b''.join(output) + self.assertEqual(expected, actual) + + + +if __name__ == "__main__": + unittest.main() |