diff options
Diffstat (limited to 'Modules/_interpqueuesmodule.c')
-rw-r--r-- | Modules/_interpqueuesmodule.c | 177 |
1 files changed, 99 insertions, 78 deletions
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 526249a0e1a..ffc52c8ee74 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -9,9 +9,11 @@ #include "pycore_crossinterp.h" // _PyXIData_t #define REGISTERS_HEAP_TYPES +#define HAS_FALLBACK #define HAS_UNBOUND_ITEMS #include "_interpreters_common.h" #undef HAS_UNBOUND_ITEMS +#undef HAS_FALLBACK #undef REGISTERS_HEAP_TYPES @@ -401,14 +403,13 @@ typedef struct _queueitem { meaning the interpreter has been destroyed. */ int64_t interpid; _PyXIData_t *data; - int fmt; - int unboundop; + unboundop_t unboundop; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, - int64_t interpid, _PyXIData_t *data, int fmt, int unboundop) + int64_t interpid, _PyXIData_t *data, unboundop_t unboundop) { if (interpid < 0) { interpid = _get_interpid(data); @@ -422,7 +423,6 @@ _queueitem_init(_queueitem *item, *item = (_queueitem){ .interpid = interpid, .data = data, - .fmt = fmt, .unboundop = unboundop, }; } @@ -446,14 +446,14 @@ _queueitem_clear(_queueitem *item) } static _queueitem * -_queueitem_new(int64_t interpid, _PyXIData_t *data, int fmt, int unboundop) +_queueitem_new(int64_t interpid, _PyXIData_t *data, int unboundop) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, interpid, data, fmt, unboundop); + _queueitem_init(item, interpid, data, unboundop); return item; } @@ -476,10 +476,9 @@ _queueitem_free_all(_queueitem *item) static void _queueitem_popped(_queueitem *item, - _PyXIData_t **p_data, int *p_fmt, int *p_unboundop) + _PyXIData_t **p_data, unboundop_t *p_unboundop) { *p_data = item->data; - *p_fmt = item->fmt; *p_unboundop = item->unboundop; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; @@ -527,16 +526,16 @@ typedef struct _queue { _queueitem *first; _queueitem *last; } items; - struct { - int fmt; + struct _queuedefaults { + xidata_fallback_t fallback; int unboundop; } defaults; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) +_queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults) { - assert(check_unbound(unboundop)); + assert(check_unbound(defaults.unboundop)); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUE_ALLOC; @@ -547,10 +546,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) .items = { .maxsize = maxsize, }, - .defaults = { - .fmt = fmt, - .unboundop = unboundop, - }, + .defaults = defaults, }; return 0; } @@ -631,8 +627,7 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, - int fmt, int unboundop) +_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -648,7 +643,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop); + _queueitem *item = _queueitem_new(interpid, data, unboundop); if (item == NULL) { _queue_unlock(queue); return -1; @@ -668,8 +663,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, } static int -_queue_next(_queue *queue, - _PyXIData_t **p_data, int *p_fmt, int *p_unboundop) +_queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -688,7 +682,7 @@ _queue_next(_queue *queue, } queue->items.count -= 1; - _queueitem_popped(item, p_data, p_fmt, p_unboundop); + _queueitem_popped(item, p_data, p_unboundop); _queue_unlock(queue); return 0; @@ -1035,8 +1029,7 @@ finally: struct queue_id_and_info { int64_t id; - int fmt; - int unboundop; + struct _queuedefaults defaults; }; static struct queue_id_and_info * @@ -1053,8 +1046,7 @@ _queues_list_all(_queues *queues, int64_t *p_count) for (int64_t i=0; ref != NULL; ref = ref->next, i++) { ids[i].id = ref->qid; assert(ref->queue != NULL); - ids[i].fmt = ref->queue->defaults.fmt; - ids[i].unboundop = ref->queue->defaults.unboundop; + ids[i].defaults = ref->queue->defaults; } *p_count = queues->count; @@ -1090,13 +1082,14 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop) +queue_create(_queues *queues, Py_ssize_t maxsize, + struct _queuedefaults defaults) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize, fmt, unboundop); + int err = _queue_init(queue, maxsize, defaults); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -1125,7 +1118,8 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) +queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop, + xidata_fallback_t fallback) { PyThreadState *tstate = PyThreadState_Get(); @@ -1138,27 +1132,27 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) assert(queue != NULL); // Convert the object to cross-interpreter data. - _PyXIData_t *data = _PyXIData_New(); - if (data == NULL) { + _PyXIData_t *xidata = _PyXIData_New(); + if (xidata == NULL) { _queue_unmark_waiter(queue, queues->mutex); return -1; } - if (_PyObject_GetXIData(tstate, obj, data) != 0) { + if (_PyObject_GetXIData(tstate, obj, fallback, xidata) != 0) { _queue_unmark_waiter(queue, queues->mutex); - GLOBAL_FREE(data); + GLOBAL_FREE(xidata); return -1; } - assert(_PyXIData_INTERPID(data) == + assert(_PyXIData_INTERPID(xidata) == PyInterpreterState_GetID(tstate->interp)); // Add the data to the queue. int64_t interpid = -1; // _queueitem_init() will set it. - int res = _queue_add(queue, interpid, data, fmt, unboundop); + int res = _queue_add(queue, interpid, xidata, unboundop); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: - (void)_release_xid_data(data, 0); - GLOBAL_FREE(data); + (void)_release_xid_data(xidata, 0); + GLOBAL_FREE(xidata); return res; } @@ -1169,7 +1163,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) // XXX Support a "wait" mutex? static int queue_get(_queues *queues, int64_t qid, - PyObject **res, int *p_fmt, int *p_unboundop) + PyObject **res, int *p_unboundop) { int err; *res = NULL; @@ -1185,7 +1179,7 @@ queue_get(_queues *queues, int64_t qid, // Pop off the next item from the queue. _PyXIData_t *data = NULL; - err = _queue_next(queue, &data, p_fmt, p_unboundop); + err = _queue_next(queue, &data, p_unboundop); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1217,6 +1211,20 @@ queue_get(_queues *queues, int64_t qid, } static int +queue_get_defaults(_queues *queues, int64_t qid, + struct _queuedefaults *p_defaults) +{ + _queue *queue = NULL; + int err = _queues_lookup(queues, qid, &queue); + if (err != 0) { + return err; + } + *p_defaults = queue->defaults; + _queue_unmark_waiter(queue, queues->mutex); + return 0; +} + +static int queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize) { _queue *queue = NULL; @@ -1270,7 +1278,7 @@ set_external_queue_type(module_state *state, PyTypeObject *queue_type) } // Add and register the new type. - if (ensure_xid_class(queue_type, _queueobj_shared) < 0) { + if (ensure_xid_class(queue_type, GETDATA(_queueobj_shared)) < 0) { return -1; } state->queue_type = (PyTypeObject *)Py_NewRef(queue_type); @@ -1474,22 +1482,28 @@ qidarg_converter(PyObject *arg, void *ptr) static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL}; + static char *kwlist[] = {"maxsize", "unboundop", "fallback", NULL}; Py_ssize_t maxsize; - int fmt; - int unboundop; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist, - &maxsize, &fmt, &unboundop)) + int unboundarg = -1; + int fallbackarg = -1; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "n|ii:create", kwlist, + &maxsize, &unboundarg, &fallbackarg)) { return NULL; } - if (!check_unbound(unboundop)) { - PyErr_Format(PyExc_ValueError, - "unsupported unboundop %d", unboundop); + struct _queuedefaults defaults = {0}; + if (resolve_unboundop(unboundarg, UNBOUND_REPLACE, + &defaults.unboundop) < 0) + { + return NULL; + } + if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK, + &defaults.fallback) < 0) + { return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop); + int64_t qid = queue_create(&_globals.queues, maxsize, defaults); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1511,7 +1525,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_create_doc, -"create(maxsize, fmt, unboundop) -> qid\n\ +"create(maxsize, unboundop, fallback) -> qid\n\ \n\ Create a new cross-interpreter queue and return its unique generated ID.\n\ It is a new reference as though bind() had been called on the queue.\n\ @@ -1560,8 +1574,9 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) } struct queue_id_and_info *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt, - cur->unboundop); + PyObject *item = Py_BuildValue("Lii", cur->id, + cur->defaults.unboundop, + cur->defaults.fallback); if (item == NULL) { Py_SETREF(ids, NULL); break; @@ -1575,34 +1590,44 @@ finally: } PyDoc_STRVAR(queuesmod_list_all_doc, -"list_all() -> [(qid, fmt)]\n\ +"list_all() -> [(qid, unboundop, fallback)]\n\ \n\ Return the list of IDs for all queues.\n\ -Each corresponding default format is also included."); +Each corresponding default unbound op and fallback is also included."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL}; + static char *kwlist[] = {"qid", "obj", "unboundop", "fallback", NULL}; qidarg_converter_data qidarg = {0}; PyObject *obj; - int fmt; - int unboundop; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist, - qidarg_converter, &qidarg, &obj, &fmt, - &unboundop)) + int unboundarg = -1; + int fallbackarg = -1; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|ii$p:put", kwlist, + qidarg_converter, &qidarg, &obj, + &unboundarg, &fallbackarg)) { return NULL; } int64_t qid = qidarg.id; - if (!check_unbound(unboundop)) { - PyErr_Format(PyExc_ValueError, - "unsupported unboundop %d", unboundop); + struct _queuedefaults defaults = {-1, -1}; + if (unboundarg < 0 || fallbackarg < 0) { + int err = queue_get_defaults(&_globals.queues, qid, &defaults); + if (handle_queue_error(err, self, qid)) { + return NULL; + } + } + unboundop_t unboundop; + if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) { + return NULL; + } + xidata_fallback_t fallback; + if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) { return NULL; } /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop); + int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback); // This is the only place that raises QueueFull. if (handle_queue_error(err, self, qid)) { return NULL; @@ -1612,7 +1637,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_put_doc, -"put(qid, obj, fmt)\n\ +"put(qid, obj)\n\ \n\ Add the object's data to the queue."); @@ -1628,27 +1653,26 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) int64_t qid = qidarg.id; PyObject *obj = NULL; - int fmt = 0; int unboundop = 0; - int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop); + int err = queue_get(&_globals.queues, qid, &obj, &unboundop); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, self, qid)) { return NULL; } if (obj == NULL) { - return Py_BuildValue("Oii", Py_None, fmt, unboundop); + return Py_BuildValue("Oi", Py_None, unboundop); } - PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None); + PyObject *res = Py_BuildValue("OO", obj, Py_None); Py_DECREF(obj); return res; } PyDoc_STRVAR(queuesmod_get_doc, -"get(qid) -> (obj, fmt)\n\ +"get(qid) -> (obj, unboundop)\n\ \n\ Return a new object from the data at the front of the queue.\n\ -The object's format is also returned.\n\ +The unbound op is also returned.\n\ \n\ If there is nothing to receive then raise QueueEmpty."); @@ -1748,17 +1772,14 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds) } int64_t qid = qidarg.id; - _queue *queue = NULL; - int err = _queues_lookup(&_globals.queues, qid, &queue); + struct _queuedefaults defaults = {0}; + int err = queue_get_defaults(&_globals.queues, qid, &defaults); if (handle_queue_error(err, self, qid)) { return NULL; } - int fmt = queue->defaults.fmt; - int unboundop = queue->defaults.unboundop; - _queue_unmark_waiter(queue, _globals.queues.mutex); - PyObject *defaults = Py_BuildValue("ii", fmt, unboundop); - return defaults; + PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback); + return res; } PyDoc_STRVAR(queuesmod_get_queue_defaults_doc, |