Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend/epoll: implement eventfd wakeup notification #128

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/ThreadPool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ pub noinline fn shutdown(self: *ThreadPool) void {
// Wake up any threads sleeping on the idle_event.
// TODO: I/O polling notification here.
if (sync.idle > 0) self.idle_event.shutdown();
if (sync.spawned == 0) self.join_event.notify();
return;
});
}
Expand Down Expand Up @@ -337,6 +338,7 @@ fn join(self: *ThreadPool) void {
// Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
self.join_event.wait();
const sync: Sync = @bitCast(self.sync.load(.monotonic));

assert(sync.state == .shutdown);
assert(sync.spawned == 0);

Expand Down
79 changes: 73 additions & 6 deletions src/backend/epoll.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub const Loop = struct {

fd: posix.fd_t,

/// The eventfd that this epoll queue always has a filter for. Writing
/// an empty message to this eventfd can be used to wake up the loop
/// at any time. Waking up the loop via this eventfd won't trigger any
/// particular completion, it just forces tick to cycle.
eventfd: xev.Async,

/// The number of active completions. This DOES NOT include completions that
/// are queued in the submissions queue.
active: usize = 0,
Expand Down Expand Up @@ -56,8 +62,12 @@ pub const Loop = struct {
} = .{},

pub fn init(options: xev.Options) !Loop {
var eventfd = try xev.Async.init();
errdefer eventfd.deinit();

var res: Loop = .{
.fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC),
.eventfd = eventfd,
.thread_pool = options.thread_pool,
.thread_pool_completions = undefined,
.cached_now = undefined,
Expand All @@ -68,6 +78,7 @@ pub const Loop = struct {

pub fn deinit(self: *Loop) void {
posix.close(self.fd);
self.eventfd.deinit();
}

/// Run the event loop. See RunMode documentation for details on modes.
Expand Down Expand Up @@ -262,20 +273,51 @@ pub const Loop = struct {
// Initialize
if (!self.flags.init) {
self.flags.init = true;

if (self.thread_pool != null) {
self.thread_pool_completions.init();
}

var ev: linux.epoll_event = .{
.events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
.data = .{ .fd = self.eventfd.fd },
};
posix.epoll_ctl(
self.fd,
linux.EPOLL.CTL_ADD,
self.eventfd.fd,
&ev,
) catch |err| {
// We reset initialization because we can't do anything
// safely unless we get this mach port registered!
self.flags.init = false;
return err;
};
}

// Submit all the submissions. We copy the submission queue so that
// any resubmits don't cause an infinite loop.
var wait_rem: usize = @intCast(wait);
var queued = self.submissions;
self.submissions = .{};
while (queued.pop()) |c| {
// We ignore any completions that aren't in the adding state.
// This usually means that we switched them to be deleted or
// something.
if (c.flags.state != .adding) continue;

// These operations happen synchronously. Ensure they are
// decremented from wait_rem.
switch (c.op) {
.cancel,
// should noop be counted?
// .noop,
.shutdown,
.timer,
=> wait_rem -|= 1,
else => {},
}

self.start(c);
}

Expand All @@ -294,7 +336,6 @@ pub const Loop = struct {

// Wait and process events. We only do this if we have any active.
var events: [1024]linux.epoll_event = undefined;
var wait_rem: usize = @intCast(wait);
while (self.active > 0 and (wait == 0 or wait_rem > 0)) {
self.update_now();
const now_timer: Operation.Timer = .{ .next = self.cached_now };
Expand Down Expand Up @@ -347,9 +388,7 @@ pub const Loop = struct {
const timeout: i32 = if (wait_rem == 0) 0 else timeout: {
// If we have a timer, we want to set the timeout to our next
// timer value. If we have no timer, we wait forever.
// TODO: do not wait 100ms here, use an eventfd for our
// thread pool to wake us up.
const t = self.timers.peek() orelse break :timeout 100;
const t = self.timers.peek() orelse break :timeout -1;

// Determine the time in milliseconds.
const ms_now = @as(u64, @intCast(self.cached_now.tv_sec)) * std.time.ms_per_s +
Expand All @@ -369,6 +408,13 @@ pub const Loop = struct {

// Process all our events and invoke their completion handlers
for (events[0..n]) |ev| {
// Handle wakeup eventfd
if (ev.data.fd == self.eventfd.fd) {
var buffer: u64 = undefined;
_ = posix.read(self.eventfd.fd, std.mem.asBytes(&buffer)) catch {};
continue;
}

const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));

// We get the fd and mark this as in progress we can properly
Expand Down Expand Up @@ -415,6 +461,7 @@ pub const Loop = struct {
const pool = self.thread_pool orelse return error.ThreadPoolRequired;

// Setup our completion state so that thread_perform can do stuff
c.task_loop = self;
c.task_completions = &self.thread_pool_completions;
c.task = .{ .callback = Loop.thread_perform };

Expand All @@ -436,6 +483,14 @@ pub const Loop = struct {

// Add to our completion queue
c.task_completions.push(c);

// Wake up our main loop
c.task_loop.wakeup() catch {};
}

/// Sends an empty message to this loop's eventfd so that it wakes up.
fn wakeup(self: *Loop) !void {
try self.eventfd.notify();
}

fn start(self: *Loop, completion: *Completion) void {
Expand Down Expand Up @@ -658,6 +713,13 @@ pub const Loop = struct {
},

.close => |v| res: {
if (completion.flags.threadpool) {
if (self.thread_schedule(completion)) |_|
return
else |err|
break :res .{ .close = err };
}

posix.close(v.fd);
break :res .{ .close = {} };
},
Expand Down Expand Up @@ -800,6 +862,7 @@ pub const Completion = struct {
/// reliable way to get access to the loop and shouldn't be used
/// except internally.
task: ThreadPool.Task = undefined,
task_loop: *Loop = undefined,
task_completions: *Loop.TaskCompletionQueue = undefined,
task_result: Result = undefined,

Expand Down Expand Up @@ -866,7 +929,6 @@ pub const Completion = struct {
// This should never happen because we always do these synchronously
// or in another location.
.cancel,
.close,
.noop,
.shutdown,
.timer,
Expand Down Expand Up @@ -972,6 +1034,11 @@ pub const Completion = struct {
err,
};
},

.close => |*op| res: {
posix.close(op.fd);
break :res .{ .close = {} };
},
};
}

Expand Down Expand Up @@ -1234,7 +1301,7 @@ pub const AcceptError = posix.EpollCtlError || error{
Unknown,
};

pub const CloseError = posix.EpollCtlError || error{
pub const CloseError = posix.EpollCtlError || ThreadPoolError || error{
Unknown,
};

Expand Down
9 changes: 8 additions & 1 deletion src/backend/kqueue.zig
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ pub const Loop = struct {
// Only resubmit if we aren't already active (in the queue)
.rearm => if (!c_active) self.submissions.push(c),
}

// If we filled the events slice, we break to avoid overflow.
if (changes == events.len) break;
}

// Determine our next timeout based on the timers
Expand Down Expand Up @@ -1099,7 +1102,6 @@ pub const Completion = struct {
fn perform(self: *Completion, ev_: ?*const Kevent) Result {
return switch (self.op) {
.cancel,
.close,
.noop,
.timer,
.shutdown,
Expand Down Expand Up @@ -1229,6 +1231,11 @@ pub const Completion = struct {

break :res .{ .proc = 0 };
},

.close => |*op| res: {
posix.close(op.fd);
break :res .{ .close = {} };
},
};
}

Expand Down
16 changes: 16 additions & 0 deletions src/watcher/stream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options
}).callback,
};

// If we're dup-ing, then we ask the backend to manage the fd.
switch (xev.backend) {
.io_uring,
.wasi_poll,
.iocp,
=> {},

.epoll => {
c.flags.threadpool = true;
},

.kqueue => {
c.flags.threadpool = true;
},
}

loop.add(c);
}
};
Expand Down