aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Modules/_zstd/compressor.c
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_zstd/compressor.c')
-rw-r--r--Modules/_zstd/compressor.c596
1 files changed, 328 insertions, 268 deletions
diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c
index 355a27d2734..bc9e6eff89a 100644
--- a/Modules/_zstd/compressor.c
+++ b/Modules/_zstd/compressor.c
@@ -1,7 +1,4 @@
-/*
-Low level interface to Meta's zstd library for use in the compression.zstd
-Python module.
-*/
+/* Low level interface to the Zstandard algorthm & the zstd library. */
/* ZstdCompressor class definitions */
@@ -19,9 +16,8 @@ class _zstd.ZstdCompressor "ZstdCompressor *" "&zstd_compressor_type_spec"
#include "_zstdmodule.h"
#include "buffer.h"
-#include "zstddict.h"
+#include "internal/pycore_lock.h" // PyMutex_IsLocked
-#include <stdbool.h> // bool
#include <stddef.h> // offsetof()
#include <zstd.h> // ZSTD_*()
@@ -43,107 +39,158 @@ typedef struct {
/* Compression level */
int compression_level;
- /* __init__ has been called, 0 or 1. */
- bool initialized;
+ /* Lock to protect the compression context */
+ PyMutex lock;
} ZstdCompressor;
#define ZstdCompressor_CAST(op) ((ZstdCompressor *)op)
+/*[python input]
+
+class zstd_contentsize_converter(CConverter):
+ type = 'unsigned long long'
+ converter = 'zstd_contentsize_converter'
+
+[python start generated code]*/
+/*[python end generated code: output=da39a3ee5e6b4b0d input=0932c350d633c7de]*/
+
+
+static int
+zstd_contentsize_converter(PyObject *size, unsigned long long *p)
+{
+ // None means the user indicates the size is unknown.
+ if (size == Py_None) {
+ *p = ZSTD_CONTENTSIZE_UNKNOWN;
+ }
+ else {
+ /* ZSTD_CONTENTSIZE_UNKNOWN is 0ULL - 1
+ ZSTD_CONTENTSIZE_ERROR is 0ULL - 2
+ Users should only pass values < ZSTD_CONTENTSIZE_ERROR */
+ unsigned long long pledged_size = PyLong_AsUnsignedLongLong(size);
+ /* Here we check for (unsigned long long)-1 as a sign of an error in
+ PyLong_AsUnsignedLongLong */
+ if (pledged_size == (unsigned long long)-1 && PyErr_Occurred()) {
+ *p = ZSTD_CONTENTSIZE_ERROR;
+ if (PyErr_ExceptionMatches(PyExc_OverflowError)) {
+ PyErr_Format(PyExc_ValueError,
+ "size argument should be a positive int less "
+ "than %ull", ZSTD_CONTENTSIZE_ERROR);
+ return 0;
+ }
+ return 0;
+ }
+ if (pledged_size >= ZSTD_CONTENTSIZE_ERROR) {
+ *p = ZSTD_CONTENTSIZE_ERROR;
+ PyErr_Format(PyExc_ValueError,
+ "size argument should be a positive int less "
+ "than %ull", ZSTD_CONTENTSIZE_ERROR);
+ return 0;
+ }
+ *p = pledged_size;
+ }
+ return 1;
+}
+
#include "clinic/compressor.c.h"
static int
-_zstd_set_c_parameters(ZstdCompressor *self, PyObject *level_or_options,
- const char *arg_name, const char* arg_type)
+_zstd_set_c_level(ZstdCompressor *self, int level)
{
- size_t zstd_ret;
- _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
- if (mod_state == NULL) {
+ /* Set integer compression level */
+ int min_level = ZSTD_minCLevel();
+ int max_level = ZSTD_maxCLevel();
+ if (level < min_level || level > max_level) {
+ PyErr_Format(PyExc_ValueError,
+ "illegal compression level %d; the valid range is [%d, %d]",
+ level, min_level, max_level);
return -1;
}
- /* Integer compression level */
- if (PyLong_Check(level_or_options)) {
- int level = PyLong_AsInt(level_or_options);
- if (level == -1 && PyErr_Occurred()) {
- PyErr_Format(PyExc_ValueError,
- "Compression level should be an int value between %d and %d.",
- ZSTD_minCLevel(), ZSTD_maxCLevel());
- return -1;
- }
+ /* Save for generating ZSTD_CDICT */
+ self->compression_level = level;
- /* Save for generating ZSTD_CDICT */
- self->compression_level = level;
+ /* Set compressionLevel to compression context */
+ size_t zstd_ret = ZSTD_CCtx_setParameter(
+ self->cctx, ZSTD_c_compressionLevel, level);
- /* Set compressionLevel to compression context */
- zstd_ret = ZSTD_CCtx_setParameter(self->cctx,
- ZSTD_c_compressionLevel,
- level);
+ /* Check error */
+ if (ZSTD_isError(zstd_ret)) {
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
+ set_zstd_error(mod_state, ERR_SET_C_LEVEL, zstd_ret);
+ return -1;
+ }
+ return 0;
+}
- /* Check error */
- if (ZSTD_isError(zstd_ret)) {
- set_zstd_error(mod_state, ERR_SET_C_LEVEL, zstd_ret);
+static int
+_zstd_set_c_parameters(ZstdCompressor *self, PyObject *options)
+{
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state == NULL) {
+ return -1;
+ }
+
+ if (!PyDict_Check(options)) {
+ PyErr_Format(PyExc_TypeError,
+ "ZstdCompressor() argument 'options' must be dict, not %T",
+ options);
+ return -1;
+ }
+
+ Py_ssize_t pos = 0;
+ PyObject *key, *value;
+ while (PyDict_Next(options, &pos, &key, &value)) {
+ /* Check key type */
+ if (Py_TYPE(key) == mod_state->DParameter_type) {
+ PyErr_SetString(PyExc_TypeError,
+ "compression options dictionary key must not be a "
+ "DecompressionParameter attribute");
return -1;
}
- return 0;
- }
- /* Options dict */
- if (PyDict_Check(level_or_options)) {
- PyObject *key, *value;
- Py_ssize_t pos = 0;
+ Py_INCREF(key);
+ Py_INCREF(value);
+ int key_v = PyLong_AsInt(key);
+ Py_DECREF(key);
+ if (key_v == -1 && PyErr_Occurred()) {
+ Py_DECREF(value);
+ return -1;
+ }
- while (PyDict_Next(level_or_options, &pos, &key, &value)) {
- /* Check key type */
- if (Py_TYPE(key) == mod_state->DParameter_type) {
- PyErr_SetString(PyExc_TypeError,
- "Key of compression option dict should "
- "NOT be DecompressionParameter.");
- return -1;
- }
+ int value_v = PyLong_AsInt(value);
+ Py_DECREF(value);
+ if (value_v == -1 && PyErr_Occurred()) {
+ return -1;
+ }
- int key_v = PyLong_AsInt(key);
- if (key_v == -1 && PyErr_Occurred()) {
- PyErr_SetString(PyExc_ValueError,
- "Key of options dict should be a CompressionParameter attribute.");
+ if (key_v == ZSTD_c_compressionLevel) {
+ if (_zstd_set_c_level(self, value_v) < 0) {
return -1;
}
-
- // TODO(emmatyping): check bounds when there is a value error here for better
- // error message?
- int value_v = PyLong_AsInt(value);
- if (value_v == -1 && PyErr_Occurred()) {
- PyErr_SetString(PyExc_ValueError,
- "Value of option dict should be an int.");
- return -1;
+ continue;
+ }
+ if (key_v == ZSTD_c_nbWorkers) {
+ /* From the zstd library docs:
+ 1. When nbWorkers >= 1, triggers asynchronous mode when
+ used with ZSTD_compressStream2().
+ 2, Default value is `0`, aka "single-threaded mode" : no
+ worker is spawned, compression is performed inside
+ caller's thread, all invocations are blocking. */
+ if (value_v != 0) {
+ self->use_multithread = 1;
}
+ }
- if (key_v == ZSTD_c_compressionLevel) {
- /* Save for generating ZSTD_CDICT */
- self->compression_level = value_v;
- }
- else if (key_v == ZSTD_c_nbWorkers) {
- /* From zstd library doc:
- 1. When nbWorkers >= 1, triggers asynchronous mode when
- used with ZSTD_compressStream2().
- 2, Default value is `0`, aka "single-threaded mode" : no
- worker is spawned, compression is performed inside
- caller's thread, all invocations are blocking. */
- if (value_v != 0) {
- self->use_multithread = 1;
- }
- }
+ /* Set parameter to compression context */
+ size_t zstd_ret = ZSTD_CCtx_setParameter(self->cctx, key_v, value_v);
- /* Set parameter to compression context */
- zstd_ret = ZSTD_CCtx_setParameter(self->cctx, key_v, value_v);
- if (ZSTD_isError(zstd_ret)) {
- set_parameter_error(mod_state, 1, key_v, value_v);
- return -1;
- }
+ /* Check error */
+ if (ZSTD_isError(zstd_ret)) {
+ set_parameter_error(1, key_v, value_v);
+ return -1;
}
- return 0;
}
- PyErr_Format(PyExc_TypeError, "Invalid type for %s. Expected %s", arg_name, arg_type);
- return -1;
+ return 0;
}
static void
@@ -156,12 +203,12 @@ capsule_free_cdict(PyObject *capsule)
ZSTD_CDict *
_get_CDict(ZstdDict *self, int compressionLevel)
{
+ assert(PyMutex_IsLocked(&self->lock));
PyObject *level = NULL;
- PyObject *capsule;
+ PyObject *capsule = NULL;
ZSTD_CDict *cdict;
+ int ret;
- // TODO(emmatyping): refactor critical section code into a lock_held function
- Py_BEGIN_CRITICAL_SECTION(self);
/* int level object */
level = PyLong_FromLong(compressionLevel);
@@ -170,27 +217,23 @@ _get_CDict(ZstdDict *self, int compressionLevel)
}
/* Get PyCapsule object from self->c_dicts */
- capsule = PyDict_GetItemWithError(self->c_dicts, level);
+ ret = PyDict_GetItemRef(self->c_dicts, level, &capsule);
+ if (ret < 0) {
+ goto error;
+ }
if (capsule == NULL) {
- if (PyErr_Occurred()) {
- goto error;
- }
-
/* Create ZSTD_CDict instance */
- char *dict_buffer = PyBytes_AS_STRING(self->dict_content);
- Py_ssize_t dict_len = Py_SIZE(self->dict_content);
Py_BEGIN_ALLOW_THREADS
- cdict = ZSTD_createCDict(dict_buffer,
- dict_len,
+ cdict = ZSTD_createCDict(self->dict_buffer, self->dict_len,
compressionLevel);
Py_END_ALLOW_THREADS
if (cdict == NULL) {
- _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
if (mod_state != NULL) {
PyErr_SetString(mod_state->ZstdError,
- "Failed to create ZSTD_CDict instance from zstd "
- "dictionary content. Maybe the content is corrupted.");
+ "Failed to create a ZSTD_CDict instance from "
+ "Zstandard dictionary content.");
}
goto error;
}
@@ -203,11 +246,10 @@ _get_CDict(ZstdDict *self, int compressionLevel)
}
/* Add PyCapsule object to self->c_dicts */
- if (PyDict_SetItem(self->c_dicts, level, capsule) < 0) {
- Py_DECREF(capsule);
+ ret = PyDict_SetItem(self->c_dicts, level, capsule);
+ if (ret < 0) {
goto error;
}
- Py_DECREF(capsule);
}
else {
/* ZSTD_CDict instance already exists */
@@ -219,62 +261,15 @@ error:
cdict = NULL;
success:
Py_XDECREF(level);
- Py_END_CRITICAL_SECTION();
+ Py_XDECREF(capsule);
return cdict;
}
static int
-_zstd_load_c_dict(ZstdCompressor *self, PyObject *dict)
+_zstd_load_impl(ZstdCompressor *self, ZstdDict *zd,
+ _zstd_state *mod_state, int type)
{
-
size_t zstd_ret;
- _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
- if (mod_state == NULL) {
- return -1;
- }
- ZstdDict *zd;
- int type, ret;
-
- /* Check ZstdDict */
- ret = PyObject_IsInstance(dict, (PyObject*)mod_state->ZstdDict_type);
- if (ret < 0) {
- return -1;
- }
- else if (ret > 0) {
- /* When compressing, use undigested dictionary by default. */
- zd = (ZstdDict*)dict;
- type = DICT_TYPE_UNDIGESTED;
- goto load;
- }
-
- /* Check (ZstdDict, type) */
- if (PyTuple_CheckExact(dict) && PyTuple_GET_SIZE(dict) == 2) {
- /* Check ZstdDict */
- ret = PyObject_IsInstance(PyTuple_GET_ITEM(dict, 0),
- (PyObject*)mod_state->ZstdDict_type);
- if (ret < 0) {
- return -1;
- }
- else if (ret > 0) {
- /* type == -1 may indicate an error. */
- type = PyLong_AsInt(PyTuple_GET_ITEM(dict, 1));
- if (type == DICT_TYPE_DIGESTED ||
- type == DICT_TYPE_UNDIGESTED ||
- type == DICT_TYPE_PREFIX)
- {
- assert(type >= 0);
- zd = (ZstdDict*)PyTuple_GET_ITEM(dict, 0);
- goto load;
- }
- }
- }
-
- /* Wrong type */
- PyErr_SetString(PyExc_TypeError,
- "zstd_dict argument should be ZstdDict object.");
- return -1;
-
-load:
if (type == DICT_TYPE_DIGESTED) {
/* Get ZSTD_CDict */
ZSTD_CDict *c_dict = _get_CDict(zd, self->compression_level);
@@ -283,28 +278,18 @@ load:
}
/* Reference a prepared dictionary.
It overrides some compression context's parameters. */
- Py_BEGIN_CRITICAL_SECTION(self);
zstd_ret = ZSTD_CCtx_refCDict(self->cctx, c_dict);
- Py_END_CRITICAL_SECTION();
}
else if (type == DICT_TYPE_UNDIGESTED) {
/* Load a dictionary.
It doesn't override compression context's parameters. */
- Py_BEGIN_CRITICAL_SECTION2(self, zd);
- zstd_ret = ZSTD_CCtx_loadDictionary(
- self->cctx,
- PyBytes_AS_STRING(zd->dict_content),
- Py_SIZE(zd->dict_content));
- Py_END_CRITICAL_SECTION2();
+ zstd_ret = ZSTD_CCtx_loadDictionary(self->cctx, zd->dict_buffer,
+ zd->dict_len);
}
else if (type == DICT_TYPE_PREFIX) {
/* Load a prefix */
- Py_BEGIN_CRITICAL_SECTION2(self, zd);
- zstd_ret = ZSTD_CCtx_refPrefix(
- self->cctx,
- PyBytes_AS_STRING(zd->dict_content),
- Py_SIZE(zd->dict_content));
- Py_END_CRITICAL_SECTION2();
+ zstd_ret = ZSTD_CCtx_refPrefix(self->cctx, zd->dict_buffer,
+ zd->dict_len);
}
else {
Py_UNREACHABLE();
@@ -318,24 +303,57 @@ load:
return 0;
}
+static int
+_zstd_load_c_dict(ZstdCompressor *self, PyObject *dict)
+{
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
+ /* When compressing, use undigested dictionary by default. */
+ int type = DICT_TYPE_UNDIGESTED;
+ ZstdDict *zd = _Py_parse_zstd_dict(mod_state, dict, &type);
+ if (zd == NULL) {
+ return -1;
+ }
+ int ret;
+ PyMutex_Lock(&zd->lock);
+ ret = _zstd_load_impl(self, zd, mod_state, type);
+ PyMutex_Unlock(&zd->lock);
+ return ret;
+}
+
+/*[clinic input]
+@classmethod
+_zstd.ZstdCompressor.__new__ as _zstd_ZstdCompressor_new
+ level: object = None
+ The compression level to use. Defaults to COMPRESSION_LEVEL_DEFAULT.
+ options: object = None
+ A dict object that contains advanced compression parameters.
+ zstd_dict: object = None
+ A ZstdDict object, a pre-trained Zstandard dictionary.
+
+Create a compressor object for compressing data incrementally.
+
+Thread-safe at method level. For one-shot compression, use the compress()
+function instead.
+[clinic start generated code]*/
+
static PyObject *
-_zstd_ZstdCompressor_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSED(kwargs))
+_zstd_ZstdCompressor_new_impl(PyTypeObject *type, PyObject *level,
+ PyObject *options, PyObject *zstd_dict)
+/*[clinic end generated code: output=cdef61eafecac3d7 input=92de0211ae20ffdc]*/
{
- ZstdCompressor *self;
- self = PyObject_GC_New(ZstdCompressor, type);
+ ZstdCompressor* self = PyObject_GC_New(ZstdCompressor, type);
if (self == NULL) {
goto error;
}
- self->initialized = 0;
- self->dict = NULL;
self->use_multithread = 0;
-
+ self->dict = NULL;
+ self->lock = (PyMutex){0};
/* Compression context */
self->cctx = ZSTD_createCCtx();
if (self->cctx == NULL) {
- _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
if (mod_state != NULL) {
PyErr_SetString(mod_state->ZstdError,
"Unable to create ZSTD_CCtx instance.");
@@ -346,12 +364,56 @@ _zstd_ZstdCompressor_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject
/* Last mode */
self->last_mode = ZSTD_e_end;
+ if (level != Py_None && options != Py_None) {
+ PyErr_SetString(PyExc_TypeError,
+ "Only one of level or options should be used.");
+ goto error;
+ }
+
+ /* Set compression level */
+ if (level != Py_None) {
+ if (!PyLong_Check(level)) {
+ PyErr_SetString(PyExc_TypeError,
+ "invalid type for level, expected int");
+ goto error;
+ }
+ int level_v = PyLong_AsInt(level);
+ if (level_v == -1 && PyErr_Occurred()) {
+ if (PyErr_ExceptionMatches(PyExc_OverflowError)) {
+ PyErr_Format(PyExc_ValueError,
+ "illegal compression level; the valid range is [%d, %d]",
+ ZSTD_minCLevel(), ZSTD_maxCLevel());
+ }
+ goto error;
+ }
+ if (_zstd_set_c_level(self, level_v) < 0) {
+ goto error;
+ }
+ }
+
+ /* Set options dictionary */
+ if (options != Py_None) {
+ if (_zstd_set_c_parameters(self, options) < 0) {
+ goto error;
+ }
+ }
+
+ /* Load Zstandard dictionary to compression context */
+ if (zstd_dict != Py_None) {
+ if (_zstd_load_c_dict(self, zstd_dict) < 0) {
+ goto error;
+ }
+ Py_INCREF(zstd_dict);
+ self->dict = zstd_dict;
+ }
+
+ // We can only start GC tracking once self->dict is set.
+ PyObject_GC_Track(self);
+
return (PyObject*)self;
error:
- if (self != NULL) {
- PyObject_GC_Del(self);
- }
+ Py_XDECREF(self);
return NULL;
}
@@ -363,7 +425,11 @@ ZstdCompressor_dealloc(PyObject *ob)
PyObject_GC_UnTrack(self);
/* Free compression context */
- ZSTD_freeCCtx(self->cctx);
+ if (self->cctx) {
+ ZSTD_freeCCtx(self->cctx);
+ }
+
+ assert(!PyMutex_IsLocked(&self->lock));
/* Py_XDECREF the dict after free the compression context */
Py_CLEAR(self->dict);
@@ -373,71 +439,11 @@ ZstdCompressor_dealloc(PyObject *ob)
Py_DECREF(tp);
}
-/*[clinic input]
-_zstd.ZstdCompressor.__init__
-
- level: object = None
- The compression level to use, defaults to ZSTD_CLEVEL_DEFAULT.
- options: object = None
- A dict object that contains advanced compression parameters.
- zstd_dict: object = None
- A ZstdDict object, a pre-trained zstd dictionary.
-
-Create a compressor object for compressing data incrementally.
-
-Thread-safe at method level. For one-shot compression, use the compress()
-function instead.
-[clinic start generated code]*/
-
-static int
-_zstd_ZstdCompressor___init___impl(ZstdCompressor *self, PyObject *level,
- PyObject *options, PyObject *zstd_dict)
-/*[clinic end generated code: output=215e6c4342732f96 input=9f79b0d8d34c8ef0]*/
-{
- if (self->initialized) {
- PyErr_SetString(PyExc_RuntimeError, "reinitialization not supported");
- return -1;
- }
- self->initialized = 1;
-
- if (level != Py_None && options != Py_None) {
- PyErr_SetString(PyExc_RuntimeError, "Only one of level or options should be used.");
- return -1;
- }
-
- /* Set compressLevel/options to compression context */
- if (level != Py_None) {
- if (_zstd_set_c_parameters(self, level, "level", "int") < 0) {
- return -1;
- }
- }
-
- if (options != Py_None) {
- if (_zstd_set_c_parameters(self, options, "options", "dict") < 0) {
- return -1;
- }
- }
-
- /* Load dictionary to compression context */
- if (zstd_dict != Py_None) {
- if (_zstd_load_c_dict(self, zstd_dict) < 0) {
- return -1;
- }
-
- /* Py_INCREF the dict */
- Py_INCREF(zstd_dict);
- self->dict = zstd_dict;
- }
-
- // We can only start tracking self with the GC once self->dict is set.
- PyObject_GC_Track(self);
- return 0;
-}
-
static PyObject *
-compress_impl(ZstdCompressor *self, Py_buffer *data,
- ZSTD_EndDirective end_directive)
+compress_lock_held(ZstdCompressor *self, Py_buffer *data,
+ ZSTD_EndDirective end_directive)
{
+ assert(PyMutex_IsLocked(&self->lock));
ZSTD_inBuffer in;
ZSTD_outBuffer out;
_BlocksOutputBuffer buffer = {.list = NULL};
@@ -464,12 +470,12 @@ compress_impl(ZstdCompressor *self, Py_buffer *data,
}
if (_OutputBuffer_InitWithSize(&buffer, &out, -1,
- (Py_ssize_t) output_buffer_size) < 0) {
+ (Py_ssize_t) output_buffer_size) < 0) {
goto error;
}
- /* zstd stream compress */
+ /* Zstandard stream compress */
while (1) {
Py_BEGIN_ALLOW_THREADS
zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, end_directive);
@@ -477,10 +483,8 @@ compress_impl(ZstdCompressor *self, Py_buffer *data,
/* Check error */
if (ZSTD_isError(zstd_ret)) {
- _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
- if (mod_state != NULL) {
- set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret);
- }
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
+ set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret);
goto error;
}
@@ -509,15 +513,18 @@ error:
return NULL;
}
+#ifndef NDEBUG
static inline int
mt_continue_should_break(ZSTD_inBuffer *in, ZSTD_outBuffer *out)
{
return in->size == in->pos && out->size != out->pos;
}
+#endif
static PyObject *
-compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data)
+compress_mt_continue_lock_held(ZstdCompressor *self, Py_buffer *data)
{
+ assert(PyMutex_IsLocked(&self->lock));
ZSTD_inBuffer in;
ZSTD_outBuffer out;
_BlocksOutputBuffer buffer = {.list = NULL};
@@ -533,24 +540,25 @@ compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data)
goto error;
}
- /* zstd stream compress */
+ /* Zstandard stream compress */
while (1) {
Py_BEGIN_ALLOW_THREADS
do {
- zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, ZSTD_e_continue);
- } while (out.pos != out.size && in.pos != in.size && !ZSTD_isError(zstd_ret));
+ zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in,
+ ZSTD_e_continue);
+ } while (out.pos != out.size
+ && in.pos != in.size
+ && !ZSTD_isError(zstd_ret));
Py_END_ALLOW_THREADS
/* Check error */
if (ZSTD_isError(zstd_ret)) {
- _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
- if (mod_state != NULL) {
- set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret);
- }
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
+ set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret);
goto error;
}
- /* Like compress_impl(), output as much as possible. */
+ /* Like compress_lock_held(), output as much as possible. */
if (out.pos == out.size) {
if (_OutputBuffer_Grow(&buffer, &out) < 0) {
goto error;
@@ -609,14 +617,14 @@ _zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data,
}
/* Thread-safe code */
- Py_BEGIN_CRITICAL_SECTION(self);
+ PyMutex_Lock(&self->lock);
/* Compress */
if (self->use_multithread && mode == ZSTD_e_continue) {
- ret = compress_mt_continue_impl(self, data);
+ ret = compress_mt_continue_lock_held(self, data);
}
else {
- ret = compress_impl(self, data, mode);
+ ret = compress_lock_held(self, data, mode);
}
if (ret) {
@@ -628,7 +636,7 @@ _zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data,
/* Resetting cctx's session never fail */
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
}
- Py_END_CRITICAL_SECTION();
+ PyMutex_Unlock(&self->lock);
return ret;
}
@@ -642,14 +650,14 @@ _zstd.ZstdCompressor.flush
Finish the compression process.
-Flush any remaining data left in internal buffers. Since zstd data consists
-of one or more independent frames, the compressor object can still be used
-after this method is called.
+Flush any remaining data left in internal buffers. Since Zstandard data
+consists of one or more independent frames, the compressor object can still
+be used after this method is called.
[clinic start generated code]*/
static PyObject *
_zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode)
-/*[clinic end generated code: output=b7cf2c8d64dcf2e3 input=a766870301932b85]*/
+/*[clinic end generated code: output=b7cf2c8d64dcf2e3 input=0ab19627f323cdbc]*/
{
PyObject *ret;
@@ -663,8 +671,9 @@ _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode)
}
/* Thread-safe code */
- Py_BEGIN_CRITICAL_SECTION(self);
- ret = compress_impl(self, NULL, mode);
+ PyMutex_Lock(&self->lock);
+
+ ret = compress_lock_held(self, NULL, mode);
if (ret) {
self->last_mode = mode;
@@ -675,26 +684,78 @@ _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode)
/* Resetting cctx's session never fail */
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
}
- Py_END_CRITICAL_SECTION();
+ PyMutex_Unlock(&self->lock);
return ret;
}
+
+/*[clinic input]
+_zstd.ZstdCompressor.set_pledged_input_size
+
+ size: zstd_contentsize
+ The size of the uncompressed data to be provided to the compressor.
+ /
+
+Set the uncompressed content size to be written into the frame header.
+
+This method can be used to ensure the header of the frame about to be written
+includes the size of the data, unless the CompressionParameter.content_size_flag
+is set to False. If last_mode != FLUSH_FRAME, then a RuntimeError is raised.
+
+It is important to ensure that the pledged data size matches the actual data
+size. If they do not match the compressed output data may be corrupted and the
+final chunk written may be lost.
+[clinic start generated code]*/
+
+static PyObject *
+_zstd_ZstdCompressor_set_pledged_input_size_impl(ZstdCompressor *self,
+ unsigned long long size)
+/*[clinic end generated code: output=3a09e55cc0e3b4f9 input=afd8a7d78cff2eb5]*/
+{
+ // Error occured while converting argument, should be unreachable
+ assert(size != ZSTD_CONTENTSIZE_ERROR);
+
+ /* Thread-safe code */
+ PyMutex_Lock(&self->lock);
+
+ /* Check the current mode */
+ if (self->last_mode != ZSTD_e_end) {
+ PyErr_SetString(PyExc_ValueError,
+ "set_pledged_input_size() method must be called "
+ "when last_mode == FLUSH_FRAME");
+ PyMutex_Unlock(&self->lock);
+ return NULL;
+ }
+
+ /* Set pledged content size */
+ size_t zstd_ret = ZSTD_CCtx_setPledgedSrcSize(self->cctx, size);
+ PyMutex_Unlock(&self->lock);
+ if (ZSTD_isError(zstd_ret)) {
+ _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self));
+ set_zstd_error(mod_state, ERR_SET_PLEDGED_INPUT_SIZE, zstd_ret);
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
static PyMethodDef ZstdCompressor_methods[] = {
_ZSTD_ZSTDCOMPRESSOR_COMPRESS_METHODDEF
_ZSTD_ZSTDCOMPRESSOR_FLUSH_METHODDEF
+ _ZSTD_ZSTDCOMPRESSOR_SET_PLEDGED_INPUT_SIZE_METHODDEF
{NULL, NULL}
};
PyDoc_STRVAR(ZstdCompressor_last_mode_doc,
"The last mode used to this compressor object, its value can be .CONTINUE,\n"
".FLUSH_BLOCK, .FLUSH_FRAME. Initialized to .FLUSH_FRAME.\n\n"
-"It can be used to get the current state of a compressor, such as, data flushed,\n"
-"a frame ended.");
+"It can be used to get the current state of a compressor, such as, data\n"
+"flushed, or a frame ended.");
static PyMemberDef ZstdCompressor_members[] = {
{"last_mode", Py_T_INT, offsetof(ZstdCompressor, last_mode),
- Py_READONLY, ZstdCompressor_last_mode_doc},
+ Py_READONLY, ZstdCompressor_last_mode_doc},
{NULL}
};
@@ -717,10 +778,9 @@ ZstdCompressor_clear(PyObject *ob)
static PyType_Slot zstdcompressor_slots[] = {
{Py_tp_new, _zstd_ZstdCompressor_new},
{Py_tp_dealloc, ZstdCompressor_dealloc},
- {Py_tp_init, _zstd_ZstdCompressor___init__},
{Py_tp_methods, ZstdCompressor_methods},
{Py_tp_members, ZstdCompressor_members},
- {Py_tp_doc, (char*)_zstd_ZstdCompressor___init____doc__},
+ {Py_tp_doc, (void *)_zstd_ZstdCompressor_new__doc__},
{Py_tp_traverse, ZstdCompressor_traverse},
{Py_tp_clear, ZstdCompressor_clear},
{0, 0}