Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: How to use spawn with not trivial payload? #5

Open
voroskoi opened this issue Dec 22, 2024 · 4 comments
Open

Question: How to use spawn with not trivial payload? #5

voroskoi opened this issue Dec 22, 2024 · 4 comments

Comments

@voroskoi
Copy link

Hi,

This is probably just me being lame, but I can not figure out even the simplest use cases. I would like to add elements to an ArrayList with tardy. It maybe does not make any sense, but I have though this would be good for starters.

const std = @import("std");
const Tardy = @import("tardy").Tardy(.auto);
const Runtime = @import("tardy").Runtime;

fn notmain() !void {
    var t = try Tardy.init(.{ .allocator = std.heap.page_allocator });
    defer t.deinit();

    // XXX: this stack is not visible in init: add to init params!
    // var stack = std.ArrayList(usize).init(std.heap.page_allocator);
    // defer stack.deinit();

    // XXX: This struct fails with unable to unwrap null on utils.zig:27
    // because it neads to be `packed` to have a backing_integer field.
    // OTOH if I add packed it becomes u128 which fails on utils.zig:71.
    const StackContext = struct {
        stack: *std.ArrayList(usize),
        item: usize,

        fn append(_: *Runtime, _: void, context: @This()) !void {
            try context.stack.append(context.item);
        }
    };

    try t.entry(
        {},
        struct {
            fn init(rt: *Runtime, _: void) !void {
                var stack = std.ArrayList(usize).init(std.heap.page_allocator);
                defer stack.deinit();

                try rt.spawn(void, StackContext{ .stack = &stack, .item = 3 }, StackContext.append);
            }
        }.init,
        {},
        struct {
            fn deinit(_: *Runtime, _: void) !void {}
        }.deinit,
    );
}

pub fn main() !void {
    var t = try Tardy.init(.{ .allocator = std.heap.page_allocator });
    defer t.deinit();

    var stack = std.ArrayList(usize).init(std.heap.page_allocator);
    defer stack.deinit();

    try t.entry(
        &stack,
        struct {
            fn append(_: *Runtime, st: *std.ArrayList(usize), item: usize) !void {
                try st.append(item);
            }
            fn init(rt: *Runtime, _: *std.ArrayList(usize)) !void {
                try rt.spawn(void, @as(usize, 3), append);
            }
        }.init,
        {},
        struct {
            fn deinit(_: *Runtime, _: void) !void {}
        }.deinit,
    );
}

The notmain function is my first try, but I could not figure out how to pass StackContext to Tardy. It fails one way or the other. See my comment above StackContext.

The main function is my second try. Here I pass stack as an init parameter (where I do not need it), but I would like to use it in append, where I am still unable to do so.

So in this simple example how can I pass both the ArrayList and the value I would like to add to each thread?

@mookums
Copy link
Owner

mookums commented Dec 23, 2024

I took at a look at your example and put together this version that should work.

The reason why the Context passing was failing is because the maximum size of data being passed through is a usize. This means you can pass through:

  1. a type that is natively smaller than a usize, like a u8.
  2. a struct that is packed to be smaller than a usize.
  3. a pointer to any arbitrary type since the pointer will always be usize sized.
const std = @import("std");
const log = std.log.scoped(.@"issue5/example");

const Tardy = @import("tardy").Tardy(.auto);
const Runtime = @import("tardy").Runtime;

const StackContext = struct {
    mutex: std.Thread.Mutex = .{},
    stack: std.ArrayList(usize),
    item: usize,

    fn init(allocator: std.mem.Allocator) StackContext {
        return .{
            .stack = std.ArrayList(usize).init(allocator),
            .item = 0,
        };
    }

    pub fn deinit(self: *const StackContext) void {
        self.stack.deinit();
    }

    fn append_task(_: *Runtime, _: void, context: *StackContext) !void {
        context.mutex.lock();
        defer context.mutex.unlock();

        context.item += 1;
        log.debug("appending {d} to stack", .{context.item});
        try context.stack.append(context.item);
    }
};

pub fn main() !void {
    const allocator = std.heap.page_allocator;

    var t = try Tardy.init(.{
        .allocator = allocator,
        .threading = .{ .multi = 10 },
    });
    defer t.deinit();

    var stack_context = StackContext.init(allocator);
    defer stack_context.deinit();

    try t.entry(
        &stack_context,
        struct {
           // this will be run in every runtime.
            fn init(rt: *Runtime, context: *StackContext) !void {
                try rt.spawn(void, context, StackContext.append_task);
            }
        }.init,
        {},
        struct {
            fn deinit(_: *Runtime, _: void) !void {}
        }.deinit,
    );
}

@voroskoi
Copy link
Author

Hi,

Thank You very much for the quick and helpful reply. I was able to come up with a working version based on Your feedback. However I have run into some surprises, I would like to share.

Running Your code gives me this:

voroskoi ~/code/tardy-test  HEAD (e457c6b) [!] via ↯ v0.13.0 ❯ zig build run
debug(tardy): aio backend: io_uring
info(tardy): thread count: 10
debug(issue5/example): appending 1 to stack
warning(tardy/runtime): no more tasks
debug(issue5/example): appending 2 to stack
warning(tardy/runtime): no more tasks

Sometimes I also see appending 3 to stack, but it should only run once, isn't it?

However sometimes I got this:

voroskoi ~/code/tardy-test  HEAD (e457c6b) [!] via ↯ v0.13.0 ❯ zig build run
debug(tardy): aio backend: io_uring
info(tardy): thread count: 10
debug(issue5/example): appending 1 to stack
warning(tardy/runtime): no more tasks
thread 13335 panic: reached unreachable code
debug(issue5/example): appending 2 to stack
warning(tardy/runtime): no more tasks
/home/voroskoi/.zig-linux-aarch64-0.13.0/lib/std/debug.zig:412:14: 0x10503fb in assert (tardy-test)
    if (!ok) unreachable; // assertion failure
             ^
/home/voroskoi/.cache/zig/p/1220c3c3e6dd2fc6e61645dff2d22d46cb3eca3fb5c4c14a0015dab881d0a5af3976/src/aio/apis/io_uring.zig:78:23: 0x1052f1f in init (tardy-test)
                assert(parent_uring.inner.fd >= 0);
                      ^
/home/voroskoi/.cache/zig/p/1220c3c3e6dd2fc6e61645dff2d22d46cb3eca3fb5c4c14a0015dab881d0a5af3976/src/lib.zig:102:45: 0x10545ab in spawn_runtime (tardy-test)
                io.* = try AioInnerType.init(self.options.allocator, options);
                                            ^
/home/voroskoi/.cache/zig/p/1220c3c3e6dd2fc6e61645dff2d22d46cb3eca3fb5c4c14a0015dab881d0a5af3976/src/lib.zig:178:60: 0x10a00cf in thread_init (tardy-test)
                        var thread_rt = tardy.spawn_runtime(.{
                                                           ^
/home/voroskoi/.zig-linux-aarch64-0.13.0/lib/std/Thread.zig:408:13: 0x108fe6b in callFn__anon_8988 (tardy-test)
            @call(.auto, f, args);
            ^
/home/voroskoi/.zig-linux-aarch64-0.13.0/lib/std/Thread.zig:1226:30: 0x1061197 in entryFn (tardy-test)
                return callFn(f, self.fn_args);
                             ^
/home/voroskoi/.zig-linux-aarch64-0.13.0/lib/c.zig:269:13: 0x10e1d1f in clone (c)
            asm volatile (
            ^
/home/voroskoi/.cache/zig/p/1220c3c3e6dd2fc6e61645dff2d22d46cb3eca3fb5c4c14a0015dab881d0a5af3976/src/aio/lib.zig:186:26: 0x106159b in deinit (tardy-test)
        @call(.auto, self._deinit, .{ self, allocator });
                         ^
run
mq run tardy-test failure
error: the following command terminated unexpectedly:
/home/voroskoi/code/tardy-test/zig-out/bin/tardy-test
Build Summary: 3/5 steps succeeded; 1 failed (disable with --summary none)
run transitive failure
mq run tardy-test failure
error: the following build command failed with exit code 1:
/home/voroskoi/code/tardy-test/.zig-cache/o/5d058b28317d6a8b993d86151d88a919/build /home/voroskoi/.zig-linux-aarch64-0.13.0/zig /home/voroskoi/code/tardy-test /home/voroskoi/code/tardy-test/.zig-cache /home/voroskoi/.cache/zig --seed 0x64329e6 -Zc69e6ce75f43905b run

My final version is this:

const std = @import("std");
const log = std.log.scoped(.@"issue5/example");

const Tardy = @import("tardy").Tardy(.auto);
const Runtime = @import("tardy").Runtime;

const StackContext = struct {
    mutex: std.Thread.Mutex = .{},
    stack: *std.ArrayList(usize),
    item: usize,

    fn init(allocator: std.mem.Allocator) !StackContext {
        const st = try allocator.create(std.ArrayList(usize));
        st.* = std.ArrayList(usize).init(allocator);
        return .{
            .stack = st,
            .item = undefined,
        };
    }

    pub fn deinit(self: *const StackContext) void {
        self.stack.deinit();
    }

    fn append_task(_: *Runtime, _: void, context: *StackContext) !void {
        context.mutex.lock();
        defer context.mutex.unlock();

        log.debug("appending {d} to stack", .{context.item});
        try context.stack.append(context.item);
        log.debug("items: {any}\n", .{context.stack.items});
    }
};

pub fn main() !void {
    const allocator = std.heap.page_allocator;

    var t = try Tardy.init(.{
        .allocator = allocator,
        .threading = .{ .multi = 1 },
    });
    defer t.deinit();

    var stack_context = try StackContext.init(allocator);
    defer stack_context.deinit();

    try t.entry(
        &stack_context,
        struct {
            // this will be run in every runtime.
            fn init(rt: *Runtime, context: *StackContext) !void {
                var ctx1 = StackContext{
                    .mutex = context.mutex,
                    .stack = context.stack,
                    .item = 3,
                };
                try rt.spawn(void, &ctx1, StackContext.append_task);
                var ctx2 = StackContext{
                    .mutex = context.mutex,
                    .stack = context.stack,
                    .item = 4,
                };
                try rt.spawn(void, &ctx2, StackContext.append_task);
                var ctx3 = StackContext{
                    .mutex = context.mutex,
                    .stack = context.stack,
                    .item = 5,
                };
                try rt.spawn(void, &ctx3, StackContext.append_task);
            }
        }.init,
        {},
        struct {
            fn deinit(_: *Runtime, _: void) !void {}
        }.deinit,
    );

    log.debug("Final items: {any}\n", .{stack_context.stack.items});
}

I had to make changes to:

  1. Allocate ArrayList header with allocator, so I can pass it around. This way I will not loose the added elements when the runtime finishes.
  2. Create a separate StackContext for every spawn call, othervise only the last item (5) gets inserted every time.

This works fine with .threading = .{.multi = 1}, but multi = 2 creates a 3 4 5 3 4 5 array, which is unexpected and I also run into the io_uring.zig line 78 assertation error shown before.
I have created a loop for adding 0..20 items into the array which creates a 40 items array (I would still expect 20 items) if multi >= 2, but further incrementing the thread count does not seem to make any difference.

Thanks,

@mookums
Copy link
Owner

mookums commented Dec 23, 2024

  1. The weird assertion bug is something I ran into and fixed yesterday if you use the latest main (c2851c1).

  2. The entry fn runs on every single runtime that gets spawned (one per thread). Each runtime runs entirely within the context of the thread it belongs to and no operations (except for the experimental channels) cross these thread boundaries. This means that getting a $THREAD_COUNT number of additions to the stack is expected behavior as the entry fn is running on each runtime.

  3. Your example is spawning 3 tasks on every runtime. Each one of these runs and appends to the stack and then the runtime terminates as it has no more tasks available. This is expected behavior given what the init method contains.

  4. I think you want processing to happen across threads which is not a standard case for Tardy since things are very thread-local. You can try using the experimental channels (Broadcast which would be SPMC) to pass data across thread boundaries. I've done this before with a quicksort example that unfortunately I don't have anymore.

@voroskoi
Copy link
Author

Hi,

Thanks again for Your help!

  1. Latest main indeed fixes that, thanks.
  2. I had a similar feeling, but how come that 3 threads do not create 60 entry as expected? - This is also fixed in latest master.
  3. See previous.
  4. Actually I just want a threadpool, but with green threads. If I use it with multi = 1 then spawn creates the tasks and waits them to finnish, isn't it?

I will check out channels anyway. Thnks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants