aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Modules/_interpchannelsmodule.c
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_interpchannelsmodule.c')
-rw-r--r--Modules/_interpchannelsmodule.c176
1 files changed, 114 insertions, 62 deletions
diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c
index f9fa1dab291..bfd805bf5e4 100644
--- a/Modules/_interpchannelsmodule.c
+++ b/Modules/_interpchannelsmodule.c
@@ -20,9 +20,11 @@
#endif
#define REGISTERS_HEAP_TYPES
+#define HAS_FALLBACK
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
+#undef HAS_FALLBACK
#undef REGISTERS_HEAP_TYPES
@@ -523,7 +525,7 @@ typedef struct _channelitem {
int64_t interpid;
_PyXIData_t *data;
_waiting_t *waiting;
- int unboundop;
+ unboundop_t unboundop;
struct _channelitem *next;
} _channelitem;
@@ -536,7 +538,7 @@ _channelitem_ID(_channelitem *item)
static void
_channelitem_init(_channelitem *item,
int64_t interpid, _PyXIData_t *data,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
@@ -583,7 +585,7 @@ _channelitem_clear(_channelitem *item)
static _channelitem *
_channelitem_new(int64_t interpid, _PyXIData_t *data,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop)
{
_channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) {
@@ -694,7 +696,7 @@ _channelqueue_free(_channelqueue *queue)
static int
_channelqueue_put(_channelqueue *queue,
int64_t interpid, _PyXIData_t *data,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop)
{
_channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
if (item == NULL) {
@@ -798,7 +800,7 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
}
queue->count -= 1;
- int unboundop;
+ unboundop_t unboundop;
_channelitem_popped(item, p_data, p_waiting, &unboundop);
}
@@ -1083,16 +1085,18 @@ typedef struct _channel {
PyThread_type_lock mutex;
_channelqueue *queue;
_channelends *ends;
- struct {
- int unboundop;
+ struct _channeldefaults {
+ unboundop_t unboundop;
+ xidata_fallback_t fallback;
} defaults;
int open;
struct _channel_closing *closing;
} _channel_state;
static _channel_state *
-_channel_new(PyThread_type_lock mutex, int unboundop)
+_channel_new(PyThread_type_lock mutex, struct _channeldefaults defaults)
{
+ assert(check_unbound(defaults.unboundop));
_channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) {
return NULL;
@@ -1109,7 +1113,7 @@ _channel_new(PyThread_type_lock mutex, int unboundop)
GLOBAL_FREE(chan);
return NULL;
}
- chan->defaults.unboundop = unboundop;
+ chan->defaults = defaults;
chan->open = 1;
chan->closing = NULL;
return chan;
@@ -1130,7 +1134,7 @@ _channel_free(_channel_state *chan)
static int
_channel_add(_channel_state *chan, int64_t interpid,
- _PyXIData_t *data, _waiting_t *waiting, int unboundop)
+ _PyXIData_t *data, _waiting_t *waiting, unboundop_t unboundop)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1611,7 +1615,7 @@ done:
struct channel_id_and_info {
int64_t id;
- int unboundop;
+ struct _channeldefaults defaults;
};
static struct channel_id_and_info *
@@ -1628,7 +1632,7 @@ _channels_list_all(_channels *channels, int64_t *count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i] = (struct channel_id_and_info){
.id = ref->cid,
- .unboundop = ref->chan->defaults.unboundop,
+ .defaults = ref->chan->defaults,
};
}
*count = channels->numopen;
@@ -1714,13 +1718,13 @@ _channel_finish_closing(_channel_state *chan) {
// Create a new channel.
static int64_t
-channel_create(_channels *channels, int unboundop)
+channel_create(_channels *channels, struct _channeldefaults defaults)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT;
}
- _channel_state *chan = _channel_new(mutex, unboundop);
+ _channel_state *chan = _channel_new(mutex, defaults);
if (chan == NULL) {
PyThread_free_lock(mutex);
return -1;
@@ -1752,7 +1756,7 @@ channel_destroy(_channels *channels, int64_t cid)
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t cid, PyObject *obj,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop, xidata_fallback_t fallback)
{
PyThreadState *tstate = _PyThreadState_GET();
PyInterpreterState *interp = tstate->interp;
@@ -1779,7 +1783,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj,
PyThread_release_lock(mutex);
return -1;
}
- if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
+ if (_PyObject_GetXIData(tstate, obj, fallback, data) != 0) {
PyThread_release_lock(mutex);
GLOBAL_FREE(data);
return -1;
@@ -1823,7 +1827,8 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
// Like channel_send(), but strictly wait for the object to be received.
static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
- int unboundop, PY_TIMEOUT_T timeout)
+ unboundop_t unboundop, PY_TIMEOUT_T timeout,
+ xidata_fallback_t fallback)
{
// We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits.
@@ -1834,7 +1839,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
}
/* Queue up the object. */
- int res = channel_send(channels, cid, obj, &waiting, unboundop);
+ int res = channel_send(channels, cid, obj, &waiting, unboundop, fallback);
if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS);
goto finally;
@@ -2006,6 +2011,20 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
}
static int
+channel_get_defaults(_channels *channels, int64_t cid, struct _channeldefaults *defaults)
+{
+ PyThread_type_lock mutex = NULL;
+ _channel_state *channel = NULL;
+ int err = _channels_lookup(channels, cid, &mutex, &channel);
+ if (err != 0) {
+ return err;
+ }
+ *defaults = channel->defaults;
+ PyThread_release_lock(mutex);
+ return 0;
+}
+
+static int
_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
{
PyThread_type_lock mutex = NULL;
@@ -2881,20 +2900,27 @@ clear_interpreter(void *data)
static PyObject *
channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"unboundop", NULL};
- int unboundop;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
- &unboundop))
+ static char *kwlist[] = {"unboundop", "fallback", NULL};
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ii:create", kwlist,
+ &unboundarg, &fallbackarg))
{
return NULL;
}
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ struct _channeldefaults defaults = {0};
+ if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
+ &defaults.unboundop) < 0)
+ {
+ return NULL;
+ }
+ if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
+ &defaults.fallback) < 0)
+ {
return NULL;
}
- int64_t cid = channel_create(&_globals.channels, unboundop);
+ int64_t cid = channel_create(&_globals.channels, defaults);
if (cid < 0) {
(void)handle_channel_error(-1, self, cid);
return NULL;
@@ -2987,7 +3013,9 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
}
assert(cidobj != NULL);
- PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
+ PyObject *item = Py_BuildValue("Oii", cidobj,
+ cur->defaults.unboundop,
+ cur->defaults.fallback);
Py_DECREF(cidobj);
if (item == NULL) {
Py_SETREF(ids, NULL);
@@ -3075,40 +3103,54 @@ receive end.");
static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
- NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
+ "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
- int unboundop = UNBOUND_REPLACE;
+ int unboundarg = -1;
+ int fallbackarg = -1;
int blocking = 1;
PyObject *timeout_obj = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&O|ii$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj,
- &unboundop, &blocking, &timeout_obj))
+ &unboundarg, &fallbackarg,
+ &blocking, &timeout_obj))
{
return NULL;
}
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
- return NULL;
- }
-
int64_t cid = cid_data.cid;
PY_TIMEOUT_T timeout;
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
+ struct _channeldefaults defaults = {-1, -1};
+ if (unboundarg < 0 || fallbackarg < 0) {
+ int err = channel_get_defaults(&_globals.channels, cid, &defaults);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ }
+ unboundop_t unboundop;
+ if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+ return NULL;
+ }
+ xidata_fallback_t fallback;
+ if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
+ return NULL;
+ }
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
+ err = channel_send_wait(
+ &_globals.channels, cid, obj, unboundop, timeout, fallback);
}
else {
- err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
+ err = channel_send(
+ &_globals.channels, cid, obj, NULL, unboundop, fallback);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
@@ -3126,32 +3168,44 @@ By default this waits for the object to be received.");
static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
- NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
+ "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
- int unboundop = UNBOUND_REPLACE;
- int blocking = 1;
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ int blocking = -1;
PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&O|i$pO:channel_send_buffer", kwlist,
+ "O&O|ii$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj,
- &unboundop, &blocking, &timeout_obj)) {
- return NULL;
- }
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ &unboundarg, &fallbackarg,
+ &blocking, &timeout_obj))
+ {
return NULL;
}
-
int64_t cid = cid_data.cid;
PY_TIMEOUT_T timeout;
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
+ struct _channeldefaults defaults = {-1, -1};
+ if (unboundarg < 0 || fallbackarg < 0) {
+ int err = channel_get_defaults(&_globals.channels, cid, &defaults);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ }
+ unboundop_t unboundop;
+ if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+ return NULL;
+ }
+ xidata_fallback_t fallback;
+ if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
+ return NULL;
+ }
PyObject *tempobj = PyMemoryView_FromObject(obj);
if (tempobj == NULL) {
@@ -3162,10 +3216,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
int err = 0;
if (blocking) {
err = channel_send_wait(
- &_globals.channels, cid, tempobj, unboundop, timeout);
+ &_globals.channels, cid, tempobj, unboundop, timeout, fallback);
}
else {
- err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
+ err = channel_send(
+ &_globals.channels, cid, tempobj, NULL, unboundop, fallback);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
@@ -3197,7 +3252,7 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
cid = cid_data.cid;
PyObject *obj = NULL;
- int unboundop = 0;
+ unboundop_t unboundop = 0;
int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
// Use the default.
@@ -3388,17 +3443,14 @@ channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
}
int64_t cid = cid_data.cid;
- PyThread_type_lock mutex = NULL;
- _channel_state *channel = NULL;
- int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
+ struct _channeldefaults defaults = {0};
+ int err = channel_get_defaults(&_globals.channels, cid, &defaults);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
- int unboundop = channel->defaults.unboundop;
- PyThread_release_lock(mutex);
- PyObject *defaults = Py_BuildValue("i", unboundop);
- return defaults;
+ PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
+ return res;
}
PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,