aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Modules/_interpqueuesmodule.c
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_interpqueuesmodule.c')
-rw-r--r--Modules/_interpqueuesmodule.c177
1 files changed, 99 insertions, 78 deletions
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index 526249a0e1a..ffc52c8ee74 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -9,9 +9,11 @@
#include "pycore_crossinterp.h" // _PyXIData_t
#define REGISTERS_HEAP_TYPES
+#define HAS_FALLBACK
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
+#undef HAS_FALLBACK
#undef REGISTERS_HEAP_TYPES
@@ -401,14 +403,13 @@ typedef struct _queueitem {
meaning the interpreter has been destroyed. */
int64_t interpid;
_PyXIData_t *data;
- int fmt;
- int unboundop;
+ unboundop_t unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
- int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
+ int64_t interpid, _PyXIData_t *data, unboundop_t unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
@@ -422,7 +423,6 @@ _queueitem_init(_queueitem *item,
*item = (_queueitem){
.interpid = interpid,
.data = data,
- .fmt = fmt,
.unboundop = unboundop,
};
}
@@ -446,14 +446,14 @@ _queueitem_clear(_queueitem *item)
}
static _queueitem *
-_queueitem_new(int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
+_queueitem_new(int64_t interpid, _PyXIData_t *data, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
- _queueitem_init(item, interpid, data, fmt, unboundop);
+ _queueitem_init(item, interpid, data, unboundop);
return item;
}
@@ -476,10 +476,9 @@ _queueitem_free_all(_queueitem *item)
static void
_queueitem_popped(_queueitem *item,
- _PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
+ _PyXIData_t **p_data, unboundop_t *p_unboundop)
{
*p_data = item->data;
- *p_fmt = item->fmt;
*p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
@@ -527,16 +526,16 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
- struct {
- int fmt;
+ struct _queuedefaults {
+ xidata_fallback_t fallback;
int unboundop;
} defaults;
} _queue;
static int
-_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
+_queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
{
- assert(check_unbound(unboundop));
+ assert(check_unbound(defaults.unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
@@ -547,10 +546,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
.items = {
.maxsize = maxsize,
},
- .defaults = {
- .fmt = fmt,
- .unboundop = unboundop,
- },
+ .defaults = defaults,
};
return 0;
}
@@ -631,8 +627,7 @@ _queue_unlock(_queue *queue)
}
static int
-_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
- int fmt, int unboundop)
+_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@@ -648,7 +643,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
return ERR_QUEUE_FULL;
}
- _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
+ _queueitem *item = _queueitem_new(interpid, data, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
@@ -668,8 +663,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
}
static int
-_queue_next(_queue *queue,
- _PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
+_queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@@ -688,7 +682,7 @@ _queue_next(_queue *queue,
}
queue->items.count -= 1;
- _queueitem_popped(item, p_data, p_fmt, p_unboundop);
+ _queueitem_popped(item, p_data, p_unboundop);
_queue_unlock(queue);
return 0;
@@ -1035,8 +1029,7 @@ finally:
struct queue_id_and_info {
int64_t id;
- int fmt;
- int unboundop;
+ struct _queuedefaults defaults;
};
static struct queue_id_and_info *
@@ -1053,8 +1046,7 @@ _queues_list_all(_queues *queues, int64_t *p_count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
- ids[i].fmt = ref->queue->defaults.fmt;
- ids[i].unboundop = ref->queue->defaults.unboundop;
+ ids[i].defaults = ref->queue->defaults;
}
*p_count = queues->count;
@@ -1090,13 +1082,14 @@ _queue_free(_queue *queue)
// Create a new queue.
static int64_t
-queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
+queue_create(_queues *queues, Py_ssize_t maxsize,
+ struct _queuedefaults defaults)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
- int err = _queue_init(queue, maxsize, fmt, unboundop);
+ int err = _queue_init(queue, maxsize, defaults);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
@@ -1125,7 +1118,8 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
-queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
+queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
+ xidata_fallback_t fallback)
{
PyThreadState *tstate = PyThreadState_Get();
@@ -1138,27 +1132,27 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
assert(queue != NULL);
// Convert the object to cross-interpreter data.
- _PyXIData_t *data = _PyXIData_New();
- if (data == NULL) {
+ _PyXIData_t *xidata = _PyXIData_New();
+ if (xidata == NULL) {
_queue_unmark_waiter(queue, queues->mutex);
return -1;
}
- if (_PyObject_GetXIData(tstate, obj, data) != 0) {
+ if (_PyObject_GetXIData(tstate, obj, fallback, xidata) != 0) {
_queue_unmark_waiter(queue, queues->mutex);
- GLOBAL_FREE(data);
+ GLOBAL_FREE(xidata);
return -1;
}
- assert(_PyXIData_INTERPID(data) ==
+ assert(_PyXIData_INTERPID(xidata) ==
PyInterpreterState_GetID(tstate->interp));
// Add the data to the queue.
int64_t interpid = -1; // _queueitem_init() will set it.
- int res = _queue_add(queue, interpid, data, fmt, unboundop);
+ int res = _queue_add(queue, interpid, xidata, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
- (void)_release_xid_data(data, 0);
- GLOBAL_FREE(data);
+ (void)_release_xid_data(xidata, 0);
+ GLOBAL_FREE(xidata);
return res;
}
@@ -1169,7 +1163,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid,
- PyObject **res, int *p_fmt, int *p_unboundop)
+ PyObject **res, int *p_unboundop)
{
int err;
*res = NULL;
@@ -1185,7 +1179,7 @@ queue_get(_queues *queues, int64_t qid,
// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
- err = _queue_next(queue, &data, p_fmt, p_unboundop);
+ err = _queue_next(queue, &data, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
@@ -1217,6 +1211,20 @@ queue_get(_queues *queues, int64_t qid,
}
static int
+queue_get_defaults(_queues *queues, int64_t qid,
+ struct _queuedefaults *p_defaults)
+{
+ _queue *queue = NULL;
+ int err = _queues_lookup(queues, qid, &queue);
+ if (err != 0) {
+ return err;
+ }
+ *p_defaults = queue->defaults;
+ _queue_unmark_waiter(queue, queues->mutex);
+ return 0;
+}
+
+static int
queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
{
_queue *queue = NULL;
@@ -1270,7 +1278,7 @@ set_external_queue_type(module_state *state, PyTypeObject *queue_type)
}
// Add and register the new type.
- if (ensure_xid_class(queue_type, _queueobj_shared) < 0) {
+ if (ensure_xid_class(queue_type, GETDATA(_queueobj_shared)) < 0) {
return -1;
}
state->queue_type = (PyTypeObject *)Py_NewRef(queue_type);
@@ -1474,22 +1482,28 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
+ static char *kwlist[] = {"maxsize", "unboundop", "fallback", NULL};
Py_ssize_t maxsize;
- int fmt;
- int unboundop;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
- &maxsize, &fmt, &unboundop))
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "n|ii:create", kwlist,
+ &maxsize, &unboundarg, &fallbackarg))
{
return NULL;
}
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ struct _queuedefaults defaults = {0};
+ if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
+ &defaults.unboundop) < 0)
+ {
+ return NULL;
+ }
+ if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
+ &defaults.fallback) < 0)
+ {
return NULL;
}
- int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
+ int64_t qid = queue_create(&_globals.queues, maxsize, defaults);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
@@ -1511,7 +1525,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_create_doc,
-"create(maxsize, fmt, unboundop) -> qid\n\
+"create(maxsize, unboundop, fallback) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
@@ -1560,8 +1574,9 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
}
struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
- PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
- cur->unboundop);
+ PyObject *item = Py_BuildValue("Lii", cur->id,
+ cur->defaults.unboundop,
+ cur->defaults.fallback);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
@@ -1575,34 +1590,44 @@ finally:
}
PyDoc_STRVAR(queuesmod_list_all_doc,
-"list_all() -> [(qid, fmt)]\n\
+"list_all() -> [(qid, unboundop, fallback)]\n\
\n\
Return the list of IDs for all queues.\n\
-Each corresponding default format is also included.");
+Each corresponding default unbound op and fallback is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
+ static char *kwlist[] = {"qid", "obj", "unboundop", "fallback", NULL};
qidarg_converter_data qidarg = {0};
PyObject *obj;
- int fmt;
- int unboundop;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
- qidarg_converter, &qidarg, &obj, &fmt,
- &unboundop))
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|ii$p:put", kwlist,
+ qidarg_converter, &qidarg, &obj,
+ &unboundarg, &fallbackarg))
{
return NULL;
}
int64_t qid = qidarg.id;
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ struct _queuedefaults defaults = {-1, -1};
+ if (unboundarg < 0 || fallbackarg < 0) {
+ int err = queue_get_defaults(&_globals.queues, qid, &defaults);
+ if (handle_queue_error(err, self, qid)) {
+ return NULL;
+ }
+ }
+ unboundop_t unboundop;
+ if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+ return NULL;
+ }
+ xidata_fallback_t fallback;
+ if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
return NULL;
}
/* Queue up the object. */
- int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
+ int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
@@ -1612,7 +1637,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_put_doc,
-"put(qid, obj, fmt)\n\
+"put(qid, obj)\n\
\n\
Add the object's data to the queue.");
@@ -1628,27 +1653,26 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
int64_t qid = qidarg.id;
PyObject *obj = NULL;
- int fmt = 0;
int unboundop = 0;
- int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
+ int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
if (obj == NULL) {
- return Py_BuildValue("Oii", Py_None, fmt, unboundop);
+ return Py_BuildValue("Oi", Py_None, unboundop);
}
- PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
+ PyObject *res = Py_BuildValue("OO", obj, Py_None);
Py_DECREF(obj);
return res;
}
PyDoc_STRVAR(queuesmod_get_doc,
-"get(qid) -> (obj, fmt)\n\
+"get(qid) -> (obj, unboundop)\n\
\n\
Return a new object from the data at the front of the queue.\n\
-The object's format is also returned.\n\
+The unbound op is also returned.\n\
\n\
If there is nothing to receive then raise QueueEmpty.");
@@ -1748,17 +1772,14 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
}
int64_t qid = qidarg.id;
- _queue *queue = NULL;
- int err = _queues_lookup(&_globals.queues, qid, &queue);
+ struct _queuedefaults defaults = {0};
+ int err = queue_get_defaults(&_globals.queues, qid, &defaults);
if (handle_queue_error(err, self, qid)) {
return NULL;
}
- int fmt = queue->defaults.fmt;
- int unboundop = queue->defaults.unboundop;
- _queue_unmark_waiter(queue, _globals.queues.mutex);
- PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
- return defaults;
+ PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
+ return res;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,