Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
coroutines v3
Browse files Browse the repository at this point in the history
  • Loading branch information
igchor committed Dec 7, 2021
1 parent bbd5363 commit 7062eb2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 60 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON)
set(TEST_DIR ${CMAKE_CURRENT_BINARY_DIR}/test
CACHE STRING "working directory for tests")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")

include(FindPerl)
include(FindThreads)
include(CMakePackageConfigHelpers)
Expand Down
122 changes: 62 additions & 60 deletions examples/basic_cpp/basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <utility>
#include <string>
#include <iostream>
#include <queue>

#include "libminiasync.h"

Expand Down Expand Up @@ -55,57 +56,56 @@ struct task {
std::coroutine_handle<task::promise_type> h;
};

struct coroutine_data {
std::coroutine_handle<> handle;
};

struct coroutine_output {
uint64_t b;
};

FUTURE(coroutine_future, struct coroutine_data, struct coroutine_output);
std::queue<std::pair<std::vector<struct vdm_memcpy_future>, std::coroutine_handle<>>> futures;

struct async_memcpy_resume_data {
FUTURE_CHAIN_ENTRY(struct vdm_memcpy_future, memcpy);
FUTURE_CHAIN_ENTRY(struct coroutine_future, resume);
};
struct memcpy_task
{
memcpy_task(void *dst, void *src, size_t n) {
auto *pthread_mover = vdm_new(vdm_descriptor_pthreads()); // XXX - lifetime
fut = vdm_memcpy(pthread_mover, dst, src, n);
}

struct async_memcpy_resume_output {
uint64_t a;
};
bool await_ready()
{
return false;
}

FUTURE(async_memcpy_resume_fut, struct async_memcpy_resume_data,
struct async_memcpy_resume_output);
void await_suspend(std::coroutine_handle<> h)
{
futures.emplace(std::vector<struct vdm_memcpy_future>{fut}, h);
}

static enum future_state
resume_impl(struct future_context *ctx, struct future_waker waker)
{
struct coroutine_data *data = reinterpret_cast<struct coroutine_data *>(future_context_get_data(ctx));
// custom function
struct vdm_memcpy_future manual_await()
{
return fut;
}

// Resume coroutine
data->handle();
void await_resume() {}

return FUTURE_STATE_COMPLETE;
}
struct vdm_memcpy_future fut;
};

static struct coroutine_future
resume_coroutine(std::coroutine_handle<> h)
void wait(struct runtime *r)
{
struct coroutine_future fut;
fut.data.handle = h;
while (futures.size()) {
auto &p = futures.front();

FUTURE_INIT(&fut, resume_impl);
std::vector<future*> futs;
for (auto &f : p.first)
futs.emplace_back(FUTURE_AS_RUNNABLE(&f));

return fut;
runtime_wait_multiple(r, futs.data(), futs.size());
p.second(); // resume coroutine
futures.pop();
}
}

struct memcpy_task
struct when_all
{
memcpy_task(void *dst, void *src, size_t n, struct async_memcpy_resume_fut *fut): fut(fut) {
auto *pthread_mover = vdm_new(vdm_descriptor_pthreads()); // XXX - lifetime
FUTURE_CHAIN_ENTRY_INIT(&fut->data.memcpy,
vdm_memcpy(pthread_mover, dst, src, n),
NULL, NULL);
// XXX - passing vector here leads to internal complier error...
when_all(memcpy_task a1, memcpy_task a2, memcpy_task a3): a1(a1), a2(a2), a3(a3)
{
}

bool await_ready()
Expand All @@ -115,33 +115,38 @@ struct memcpy_task

void await_suspend(std::coroutine_handle<> h)
{
FUTURE_CHAIN_ENTRY_INIT(&fut->data.resume, resume_coroutine(h),
NULL, NULL);
FUTURE_CHAIN_INIT(fut);
std::vector<struct vdm_memcpy_future> v;
v.emplace_back(a1.manual_await());
v.emplace_back(a2.manual_await());
v.emplace_back(a3.manual_await());

futures.emplace(v, h);
}

void await_resume() {}

struct async_memcpy_resume_fut *fut;
memcpy_task a1, a2, a3;
};

// XXX - soe similar as in cpp coro for when_all + when_all_task:
// memcpy_task is a separate task with it's own promise and start() method
// which takes appropriate argumentes (maybe handle to coroutine?)
// then on top of that there is extra awaiter -> something like when_all_ready_awaitable
// which is responsible for starting the task and waiting for runtime to finish
// AND IT COULD ALSO RESUME the coroutine manuallY (not using chaining?)

task async_mempcy(void *dst, void *src, size_t n, struct async_memcpy_resume_fut *fut)
task async_mempcy(void *dst, void *src, size_t n)
{
std::cout << "Before memcpy" << std::endl;
co_await memcpy_task{dst, src, n, fut};
std::cout << "After memcpy" << std::endl;
co_await memcpy_task{dst, src, n/2};
std::cout << "After memcpy " << ((char*) dst) << std::endl;
co_await memcpy_task{dst + n/2, src + n/2, n - n/2};
std::cout << "After second memcpy " << ((char*) dst) << std::endl;

auto a1 = memcpy_task{dst, src, 1};
auto a2 = memcpy_task{dst + 1, src, 1};
auto a3 = memcpy_task{dst + 2, src, 1};

co_await when_all(a1, a2, a3);
std::cout << "After 3 concurrent memcopies " << ((char*) dst) << std::endl;
}

task async_memcpy_print(std::string to_copy, char *buffer, const std::string &to_print, struct async_memcpy_resume_fut *fut)
task async_memcpy_print(std::string to_copy, char *buffer, const std::string &to_print)
{
co_await async_mempcy(reinterpret_cast<void*>(buffer), reinterpret_cast<void*>(to_copy.data()), to_copy.size(), fut);
co_await async_mempcy(reinterpret_cast<void*>(buffer), reinterpret_cast<void*>(to_copy.data()), to_copy.size());
std::cout << to_print << std::endl;
}

Expand All @@ -156,15 +161,12 @@ main(int argc, char *argv[])

char buffer[buffer_size] = {0};
{
struct async_memcpy_resume_fut fut;
auto future = async_memcpy_print(to_copy, buffer, to_print, &fut);
auto future = async_memcpy_print(to_copy, buffer, to_print);

std::cout << "inside main" << std::endl;

// actually executes future on runtime r
// XXX - make it a single function
future.wait();
runtime_wait(r, FUTURE_AS_RUNNABLE(&fut));
future.h.resume();
wait(r);

std::cout << buffer << std::endl;
}
Expand Down

0 comments on commit 7062eb2

Please sign in to comment.