diff --git a/CHANGELOG.md b/CHANGELOG.md index 67c633e..054b292 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +- Add `#prep_timeout` and `AsyncOp` + # 2024-11-14 Version 0.5 - Add `#waitpid` diff --git a/ext/um/um.c b/ext/um/um.c index cff803c..a4877b9 100644 --- a/ext/um/um.c +++ b/ext/um/um.c @@ -75,7 +75,7 @@ static inline void um_process_cqe(struct um *machine, struct io_uring_cqe *cqe) if (unlikely((cqe->res == -ECANCELED) && (op->flags & OP_F_IGNORE_CANCELED))) return; op->flags |= OP_F_COMPLETED; - if (unlikely(op->flags & OP_F_TRANSIENT)) + if (op->flags & OP_F_TRANSIENT) um_op_transient_remove(machine, op); if (op->flags & OP_F_MULTISHOT) { @@ -88,7 +88,8 @@ static inline void um_process_cqe(struct um *machine, struct io_uring_cqe *cqe) op->result.flags = cqe->flags; } - um_runqueue_push(machine, op); + if (!(op->flags & OP_F_ASYNC)) + um_runqueue_push(machine, op); } // copied from liburing/queue.c @@ -180,7 +181,7 @@ inline VALUE um_fiber_switch(struct um *machine) { } } -static inline void um_submit_cancel_op(struct um *machine, struct um_op *op) { +void um_submit_cancel_op(struct um *machine, struct um_op *op) { struct io_uring_sqe *sqe = um_get_sqe(machine, NULL); io_uring_prep_cancel64(sqe, (long long)op, 0); } @@ -260,7 +261,7 @@ VALUE um_timeout(struct um *machine, VALUE interval, VALUE class) { static ID ID_new = 0; if (!ID_new) ID_new = rb_intern("new"); - struct um_op *op = malloc(sizeof(struct um_op)); + struct um_op *op = um_op_alloc(machine); um_prep_op(machine, op, OP_TIMEOUT); op->ts = um_double_to_timespec(NUM2DBL(interval)); RB_OBJ_WRITE(machine->self, &op->fiber, rb_fiber_current()); diff --git a/ext/um/um.h b/ext/um/um.h index 22f0e5c..cf5edb0 100644 --- a/ext/um/um.h +++ b/ext/um/um.h @@ -48,8 +48,9 @@ enum op_kind { #define OP_F_COMPLETED (1U << 0) #define OP_F_TRANSIENT (1U << 1) -#define OP_F_IGNORE_CANCELED (1U << 2) -#define OP_F_MULTISHOT (1U << 3) +#define OP_F_ASYNC (1U << 2) +#define OP_F_IGNORE_CANCELED (1U << 3) +#define OP_F_MULTISHOT (1U << 4) struct um_op_result { __s32 res; @@ -66,6 +67,7 @@ struct um_op { VALUE fiber; VALUE value; + VALUE async_op; struct um_op_result result; struct um_op_result *multishot_result_tail; @@ -136,10 +138,19 @@ struct um_queue { uint32_t count; }; +struct um_async_op { + VALUE self; + + struct um *machine; + struct um_op *op; +}; + extern VALUE cUM; extern VALUE cMutex; extern VALUE cQueue; +extern VALUE cAsyncOp; +struct um *um_get_machine(VALUE self); void um_setup(VALUE self, struct um *machine); void um_teardown(struct um *machine); @@ -178,6 +189,7 @@ struct io_uring_sqe *um_get_sqe(struct um *machine, struct um_op *op); VALUE um_fiber_switch(struct um *machine); VALUE um_await(struct um *machine); +void um_submit_cancel_op(struct um *machine, struct um_op *op); void um_cancel_and_wait(struct um *machine, struct um_op *op); int um_check_completion(struct um *machine, struct um_op *op); @@ -206,6 +218,12 @@ VALUE um_listen(struct um *machine, int fd, int backlog); VALUE um_getsockopt(struct um *machine, int fd, int level, int opt); VALUE um_setsockopt(struct um *machine, int fd, int level, int opt, int value); +void um_async_op_set(VALUE self, struct um *machine, struct um_op *op); +VALUE um_async_op_await(struct um_async_op *async_op); +void um_async_op_cancel(struct um_async_op *async_op); + +VALUE um_prep_timeout(struct um *machine, double interval); + struct um_mutex *Mutex_data(VALUE self); struct um_queue *Queue_data(VALUE self); diff --git a/ext/um/um_async_op.c b/ext/um/um_async_op.c new file mode 100644 index 0000000..48b1763 --- /dev/null +++ b/ext/um/um_async_op.c @@ -0,0 +1,40 @@ +#include "um.h" +#include + +VALUE um_prep_timeout(struct um *machine, double interval) { + static ID ID_new = 0; + if (!ID_new) ID_new = rb_intern("new"); + + struct um_op *op = malloc(sizeof(struct um_op)); + um_prep_op(machine, op, OP_TIMEOUT); + op->ts = um_double_to_timespec(interval); + op->flags = OP_F_TRANSIENT | OP_F_ASYNC; + + VALUE obj = rb_funcall(cAsyncOp, rb_intern_const("new"), 0); + um_async_op_set(obj, machine, op); + + RB_OBJ_WRITE(machine->self, &op->async_op, obj); + + struct io_uring_sqe *sqe = um_get_sqe(machine, op); + io_uring_prep_timeout(sqe, &op->ts, 0, 0); + + um_op_transient_add(machine, op); + + return obj; +} + +VALUE um_async_op_await(struct um_async_op *async_op) { + RB_OBJ_WRITE(async_op->machine->self, &async_op->op->fiber, rb_fiber_current()); + async_op->op->flags &= ~OP_F_ASYNC; + + VALUE ret = um_fiber_switch(async_op->machine); + if (!um_op_completed_p(async_op->op)) + um_cancel_and_wait(async_op->machine, async_op->op); + + raise_if_exception(ret); + return INT2NUM(async_op->op->result.res); +} + +void um_async_op_cancel(struct um_async_op *async_op) { + um_submit_cancel_op(async_op->machine, async_op->op); +} diff --git a/ext/um/um_async_op_class.c b/ext/um/um_async_op_class.c new file mode 100644 index 0000000..0dcc61c --- /dev/null +++ b/ext/um/um_async_op_class.c @@ -0,0 +1,136 @@ +#include "um.h" +#include + +VALUE cAsyncOp; + +VALUE SYM_timeout; + +static void AsyncOp_mark(void *ptr) { + struct um_async_op *async_op = ptr; + rb_gc_mark_movable(async_op->self); + rb_gc_mark_movable(async_op->machine->self); +} + +static void AsyncOp_compact(void *ptr) { + struct um_async_op *async_op = ptr; + async_op->self = rb_gc_location(async_op->self); +} + +static size_t AsyncOp_size(const void *ptr) { + return sizeof(struct um_async_op); +} + +static void AsyncOp_free(void *ptr) { + struct um_async_op *async_op = ptr; + um_op_free(async_op->machine, async_op->op); + free(ptr); +} + +static const rb_data_type_t AsyncOp_type = { + "UringMachine::AsyncOp", + {AsyncOp_mark, AsyncOp_free, AsyncOp_size, AsyncOp_compact}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED +}; + +static VALUE AsyncOp_allocate(VALUE klass) { + struct um_async_op *async_op = malloc(sizeof(struct um_async_op)); + return TypedData_Wrap_Struct(klass, &AsyncOp_type, async_op); +} + +inline struct um_async_op *AsyncOp_data(VALUE self) { + return RTYPEDDATA_DATA(self); +} + +VALUE AsyncOp_initialize(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + memset(async_op, 0, sizeof(struct um_async_op)); + async_op->self = self; + return self; +} + +void um_async_op_set(VALUE self, struct um *machine, struct um_op *op) { + struct um_async_op *async_op = AsyncOp_data(self); + async_op->machine = machine; + async_op->op = op; +} + +inline void raise_on_missing_op(struct um_async_op *async_op) { + if (!async_op->op) + rb_raise(rb_eRuntimeError, "Missing op"); +} + +inline int async_op_is_done(struct um_async_op *async_op) { + return (async_op->op->flags & OP_F_COMPLETED); +} + +VALUE AsyncOp_kind(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + raise_on_missing_op(async_op); + + switch(async_op->op->kind) { + case OP_TIMEOUT: + return SYM_timeout; + default: + rb_raise(rb_eRuntimeError, "Invalid op kind"); + } +} + +VALUE AsyncOp_done_p(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + raise_on_missing_op(async_op); + + return async_op_is_done(async_op) ? Qtrue : Qfalse; +} + +VALUE AsyncOp_result(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + raise_on_missing_op(async_op); + + return async_op_is_done(async_op) ? INT2NUM(async_op->op->result.res) : Qnil; +} + +VALUE AsyncOp_cancelled_p(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + raise_on_missing_op(async_op); + + if (!async_op_is_done(async_op)) return Qnil; + + return (async_op->op->result.res == -ECANCELED) ? Qtrue : Qfalse; +} + +VALUE AsyncOp_await(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + raise_on_missing_op(async_op); + + if (async_op_is_done(async_op)) + return INT2NUM(async_op->op->result.res); + + return um_async_op_await(async_op); +} + +VALUE AsyncOp_cancel(VALUE self) { + struct um_async_op *async_op = AsyncOp_data(self); + raise_on_missing_op(async_op); + + if (!async_op_is_done(async_op)) + um_async_op_cancel(async_op); + + return self; +} + +void Init_AsyncOp(void) { + cAsyncOp = rb_define_class_under(cUM, "AsyncOp", rb_cObject); + rb_define_alloc_func(cAsyncOp, AsyncOp_allocate); + + rb_define_method(cAsyncOp, "initialize", AsyncOp_initialize, 0); + rb_define_method(cAsyncOp, "kind", AsyncOp_kind, 0); + rb_define_method(cAsyncOp, "done?", AsyncOp_done_p, 0); + rb_define_method(cAsyncOp, "result", AsyncOp_result, 0); + rb_define_method(cAsyncOp, "cancelled?", AsyncOp_cancelled_p, 0); + + rb_define_method(cAsyncOp, "await", AsyncOp_await, 0); + rb_define_method(cAsyncOp, "join", AsyncOp_await, 0); + rb_define_method(cAsyncOp, "cancel", AsyncOp_cancel, 0); + + SYM_timeout = ID2SYM(rb_intern("timeout")); +} diff --git a/ext/um/um_class.c b/ext/um/um_class.c index 8c62b1c..1ec3178 100644 --- a/ext/um/um_class.c +++ b/ext/um/um_class.c @@ -40,7 +40,7 @@ static VALUE UM_allocate(VALUE klass) { return TypedData_Wrap_Struct(klass, &UM_type, machine); } -inline struct um *get_machine(VALUE self) { +inline struct um *um_get_machine(VALUE self) { struct um *machine = RTYPEDDATA_DATA(self); if (!machine->ring_initialized) rb_raise(rb_eRuntimeError, "Machine not initialized"); @@ -54,45 +54,45 @@ VALUE UM_initialize(VALUE self) { } VALUE UM_setup_buffer_ring(VALUE self, VALUE size, VALUE count) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); int bgid = um_setup_buffer_ring(machine, NUM2UINT(size), NUM2UINT(count)); return INT2NUM(bgid); } VALUE UM_pending_count(VALUE self) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return INT2NUM(machine->pending_count); } VALUE UM_snooze(VALUE self) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); um_schedule(machine, rb_fiber_current(), Qnil); return um_await(machine); } VALUE UM_yield(VALUE self) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_await(machine); } VALUE UM_schedule(VALUE self, VALUE fiber, VALUE value) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); um_schedule(machine, fiber, value); return self; } VALUE UM_timeout(VALUE self, VALUE interval, VALUE class) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_timeout(machine, interval, class); } VALUE UM_sleep(VALUE self, VALUE duration) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_sleep(machine, NUM2DBL(duration)); } VALUE UM_read(int argc, VALUE *argv, VALUE self) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); VALUE fd; VALUE buffer; VALUE maxlen; @@ -107,7 +107,7 @@ VALUE UM_read(int argc, VALUE *argv, VALUE self) { VALUE UM_read_each(VALUE self, VALUE fd, VALUE bgid) { #ifdef HAVE_IO_URING_PREP_READ_MULTISHOT - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_read_each(machine, NUM2INT(fd), NUM2INT(bgid)); #else rb_raise(rb_eRuntimeError, "Not supported by kernel"); @@ -115,7 +115,7 @@ VALUE UM_read_each(VALUE self, VALUE fd, VALUE bgid) { } VALUE UM_write(int argc, VALUE *argv, VALUE self) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); VALUE fd; VALUE buffer; VALUE len; @@ -126,27 +126,27 @@ VALUE UM_write(int argc, VALUE *argv, VALUE self) { } VALUE UM_close(VALUE self, VALUE fd) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_close(machine, NUM2INT(fd)); } VALUE UM_accept(VALUE self, VALUE fd) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_accept(machine, NUM2INT(fd)); } VALUE UM_accept_each(VALUE self, VALUE fd) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_accept_each(machine, NUM2INT(fd)); } VALUE UM_socket(VALUE self, VALUE domain, VALUE type, VALUE protocol, VALUE flags) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_socket(machine, NUM2INT(domain), NUM2INT(type), NUM2INT(protocol), NUM2UINT(flags)); } VALUE UM_connect(VALUE self, VALUE fd, VALUE host, VALUE port) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); @@ -158,17 +158,17 @@ VALUE UM_connect(VALUE self, VALUE fd, VALUE host, VALUE port) { } VALUE UM_send(VALUE self, VALUE fd, VALUE buffer, VALUE len, VALUE flags) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_send(machine, NUM2INT(fd), buffer, NUM2INT(len), NUM2INT(flags)); } VALUE UM_recv(VALUE self, VALUE fd, VALUE buffer, VALUE maxlen, VALUE flags) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_recv(machine, NUM2INT(fd), buffer, NUM2INT(maxlen), NUM2INT(flags)); } VALUE UM_recv_each(VALUE self, VALUE fd, VALUE bgid, VALUE flags) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_recv_each(machine, NUM2INT(fd), NUM2INT(bgid), NUM2INT(flags)); } @@ -180,7 +180,7 @@ VALUE UM_bind(VALUE self, VALUE fd, VALUE host, VALUE port) { addr.sin_port = htons(NUM2INT(port)); #ifdef HAVE_IO_URING_PREP_BIND - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_bind(machine, NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr)); #else int res = bind(NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr)); @@ -192,7 +192,7 @@ VALUE UM_bind(VALUE self, VALUE fd, VALUE host, VALUE port) { VALUE UM_listen(VALUE self, VALUE fd, VALUE backlog) { #ifdef HAVE_IO_URING_PREP_LISTEN - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_listen(machine, NUM2INT(fd), NUM2INT(backlog)); #else int res = listen(NUM2INT(fd), NUM2INT(backlog)); @@ -214,43 +214,43 @@ static inline int numeric_value(VALUE value) { } VALUE UM_getsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_getsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt)); } VALUE UM_setsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt, VALUE value) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_setsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt), numeric_value(value)); } #ifdef HAVE_IO_URING_PREP_FUTEX VALUE UM_mutex_synchronize(VALUE self, VALUE mutex) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); struct um_mutex *mutex_data = Mutex_data(mutex); return um_mutex_synchronize(machine, &mutex_data->state); } VALUE UM_queue_push(VALUE self, VALUE queue, VALUE value) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_push(machine, que, value); } VALUE UM_queue_pop(VALUE self, VALUE queue) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_pop(machine, que); } VALUE UM_queue_unshift(VALUE self, VALUE queue, VALUE value) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_unshift(machine, que, value); } VALUE UM_queue_shift(VALUE self, VALUE queue) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_shift(machine, que); } @@ -269,7 +269,7 @@ VALUE UM_open_ensure(VALUE arg) { } VALUE UM_open(VALUE self, VALUE pathname, VALUE flags) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); // TODO: take optional perm (mode) arg VALUE fd = um_open(machine, pathname, NUM2INT(flags), 0666); if (rb_block_given_p()) { @@ -281,10 +281,15 @@ VALUE UM_open(VALUE self, VALUE pathname, VALUE flags) { } VALUE UM_waitpid(VALUE self, VALUE pid, VALUE options) { - struct um *machine = get_machine(self); + struct um *machine = um_get_machine(self); return um_waitpid(machine, NUM2INT(pid), NUM2INT(options)); } +VALUE UM_prep_timeout(VALUE self, VALUE interval) { + struct um *machine = um_get_machine(self); + return um_prep_timeout(machine, NUM2DBL(interval)); +} + VALUE UM_pipe(VALUE self) { int fds[2]; int ret = pipe(fds); @@ -340,6 +345,8 @@ void Init_UM(void) { rb_define_method(cUM, "setsockopt", UM_setsockopt, 4); rb_define_method(cUM, "socket", UM_socket, 4); + rb_define_method(cUM, "prep_timeout", UM_prep_timeout, 1); + #ifdef HAVE_IO_URING_PREP_FUTEX rb_define_method(cUM, "pop", UM_queue_pop, 1); rb_define_method(cUM, "push", UM_queue_push, 2); diff --git a/ext/um/um_const.c b/ext/um/um_const.c index 2aeb1f1..c371842 100644 --- a/ext/um/um_const.c +++ b/ext/um/um_const.c @@ -215,4 +215,135 @@ void um_define_net_constants(VALUE mod) { DEF_CONST_INT(mod, IF_NAMESIZE); DEF_CONST_INT(mod, SOMAXCONN); + + DEF_CONST_INT(mod, EPERM); + DEF_CONST_INT(mod, ENOENT); + DEF_CONST_INT(mod, ESRCH); + DEF_CONST_INT(mod, EINTR); + DEF_CONST_INT(mod, EIO); + DEF_CONST_INT(mod, ENXIO); + DEF_CONST_INT(mod, E2BIG); + DEF_CONST_INT(mod, ENOEXEC); + DEF_CONST_INT(mod, EBADF); + DEF_CONST_INT(mod, ECHILD); + DEF_CONST_INT(mod, EAGAIN); + DEF_CONST_INT(mod, ENOMEM); + DEF_CONST_INT(mod, EACCES); + DEF_CONST_INT(mod, EFAULT); + DEF_CONST_INT(mod, ENOTBLK); + DEF_CONST_INT(mod, EBUSY); + DEF_CONST_INT(mod, EEXIST); + DEF_CONST_INT(mod, EXDEV); + DEF_CONST_INT(mod, ENODEV); + DEF_CONST_INT(mod, ENOTDIR); + DEF_CONST_INT(mod, EISDIR); + DEF_CONST_INT(mod, EINVAL); + DEF_CONST_INT(mod, ENFILE); + DEF_CONST_INT(mod, EMFILE); + DEF_CONST_INT(mod, ENOTTY); + DEF_CONST_INT(mod, ETXTBSY); + DEF_CONST_INT(mod, EFBIG); + DEF_CONST_INT(mod, ENOSPC); + DEF_CONST_INT(mod, ESPIPE); + DEF_CONST_INT(mod, EROFS); + DEF_CONST_INT(mod, EMLINK); + DEF_CONST_INT(mod, EPIPE); + DEF_CONST_INT(mod, EDOM); + DEF_CONST_INT(mod, ERANGE); + DEF_CONST_INT(mod, EDEADLK); + DEF_CONST_INT(mod, ENAMETOOLONG); + DEF_CONST_INT(mod, ENOLCK); + DEF_CONST_INT(mod, ENOSYS); + DEF_CONST_INT(mod, ENOTEMPTY); + DEF_CONST_INT(mod, ELOOP); + DEF_CONST_INT(mod, ENOMSG); + DEF_CONST_INT(mod, EIDRM); + DEF_CONST_INT(mod, ECHRNG); + DEF_CONST_INT(mod, EL2NSYNC); + DEF_CONST_INT(mod, EL3HLT); + DEF_CONST_INT(mod, EL3RST); + DEF_CONST_INT(mod, ELNRNG); + DEF_CONST_INT(mod, EUNATCH); + DEF_CONST_INT(mod, ENOCSI); + DEF_CONST_INT(mod, EL2HLT); + DEF_CONST_INT(mod, EBADE); + DEF_CONST_INT(mod, EBADR); + DEF_CONST_INT(mod, EXFULL); + DEF_CONST_INT(mod, ENOANO); + DEF_CONST_INT(mod, EBADRQC); + DEF_CONST_INT(mod, EBADSLT); + DEF_CONST_INT(mod, EBFONT); + DEF_CONST_INT(mod, ENOSTR); + DEF_CONST_INT(mod, ENODATA); + DEF_CONST_INT(mod, ETIME); + DEF_CONST_INT(mod, ENOSR); + DEF_CONST_INT(mod, ENONET); + DEF_CONST_INT(mod, ENOPKG); + DEF_CONST_INT(mod, EREMOTE); + DEF_CONST_INT(mod, ENOLINK); + DEF_CONST_INT(mod, EADV); + DEF_CONST_INT(mod, ESRMNT); + DEF_CONST_INT(mod, ECOMM); + DEF_CONST_INT(mod, EPROTO); + DEF_CONST_INT(mod, EMULTIHOP); + DEF_CONST_INT(mod, EDOTDOT); + DEF_CONST_INT(mod, EBADMSG); + DEF_CONST_INT(mod, EOVERFLOW); + DEF_CONST_INT(mod, ENOTUNIQ); + DEF_CONST_INT(mod, EBADFD); + DEF_CONST_INT(mod, EREMCHG); + DEF_CONST_INT(mod, ELIBACC); + DEF_CONST_INT(mod, ELIBBAD); + DEF_CONST_INT(mod, ELIBSCN); + DEF_CONST_INT(mod, ELIBMAX); + DEF_CONST_INT(mod, ELIBEXEC); + DEF_CONST_INT(mod, EILSEQ); + DEF_CONST_INT(mod, ERESTART); + DEF_CONST_INT(mod, ESTRPIPE); + DEF_CONST_INT(mod, EUSERS); + DEF_CONST_INT(mod, ENOTSOCK); + DEF_CONST_INT(mod, EDESTADDRREQ); + DEF_CONST_INT(mod, EMSGSIZE); + DEF_CONST_INT(mod, EPROTOTYPE); + DEF_CONST_INT(mod, ENOPROTOOPT); + DEF_CONST_INT(mod, EPROTONOSUPPORT); + DEF_CONST_INT(mod, ESOCKTNOSUPPORT); + DEF_CONST_INT(mod, EOPNOTSUPP); + DEF_CONST_INT(mod, EPFNOSUPPORT); + DEF_CONST_INT(mod, EAFNOSUPPORT); + DEF_CONST_INT(mod, EADDRINUSE); + DEF_CONST_INT(mod, EADDRNOTAVAIL); + DEF_CONST_INT(mod, ENETDOWN); + DEF_CONST_INT(mod, ENETUNREACH); + DEF_CONST_INT(mod, ENETRESET); + DEF_CONST_INT(mod, ECONNABORTED); + DEF_CONST_INT(mod, ECONNRESET); + DEF_CONST_INT(mod, ENOBUFS); + DEF_CONST_INT(mod, EISCONN); + DEF_CONST_INT(mod, ENOTCONN); + DEF_CONST_INT(mod, ESHUTDOWN); + DEF_CONST_INT(mod, ETOOMANYREFS); + DEF_CONST_INT(mod, ETIMEDOUT); + DEF_CONST_INT(mod, ECONNREFUSED); + DEF_CONST_INT(mod, EHOSTDOWN); + DEF_CONST_INT(mod, EHOSTUNREACH); + DEF_CONST_INT(mod, EALREADY); + DEF_CONST_INT(mod, EINPROGRESS); + DEF_CONST_INT(mod, ESTALE); + DEF_CONST_INT(mod, EUCLEAN); + DEF_CONST_INT(mod, ENOTNAM); + DEF_CONST_INT(mod, ENAVAIL); + DEF_CONST_INT(mod, EISNAM); + DEF_CONST_INT(mod, EREMOTEIO); + DEF_CONST_INT(mod, EDQUOT); + DEF_CONST_INT(mod, ENOMEDIUM); + DEF_CONST_INT(mod, EMEDIUMTYPE); + DEF_CONST_INT(mod, ECANCELED); + DEF_CONST_INT(mod, ENOKEY); + DEF_CONST_INT(mod, EKEYEXPIRED); + DEF_CONST_INT(mod, EKEYREVOKED); + DEF_CONST_INT(mod, EKEYREJECTED); + DEF_CONST_INT(mod, EOWNERDEAD); + DEF_CONST_INT(mod, ENOTRECOVERABLE); + } diff --git a/ext/um/um_ext.c b/ext/um/um_ext.c index 63b93ee..cfa1ef8 100644 --- a/ext/um/um_ext.c +++ b/ext/um/um_ext.c @@ -1,11 +1,13 @@ void Init_UM(); void Init_Mutex(); void Init_Queue(); +void Init_AsyncOp(); void Init_SSL(); void Init_um_ext(void) { Init_UM(); Init_Mutex(); Init_Queue(); + Init_AsyncOp(); // Init_SSL(); } diff --git a/ext/um/um_op.c b/ext/um/um_op.c index af334ff..92f0feb 100644 --- a/ext/um/um_op.c +++ b/ext/um/um_op.c @@ -2,8 +2,9 @@ inline void um_op_clear(struct um *machine, struct um_op *op) { memset(op, 0, sizeof(struct um_op)); - RB_OBJ_WRITE(machine->self, &op->fiber, Qnil); - RB_OBJ_WRITE(machine->self, &op->value, Qnil); + op->fiber = Qnil; + op->value = Qnil; + op->async_op = Qnil; } inline void um_op_transient_add(struct um *machine, struct um_op *op) { @@ -50,6 +51,7 @@ inline void um_op_list_mark(struct um *machine, struct um_op *head) { struct um_op *next = head->next; rb_gc_mark_movable(head->fiber); rb_gc_mark_movable(head->value); + rb_gc_mark_movable(head->async_op); head = next; } } @@ -59,6 +61,7 @@ inline void um_op_list_compact(struct um *machine, struct um_op *head) { struct um_op *next = head->next; head->fiber = rb_gc_location(head->fiber); head->value = rb_gc_location(head->value); + head->async_op = rb_gc_location(head->async_op); head = next; } } diff --git a/test/helper.rb b/test/helper.rb index e569c1e..221a0d9 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -51,6 +51,12 @@ def assert_in_range exp_range, act end class UMBaseTest < Minitest::Test + # pull in UM constants + UM.constants.each do |c| + v = UM.const_get(c) + const_set(c, v) if v.is_a?(Integer) + end + attr_accessor :machine def setup diff --git a/test/test_async_op.rb b/test/test_async_op.rb new file mode 100644 index 0000000..1394a3f --- /dev/null +++ b/test/test_async_op.rb @@ -0,0 +1,97 @@ +# frozen_string_literal: true + +require_relative 'helper' +require 'socket' + +class AsyncOpTest < UMBaseTest + def setup + super + @t0 = monotonic_clock + @op = machine.prep_timeout(0.05) + end + + def test_async_op_await + assert_equal 1, machine.pending_count + res = @op.await + t1 = monotonic_clock + assert_in_range 0.04..0.08, t1 - @t0 + assert_equal 0, machine.pending_count + assert_equal (-ETIME), res + assert_equal true, @op.done? + assert_equal false, @op.cancelled? + end + + def test_async_op_join + assert_equal 1, machine.pending_count + res = @op.join + t1 = monotonic_clock + assert_in_range 0.04..0.08, t1 - @t0 + assert_equal 0, machine.pending_count + assert_equal (-ETIME), res + assert_equal true, @op.done? + assert_equal false, @op.cancelled? + end + + def test_async_op_cancel + machine.sleep(0.01) + assert_equal 1, machine.pending_count + @op.cancel + assert_equal false, @op.done? + + machine.sleep(0.01) + + assert_equal 0, machine.pending_count + assert_equal true, @op.done? + assert_equal (-ECANCELED), @op.result + assert_equal true, @op.cancelled? + end + + def test_async_op_await_with_cancel + machine.spin do + @op.cancel + end + + res = @op.await + + assert_equal 0, machine.pending_count + assert_equal true, @op.done? + assert_equal (-ECANCELED), res + assert_equal true, @op.cancelled? + end + + class TOError < RuntimeError; end + + def test_async_op_await_with_timeout + e = nil + + begin + machine.timeout(0.01, TOError) do + @op.await + end + rescue => e + end + + assert_equal 0, machine.pending_count + assert_kind_of TOError, e + assert_equal true, @op.done? + assert_equal -ECANCELED, @op.result + assert_equal true, @op.cancelled? + end +end + +class PrepTimeoutTest < UMBaseTest + def test_prep_timeout + op = machine.prep_timeout(0.03) + assert_kind_of UM::AsyncOp, op + assert_equal :timeout, op.kind + + assert_equal false, op.done? + assert_nil op.result + + machine.sleep(0.05) + + assert_equal true, op.done? + assert_equal (-ETIME), op.result + assert_equal false, op.cancelled? + end +end