aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Modules/_zstd/compressor.c
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_zstd/compressor.c')
-rw-r--r--Modules/_zstd/compressor.c707
1 files changed, 707 insertions, 0 deletions
diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c
new file mode 100644
index 00000000000..d0f677be821
--- /dev/null
+++ b/Modules/_zstd/compressor.c
@@ -0,0 +1,707 @@
+/*
+Low level interface to Meta's zstd library for use in the compression.zstd
+Python module.
+*/
+
+/* ZstdCompressor class definitions */
+
+/*[clinic input]
+module _zstd
+class _zstd.ZstdCompressor "ZstdCompressor *" "clinic_state()->ZstdCompressor_type"
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=875bf614798f80cb]*/
+
+
+#ifndef Py_BUILD_CORE_BUILTIN
+# define Py_BUILD_CORE_MODULE 1
+#endif
+
+#include "_zstdmodule.h"
+
+#include "buffer.h"
+
+#include <stddef.h> // offsetof()
+
+
+#define ZstdCompressor_CAST(op) ((ZstdCompressor *)op)
+
+int
+_PyZstd_set_c_parameters(ZstdCompressor *self, PyObject *level_or_options,
+ const char *arg_name, const char* arg_type)
+{
+ size_t zstd_ret;
+ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state == NULL) {
+ return -1;
+ }
+
+ /* Integer compression level */
+ if (PyLong_Check(level_or_options)) {
+ int level = PyLong_AsInt(level_or_options);
+ if (level == -1 && PyErr_Occurred()) {
+ PyErr_Format(PyExc_ValueError,
+ "Compression level should be an int value between %d and %d.",
+ ZSTD_minCLevel(), ZSTD_maxCLevel());
+ return -1;
+ }
+
+ /* Save for generating ZSTD_CDICT */
+ self->compression_level = level;
+
+ /* Set compressionLevel to compression context */
+ zstd_ret = ZSTD_CCtx_setParameter(self->cctx,
+ ZSTD_c_compressionLevel,
+ level);
+
+ /* Check error */
+ if (ZSTD_isError(zstd_ret)) {
+ set_zstd_error(mod_state, ERR_SET_C_LEVEL, zstd_ret);
+ return -1;
+ }
+ return 0;
+ }
+
+ /* Options dict */
+ if (PyDict_Check(level_or_options)) {
+ PyObject *key, *value;
+ Py_ssize_t pos = 0;
+
+ while (PyDict_Next(level_or_options, &pos, &key, &value)) {
+ /* Check key type */
+ if (Py_TYPE(key) == mod_state->DParameter_type) {
+ PyErr_SetString(PyExc_TypeError,
+ "Key of compression option dict should "
+ "NOT be DParameter.");
+ return -1;
+ }
+
+ int key_v = PyLong_AsInt(key);
+ if (key_v == -1 && PyErr_Occurred()) {
+ PyErr_SetString(PyExc_ValueError,
+ "Key of options dict should be a CParameter attribute.");
+ return -1;
+ }
+
+ // TODO(emmatyping): check bounds when there is a value error here for better
+ // error message?
+ int value_v = PyLong_AsInt(value);
+ if (value_v == -1 && PyErr_Occurred()) {
+ PyErr_SetString(PyExc_ValueError,
+ "Value of option dict should be an int.");
+ return -1;
+ }
+
+ if (key_v == ZSTD_c_compressionLevel) {
+ /* Save for generating ZSTD_CDICT */
+ self->compression_level = value_v;
+ }
+ else if (key_v == ZSTD_c_nbWorkers) {
+ /* From zstd library doc:
+ 1. When nbWorkers >= 1, triggers asynchronous mode when
+ used with ZSTD_compressStream2().
+ 2, Default value is `0`, aka "single-threaded mode" : no
+ worker is spawned, compression is performed inside
+ caller's thread, all invocations are blocking. */
+ if (value_v != 0) {
+ self->use_multithread = 1;
+ }
+ }
+
+ /* Set parameter to compression context */
+ zstd_ret = ZSTD_CCtx_setParameter(self->cctx, key_v, value_v);
+ if (ZSTD_isError(zstd_ret)) {
+ set_parameter_error(mod_state, 1, key_v, value_v);
+ return -1;
+ }
+ }
+ return 0;
+ }
+ PyErr_Format(PyExc_TypeError, "Invalid type for %s. Expected %s", arg_name, arg_type);
+ return -1;
+}
+
+static void
+capsule_free_cdict(PyObject *capsule)
+{
+ ZSTD_CDict *cdict = PyCapsule_GetPointer(capsule, NULL);
+ ZSTD_freeCDict(cdict);
+}
+
+ZSTD_CDict *
+_get_CDict(ZstdDict *self, int compressionLevel)
+{
+ PyObject *level = NULL;
+ PyObject *capsule;
+ ZSTD_CDict *cdict;
+
+ // TODO(emmatyping): refactor critical section code into a lock_held function
+ Py_BEGIN_CRITICAL_SECTION(self);
+
+ /* int level object */
+ level = PyLong_FromLong(compressionLevel);
+ if (level == NULL) {
+ goto error;
+ }
+
+ /* Get PyCapsule object from self->c_dicts */
+ capsule = PyDict_GetItemWithError(self->c_dicts, level);
+ if (capsule == NULL) {
+ if (PyErr_Occurred()) {
+ goto error;
+ }
+
+ /* Create ZSTD_CDict instance */
+ char *dict_buffer = PyBytes_AS_STRING(self->dict_content);
+ Py_ssize_t dict_len = Py_SIZE(self->dict_content);
+ Py_BEGIN_ALLOW_THREADS
+ cdict = ZSTD_createCDict(dict_buffer,
+ dict_len,
+ compressionLevel);
+ Py_END_ALLOW_THREADS
+
+ if (cdict == NULL) {
+ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state != NULL) {
+ PyErr_SetString(mod_state->ZstdError,
+ "Failed to create ZSTD_CDict instance from zstd "
+ "dictionary content. Maybe the content is corrupted.");
+ }
+ goto error;
+ }
+
+ /* Put ZSTD_CDict instance into PyCapsule object */
+ capsule = PyCapsule_New(cdict, NULL, capsule_free_cdict);
+ if (capsule == NULL) {
+ ZSTD_freeCDict(cdict);
+ goto error;
+ }
+
+ /* Add PyCapsule object to self->c_dicts */
+ if (PyDict_SetItem(self->c_dicts, level, capsule) < 0) {
+ Py_DECREF(capsule);
+ goto error;
+ }
+ Py_DECREF(capsule);
+ }
+ else {
+ /* ZSTD_CDict instance already exists */
+ cdict = PyCapsule_GetPointer(capsule, NULL);
+ }
+ goto success;
+
+error:
+ cdict = NULL;
+success:
+ Py_XDECREF(level);
+ Py_END_CRITICAL_SECTION();
+ return cdict;
+}
+
+int
+_PyZstd_load_c_dict(ZstdCompressor *self, PyObject *dict) {
+
+ size_t zstd_ret;
+ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state == NULL) {
+ return -1;
+ }
+ ZstdDict *zd;
+ int type, ret;
+
+ /* Check ZstdDict */
+ ret = PyObject_IsInstance(dict, (PyObject*)mod_state->ZstdDict_type);
+ if (ret < 0) {
+ return -1;
+ }
+ else if (ret > 0) {
+ /* When compressing, use undigested dictionary by default. */
+ zd = (ZstdDict*)dict;
+ type = DICT_TYPE_UNDIGESTED;
+ goto load;
+ }
+
+ /* Check (ZstdDict, type) */
+ if (PyTuple_CheckExact(dict) && PyTuple_GET_SIZE(dict) == 2) {
+ /* Check ZstdDict */
+ ret = PyObject_IsInstance(PyTuple_GET_ITEM(dict, 0),
+ (PyObject*)mod_state->ZstdDict_type);
+ if (ret < 0) {
+ return -1;
+ }
+ else if (ret > 0) {
+ /* type == -1 may indicate an error. */
+ type = PyLong_AsInt(PyTuple_GET_ITEM(dict, 1));
+ if (type == DICT_TYPE_DIGESTED ||
+ type == DICT_TYPE_UNDIGESTED ||
+ type == DICT_TYPE_PREFIX)
+ {
+ assert(type >= 0);
+ zd = (ZstdDict*)PyTuple_GET_ITEM(dict, 0);
+ goto load;
+ }
+ }
+ }
+
+ /* Wrong type */
+ PyErr_SetString(PyExc_TypeError,
+ "zstd_dict argument should be ZstdDict object.");
+ return -1;
+
+load:
+ if (type == DICT_TYPE_DIGESTED) {
+ /* Get ZSTD_CDict */
+ ZSTD_CDict *c_dict = _get_CDict(zd, self->compression_level);
+ if (c_dict == NULL) {
+ return -1;
+ }
+ /* Reference a prepared dictionary.
+ It overrides some compression context's parameters. */
+ Py_BEGIN_CRITICAL_SECTION(self);
+ zstd_ret = ZSTD_CCtx_refCDict(self->cctx, c_dict);
+ Py_END_CRITICAL_SECTION();
+ }
+ else if (type == DICT_TYPE_UNDIGESTED) {
+ /* Load a dictionary.
+ It doesn't override compression context's parameters. */
+ Py_BEGIN_CRITICAL_SECTION2(self, zd);
+ zstd_ret = ZSTD_CCtx_loadDictionary(
+ self->cctx,
+ PyBytes_AS_STRING(zd->dict_content),
+ Py_SIZE(zd->dict_content));
+ Py_END_CRITICAL_SECTION2();
+ }
+ else if (type == DICT_TYPE_PREFIX) {
+ /* Load a prefix */
+ Py_BEGIN_CRITICAL_SECTION2(self, zd);
+ zstd_ret = ZSTD_CCtx_refPrefix(
+ self->cctx,
+ PyBytes_AS_STRING(zd->dict_content),
+ Py_SIZE(zd->dict_content));
+ Py_END_CRITICAL_SECTION2();
+ }
+ else {
+ Py_UNREACHABLE();
+ }
+
+ /* Check error */
+ if (ZSTD_isError(zstd_ret)) {
+ set_zstd_error(mod_state, ERR_LOAD_C_DICT, zstd_ret);
+ return -1;
+ }
+ return 0;
+}
+
+#define clinic_state() (get_zstd_state_from_type(type))
+#include "clinic/compressor.c.h"
+#undef clinic_state
+
+static PyObject *
+_zstd_ZstdCompressor_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSED(kwargs))
+{
+ ZstdCompressor *self;
+ self = PyObject_GC_New(ZstdCompressor, type);
+ if (self == NULL) {
+ goto error;
+ }
+
+ self->inited = 0;
+ self->dict = NULL;
+ self->use_multithread = 0;
+
+
+ /* Compression context */
+ self->cctx = ZSTD_createCCtx();
+ if (self->cctx == NULL) {
+ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state != NULL) {
+ PyErr_SetString(mod_state->ZstdError,
+ "Unable to create ZSTD_CCtx instance.");
+ }
+ goto error;
+ }
+
+ /* Last mode */
+ self->last_mode = ZSTD_e_end;
+
+ return (PyObject*)self;
+
+error:
+ if (self != NULL) {
+ PyObject_GC_Del(self);
+ }
+ return NULL;
+}
+
+static void
+ZstdCompressor_dealloc(PyObject *ob)
+{
+ ZstdCompressor *self = ZstdCompressor_CAST(ob);
+
+ PyObject_GC_UnTrack(self);
+
+ /* Free compression context */
+ ZSTD_freeCCtx(self->cctx);
+
+ /* Py_XDECREF the dict after free the compression context */
+ Py_CLEAR(self->dict);
+
+ PyTypeObject *tp = Py_TYPE(self);
+ PyObject_GC_Del(ob);
+ Py_DECREF(tp);
+}
+
+/*[clinic input]
+_zstd.ZstdCompressor.__init__
+
+ level: object = None
+ The compression level to use, defaults to ZSTD_CLEVEL_DEFAULT.
+ options: object = None
+ A dict object that contains advanced compression parameters.
+ zstd_dict: object = None
+ A ZstdDict object, a pre-trained zstd dictionary.
+
+Create a compressor object for compressing data incrementally.
+
+Thread-safe at method level. For one-shot compression, use the compress()
+function instead.
+[clinic start generated code]*/
+
+static int
+_zstd_ZstdCompressor___init___impl(ZstdCompressor *self, PyObject *level,
+ PyObject *options, PyObject *zstd_dict)
+/*[clinic end generated code: output=215e6c4342732f96 input=9f79b0d8d34c8ef0]*/
+{
+ /* Only called once */
+ if (self->inited) {
+ PyErr_SetString(PyExc_RuntimeError, init_twice_msg);
+ return -1;
+ }
+ self->inited = 1;
+
+ if (level != Py_None && options != Py_None) {
+ PyErr_SetString(PyExc_RuntimeError, "Only one of level or options should be used.");
+ return -1;
+ }
+
+ /* Set compressLevel/options to compression context */
+ if (level != Py_None) {
+ if (_PyZstd_set_c_parameters(self, level, "level", "int") < 0) {
+ return -1;
+ }
+ }
+
+ if (options != Py_None) {
+ if (_PyZstd_set_c_parameters(self, options, "options", "dict") < 0) {
+ return -1;
+ }
+ }
+
+ /* Load dictionary to compression context */
+ if (zstd_dict != Py_None) {
+ if (_PyZstd_load_c_dict(self, zstd_dict) < 0) {
+ return -1;
+ }
+
+ /* Py_INCREF the dict */
+ Py_INCREF(zstd_dict);
+ self->dict = zstd_dict;
+ }
+
+ // We can only start tracking self with the GC once self->dict is set.
+ PyObject_GC_Track(self);
+ return 0;
+}
+
+PyObject *
+compress_impl(ZstdCompressor *self, Py_buffer *data,
+ ZSTD_EndDirective end_directive)
+{
+ ZSTD_inBuffer in;
+ ZSTD_outBuffer out;
+ _BlocksOutputBuffer buffer = {.list = NULL};
+ size_t zstd_ret;
+ PyObject *ret;
+
+ /* Prepare input & output buffers */
+ if (data != NULL) {
+ in.src = data->buf;
+ in.size = data->len;
+ in.pos = 0;
+ }
+ else {
+ in.src = &in;
+ in.size = 0;
+ in.pos = 0;
+ }
+
+ /* Calculate output buffer's size */
+ size_t output_buffer_size = ZSTD_compressBound(in.size);
+ if (output_buffer_size > (size_t) PY_SSIZE_T_MAX) {
+ PyErr_NoMemory();
+ goto error;
+ }
+
+ if (_OutputBuffer_InitWithSize(&buffer, &out, -1,
+ (Py_ssize_t) output_buffer_size) < 0) {
+ goto error;
+ }
+
+
+ /* zstd stream compress */
+ while (1) {
+ Py_BEGIN_ALLOW_THREADS
+ zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, end_directive);
+ Py_END_ALLOW_THREADS
+
+ /* Check error */
+ if (ZSTD_isError(zstd_ret)) {
+ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state != NULL) {
+ set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret);
+ }
+ goto error;
+ }
+
+ /* Finished */
+ if (zstd_ret == 0) {
+ break;
+ }
+
+ /* Output buffer should be exhausted, grow the buffer. */
+ assert(out.pos == out.size);
+ if (out.pos == out.size) {
+ if (_OutputBuffer_Grow(&buffer, &out) < 0) {
+ goto error;
+ }
+ }
+ }
+
+ /* Return a bytes object */
+ ret = _OutputBuffer_Finish(&buffer, &out);
+ if (ret != NULL) {
+ return ret;
+ }
+
+error:
+ _OutputBuffer_OnError(&buffer);
+ return NULL;
+}
+
+static PyObject *
+compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data)
+{
+ ZSTD_inBuffer in;
+ ZSTD_outBuffer out;
+ _BlocksOutputBuffer buffer = {.list = NULL};
+ size_t zstd_ret;
+ PyObject *ret;
+
+ /* Prepare input & output buffers */
+ in.src = data->buf;
+ in.size = data->len;
+ in.pos = 0;
+
+ if (_OutputBuffer_InitAndGrow(&buffer, &out, -1) < 0) {
+ goto error;
+ }
+
+ /* zstd stream compress */
+ while (1) {
+ Py_BEGIN_ALLOW_THREADS
+ do {
+ zstd_ret = ZSTD_compressStream2(self->cctx, &out, &in, ZSTD_e_continue);
+ } while (out.pos != out.size && in.pos != in.size && !ZSTD_isError(zstd_ret));
+ Py_END_ALLOW_THREADS
+
+ /* Check error */
+ if (ZSTD_isError(zstd_ret)) {
+ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self));
+ if (mod_state != NULL) {
+ set_zstd_error(mod_state, ERR_COMPRESS, zstd_ret);
+ }
+ goto error;
+ }
+
+ /* Like compress_impl(), output as much as possible. */
+ if (out.pos == out.size) {
+ if (_OutputBuffer_Grow(&buffer, &out) < 0) {
+ goto error;
+ }
+ }
+ else if (in.pos == in.size) {
+ /* Finished */
+ assert(mt_continue_should_break(&in, &out));
+ break;
+ }
+ }
+
+ /* Return a bytes object */
+ ret = _OutputBuffer_Finish(&buffer, &out);
+ if (ret != NULL) {
+ return ret;
+ }
+
+error:
+ _OutputBuffer_OnError(&buffer);
+ return NULL;
+}
+
+/*[clinic input]
+_zstd.ZstdCompressor.compress
+
+ data: Py_buffer
+ mode: int(c_default="ZSTD_e_continue") = ZstdCompressor.CONTINUE
+ Can be these 3 values ZstdCompressor.CONTINUE,
+ ZstdCompressor.FLUSH_BLOCK, ZstdCompressor.FLUSH_FRAME
+
+Provide data to the compressor object.
+
+Return a chunk of compressed data if possible, or b'' otherwise. When you have
+finished providing data to the compressor, call the flush() method to finish
+the compression process.
+[clinic start generated code]*/
+
+static PyObject *
+_zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data,
+ int mode)
+/*[clinic end generated code: output=ed7982d1cf7b4f98 input=ac2c21d180f579ea]*/
+{
+ PyObject *ret;
+
+ /* Check mode value */
+ if (mode != ZSTD_e_continue &&
+ mode != ZSTD_e_flush &&
+ mode != ZSTD_e_end)
+ {
+ PyErr_SetString(PyExc_ValueError,
+ "mode argument wrong value, it should be one of "
+ "ZstdCompressor.CONTINUE, ZstdCompressor.FLUSH_BLOCK, "
+ "ZstdCompressor.FLUSH_FRAME.");
+ return NULL;
+ }
+
+ /* Thread-safe code */
+ Py_BEGIN_CRITICAL_SECTION(self);
+
+ /* Compress */
+ if (self->use_multithread && mode == ZSTD_e_continue) {
+ ret = compress_mt_continue_impl(self, data);
+ }
+ else {
+ ret = compress_impl(self, data, mode);
+ }
+
+ if (ret) {
+ self->last_mode = mode;
+ }
+ else {
+ self->last_mode = ZSTD_e_end;
+
+ /* Resetting cctx's session never fail */
+ ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
+ }
+ Py_END_CRITICAL_SECTION();
+
+ return ret;
+}
+
+/*[clinic input]
+_zstd.ZstdCompressor.flush
+
+ mode: int(c_default="ZSTD_e_end") = ZstdCompressor.FLUSH_FRAME
+ Can be these 2 values ZstdCompressor.FLUSH_FRAME,
+ ZstdCompressor.FLUSH_BLOCK
+
+Finish the compression process.
+
+Flush any remaining data left in internal buffers. Since zstd data consists
+of one or more independent frames, the compressor object can still be used
+after this method is called.
+[clinic start generated code]*/
+
+static PyObject *
+_zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode)
+/*[clinic end generated code: output=b7cf2c8d64dcf2e3 input=a766870301932b85]*/
+{
+ PyObject *ret;
+
+ /* Check mode value */
+ if (mode != ZSTD_e_end && mode != ZSTD_e_flush) {
+ PyErr_SetString(PyExc_ValueError,
+ "mode argument wrong value, it should be "
+ "ZstdCompressor.FLUSH_FRAME or "
+ "ZstdCompressor.FLUSH_BLOCK.");
+ return NULL;
+ }
+
+ /* Thread-safe code */
+ Py_BEGIN_CRITICAL_SECTION(self);
+ ret = compress_impl(self, NULL, mode);
+
+ if (ret) {
+ self->last_mode = mode;
+ }
+ else {
+ self->last_mode = ZSTD_e_end;
+
+ /* Resetting cctx's session never fail */
+ ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
+ }
+ Py_END_CRITICAL_SECTION();
+
+ return ret;
+}
+
+static PyMethodDef ZstdCompressor_methods[] = {
+ _ZSTD_ZSTDCOMPRESSOR_COMPRESS_METHODDEF
+ _ZSTD_ZSTDCOMPRESSOR_FLUSH_METHODDEF
+
+ {0}
+};
+
+PyDoc_STRVAR(ZstdCompressor_last_mode_doc,
+"The last mode used to this compressor object, its value can be .CONTINUE,\n"
+".FLUSH_BLOCK, .FLUSH_FRAME. Initialized to .FLUSH_FRAME.\n\n"
+"It can be used to get the current state of a compressor, such as, data flushed,\n"
+"a frame ended.");
+
+static PyMemberDef ZstdCompressor_members[] = {
+ {"last_mode", Py_T_INT, offsetof(ZstdCompressor, last_mode),
+ Py_READONLY, ZstdCompressor_last_mode_doc},
+ {0}
+};
+
+static int
+ZstdCompressor_traverse(PyObject *ob, visitproc visit, void *arg)
+{
+ ZstdCompressor *self = ZstdCompressor_CAST(ob);
+ Py_VISIT(self->dict);
+ return 0;
+}
+
+static int
+ZstdCompressor_clear(PyObject *ob)
+{
+ ZstdCompressor *self = ZstdCompressor_CAST(ob);
+ Py_CLEAR(self->dict);
+ return 0;
+}
+
+static PyType_Slot zstdcompressor_slots[] = {
+ {Py_tp_new, _zstd_ZstdCompressor_new},
+ {Py_tp_dealloc, ZstdCompressor_dealloc},
+ {Py_tp_init, _zstd_ZstdCompressor___init__},
+ {Py_tp_methods, ZstdCompressor_methods},
+ {Py_tp_members, ZstdCompressor_members},
+ {Py_tp_doc, (char*)_zstd_ZstdCompressor___init____doc__},
+ {Py_tp_traverse, ZstdCompressor_traverse},
+ {Py_tp_clear, ZstdCompressor_clear},
+ {0}
+};
+
+PyType_Spec zstdcompressor_type_spec = {
+ .name = "_zstd.ZstdCompressor",
+ .basicsize = sizeof(ZstdCompressor),
+ .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC,
+ .slots = zstdcompressor_slots,
+};