diff --git a/CMakeLists.txt b/CMakeLists.txt index 814a691..2eee4d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,8 @@ option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON) set(TEST_DIR ${CMAKE_CURRENT_BINARY_DIR}/test CACHE STRING "working directory for tests") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines") + include(FindPerl) include(FindThreads) include(CMakePackageConfigHelpers) diff --git a/examples/basic_cpp/basic.cpp b/examples/basic_cpp/basic.cpp index 25b7dd7..b22313e 100644 --- a/examples/basic_cpp/basic.cpp +++ b/examples/basic_cpp/basic.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "libminiasync.h" @@ -55,57 +56,56 @@ struct task { std::coroutine_handle h; }; -struct coroutine_data { - std::coroutine_handle<> handle; -}; - -struct coroutine_output { - uint64_t b; -}; - -FUTURE(coroutine_future, struct coroutine_data, struct coroutine_output); +std::queue, std::coroutine_handle<>>> futures; -struct async_memcpy_resume_data { - FUTURE_CHAIN_ENTRY(struct vdm_memcpy_future, memcpy); - FUTURE_CHAIN_ENTRY(struct coroutine_future, resume); -}; +struct memcpy_task +{ + memcpy_task(void *dst, void *src, size_t n) { + auto *pthread_mover = vdm_new(vdm_descriptor_pthreads()); // XXX - lifetime + fut = vdm_memcpy(pthread_mover, dst, src, n); + } -struct async_memcpy_resume_output { - uint64_t a; -}; + bool await_ready() + { + return false; + } -FUTURE(async_memcpy_resume_fut, struct async_memcpy_resume_data, - struct async_memcpy_resume_output); + void await_suspend(std::coroutine_handle<> h) + { + futures.emplace(std::vector{fut}, h); + } -static enum future_state -resume_impl(struct future_context *ctx, struct future_waker waker) -{ - struct coroutine_data *data = reinterpret_cast(future_context_get_data(ctx)); + // custom function + struct vdm_memcpy_future manual_await() + { + return fut; + } - // Resume coroutine - data->handle(); + void await_resume() {} - return FUTURE_STATE_COMPLETE; -} + struct vdm_memcpy_future fut; +}; -static struct coroutine_future -resume_coroutine(std::coroutine_handle<> h) +void wait(struct runtime *r) { - struct coroutine_future fut; - fut.data.handle = h; + while (futures.size()) { + auto &p = futures.front(); - FUTURE_INIT(&fut, resume_impl); + std::vector futs; + for (auto &f : p.first) + futs.emplace_back(FUTURE_AS_RUNNABLE(&f)); - return fut; + runtime_wait_multiple(r, futs.data(), futs.size()); + p.second(); // resume coroutine + futures.pop(); + } } -struct memcpy_task +struct when_all { - memcpy_task(void *dst, void *src, size_t n, struct async_memcpy_resume_fut *fut): fut(fut) { - auto *pthread_mover = vdm_new(vdm_descriptor_pthreads()); // XXX - lifetime - FUTURE_CHAIN_ENTRY_INIT(&fut->data.memcpy, - vdm_memcpy(pthread_mover, dst, src, n), - NULL, NULL); + // XXX - passing vector here leads to internal complier error... + when_all(memcpy_task a1, memcpy_task a2, memcpy_task a3): a1(a1), a2(a2), a3(a3) + { } bool await_ready() @@ -115,33 +115,38 @@ struct memcpy_task void await_suspend(std::coroutine_handle<> h) { - FUTURE_CHAIN_ENTRY_INIT(&fut->data.resume, resume_coroutine(h), - NULL, NULL); - FUTURE_CHAIN_INIT(fut); + std::vector v; + v.emplace_back(a1.manual_await()); + v.emplace_back(a2.manual_await()); + v.emplace_back(a3.manual_await()); + + futures.emplace(v, h); } void await_resume() {} - struct async_memcpy_resume_fut *fut; + memcpy_task a1, a2, a3; }; -// XXX - soe similar as in cpp coro for when_all + when_all_task: -// memcpy_task is a separate task with it's own promise and start() method -// which takes appropriate argumentes (maybe handle to coroutine?) -// then on top of that there is extra awaiter -> something like when_all_ready_awaitable -// which is responsible for starting the task and waiting for runtime to finish -// AND IT COULD ALSO RESUME the coroutine manuallY (not using chaining?) - -task async_mempcy(void *dst, void *src, size_t n, struct async_memcpy_resume_fut *fut) +task async_mempcy(void *dst, void *src, size_t n) { std::cout << "Before memcpy" << std::endl; - co_await memcpy_task{dst, src, n, fut}; - std::cout << "After memcpy" << std::endl; + co_await memcpy_task{dst, src, n/2}; + std::cout << "After memcpy " << ((char*) dst) << std::endl; + co_await memcpy_task{dst + n/2, src + n/2, n - n/2}; + std::cout << "After second memcpy " << ((char*) dst) << std::endl; + + auto a1 = memcpy_task{dst, src, 1}; + auto a2 = memcpy_task{dst + 1, src, 1}; + auto a3 = memcpy_task{dst + 2, src, 1}; + + co_await when_all(a1, a2, a3); + std::cout << "After 3 concurrent memcopies " << ((char*) dst) << std::endl; } -task async_memcpy_print(std::string to_copy, char *buffer, const std::string &to_print, struct async_memcpy_resume_fut *fut) +task async_memcpy_print(std::string to_copy, char *buffer, const std::string &to_print) { - co_await async_mempcy(reinterpret_cast(buffer), reinterpret_cast(to_copy.data()), to_copy.size(), fut); + co_await async_mempcy(reinterpret_cast(buffer), reinterpret_cast(to_copy.data()), to_copy.size()); std::cout << to_print << std::endl; } @@ -156,15 +161,12 @@ main(int argc, char *argv[]) char buffer[buffer_size] = {0}; { - struct async_memcpy_resume_fut fut; - auto future = async_memcpy_print(to_copy, buffer, to_print, &fut); + auto future = async_memcpy_print(to_copy, buffer, to_print); std::cout << "inside main" << std::endl; - // actually executes future on runtime r - // XXX - make it a single function - future.wait(); - runtime_wait(r, FUTURE_AS_RUNNABLE(&fut)); + future.h.resume(); + wait(r); std::cout << buffer << std::endl; }