diff options
Diffstat (limited to 'Modules/_interpchannelsmodule.c')
-rw-r--r-- | Modules/_interpchannelsmodule.c | 176 |
1 files changed, 114 insertions, 62 deletions
diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index f9fa1dab291..bfd805bf5e4 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -20,9 +20,11 @@ #endif #define REGISTERS_HEAP_TYPES +#define HAS_FALLBACK #define HAS_UNBOUND_ITEMS #include "_interpreters_common.h" #undef HAS_UNBOUND_ITEMS +#undef HAS_FALLBACK #undef REGISTERS_HEAP_TYPES @@ -523,7 +525,7 @@ typedef struct _channelitem { int64_t interpid; _PyXIData_t *data; _waiting_t *waiting; - int unboundop; + unboundop_t unboundop; struct _channelitem *next; } _channelitem; @@ -536,7 +538,7 @@ _channelitem_ID(_channelitem *item) static void _channelitem_init(_channelitem *item, int64_t interpid, _PyXIData_t *data, - _waiting_t *waiting, int unboundop) + _waiting_t *waiting, unboundop_t unboundop) { if (interpid < 0) { interpid = _get_interpid(data); @@ -583,7 +585,7 @@ _channelitem_clear(_channelitem *item) static _channelitem * _channelitem_new(int64_t interpid, _PyXIData_t *data, - _waiting_t *waiting, int unboundop) + _waiting_t *waiting, unboundop_t unboundop) { _channelitem *item = GLOBAL_MALLOC(_channelitem); if (item == NULL) { @@ -694,7 +696,7 @@ _channelqueue_free(_channelqueue *queue) static int _channelqueue_put(_channelqueue *queue, int64_t interpid, _PyXIData_t *data, - _waiting_t *waiting, int unboundop) + _waiting_t *waiting, unboundop_t unboundop) { _channelitem *item = _channelitem_new(interpid, data, waiting, unboundop); if (item == NULL) { @@ -798,7 +800,7 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid, } queue->count -= 1; - int unboundop; + unboundop_t unboundop; _channelitem_popped(item, p_data, p_waiting, &unboundop); } @@ -1083,16 +1085,18 @@ typedef struct _channel { PyThread_type_lock mutex; _channelqueue *queue; _channelends *ends; - struct { - int unboundop; + struct _channeldefaults { + unboundop_t unboundop; + xidata_fallback_t fallback; } defaults; int open; struct _channel_closing *closing; } _channel_state; static _channel_state * -_channel_new(PyThread_type_lock mutex, int unboundop) +_channel_new(PyThread_type_lock mutex, struct _channeldefaults defaults) { + assert(check_unbound(defaults.unboundop)); _channel_state *chan = GLOBAL_MALLOC(_channel_state); if (chan == NULL) { return NULL; @@ -1109,7 +1113,7 @@ _channel_new(PyThread_type_lock mutex, int unboundop) GLOBAL_FREE(chan); return NULL; } - chan->defaults.unboundop = unboundop; + chan->defaults = defaults; chan->open = 1; chan->closing = NULL; return chan; @@ -1130,7 +1134,7 @@ _channel_free(_channel_state *chan) static int _channel_add(_channel_state *chan, int64_t interpid, - _PyXIData_t *data, _waiting_t *waiting, int unboundop) + _PyXIData_t *data, _waiting_t *waiting, unboundop_t unboundop) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1611,7 +1615,7 @@ done: struct channel_id_and_info { int64_t id; - int unboundop; + struct _channeldefaults defaults; }; static struct channel_id_and_info * @@ -1628,7 +1632,7 @@ _channels_list_all(_channels *channels, int64_t *count) for (int64_t i=0; ref != NULL; ref = ref->next, i++) { ids[i] = (struct channel_id_and_info){ .id = ref->cid, - .unboundop = ref->chan->defaults.unboundop, + .defaults = ref->chan->defaults, }; } *count = channels->numopen; @@ -1714,13 +1718,13 @@ _channel_finish_closing(_channel_state *chan) { // Create a new channel. static int64_t -channel_create(_channels *channels, int unboundop) +channel_create(_channels *channels, struct _channeldefaults defaults) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_CHANNEL_MUTEX_INIT; } - _channel_state *chan = _channel_new(mutex, unboundop); + _channel_state *chan = _channel_new(mutex, defaults); if (chan == NULL) { PyThread_free_lock(mutex); return -1; @@ -1752,7 +1756,7 @@ channel_destroy(_channels *channels, int64_t cid) // Optionally request to be notified when it is received. static int channel_send(_channels *channels, int64_t cid, PyObject *obj, - _waiting_t *waiting, int unboundop) + _waiting_t *waiting, unboundop_t unboundop, xidata_fallback_t fallback) { PyThreadState *tstate = _PyThreadState_GET(); PyInterpreterState *interp = tstate->interp; @@ -1779,7 +1783,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj, PyThread_release_lock(mutex); return -1; } - if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) { + if (_PyObject_GetXIData(tstate, obj, fallback, data) != 0) { PyThread_release_lock(mutex); GLOBAL_FREE(data); return -1; @@ -1823,7 +1827,8 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) // Like channel_send(), but strictly wait for the object to be received. static int channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, - int unboundop, PY_TIMEOUT_T timeout) + unboundop_t unboundop, PY_TIMEOUT_T timeout, + xidata_fallback_t fallback) { // We use a stack variable here, so we must ensure that &waiting // is not held by any channel item at the point this function exits. @@ -1834,7 +1839,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, } /* Queue up the object. */ - int res = channel_send(channels, cid, obj, &waiting, unboundop); + int res = channel_send(channels, cid, obj, &waiting, unboundop, fallback); if (res < 0) { assert(waiting.status == WAITING_NO_STATUS); goto finally; @@ -2006,6 +2011,20 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid, } static int +channel_get_defaults(_channels *channels, int64_t cid, struct _channeldefaults *defaults) +{ + PyThread_type_lock mutex = NULL; + _channel_state *channel = NULL; + int err = _channels_lookup(channels, cid, &mutex, &channel); + if (err != 0) { + return err; + } + *defaults = channel->defaults; + PyThread_release_lock(mutex); + return 0; +} + +static int _channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count) { PyThread_type_lock mutex = NULL; @@ -2881,20 +2900,27 @@ clear_interpreter(void *data) static PyObject * channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"unboundop", NULL}; - int unboundop; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist, - &unboundop)) + static char *kwlist[] = {"unboundop", "fallback", NULL}; + int unboundarg = -1; + int fallbackarg = -1; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ii:create", kwlist, + &unboundarg, &fallbackarg)) { return NULL; } - if (!check_unbound(unboundop)) { - PyErr_Format(PyExc_ValueError, - "unsupported unboundop %d", unboundop); + struct _channeldefaults defaults = {0}; + if (resolve_unboundop(unboundarg, UNBOUND_REPLACE, + &defaults.unboundop) < 0) + { + return NULL; + } + if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK, + &defaults.fallback) < 0) + { return NULL; } - int64_t cid = channel_create(&_globals.channels, unboundop); + int64_t cid = channel_create(&_globals.channels, defaults); if (cid < 0) { (void)handle_channel_error(-1, self, cid); return NULL; @@ -2987,7 +3013,9 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) } assert(cidobj != NULL); - PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop); + PyObject *item = Py_BuildValue("Oii", cidobj, + cur->defaults.unboundop, + cur->defaults.fallback); Py_DECREF(cidobj); if (item == NULL) { Py_SETREF(ids, NULL); @@ -3075,40 +3103,54 @@ receive end."); static PyObject * channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout", - NULL}; + static char *kwlist[] = {"cid", "obj", "unboundop", "fallback", + "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; - int unboundop = UNBOUND_REPLACE; + int unboundarg = -1; + int fallbackarg = -1; int blocking = 1; PyObject *timeout_obj = NULL; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist, + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&O|ii$pO:channel_send", kwlist, channel_id_converter, &cid_data, &obj, - &unboundop, &blocking, &timeout_obj)) + &unboundarg, &fallbackarg, + &blocking, &timeout_obj)) { return NULL; } - if (!check_unbound(unboundop)) { - PyErr_Format(PyExc_ValueError, - "unsupported unboundop %d", unboundop); - return NULL; - } - int64_t cid = cid_data.cid; PY_TIMEOUT_T timeout; if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) { return NULL; } + struct _channeldefaults defaults = {-1, -1}; + if (unboundarg < 0 || fallbackarg < 0) { + int err = channel_get_defaults(&_globals.channels, cid, &defaults); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + } + unboundop_t unboundop; + if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) { + return NULL; + } + xidata_fallback_t fallback; + if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) { + return NULL; + } /* Queue up the object. */ int err = 0; if (blocking) { - err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout); + err = channel_send_wait( + &_globals.channels, cid, obj, unboundop, timeout, fallback); } else { - err = channel_send(&_globals.channels, cid, obj, NULL, unboundop); + err = channel_send( + &_globals.channels, cid, obj, NULL, unboundop, fallback); } if (handle_channel_error(err, self, cid)) { return NULL; @@ -3126,32 +3168,44 @@ By default this waits for the object to be received."); static PyObject * channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout", - NULL}; + static char *kwlist[] = {"cid", "obj", "unboundop", "fallback", + "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; - int unboundop = UNBOUND_REPLACE; - int blocking = 1; + int unboundarg = -1; + int fallbackarg = -1; + int blocking = -1; PyObject *timeout_obj = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O&O|i$pO:channel_send_buffer", kwlist, + "O&O|ii$pO:channel_send_buffer", kwlist, channel_id_converter, &cid_data, &obj, - &unboundop, &blocking, &timeout_obj)) { - return NULL; - } - if (!check_unbound(unboundop)) { - PyErr_Format(PyExc_ValueError, - "unsupported unboundop %d", unboundop); + &unboundarg, &fallbackarg, + &blocking, &timeout_obj)) + { return NULL; } - int64_t cid = cid_data.cid; PY_TIMEOUT_T timeout; if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) { return NULL; } + struct _channeldefaults defaults = {-1, -1}; + if (unboundarg < 0 || fallbackarg < 0) { + int err = channel_get_defaults(&_globals.channels, cid, &defaults); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + } + unboundop_t unboundop; + if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) { + return NULL; + } + xidata_fallback_t fallback; + if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) { + return NULL; + } PyObject *tempobj = PyMemoryView_FromObject(obj); if (tempobj == NULL) { @@ -3162,10 +3216,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) int err = 0; if (blocking) { err = channel_send_wait( - &_globals.channels, cid, tempobj, unboundop, timeout); + &_globals.channels, cid, tempobj, unboundop, timeout, fallback); } else { - err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop); + err = channel_send( + &_globals.channels, cid, tempobj, NULL, unboundop, fallback); } Py_DECREF(tempobj); if (handle_channel_error(err, self, cid)) { @@ -3197,7 +3252,7 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds) cid = cid_data.cid; PyObject *obj = NULL; - int unboundop = 0; + unboundop_t unboundop = 0; int err = channel_recv(&_globals.channels, cid, &obj, &unboundop); if (err == ERR_CHANNEL_EMPTY && dflt != NULL) { // Use the default. @@ -3388,17 +3443,14 @@ channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds) } int64_t cid = cid_data.cid; - PyThread_type_lock mutex = NULL; - _channel_state *channel = NULL; - int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel); + struct _channeldefaults defaults = {0}; + int err = channel_get_defaults(&_globals.channels, cid, &defaults); if (handle_channel_error(err, self, cid)) { return NULL; } - int unboundop = channel->defaults.unboundop; - PyThread_release_lock(mutex); - PyObject *defaults = Py_BuildValue("i", unboundop); - return defaults; + PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback); + return res; } PyDoc_STRVAR(channelsmod_get_channel_defaults_doc, |