Skip to content

Commit

Permalink
Implement #recv_each
Browse files Browse the repository at this point in the history
  • Loading branch information
noteflakes committed Oct 10, 2024
1 parent 8338c37 commit 81bc6bc
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
- Add `#recv_each`
- Add `#getsockopt`, `#setsockopt`
- Simplify and improve op management

# 2024-10-06 Version 0.4

- Add socket constants
Expand Down
15 changes: 15 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
- send_bundle / recv_bundle (kernel >= 6.10)
- splice
- sendto
- recvfrom
- poll
- tee
- open / openat
- fsync
- mkdir / mkdirat
- statx
- link / linkat / unlink / unlinkat / symlink
- rename / renameat
- waitid
- fadvise
- madvise
- getxattr / setxattr
- shutdown
- futex wait wake

- queues
46 changes: 37 additions & 9 deletions ext/um/um.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ struct op_ensure_ctx {
void *read_buf;
int read_maxlen;
struct __kernel_timespec ts;
int flags;
};

VALUE um_timeout_ensure(VALUE arg) {
Expand Down Expand Up @@ -390,7 +391,7 @@ int um_read_each_safe_loop_singleshot(struct op_ensure_ctx *ctx, int total) {
}
}

int um_read_each_multishot_process_results(struct op_ensure_ctx *ctx, int *total) {
int read_each_multishot_process_results(struct op_ensure_ctx *ctx, int *total) {
__s32 result = 0;
__u32 flags = 0;
__s32 bad_result = 0;
Expand Down Expand Up @@ -441,7 +442,7 @@ VALUE um_read_each_safe_loop(VALUE arg) {
if (!ctx->op->list_results.head)
rb_raise(rb_eRuntimeError, "no result found!\n");

if (!um_read_each_multishot_process_results(ctx, &total))
if (!read_each_multishot_process_results(ctx, &total))
return INT2NUM(total);
}
}
Expand Down Expand Up @@ -539,7 +540,6 @@ VALUE um_socket(struct um *machine, int domain, int type, int protocol, uint fla
int result = 0;

io_uring_prep_socket(sqe, domain, type, protocol, flags);

um_await_op(machine, op, &result, NULL);

um_raise_on_error_result(result);
Expand All @@ -552,7 +552,6 @@ VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, sockle
int result = 0;

io_uring_prep_connect(sqe, fd, addr, addrlen);

um_await_op(machine, op, &result, NULL);

um_raise_on_error_result(result);
Expand All @@ -565,7 +564,6 @@ VALUE um_send(struct um *machine, int fd, VALUE buffer, int len, int flags) {
int result = 0;

io_uring_prep_send(sqe, fd, RSTRING_PTR(buffer), len, flags);

um_await_op(machine, op, &result, NULL);

um_raise_on_error_result(result);
Expand All @@ -579,21 +577,52 @@ VALUE um_recv(struct um *machine, int fd, VALUE buffer, int maxlen, int flags) {

void *ptr = um_prepare_read_buffer(buffer, maxlen, 0);
io_uring_prep_recv(sqe, fd, ptr, maxlen, flags);

um_await_op(machine, op, &result, NULL);

um_raise_on_error_result(result);
um_update_read_buffer(machine, buffer, 0, result, flags);
return INT2NUM(result);
}

static inline void recv_each_prepare_op(struct op_ensure_ctx *ctx) {
struct um_op *op = um_op_idle_checkout(ctx->machine, OP_RECV_MULTISHOT);
struct io_uring_sqe *sqe = um_get_sqe(ctx->machine, op);
io_uring_prep_recv_multishot(sqe, ctx->fd, 0, -1, ctx->bgid);
sqe->buf_group = ctx->bgid;
sqe->flags |= IOSQE_BUFFER_SELECT;
op->flags |= OP_F_MULTISHOT;
ctx->op = op;
}

VALUE recv_each_safe_loop(VALUE arg) {
struct op_ensure_ctx *ctx = (struct op_ensure_ctx *)arg;
int total = 0;
recv_each_prepare_op(ctx);

while (1) {
um_await_op(ctx->machine, ctx->op, NULL, NULL);
if (!ctx->op->aux)
rb_raise(rb_eRuntimeError, "no associated schedule op found");
ctx->op->aux = NULL;
if (!ctx->op->list_results.head)
rb_raise(rb_eRuntimeError, "no result found!\n");

if (!read_each_multishot_process_results(ctx, &total))
return INT2NUM(total);
}
}

VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags) {
struct op_ensure_ctx ctx = { .machine = machine, .fd = fd, .bgid = bgid, .read_buf = NULL, .flags = flags };
return rb_ensure(recv_each_safe_loop, (VALUE)&ctx, um_multishot_ensure, (VALUE)&ctx);
}

VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen) {
struct um_op *op = um_op_idle_checkout(machine, OP_BIND);
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
int result = 0;

io_uring_prep_bind(sqe, fd, addr, addrlen);

um_await_op(machine, op, &result, NULL);

um_raise_on_error_result(result);
Expand All @@ -606,7 +635,6 @@ VALUE um_listen(struct um *machine, int fd, int backlog) {
int result = 0;

io_uring_prep_listen(sqe, fd, backlog);

um_await_op(machine, op, &result, NULL);

um_raise_on_error_result(result);
Expand Down Expand Up @@ -660,4 +688,4 @@ VALUE um_debug(struct um *machine) {
printf("scheduled head %p tail %p\n", machine->list_scheduled.head, machine->list_scheduled.tail);
printf("\n");
return machine->self;
}
}
1 change: 1 addition & 0 deletions ext/um/um.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ VALUE um_socket(struct um *machine, int domain, int type, int protocol, uint fla
VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, socklen_t addrlen);
VALUE um_send(struct um *machine, int fd, VALUE buffer, int len, int flags);
VALUE um_recv(struct um *machine, int fd, VALUE buffer, int maxlen, int flags);
VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags);
VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen);
VALUE um_listen(struct um *machine, int fd, int backlog);
VALUE um_getsockopt(struct um *machine, int fd, int level, int opt);
Expand Down
6 changes: 6 additions & 0 deletions ext/um/um_class.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ VALUE UM_recv(VALUE self, VALUE fd, VALUE buffer, VALUE maxlen, VALUE flags) {
return um_recv(machine, NUM2INT(fd), buffer, NUM2INT(maxlen), NUM2INT(flags));
}

VALUE UM_recv_each(VALUE self, VALUE fd, VALUE bgid, VALUE flags) {
struct um *machine = get_machine(self);
return um_recv_each(machine, NUM2INT(fd), NUM2INT(bgid), NUM2INT(flags));
}

VALUE UM_bind(VALUE self, VALUE fd, VALUE host, VALUE port) {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
Expand Down Expand Up @@ -250,6 +255,7 @@ void Init_UM(void) {
rb_define_method(cUM, "connect", UM_connect, 3);
rb_define_method(cUM, "send", UM_send, 4);
rb_define_method(cUM, "recv", UM_recv, 4);
rb_define_method(cUM, "recv_each", UM_recv_each, 3);
rb_define_method(cUM, "bind", UM_bind, 3);
rb_define_method(cUM, "listen", UM_listen, 2);
rb_define_method(cUM, "getsockopt", UM_getsockopt, 3);
Expand Down
43 changes: 43 additions & 0 deletions test/test_um.rb
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,49 @@ def test_recv
end
end

class RecvEachTest < UMBaseTest
def setup
super
@port = assign_port
@server = TCPServer.open('127.0.0.1', @port)
end

def teardown
@server&.close
super
end

def test_recv_each
t = Thread.new do
conn = @server.accept
conn.write('abc')
sleep 0.01
conn.write('def')
sleep 0.01
conn.write('ghi')
sleep 0.01
conn.close
sleep
end

fd = machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
res = machine.connect(fd, '127.0.0.1', @port)
assert_equal 0, res

bgid = machine.setup_buffer_ring(4096, 1024)
assert_equal 0, bgid

bufs = []

machine.recv_each(fd, bgid, 0) do |buf|
bufs << buf
end
assert_equal ['abc', 'def', 'ghi'], bufs
ensure
t&.kill
end
end

class BindTest < UMBaseTest
def setup
super
Expand Down

0 comments on commit 81bc6bc

Please sign in to comment.