Skip to content

Commit

Permalink
Add #prep_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
noteflakes committed Nov 21, 2024
1 parent 17a22ed commit b4d0dc8
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Add `#prep_timeout` and `AsyncOp`

# 2024-11-14 Version 0.5

- Add `#waitpid`
Expand Down
9 changes: 5 additions & 4 deletions ext/um/um.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static inline void um_process_cqe(struct um *machine, struct io_uring_cqe *cqe)
if (unlikely((cqe->res == -ECANCELED) && (op->flags & OP_F_IGNORE_CANCELED))) return;

op->flags |= OP_F_COMPLETED;
if (unlikely(op->flags & OP_F_TRANSIENT))
if (op->flags & OP_F_TRANSIENT)
um_op_transient_remove(machine, op);

if (op->flags & OP_F_MULTISHOT) {
Expand All @@ -88,7 +88,8 @@ static inline void um_process_cqe(struct um *machine, struct io_uring_cqe *cqe)
op->result.flags = cqe->flags;
}

um_runqueue_push(machine, op);
if (!(op->flags & OP_F_ASYNC))
um_runqueue_push(machine, op);
}

// copied from liburing/queue.c
Expand Down Expand Up @@ -180,7 +181,7 @@ inline VALUE um_fiber_switch(struct um *machine) {
}
}

static inline void um_submit_cancel_op(struct um *machine, struct um_op *op) {
void um_submit_cancel_op(struct um *machine, struct um_op *op) {
struct io_uring_sqe *sqe = um_get_sqe(machine, NULL);
io_uring_prep_cancel64(sqe, (long long)op, 0);
}
Expand Down Expand Up @@ -260,7 +261,7 @@ VALUE um_timeout(struct um *machine, VALUE interval, VALUE class) {
static ID ID_new = 0;
if (!ID_new) ID_new = rb_intern("new");

struct um_op *op = malloc(sizeof(struct um_op));
struct um_op *op = um_op_alloc(machine);
um_prep_op(machine, op, OP_TIMEOUT);
op->ts = um_double_to_timespec(NUM2DBL(interval));
RB_OBJ_WRITE(machine->self, &op->fiber, rb_fiber_current());
Expand Down
22 changes: 20 additions & 2 deletions ext/um/um.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ enum op_kind {

#define OP_F_COMPLETED (1U << 0)
#define OP_F_TRANSIENT (1U << 1)
#define OP_F_IGNORE_CANCELED (1U << 2)
#define OP_F_MULTISHOT (1U << 3)
#define OP_F_ASYNC (1U << 2)
#define OP_F_IGNORE_CANCELED (1U << 3)
#define OP_F_MULTISHOT (1U << 4)

struct um_op_result {
__s32 res;
Expand All @@ -66,6 +67,7 @@ struct um_op {

VALUE fiber;
VALUE value;
VALUE async_op;

struct um_op_result result;
struct um_op_result *multishot_result_tail;
Expand Down Expand Up @@ -136,10 +138,19 @@ struct um_queue {
uint32_t count;
};

struct um_async_op {
VALUE self;

struct um *machine;
struct um_op *op;
};

extern VALUE cUM;
extern VALUE cMutex;
extern VALUE cQueue;
extern VALUE cAsyncOp;

struct um *um_get_machine(VALUE self);
void um_setup(VALUE self, struct um *machine);
void um_teardown(struct um *machine);

Expand Down Expand Up @@ -178,6 +189,7 @@ struct io_uring_sqe *um_get_sqe(struct um *machine, struct um_op *op);

VALUE um_fiber_switch(struct um *machine);
VALUE um_await(struct um *machine);
void um_submit_cancel_op(struct um *machine, struct um_op *op);
void um_cancel_and_wait(struct um *machine, struct um_op *op);
int um_check_completion(struct um *machine, struct um_op *op);

Expand Down Expand Up @@ -206,6 +218,12 @@ VALUE um_listen(struct um *machine, int fd, int backlog);
VALUE um_getsockopt(struct um *machine, int fd, int level, int opt);
VALUE um_setsockopt(struct um *machine, int fd, int level, int opt, int value);

void um_async_op_set(VALUE self, struct um *machine, struct um_op *op);
VALUE um_async_op_await(struct um_async_op *async_op);
void um_async_op_cancel(struct um_async_op *async_op);

VALUE um_prep_timeout(struct um *machine, double interval);

struct um_mutex *Mutex_data(VALUE self);
struct um_queue *Queue_data(VALUE self);

Expand Down
40 changes: 40 additions & 0 deletions ext/um/um_async_op.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "um.h"
#include <stdlib.h>

VALUE um_prep_timeout(struct um *machine, double interval) {
static ID ID_new = 0;
if (!ID_new) ID_new = rb_intern("new");

struct um_op *op = malloc(sizeof(struct um_op));
um_prep_op(machine, op, OP_TIMEOUT);
op->ts = um_double_to_timespec(interval);
op->flags = OP_F_TRANSIENT | OP_F_ASYNC;

VALUE obj = rb_funcall(cAsyncOp, rb_intern_const("new"), 0);
um_async_op_set(obj, machine, op);

RB_OBJ_WRITE(machine->self, &op->async_op, obj);

struct io_uring_sqe *sqe = um_get_sqe(machine, op);
io_uring_prep_timeout(sqe, &op->ts, 0, 0);

um_op_transient_add(machine, op);

return obj;
}

VALUE um_async_op_await(struct um_async_op *async_op) {
RB_OBJ_WRITE(async_op->machine->self, &async_op->op->fiber, rb_fiber_current());
async_op->op->flags &= ~OP_F_ASYNC;

VALUE ret = um_fiber_switch(async_op->machine);
if (!um_op_completed_p(async_op->op))
um_cancel_and_wait(async_op->machine, async_op->op);

raise_if_exception(ret);
return INT2NUM(async_op->op->result.res);
}

void um_async_op_cancel(struct um_async_op *async_op) {
um_submit_cancel_op(async_op->machine, async_op->op);
}
136 changes: 136 additions & 0 deletions ext/um/um_async_op_class.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#include "um.h"
#include <stdlib.h>

VALUE cAsyncOp;

VALUE SYM_timeout;

static void AsyncOp_mark(void *ptr) {
struct um_async_op *async_op = ptr;
rb_gc_mark_movable(async_op->self);
rb_gc_mark_movable(async_op->machine->self);
}

static void AsyncOp_compact(void *ptr) {
struct um_async_op *async_op = ptr;
async_op->self = rb_gc_location(async_op->self);
}

static size_t AsyncOp_size(const void *ptr) {
return sizeof(struct um_async_op);
}

static void AsyncOp_free(void *ptr) {
struct um_async_op *async_op = ptr;
um_op_free(async_op->machine, async_op->op);
free(ptr);
}

static const rb_data_type_t AsyncOp_type = {
"UringMachine::AsyncOp",
{AsyncOp_mark, AsyncOp_free, AsyncOp_size, AsyncOp_compact},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};

static VALUE AsyncOp_allocate(VALUE klass) {
struct um_async_op *async_op = malloc(sizeof(struct um_async_op));
return TypedData_Wrap_Struct(klass, &AsyncOp_type, async_op);
}

inline struct um_async_op *AsyncOp_data(VALUE self) {
return RTYPEDDATA_DATA(self);
}

VALUE AsyncOp_initialize(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
memset(async_op, 0, sizeof(struct um_async_op));
async_op->self = self;
return self;
}

void um_async_op_set(VALUE self, struct um *machine, struct um_op *op) {
struct um_async_op *async_op = AsyncOp_data(self);
async_op->machine = machine;
async_op->op = op;
}

inline void raise_on_missing_op(struct um_async_op *async_op) {
if (!async_op->op)
rb_raise(rb_eRuntimeError, "Missing op");
}

inline int async_op_is_done(struct um_async_op *async_op) {
return (async_op->op->flags & OP_F_COMPLETED);
}

VALUE AsyncOp_kind(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
raise_on_missing_op(async_op);

switch(async_op->op->kind) {
case OP_TIMEOUT:
return SYM_timeout;
default:
rb_raise(rb_eRuntimeError, "Invalid op kind");
}
}

VALUE AsyncOp_done_p(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
raise_on_missing_op(async_op);

return async_op_is_done(async_op) ? Qtrue : Qfalse;
}

VALUE AsyncOp_result(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
raise_on_missing_op(async_op);

return async_op_is_done(async_op) ? INT2NUM(async_op->op->result.res) : Qnil;
}

VALUE AsyncOp_cancelled_p(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
raise_on_missing_op(async_op);

if (!async_op_is_done(async_op)) return Qnil;

return (async_op->op->result.res == -ECANCELED) ? Qtrue : Qfalse;
}

VALUE AsyncOp_await(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
raise_on_missing_op(async_op);

if (async_op_is_done(async_op))
return INT2NUM(async_op->op->result.res);

return um_async_op_await(async_op);
}

VALUE AsyncOp_cancel(VALUE self) {
struct um_async_op *async_op = AsyncOp_data(self);
raise_on_missing_op(async_op);

if (!async_op_is_done(async_op))
um_async_op_cancel(async_op);

return self;
}

void Init_AsyncOp(void) {
cAsyncOp = rb_define_class_under(cUM, "AsyncOp", rb_cObject);
rb_define_alloc_func(cAsyncOp, AsyncOp_allocate);

rb_define_method(cAsyncOp, "initialize", AsyncOp_initialize, 0);
rb_define_method(cAsyncOp, "kind", AsyncOp_kind, 0);
rb_define_method(cAsyncOp, "done?", AsyncOp_done_p, 0);
rb_define_method(cAsyncOp, "result", AsyncOp_result, 0);
rb_define_method(cAsyncOp, "cancelled?", AsyncOp_cancelled_p, 0);

rb_define_method(cAsyncOp, "await", AsyncOp_await, 0);
rb_define_method(cAsyncOp, "join", AsyncOp_await, 0);
rb_define_method(cAsyncOp, "cancel", AsyncOp_cancel, 0);

SYM_timeout = ID2SYM(rb_intern("timeout"));
}
Loading

0 comments on commit b4d0dc8

Please sign in to comment.