diff options
Diffstat (limited to 'Modules/_zstd/compressor.c')
-rw-r--r-- | Modules/_zstd/compressor.c | 596 |
1 files changed, 328 insertions, 268 deletions
diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 355a27d2734..bc9e6eff89a 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -1,7 +1,4 @@ -/* -Low level interface to Meta's zstd library for use in the compression.zstd -Python module. -*/ +/* Low level interface to the Zstandard algorthm & the zstd library. */ /* ZstdCompressor class definitions */ @@ -19,9 +16,8 @@ class _zstd.ZstdCompressor "ZstdCompressor *" "&zstd_compressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" -#include "zstddict.h" +#include "internal/pycore_lock.h" // PyMutex_IsLocked -#include <stdbool.h> // bool #include <stddef.h> // offsetof() #include <zstd.h> // ZSTD_*() @@ -43,107 +39,158 @@ typedef struct { /* Compression level */ int compression_level; - /* __init__ has been called, 0 or 1. */ - bool initialized; + /* Lock to protect the compression context */ + PyMutex lock; } ZstdCompressor; #define ZstdCompressor_CAST(op) ((ZstdCompressor *)op) +/*[python input] + +class zstd_contentsize_converter(CConverter): + type = 'unsigned long long' + converter = 'zstd_contentsize_converter' + +[python start generated code]*/ +/*[python end generated code: output=da39a3ee5e6b4b0d input=0932c350d633c7de]*/ + + +static int +zstd_contentsize_converter(PyObject *size, unsigned long long *p) +{ + // None means the user indicates the size is unknown. + if (size == Py_None) { + *p = ZSTD_CONTENTSIZE_UNKNOWN; + } + else { + /* ZSTD_CONTENTSIZE_UNKNOWN is 0ULL - 1 + ZSTD_CONTENTSIZE_ERROR is 0ULL - 2 + Users should only pass values < ZSTD_CONTENTSIZE_ERROR */ + unsigned long long pledged_size = PyLong_AsUnsignedLongLong(size); + /* Here we check for (unsigned long long)-1 as a sign of an error in + PyLong_AsUnsignedLongLong */ + if (pledged_size == (unsigned long long)-1 && PyErr_Occurred()) { + *p = ZSTD_CONTENTSIZE_ERROR; + if (PyErr_ExceptionMatches(PyExc_OverflowError)) { + PyErr_Format(PyExc_ValueError, + "size argument should be a positive int less " + "than %ull", ZSTD_CONTENTSIZE_ERROR); + return 0; + } + return 0; + } + if (pledged_size >= ZSTD_CONTENTSIZE_ERROR) { + *p = ZSTD_CONTENTSIZE_ERROR; + PyErr_Format(PyExc_ValueError, + "size argument should be a positive int less " + "than %ull", ZSTD_CONTENTSIZE_ERROR); + return 0; + } + *p = pledged_size; + } + return 1; +} + #include "clinic/compressor.c.h" static int -_zstd_set_c_parameters(ZstdCompressor *self, PyObject *level_or_options, - const char *arg_name, const char* arg_type) +_zstd_set_c_level(ZstdCompressor *self, int level) { - size_t zstd_ret; - _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); - if (mod_state == NULL) { + /* Set integer compression level */ + int min_level = ZSTD_minCLevel(); + int max_level = ZSTD_maxCLevel(); + if (level < min_level || level > max_level) { + PyErr_Format(PyExc_ValueError, + "illegal compression level %d; the valid range is [%d, %d]", + level, min_level, max_level); return -1; } - /* Integer compression level */ - if (PyLong_Check(level_or_options)) { - int level = PyLong_AsInt(level_or_options); - if (level == -1 && PyErr_Occurred()) { - PyErr_Format(PyExc_ValueError, - "Compression level should be an int value between %d and %d.", - ZSTD_minCLevel(), ZSTD_maxCLevel()); - return -1; - } + /* Save for generating ZSTD_CDICT */ + self->compression_level = level; - /* Save for generating ZSTD_CDICT */ - self->compression_level = level; + /* Set compressionLevel to compression context */ + size_t zstd_ret = ZSTD_CCtx_setParameter( + self->cctx, ZSTD_c_compressionLevel, level); - /* Set compressionLevel to compression context */ - zstd_ret = ZSTD_CCtx_setParameter(self->cctx, - ZSTD_c_compressionLevel, - level); + /* Check error */ + if (ZSTD_isError(zstd_ret)) { + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); + set_zstd_error(mod_state, ERR_SET_C_LEVEL, zstd_ret); + return -1; + } + return 0; +} - /* Check error */ - if (ZSTD_isError(zstd_ret)) { - set_zstd_error(mod_state, ERR_SET_C_LEVEL, zstd_ret); +static int +_zstd_set_c_parameters(ZstdCompressor *self, PyObject *options) +{ + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); + if (mod_state == NULL) { + return -1; + } + + if (!PyDict_Check(options)) { + PyErr_Format(PyExc_TypeError, + "ZstdCompressor() argument 'options' must be dict, not %T", + options); + return -1; + } + + Py_ssize_t pos = 0; + PyObject *key, *value; + while (PyDict_Next(options, &pos, &key, &value)) { + /* Check key type */ + if (Py_TYPE(key) == mod_state->DParameter_type) { + PyErr_SetString(PyExc_TypeError, + "compression options dictionary key must not be a " + "DecompressionParameter attribute"); return -1; } - return 0; - } - /* Options dict */ - if (PyDict_Check(level_or_options)) { - PyObject *key, *value; - Py_ssize_t pos = 0; + Py_INCREF(key); + Py_INCREF(value); + int key_v = PyLong_AsInt(key); + Py_DECREF(key); + if (key_v == -1 && PyErr_Occurred()) { + Py_DECREF(value); + return -1; + } - while (PyDict_Next(level_or_options, &pos, &key, &value)) { - /* Check key type */ - if (Py_TYPE(key) == mod_state->DParameter_type) { - PyErr_SetString(PyExc_TypeError, - "Key of compression option dict should " - "NOT be DecompressionParameter."); - return -1; - } + int value_v = PyLong_AsInt(value); + Py_DECREF(value); + if (value_v == -1 && PyErr_Occurred()) { + return -1; + } - int key_v = PyLong_AsInt(key); - if (key_v == -1 && PyErr_Occurred()) { - PyErr_SetString(PyExc_ValueError, - "Key of options dict should be a CompressionParameter attribute."); + if (key_v == ZSTD_c_compressionLevel) { + if (_zstd_set_c_level(self, value_v) < 0) { return -1; } - - // TODO(emmatyping): check bounds when there is a value error here for better - // error message? - int value_v = PyLong_AsInt(value); - if (value_v == -1 && PyErr_Occurred()) { - PyErr_SetString(PyExc_ValueError, - "Value of option dict should be an int."); - return -1; + continue; + } + if (key_v == ZSTD_c_nbWorkers) { + /* From the zstd library docs: + 1. When nbWorkers >= 1, triggers asynchronous mode when + used with ZSTD_compressStream2(). + 2, Default value is `0`, aka "single-threaded mode" : no + worker is spawned, compression is performed inside + caller's thread, all invocations are blocking. */ + if (value_v != 0) { + self->use_multithread = 1; } + } - if (key_v == ZSTD_c_compressionLevel) { - /* Save for generating ZSTD_CDICT */ - self->compression_level = value_v; - } - else if (key_v == ZSTD_c_nbWorkers) { - /* From zstd library doc: - 1. When nbWorkers >= 1, triggers asynchronous mode when - used with ZSTD_compressStream2(). - 2, Default value is `0`, aka "single-threaded mode" : no - worker is spawned, compression is performed inside - caller's thread, all invocations are blocking. */ - if (value_v != 0) { - self->use_multithread = 1; - } - } + /* Set parameter to compression context */ + size_t zstd_ret = ZSTD_CCtx_setParameter(self->cctx, key_v, value_v); - /* Set parameter to compression context */ - zstd_ret = ZSTD_CCtx_setParameter(self->cctx, key_v, value_v); - if (ZSTD_isError(zstd_ret)) { - set_parameter_error(mod_state, 1, key_v, value_v); - return -1; - } + /* Check error */ + if (ZSTD_isError(zstd_ret)) { + set_parameter_error(1, key_v, value_v); + return -1; } - return 0; } - PyErr_Format(PyExc_TypeError, "Invalid type for %s. Expected %s", arg_name, arg_type); - return -1; + return 0; } static void @@ -156,12 +203,12 @@ capsule_free_cdict(PyObject *capsule) ZSTD_CDict * _get_CDict(ZstdDict *self, int compressionLevel) { + assert(PyMutex_IsLocked(&self->lock)); PyObject *level = NULL; - PyObject *capsule; + PyObject *capsule = NULL; ZSTD_CDict *cdict; + int ret; - // TODO(emmatyping): refactor critical section code into a lock_held function - Py_BEGIN_CRITICAL_SECTION(self); /* int level object */ level = PyLong_FromLong(compressionLevel); @@ -170,27 +217,23 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Get PyCapsule object from self->c_dicts */ - capsule = PyDict_GetItemWithError(self->c_dicts, level); + ret = PyDict_GetItemRef(self->c_dicts, level, &capsule); + if (ret < 0) { + goto error; + } if (capsule == NULL) { - if (PyErr_Occurred()) { - goto error; - } - /* Create ZSTD_CDict instance */ - char *dict_buffer = PyBytes_AS_STRING(self->dict_content); - Py_ssize_t dict_len = Py_SIZE(self->dict_content); Py_BEGIN_ALLOW_THREADS - cdict = ZSTD_createCDict(dict_buffer, - dict_len, + cdict = ZSTD_createCDict(self->dict_buffer, self->dict_len, compressionLevel); Py_END_ALLOW_THREADS if (cdict == NULL) { - _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); if (mod_state != NULL) { PyErr_SetString(mod_state->ZstdError, - "Failed to create ZSTD_CDict instance from zstd " - "dictionary content. Maybe the content is corrupted."); + "Failed to create a ZSTD_CDict instance from " + "Zstandard dictionary content."); } goto error; } @@ -203,11 +246,10 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Add PyCapsule object to self->c_dicts */ - if (PyDict_SetItem(self->c_dicts, level, capsule) < 0) { - Py_DECREF(capsule); + ret = PyDict_SetItem(self->c_dicts, level, capsule); + if (ret < 0) { goto error; } - Py_DECREF(capsule); } else { /* ZSTD_CDict instance already exists */ @@ -219,62 +261,15 @@ error: cdict = NULL; success: Py_XDECREF(level); - Py_END_CRITICAL_SECTION(); + Py_XDECREF(capsule); return cdict; } static int -_zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) +_zstd_load_impl(ZstdCompressor *self, ZstdDict *zd, + _zstd_state *mod_state, int type) { - size_t zstd_ret; - _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); - if (mod_state == NULL) { - return -1; - } - ZstdDict *zd; - int type, ret; - - /* Check ZstdDict */ - ret = PyObject_IsInstance(dict, (PyObject*)mod_state->ZstdDict_type); - if (ret < 0) { - return -1; - } - else if (ret > 0) { - /* When compressing, use undigested dictionary by default. */ - zd = (ZstdDict*)dict; - type = DICT_TYPE_UNDIGESTED; - goto load; - } - - /* Check (ZstdDict, type) */ - if (PyTuple_CheckExact(dict) && PyTuple_GET_SIZE(dict) == 2) { - /* Check ZstdDict */ - ret = PyObject_IsInstance(PyTuple_GET_ITEM(dict, 0), - (PyObject*)mod_state->ZstdDict_type); - if (ret < 0) { - return -1; - } - else if (ret > 0) { - /* type == -1 may indicate an error. */ - type = PyLong_AsInt(PyTuple_GET_ITEM(dict, 1)); - if (type == DICT_TYPE_DIGESTED || - type == DICT_TYPE_UNDIGESTED || - type == DICT_TYPE_PREFIX) - { - assert(type >= 0); - zd = (ZstdDict*)PyTuple_GET_ITEM(dict, 0); - goto load; - } - } - } - - /* Wrong type */ - PyErr_SetString(PyExc_TypeError, - "zstd_dict argument should be ZstdDict object."); - return -1; - -load: if (type == DICT_TYPE_DIGESTED) { /* Get ZSTD_CDict */ ZSTD_CDict *c_dict = _get_CDict(zd, self->compression_level); @@ -283,28 +278,18 @@ load: } /* Reference a prepared dictionary. It overrides some compression context's parameters. */ - Py_BEGIN_CRITICAL_SECTION(self); zstd_ret = ZSTD_CCtx_refCDict(self->cctx, c_dict); - Py_END_CRITICAL_SECTION(); } else if (type == DICT_TYPE_UNDIGESTED) { /* Load a dictionary. It doesn't override compression context's parameters. */ - Py_BEGIN_CRITICAL_SECTION2(self, zd); - zstd_ret = ZSTD_CCtx_loadDictionary( - self->cctx, - PyBytes_AS_STRING(zd->dict_content), - Py_SIZE(zd->dict_content)); - Py_END_CRITICAL_SECTION2(); + zstd_ret = ZSTD_CCtx_loadDictionary(self->cctx, zd->dict_buffer, + zd->dict_len); } else if (type == DICT_TYPE_PREFIX) { /* Load a prefix */ - Py_BEGIN_CRITICAL_SECTION2(self, zd); - zstd_ret = ZSTD_CCtx_refPrefix( - self->cctx, - PyBytes_AS_STRING(zd->dict_content), - Py_SIZE(zd->dict_content)); - Py_END_CRITICAL_SECTION2(); + zstd_ret = ZSTD_CCtx_refPrefix(self->cctx, zd->dict_buffer, + zd->dict_len); } else { Py_UNREACHABLE(); @@ -318,24 +303,57 @@ load: return 0; } +static int +_zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) +{ + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); + /* When compressing, use undigested dictionary by default. */ + int type = DICT_TYPE_UNDIGESTED; + ZstdDict *zd = _Py_parse_zstd_dict(mod_state, dict, &type); + if (zd == NULL) { + return -1; + } + int ret; + PyMutex_Lock(&zd->lock); + ret = _zstd_load_impl(self, zd, mod_state, type); + PyMutex_Unlock(&zd->lock); + return ret; +} + +/*[clinic input] +@classmethod +_zstd.ZstdCompressor.__new__ as _zstd_ZstdCompressor_new + level: object = None + The compression level to use. Defaults to COMPRESSION_LEVEL_DEFAULT. + options: object = None + A dict object that contains advanced compression parameters. + zstd_dict: object = None + A ZstdDict object, a pre-trained Zstandard dictionary. + +Create a compressor object for compressing data incrementally. + +Thread-safe at method level. For one-shot compression, use the compress() +function instead. +[clinic start generated code]*/ + static PyObject * -_zstd_ZstdCompressor_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSED(kwargs)) +_zstd_ZstdCompressor_new_impl(PyTypeObject *type, PyObject *level, + PyObject *options, PyObject *zstd_dict) +/*[clinic end generated code: output=cdef61eafecac3d7 input=92de0211ae20ffdc]*/ { - ZstdCompressor *self; - self = PyObject_GC_New(ZstdCompressor, type); + ZstdCompressor* self = PyObject_GC_New(ZstdCompressor, type); if (self == NULL) { goto error; } - self->initialized = 0; - self->dict = NULL; self->use_multithread = 0; - + self->dict = NULL; + self->lock = (PyMutex){0}; /* Compression context */ self->cctx = ZSTD_createCCtx(); if (self->cctx == NULL) { - _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); if (mod_state != NULL) { PyErr_SetString(mod_state->ZstdError, "Unable to create ZSTD_CCtx instance."); @@ -346,12 +364,56 @@ _zstd_ZstdCompressor_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject /* Last mode */ self->last_mode = ZSTD_e_end; + if (level != Py_None && options != Py_None) { + PyErr_SetString(PyExc_TypeError, + "Only one of level or options should be used."); + goto error; + } + + /* Set compression level */ + if (level != Py_None) { + if (!PyLong_Check(level)) { + PyErr_SetString(PyExc_TypeError, + "invalid type for level, expected int"); + goto error; + } + int level_v = PyLong_AsInt(level); + if (level_v == -1 && PyErr_Occurred()) { + if (PyErr_ExceptionMatches(PyExc_OverflowError)) { + PyErr_Format(PyExc_ValueError, + "illegal compression level; the valid range is [%d, %d]", + ZSTD_minCLevel(), ZSTD_maxCLevel()); + } + goto error; + } + if (_zstd_set_c_level(self, level_v) < 0) { + goto error; + } + } + + /* Set options dictionary */ + if (options != Py_None) { + if (_zstd_set_c_parameters(self, options) < 0) { + goto error; + } + } + + /* Load Zstandard dictionary to compression context */ + if (zstd_dict != Py_None) { + if (_zstd_load_c_dict(self, zstd_dict) < 0) { + goto error; + } + Py_INCREF(zstd_dict); + self->dict = zstd_dict; + } + + // We can only start GC tracking once self->dict is set. + PyObject_GC_Track(self); + return (PyObject*)self; error: - if (self != NULL) { - PyObject_GC_Del(self); - } + Py_XDECREF(self); return NULL; } @@ -363,7 +425,11 @@ ZstdCompressor_dealloc(PyObject *ob) PyObject_GC_UnTrack(self); /* Free compression context */ - ZSTD_freeCCtx(self->cctx); + if (self->cctx) { + ZSTD_freeCCtx(self->cctx); + } + + assert(!PyMutex_IsLocked(&self->lock)); /* Py_XDECREF the dict after free the compression context */ Py_CLEAR(self->dict); @@ -373,71 +439,11 @@ ZstdCompressor_dealloc(PyObject *ob) Py_DECREF(tp); } -/*[clinic input] -_zstd.ZstdCompressor.__init__ - - level: object = None - The compression level to use, defaults to ZSTD_CLEVEL_DEFAULT. - options: object = None - A dict object that contains advanced compression parameters. - zstd_dict: object = None - A ZstdDict object, a pre-trained zstd dictionary. - -Create a compressor object for compressing data incrementally. - -Thread-safe at method level. For one-shot compression, use the compress() -function instead. -[clinic start generated code]*/ - -static int -_zstd_ZstdCompressor___init___impl(ZstdCompressor *self, PyObject *level, - PyObject *options, PyObject *zstd_dict) -/*[clinic end generated code: output=215e6c4342732f96 input=9f79b0d8d34c8ef0]*/ -{ - if (self->initialized) { - PyErr_SetString(PyExc_RuntimeError, "reinitialization not supported"); - return -1; - } - self->initialized = 1; - - if (level != Py_None && options != Py_None) { - PyErr_SetString(PyExc_RuntimeError, "Only one of level or options should be used."); - return -1; - } - - /* Set compressLevel/options to compression context */ - if (level != Py_None) { - if (_zstd_set_c_parameters(self, level, "level", "int") < 0) { - return -1; - } - } - - if (options != Py_None) { - if (_zstd_set_c_parameters(self, options, "options", "dict") < 0) { - return -1; - } - } - - /* Load dictionary to compression context */ - if (zstd_dict != Py_None) { - if (_zstd_load_c_dict(self, zstd_dict) < 0) { - return -1; - } - - /* Py_INCREF the dict */ - Py_INCREF(zstd_dict); - self->dict = zstd_dict; - } - - // We can only start tracking self with the GC once self->dict is set. - PyObject_GC_Track(self); - return 0; -} - static PyObject * -compress_impl(ZstdCompressor *self, Py_buffer *data, - ZSTD_EndDirective end_directive) +compress_lock_held(ZstdCompressor *self, Py_buffer *data, + ZSTD_EndDirective end_directive) { + assert(PyMutex_IsLocked(&self->lock)); ZSTD_inBuffer in; ZSTD_outBuffer out; _BlocksOutputBuffer buffer = {.list = NULL}; @@ -464,12 +470,12 @@ compress_impl(ZstdCompressor *self, Py_buffer *data, } if (_OutputBuffer_InitWithSize(&buffer, &out, -1, - (Py_ssize_t) output_buffer_size) < 0) { + (Py_ssize_t) output_buffer_size) < 0) { goto error; } - /* zstd stream compress */ + /* Zstandard stream compress */ while (1) { Py_BEGIN_ALLOW_THREADS zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, end_directive); @@ -477,10 +483,8 @@ compress_impl(ZstdCompressor *self, Py_buffer *data, /* Check error */ if (ZSTD_isError(zstd_ret)) { - _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); - if (mod_state != NULL) { - set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret); - } + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); + set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret); goto error; } @@ -509,15 +513,18 @@ error: return NULL; } +#ifndef NDEBUG static inline int mt_continue_should_break(ZSTD_inBuffer *in, ZSTD_outBuffer *out) { return in->size == in->pos && out->size != out->pos; } +#endif static PyObject * -compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data) +compress_mt_continue_lock_held(ZstdCompressor *self, Py_buffer *data) { + assert(PyMutex_IsLocked(&self->lock)); ZSTD_inBuffer in; ZSTD_outBuffer out; _BlocksOutputBuffer buffer = {.list = NULL}; @@ -533,24 +540,25 @@ compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data) goto error; } - /* zstd stream compress */ + /* Zstandard stream compress */ while (1) { Py_BEGIN_ALLOW_THREADS do { - zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, ZSTD_e_continue); - } while (out.pos != out.size && in.pos != in.size && !ZSTD_isError(zstd_ret)); + zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, + ZSTD_e_continue); + } while (out.pos != out.size + && in.pos != in.size + && !ZSTD_isError(zstd_ret)); Py_END_ALLOW_THREADS /* Check error */ if (ZSTD_isError(zstd_ret)) { - _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); - if (mod_state != NULL) { - set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret); - } + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); + set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret); goto error; } - /* Like compress_impl(), output as much as possible. */ + /* Like compress_lock_held(), output as much as possible. */ if (out.pos == out.size) { if (_OutputBuffer_Grow(&buffer, &out) < 0) { goto error; @@ -609,14 +617,14 @@ _zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data, } /* Thread-safe code */ - Py_BEGIN_CRITICAL_SECTION(self); + PyMutex_Lock(&self->lock); /* Compress */ if (self->use_multithread && mode == ZSTD_e_continue) { - ret = compress_mt_continue_impl(self, data); + ret = compress_mt_continue_lock_held(self, data); } else { - ret = compress_impl(self, data, mode); + ret = compress_lock_held(self, data, mode); } if (ret) { @@ -628,7 +636,7 @@ _zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data, /* Resetting cctx's session never fail */ ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only); } - Py_END_CRITICAL_SECTION(); + PyMutex_Unlock(&self->lock); return ret; } @@ -642,14 +650,14 @@ _zstd.ZstdCompressor.flush Finish the compression process. -Flush any remaining data left in internal buffers. Since zstd data consists -of one or more independent frames, the compressor object can still be used -after this method is called. +Flush any remaining data left in internal buffers. Since Zstandard data +consists of one or more independent frames, the compressor object can still +be used after this method is called. [clinic start generated code]*/ static PyObject * _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode) -/*[clinic end generated code: output=b7cf2c8d64dcf2e3 input=a766870301932b85]*/ +/*[clinic end generated code: output=b7cf2c8d64dcf2e3 input=0ab19627f323cdbc]*/ { PyObject *ret; @@ -663,8 +671,9 @@ _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode) } /* Thread-safe code */ - Py_BEGIN_CRITICAL_SECTION(self); - ret = compress_impl(self, NULL, mode); + PyMutex_Lock(&self->lock); + + ret = compress_lock_held(self, NULL, mode); if (ret) { self->last_mode = mode; @@ -675,26 +684,78 @@ _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode) /* Resetting cctx's session never fail */ ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only); } - Py_END_CRITICAL_SECTION(); + PyMutex_Unlock(&self->lock); return ret; } + +/*[clinic input] +_zstd.ZstdCompressor.set_pledged_input_size + + size: zstd_contentsize + The size of the uncompressed data to be provided to the compressor. + / + +Set the uncompressed content size to be written into the frame header. + +This method can be used to ensure the header of the frame about to be written +includes the size of the data, unless the CompressionParameter.content_size_flag +is set to False. If last_mode != FLUSH_FRAME, then a RuntimeError is raised. + +It is important to ensure that the pledged data size matches the actual data +size. If they do not match the compressed output data may be corrupted and the +final chunk written may be lost. +[clinic start generated code]*/ + +static PyObject * +_zstd_ZstdCompressor_set_pledged_input_size_impl(ZstdCompressor *self, + unsigned long long size) +/*[clinic end generated code: output=3a09e55cc0e3b4f9 input=afd8a7d78cff2eb5]*/ +{ + // Error occured while converting argument, should be unreachable + assert(size != ZSTD_CONTENTSIZE_ERROR); + + /* Thread-safe code */ + PyMutex_Lock(&self->lock); + + /* Check the current mode */ + if (self->last_mode != ZSTD_e_end) { + PyErr_SetString(PyExc_ValueError, + "set_pledged_input_size() method must be called " + "when last_mode == FLUSH_FRAME"); + PyMutex_Unlock(&self->lock); + return NULL; + } + + /* Set pledged content size */ + size_t zstd_ret = ZSTD_CCtx_setPledgedSrcSize(self->cctx, size); + PyMutex_Unlock(&self->lock); + if (ZSTD_isError(zstd_ret)) { + _zstd_state* mod_state = PyType_GetModuleState(Py_TYPE(self)); + set_zstd_error(mod_state, ERR_SET_PLEDGED_INPUT_SIZE, zstd_ret); + return NULL; + } + + Py_RETURN_NONE; +} + static PyMethodDef ZstdCompressor_methods[] = { _ZSTD_ZSTDCOMPRESSOR_COMPRESS_METHODDEF _ZSTD_ZSTDCOMPRESSOR_FLUSH_METHODDEF + _ZSTD_ZSTDCOMPRESSOR_SET_PLEDGED_INPUT_SIZE_METHODDEF {NULL, NULL} }; PyDoc_STRVAR(ZstdCompressor_last_mode_doc, "The last mode used to this compressor object, its value can be .CONTINUE,\n" ".FLUSH_BLOCK, .FLUSH_FRAME. Initialized to .FLUSH_FRAME.\n\n" -"It can be used to get the current state of a compressor, such as, data flushed,\n" -"a frame ended."); +"It can be used to get the current state of a compressor, such as, data\n" +"flushed, or a frame ended."); static PyMemberDef ZstdCompressor_members[] = { {"last_mode", Py_T_INT, offsetof(ZstdCompressor, last_mode), - Py_READONLY, ZstdCompressor_last_mode_doc}, + Py_READONLY, ZstdCompressor_last_mode_doc}, {NULL} }; @@ -717,10 +778,9 @@ ZstdCompressor_clear(PyObject *ob) static PyType_Slot zstdcompressor_slots[] = { {Py_tp_new, _zstd_ZstdCompressor_new}, {Py_tp_dealloc, ZstdCompressor_dealloc}, - {Py_tp_init, _zstd_ZstdCompressor___init__}, {Py_tp_methods, ZstdCompressor_methods}, {Py_tp_members, ZstdCompressor_members}, - {Py_tp_doc, (char*)_zstd_ZstdCompressor___init____doc__}, + {Py_tp_doc, (void *)_zstd_ZstdCompressor_new__doc__}, {Py_tp_traverse, ZstdCompressor_traverse}, {Py_tp_clear, ZstdCompressor_clear}, {0, 0} |