Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows - thread panic: integer overflow #81

Open
jvmiert opened this issue Dec 14, 2023 · 8 comments
Open

Windows - thread panic: integer overflow #81

jvmiert opened this issue Dec 14, 2023 · 8 comments

Comments

@jvmiert
Copy link

jvmiert commented Dec 14, 2023

This is probably user error since I'm not fully sure what I'm doing, my apologies in advance.

I have multiple threads trying to accept TCP requests. Based on the documentation it seems this should be possible as long as each thread has their own loop. However, when I do this things go wrong when multiple TCP connections come in at the same time:

C:\Users\Jeroen\Desktop\libxev\src\backend\iocp.zig:346:29: 0x7ff7974f825c in tick (http.exe.obj)
                self.active -= 1;
                            ^
C:\Users\Jeroen\Desktop\libxev\src\backend\iocp.zig:171:62: 0x7ff7974f9932 in run (http.exe.obj)
            .until_done => while (!self.done()) try self.tick(1),
                                                             ^
C:\Users\Jeroen\Desktop\libxev\http.zig:92:30: 0x7ff7974f9abe in start (http.exe.obj)
            try self.loop.run(.until_done);
                             ^
C:\Users\Jeroen\zig\lib\std\Thread.zig:432:13: 0x7ff7974e1288 in callFn__anon_10500 (http.exe.obj)
            @call(.auto, f, args) catch |err| {
            ^
C:\Users\Jeroen\zig\lib\std\Thread.zig:523:30: 0x7ff7974a807e in entryFn (http.exe.obj)
                return callFn(f, self.fn_args);
                             ^
???:?:?: 0x7ff906dc7343 in ??? (KERNEL32.DLL)
???:?:?: 0x7ff9089e26b0 in ??? (ntdll.dll)

It seems like the completions are somehow added to the same loop even though I init them per thread. Am I doing something wrong? Or is this usecase currently not supported with the IOCP backend?

My code:

const xev = @import("xev");
const std = @import("std");
const Allocator = std.mem.Allocator;
const Thread = std.Thread;
const Mutex = std.Thread.Mutex;
const log = std.log;

const TCPPool = std.heap.MemoryPool(xev.TCP);
const CompletionPool = std.heap.MemoryPool(xev.Completion);
const BufferPool = std.heap.MemoryPool([4096]u8);

pub fn main() !void {
    const GPA = std.heap.GeneralPurposeAllocator(.{});
    var gpa: GPA = .{};
    defer _ = gpa.deinit();
    const alloc = gpa.allocator();

    var server = try Server.init(alloc);

    try server.start();
}

const Server = struct {
    allocator: Allocator,

    pub fn init(alloc: Allocator) !Server {
        return .{ .allocator = alloc };
    }

    pub fn start(self: *Server) !void {
        const addr = try std.net.Address.parseIp4("0.0.0.0", 8088);
        var socket = try xev.TCP.init(addr);

        try socket.bind(addr);
        try socket.listen(std.os.linux.SOMAXCONN);

        const worker_count = 2;
        log.info("(server) init {d} workers...", .{worker_count});

        const workers = try self.allocator.alloc(Worker, worker_count);
        const threads = try self.allocator.alloc(Thread, worker_count);

        var started: usize = 0;

        defer {
            for (0..started) |i| {
                threads[i].join();
                workers[i].deinit();
            }
            self.allocator.free(workers);
            self.allocator.free(threads);
        }

        for (0..workers.len) |i| {
            workers[i] = try Worker.init(self.allocator, self, &socket, i);
            threads[i] = try Thread.spawn(.{}, Worker.start, .{&workers[i]});
            started += 1;
        }
        var wait: Thread.Condition = .{};

        var mutex = Thread.Mutex{};
        mutex.lock();
        wait.wait(&mutex);
        mutex.unlock();
    }
};

const Worker = struct {
    loop: xev.Loop,
    server: *Server,
    alloc: Allocator,

    socket: *xev.TCP,

    socket_pool: TCPPool,
    completion_pool: CompletionPool,
    buffer_pool: BufferPool,

    worker_id: usize,

    pub fn init(alloc: Allocator, server: *Server, socket: *xev.TCP, worker_id: usize) !Worker {
        return .{ .loop = try xev.Loop.init(.{}), .server = server, .alloc = alloc, .socket = socket, .socket_pool = TCPPool.init(alloc), .buffer_pool = BufferPool.init(alloc), .completion_pool = CompletionPool.init(alloc), .worker_id = worker_id };
    }

    pub fn start(self: *Worker) !void {
        while (true) {
            const c = try self.completion_pool.create();

            log.info("(worker {d}) Wait for accept...", .{self.worker_id});
            self.socket.accept(&self.loop, c, Worker, self, acceptCallback);

            try self.loop.run(.until_done);
        }
    }

    fn acceptCallback(
        self_: ?*Worker,
        l: *xev.Loop,
        c: *xev.Completion,
        r: xev.TCP.AcceptError!xev.TCP,
    ) xev.CallbackAction {
        const self = self_.?;

        const socket = self.socket_pool.create() catch unreachable;
        socket.* = r catch unreachable;

        log.info("(worker {d}) Accept callback [loop: {*}] [completion: {*}]", .{ self.worker_id, l, c });

        const buf = self.buffer_pool.create() catch unreachable;
        _ = buf;

        // socket.read(l, c, .{ .slice = buf }, Worker, self, readCallback);
        return .disarm;
    }

    pub fn deinit(self: *Worker) void {
        self.buffer_pool.deinit();
        self.completion_pool.deinit();
        self.socket_pool.deinit();
        self.loop.deinit();
    }
};

When I make a concurrent request, e.g. curl http://localhost:8088 & curl http://localhost:8088 & curl http://localhost:8088 & curl http://localhost:8088 the crash occurs.

@Corendos
Copy link
Contributor

Hey there !

I tried to reproduce your issue but it seems to be working on my side. Which version of libxev did you use, master ? I'm asking because the line numbers in the stacktrace don't seem to match what is on master.

@Corendos
Copy link
Contributor

I think I'm starting to see what the issue could be.

Basically, the way IOCP works, you have to register file descriptors to a completion port (https://learn.microsoft.com/en-us/windows/win32/fileio/createiocompletionport).

In your case you are reusing the same socket for both workers, thus adding the same file descriptor to multiple completion ports. That's why you observe that the same loop seems to be used by both workers.

As a workaround (though I don't know if Windows supports it), you can create multiple sockets using the same address and port (using the SO_REUSEPORT socket option) and give each loop a copy of it.

@jvmiert
Copy link
Author

jvmiert commented Dec 15, 2023

Thank you for the quick reply and a thank you to you specifically for adding the iocp backend!

I was on main, however, I added some logging to see what was going on so that's why the line numbers aren't matching up. The issue is occuring here.

What you described in your last comment makes sense. I don't think you can have multiple completion ports since the idea is that many threads subscribe to a single one but I will try nevertheless. Will report back!

@jvmiert
Copy link
Author

jvmiert commented Dec 26, 2023

Just wanted to share a brief update based on my limited understanding of what the issue is for future reference.

The workaround just ends up associating the last socket that binds to the address.

In io_uring you can create a new ring per thread and all the work related to that ring will always end up back at that thread. So if you submit work to a SQE it will always end up back with its corresponding CQE.

For IOCP the completion is submitted to the most recent waiting thread regardless which thread originally submitted the work. So I guess the abstraction that libxev provides doesn't work in my specific usecase for IOCP.

@Corendos
Copy link
Contributor

The workaround just ends up associating the last socket that binds to the address.

I'm not sure I understand that part

In io_uring you can create a new ring per thread and all the work related to that ring will always end up back at that thread. So if you submit work to a SQE it will always end up back with its corresponding CQE.

That's also what happens with IOCP. Each xev.Loop creates its own Completion Port and the thread calling the Loop.run(...) method is the only thread registered to the completion port.

For IOCP the completion is submitted to the most recent waiting thread regardless which thread originally submitted the work. So I guess the abstraction that libxev provides doesn't work in my specific usecase for IOCP.

Indeed, but the source of the issue is the fact that the listening socket is associated with two xev.Loop and this isn't something I would recommend.

With a slight variation of the code you posted, the problem disappears:

const xev = @import("xev");
const std = @import("std");
const builtin = @import("builtin");
const Allocator = std.mem.Allocator;
const Thread = std.Thread;
const Mutex = std.Thread.Mutex;
const log = std.log;

const TCPPool = std.heap.MemoryPool(xev.TCP);
const CompletionPool = std.heap.MemoryPool(xev.Completion);
const BufferPool = std.heap.MemoryPool([4096]u8);

pub fn main() !void {
    const GPA = std.heap.GeneralPurposeAllocator(.{});
    var gpa: GPA = .{};
    defer _ = gpa.deinit();
    const alloc = gpa.allocator();

    var server = try Server.init(alloc);

    try server.start();
}

const Server = struct {
    allocator: Allocator,

    pub fn init(alloc: Allocator) !Server {
        return .{ .allocator = alloc };
    }

    pub fn start(self: *Server) !void {
        const addr = try std.net.Address.parseIp4("0.0.0.0", 8088);

        const worker_count = 2;
        log.info("(server) init {d} workers...", .{worker_count});

        const workers = try self.allocator.alloc(Worker, worker_count);
        const threads = try self.allocator.alloc(Thread, worker_count);

        var started: usize = 0;

        defer {
            for (0..started) |i| {
                threads[i].join();
                workers[i].deinit();
            }
            self.allocator.free(workers);
            self.allocator.free(threads);
        }

        for (0..workers.len) |i| {
            workers[i] = try Worker.init(self.allocator, self, addr, i);
            threads[i] = try Thread.spawn(.{}, Worker.start, .{&workers[i]});
            started += 1;
        }
        var wait: Thread.Condition = .{};

        var mutex = Thread.Mutex{};
        mutex.lock();
        wait.wait(&mutex);
        mutex.unlock();
    }
};

const Worker = struct {
    loop: xev.Loop,
    server: *Server,
    alloc: Allocator,

    socket: xev.TCP,

    socket_pool: TCPPool,
    completion_pool: CompletionPool,
    buffer_pool: BufferPool,

    worker_id: usize,

    pub fn init(alloc: Allocator, server: *Server, address: std.net.Address, worker_id: usize) !Worker {
        var socket = try xev.TCP.init(address);

        // On Windows, SO_REUSEADDR implies SO_REUSEPORT
        if (builtin.target.os.tag != .windows) {
            try std.os.setsockopt(socket.fd, std.os.SOL.SOCKET, std.os.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
        }
        try socket.bind(address);
        try socket.listen(std.os.linux.SOMAXCONN);
        return .{ .loop = try xev.Loop.init(.{}), .server = server, .alloc = alloc, .socket = socket, .socket_pool = TCPPool.init(alloc), .buffer_pool = BufferPool.init(alloc), .completion_pool = CompletionPool.init(alloc), .worker_id = worker_id };
    }

    pub fn start(self: *Worker) !void {
        while (true) {
            const c = try self.completion_pool.create();

            log.info("(worker {d}) Wait for accept...", .{self.worker_id});
            self.socket.accept(&self.loop, c, Worker, self, acceptCallback);

            try self.loop.run(.until_done);
        }
    }

    fn acceptCallback(
        self_: ?*Worker,
        l: *xev.Loop,
        c: *xev.Completion,
        r: xev.TCP.AcceptError!xev.TCP,
    ) xev.CallbackAction {
        const self = self_.?;

        const socket = self.socket_pool.create() catch unreachable;
        socket.* = r catch unreachable;

        log.info("(worker {d}) Accept callback [loop: {*}] [completion: {*}]", .{ self.worker_id, l, c });

        const buf = self.buffer_pool.create() catch unreachable;
        _ = buf;

        // socket.read(l, c, .{ .slice = buf }, Worker, self, readCallback);
        return .disarm;
    }

    pub fn deinit(self: *Worker) void {
        self.buffer_pool.deinit();
        self.completion_pool.deinit();
        self.socket_pool.deinit();
        self.loop.deinit();
    }
};

This is what I meant when I proposed to use two sockets bound to the same address and port.

@jvmiert
Copy link
Author

jvmiert commented Dec 28, 2023

This is what I meant when I proposed to use two sockets bound to the same address and port.

I tried your code but when I run concurrent requests I only see a single worker receiving requests. Even when I remove the while loop in the start function I don't see the acceptCallback of the other workers firing. So to me it seemed that two sockets cannot be bound to the same address and port simultaneously?

That's also what happens with IOCP. Each xev.Loop creates its own Completion Port and the thread calling the Loop.run(...) method is the only thread registered to the completion port.

I don't mean to disagree with you since you are obviously more knowledagble than me on this subject. I'm just trying to clarify my understanding. When I init a new xev.Loop in io_uring I get a new memory address every init regardless if it's the same socket or not. However with IOCP I get a xev.Loop at the same memory address even though I init it from multiple different threads. To me this seems like they work differently from each other. Please correct me if I misunderstood something here.

Indeed, but the source of the issue is the fact that the listening socket is associated with two xev.Loop and this isn't something I would recommend.

Yes, although this seems fine with io_uring it does not work well with IOCP. Regardless of the backend this probably does not seems like a good idea.

@Corendos
Copy link
Contributor

I tried your code but when I run concurrent requests I only see a single worker receiving requests. Even when I remove the while loop in the start function I don't see the acceptCallback of the other workers firing. So to me it seemed that two sockets cannot be bound to the same address and port simultaneously?

Ok, this is weird 😅 It's supposed to be a common practice to load balance on socket like this. Maybe something is different with Windows (Although it shouldn't, that's why I'm surprised).

I don't mean to disagree with you since you are obviously more knowledagble than me on this subject. I'm just trying to clarify my understanding. When I init a new xev.Loop in io_uring I get a new memory address every init regardless if it's the same socket or not. However with IOCP I get a xev.Loop at the same memory address even though I init it from multiple different threads. To me this seems like they work differently from each other. Please correct me if I misunderstood something here.

You mean that when you run that locally, this happens ?

var loop1 = try xev.Loop.init(.{});
var loop2 = try xev.Loop.init(.{});

std.debug.print("{*} == {*}: {}\n", .{&loop1, &loop2, &loop1 == &loop2}); // Will print {addr1} == {addr2}: true

Cause that's not what happens on my Windows machine (and shouldn't on yours as well), so I'm a bit confused 😁

I don't want to say that what you are trying to do shouldn't be done (you have your reasons). Generally speaking, accept-ing sockets should offload new sockets to worker threads and go back to waiting for connections as fast as possible. Thus reducing the need to load-balance on multiple threads like your original code does.

@jvmiert
Copy link
Author

jvmiert commented Dec 31, 2023

Cause that's not what happens on my Windows machine (and shouldn't on yours as well), so I'm a bit confused 😁

My apologies for the confusion, I mixed up the xev.Loop object with the xev.TCP object. Obviously makes sense that socket would be the same. The loop does indeed return new instances at different addresses.

I don't want to say that what you are trying to do shouldn't be done (you have your reasons). Generally speaking, accept-ing sockets should offload new sockets to worker threads and go back to waiting for connections as fast as possible. Thus reducing the need to load-balance on multiple threads like your original code does.

Thanks again for double checking my work, it's been helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants