aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_zstd.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_zstd.py')
-rw-r--r--Lib/test/test_zstd.py61
1 files changed, 56 insertions, 5 deletions
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py
index 53ca592ea38..34c7c721b1a 100644
--- a/Lib/test/test_zstd.py
+++ b/Lib/test/test_zstd.py
@@ -1424,11 +1424,12 @@ class FileTestCase(unittest.TestCase):
with self.assertRaises(ValueError):
ZstdFile(io.BytesIO(COMPRESSED_100_PLUS_32KB), "rw")
- with self.assertRaisesRegex(TypeError, r"NOT be CompressionParameter"):
+ with self.assertRaisesRegex(TypeError,
+ r"NOT be a CompressionParameter"):
ZstdFile(io.BytesIO(), 'rb',
options={CompressionParameter.compression_level:5})
with self.assertRaisesRegex(TypeError,
- r"NOT be DecompressionParameter"):
+ r"NOT be a DecompressionParameter"):
ZstdFile(io.BytesIO(), 'wb',
options={DecompressionParameter.window_log_max:21})
@@ -2430,10 +2431,8 @@ class OpenTestCase(unittest.TestCase):
self.assertEqual(f.write(arr), LENGTH)
self.assertEqual(f.tell(), LENGTH)
-@unittest.skip("it fails for now, see gh-133885")
class FreeThreadingMethodTests(unittest.TestCase):
- @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled')
@threading_helper.reap_threads
@threading_helper.requires_working_threading()
def test_compress_locking(self):
@@ -2470,7 +2469,6 @@ class FreeThreadingMethodTests(unittest.TestCase):
actual = b''.join(output) + rest2
self.assertEqual(expected, actual)
- @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled')
@threading_helper.reap_threads
@threading_helper.requires_working_threading()
def test_decompress_locking(self):
@@ -2506,6 +2504,59 @@ class FreeThreadingMethodTests(unittest.TestCase):
actual = b''.join(output)
self.assertEqual(expected, actual)
+ @threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
+ def test_compress_shared_dict(self):
+ num_threads = 8
+
+ def run_method(b):
+ level = threading.get_ident() % 4
+ # sync threads to increase chance of contention on
+ # capsule storing dictionary levels
+ b.wait()
+ ZstdCompressor(level=level,
+ zstd_dict=TRAINED_DICT.as_digested_dict)
+ b.wait()
+ ZstdCompressor(level=level,
+ zstd_dict=TRAINED_DICT.as_undigested_dict)
+ b.wait()
+ ZstdCompressor(level=level,
+ zstd_dict=TRAINED_DICT.as_prefix)
+ threads = []
+
+ b = threading.Barrier(num_threads)
+ for i in range(num_threads):
+ thread = threading.Thread(target=run_method, args=(b,))
+
+ threads.append(thread)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ @threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
+ def test_decompress_shared_dict(self):
+ num_threads = 8
+
+ def run_method(b):
+ # sync threads to increase chance of contention on
+ # decompression dictionary
+ b.wait()
+ ZstdDecompressor(zstd_dict=TRAINED_DICT.as_digested_dict)
+ b.wait()
+ ZstdDecompressor(zstd_dict=TRAINED_DICT.as_undigested_dict)
+ b.wait()
+ ZstdDecompressor(zstd_dict=TRAINED_DICT.as_prefix)
+ threads = []
+
+ b = threading.Barrier(num_threads)
+ for i in range(num_threads):
+ thread = threading.Thread(target=run_method, args=(b,))
+
+ threads.append(thread)
+
+ with threading_helper.start_threads(threads):
+ pass
if __name__ == "__main__":