diff options
Diffstat (limited to 'py/stream.c')
-rw-r--r-- | py/stream.c | 103 |
1 files changed, 72 insertions, 31 deletions
diff --git a/py/stream.c b/py/stream.c index a3df1b8fdd..9b1d5fd2de 100644 --- a/py/stream.c +++ b/py/stream.c @@ -49,6 +49,48 @@ STATIC mp_obj_t stream_readall(mp_obj_t self_in); #define STREAM_CONTENT_TYPE(stream) (((stream)->is_text) ? &mp_type_str : &mp_type_bytes) +// Returns error condition in *errcode, if non-zero, return value is number of bytes written +// before error condition occured. If *errcode == 0, returns total bytes written (which will +// be equal to input size). +mp_uint_t mp_stream_rw(mp_obj_t stream, void *buf_, mp_uint_t size, int *errcode, byte flags) { + byte *buf = buf_; + mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream); + typedef mp_uint_t (*io_func_t)(mp_obj_t obj, void *buf, mp_uint_t size, int *errcode); + io_func_t io_func; + if (flags & MP_STREAM_RW_WRITE) { + io_func = (io_func_t)s->type->stream_p->write; + } else { + io_func = s->type->stream_p->read; + } + + *errcode = 0; + mp_uint_t done = 0; + while (size > 0) { + mp_uint_t out_sz = io_func(stream, buf, size, errcode); + // For read, out_sz == 0 means EOF. For write, it's unspecified + // what it means, but we don't make any progress, so returning + // is still the best option. + if (out_sz == 0) { + return done; + } + if (out_sz == MP_STREAM_ERROR) { + // If we read something before getting EAGAIN, don't leak it + if (mp_is_nonblocking_error(*errcode) && done != 0) { + *errcode = 0; + } + return done; + } + if (flags & MP_STREAM_RW_ONCE) { + return out_sz; + } + + buf += out_sz; + size -= out_sz; + done += out_sz; + } + return done; +} + const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) { mp_obj_base_t *o = (mp_obj_base_t*)MP_OBJ_TO_PTR(self_in); const mp_stream_p_t *stream_p = o->type->stream_p; @@ -62,7 +104,7 @@ const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) { return stream_p; } -STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) { +STATIC mp_obj_t stream_read_generic(size_t n_args, const mp_obj_t *args, byte flags) { const mp_stream_p_t *stream_p = mp_get_stream_raise(args[0], MP_STREAM_OP_READ); // What to do if sz < -1? Python docs don't specify this case. @@ -94,8 +136,8 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) { nlr_raise(mp_obj_new_exception_msg_varg(&mp_type_MemoryError, "out of memory")); } int error; - mp_uint_t out_sz = stream_p->read(args[0], p, more_bytes, &error); - if (out_sz == MP_STREAM_ERROR) { + mp_uint_t out_sz = mp_stream_read_exactly(args[0], p, more_bytes, &error); + if (error != 0) { vstr_cut_tail_bytes(&vstr, more_bytes); if (mp_is_nonblocking_error(error)) { // With non-blocking streams, we read as much as we can. @@ -165,8 +207,8 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) { vstr_t vstr; vstr_init_len(&vstr, sz); int error; - mp_uint_t out_sz = stream_p->read(args[0], vstr.buf, sz, &error); - if (out_sz == MP_STREAM_ERROR) { + mp_uint_t out_sz = mp_stream_rw(args[0], vstr.buf, sz, &error, flags); + if (error != 0) { vstr_clear(&vstr); if (mp_is_nonblocking_error(error)) { // https://docs.python.org/3.4/library/io.html#io.RawIOBase.read @@ -182,20 +224,27 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) { return mp_obj_new_str_from_vstr(STREAM_CONTENT_TYPE(stream_p), &vstr); } } + +STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) { + return stream_read_generic(n_args, args, MP_STREAM_RW_READ); +} MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_stream_read_obj, 1, 2, stream_read); -mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len) { - const mp_stream_p_t *stream_p = mp_get_stream_raise(self_in, MP_STREAM_OP_WRITE); +STATIC mp_obj_t stream_read1(size_t n_args, const mp_obj_t *args) { + return stream_read_generic(n_args, args, MP_STREAM_RW_READ | MP_STREAM_RW_ONCE); +} +MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_stream_read1_obj, 1, 2, stream_read1); + +mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len, byte flags) { + mp_get_stream_raise(self_in, MP_STREAM_OP_WRITE); int error; - mp_uint_t out_sz = stream_p->write(self_in, buf, len, &error); - if (out_sz == MP_STREAM_ERROR) { + mp_uint_t out_sz = mp_stream_rw(self_in, (void*)buf, len, &error, flags); + if (error != 0) { if (mp_is_nonblocking_error(error)) { // http://docs.python.org/3/library/io.html#io.RawIOBase.write // "None is returned if the raw stream is set not to block and // no single byte could be readily written to it." - // This is for consistency with read() behavior, still weird, - // see abobe. return mp_const_none; } nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(error))); @@ -206,33 +255,25 @@ mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len) { // XXX hack void mp_stream_write_adaptor(void *self, const char *buf, size_t len) { - mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len); -} - -// Works only with blocking streams -mp_uint_t mp_stream_writeall(mp_obj_t stream, const byte *buf, mp_uint_t size, int *errcode) { - mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream); - mp_uint_t org_size = size; - while (size > 0) { - mp_uint_t out_sz = s->type->stream_p->write(stream, buf, size, errcode); - if (out_sz == MP_STREAM_ERROR) { - return MP_STREAM_ERROR; - } - buf += out_sz; - size -= out_sz; - } - return org_size; + mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len, MP_STREAM_RW_WRITE); } STATIC mp_obj_t stream_write_method(mp_obj_t self_in, mp_obj_t arg) { mp_buffer_info_t bufinfo; mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ); - return mp_stream_write(self_in, bufinfo.buf, bufinfo.len); + return mp_stream_write(self_in, bufinfo.buf, bufinfo.len, MP_STREAM_RW_WRITE); } MP_DEFINE_CONST_FUN_OBJ_2(mp_stream_write_obj, stream_write_method); +STATIC mp_obj_t stream_write1_method(mp_obj_t self_in, mp_obj_t arg) { + mp_buffer_info_t bufinfo; + mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ); + return mp_stream_write(self_in, bufinfo.buf, bufinfo.len, MP_STREAM_RW_WRITE | MP_STREAM_RW_ONCE); +} +MP_DEFINE_CONST_FUN_OBJ_2(mp_stream_write1_obj, stream_write1_method); + STATIC mp_obj_t stream_readinto(size_t n_args, const mp_obj_t *args) { - const mp_stream_p_t *stream_p = mp_get_stream_raise(args[0], MP_STREAM_OP_READ); + mp_get_stream_raise(args[0], MP_STREAM_OP_READ); mp_buffer_info_t bufinfo; mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_WRITE); @@ -248,8 +289,8 @@ STATIC mp_obj_t stream_readinto(size_t n_args, const mp_obj_t *args) { } int error; - mp_uint_t out_sz = stream_p->read(args[0], bufinfo.buf, len, &error); - if (out_sz == MP_STREAM_ERROR) { + mp_uint_t out_sz = mp_stream_read_exactly(args[0], bufinfo.buf, len, &error); + if (error != 0) { if (mp_is_nonblocking_error(error)) { return mp_const_none; } |