This repository has been archived by the owner on Mar 22, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
examples: Add example with coroutines
How it works: When co_await async_memcpy is called, the compiler calls async_memcpy::awaitable::await_suspend() which puts miniasync future and coroutine handle to a shared containers. It also registers miniasync notifier so that other entities can now when the future completes. There is also an executor loop which takes elements from the container and executes them (calls runtime_wait for miniasync future and and then resumes the coroutine).
- Loading branch information
Showing
8 changed files
with
306 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
/* Copyright 2021, Intel Corporation */ | ||
// SPDX-License-Identifier: MIT | ||
/* Copyright (c) Lewis Baker */ | ||
|
||
#include <coroutine> | ||
#include <utility> | ||
#include <string> | ||
#include <atomic> | ||
#include <functional> | ||
#include <memory> | ||
|
||
#ifndef MINIASYNC_COROUTINE_HELPERS | ||
#define MINIASYNC_COROUTINE_HELPERS | ||
|
||
/* | ||
* Helper structures for coroutines, they are heavily inspired by | ||
* https://github.com/lewissbaker/cppcoro | ||
*/ | ||
|
||
/* This is a generic task which supports continuation. */ | ||
struct task { | ||
struct promise_type { | ||
struct final_awaitable | ||
{ | ||
bool await_ready() const noexcept { return false; } | ||
void await_resume() noexcept {} | ||
|
||
void await_suspend(std::coroutine_handle<task::promise_type> h) noexcept { | ||
auto &cont = h.promise().cont; | ||
if (cont) | ||
cont.resume(); | ||
} | ||
}; | ||
|
||
task get_return_object() { return task{std::coroutine_handle<task::promise_type>::from_promise(*this)}; } | ||
std::suspend_always initial_suspend() { return {}; } | ||
auto final_suspend() noexcept { return final_awaitable{}; } | ||
void return_void() {} | ||
void unhandled_exception() {} | ||
|
||
using future_type = std::pair<struct future_poller, std::coroutine_handle<>>; | ||
|
||
std::vector<future_type> futures; | ||
std::coroutine_handle<task::promise_type> cont; | ||
}; | ||
|
||
void | ||
wait() { | ||
h.resume(); | ||
} | ||
|
||
bool | ||
await_ready() { return !h || h.done();} | ||
std::coroutine_handle<> await_suspend(std::coroutine_handle<task::promise_type> aw) { | ||
h.promise().cont = aw; | ||
return h; | ||
} | ||
|
||
void | ||
await_resume() {} | ||
|
||
std::coroutine_handle<task::promise_type> h; | ||
}; | ||
|
||
namespace detail { | ||
|
||
template <typename Awaitable> | ||
task when_all_task(Awaitable awaitable, std::atomic<int> &counter, std::coroutine_handle<> h) | ||
{ | ||
co_await awaitable; | ||
|
||
auto cnt = counter.fetch_sub(1); | ||
if (cnt - 1 == 0) { | ||
h.resume(); | ||
} | ||
} | ||
|
||
template <typename Task> | ||
struct when_all_ready_awaitable | ||
{ | ||
when_all_ready_awaitable(std::vector<Task>&& tasks): counter(tasks.size()), tasks(std::move(tasks)) | ||
{ | ||
} | ||
|
||
bool await_ready() | ||
{ | ||
return false; | ||
} | ||
|
||
void await_suspend(std::coroutine_handle<> h) | ||
{ | ||
for (auto&& task : tasks) | ||
{ | ||
when_all_task(std::move(task), counter, h).h.resume(); | ||
} | ||
} | ||
|
||
void await_resume() {} | ||
|
||
std::atomic<int> counter = 0; | ||
std::vector<Task> tasks; | ||
}; | ||
} | ||
|
||
template <typename A, typename... Awaitables> | ||
auto when_all(A&& aw, Awaitables&&... awaitables) | ||
{ | ||
std::vector<std::remove_reference_t<A>> tasks; | ||
tasks.emplace_back(std::move<A>(aw)); | ||
|
||
for (auto &&a : {awaitables...}) | ||
tasks.emplace_back(std::move(a)); | ||
|
||
return detail::when_all_ready_awaitable(std::move(tasks)); | ||
} | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
/* Copyright 2021, Intel Corporation */ | ||
|
||
/* | ||
* coroutine_memcpy.cpp -- example showing miniasync integration with coroutines | ||
*/ | ||
|
||
#include "libminiasync.h" | ||
|
||
#include <iostream> | ||
#include <queue> | ||
#include <unordered_set> | ||
#include <numeric> | ||
#include <cassert> | ||
|
||
#include "coroutine_helpers.hpp" | ||
#include "miniaync_awaitable.hpp" | ||
|
||
/* Executor loop */ | ||
void wait(runner_state &rs, task& t) | ||
{ | ||
t.h.resume(); | ||
|
||
while (!t.h.done()) { | ||
// XXX - optimize this for single future case, | ||
// it's not optimal to allocate new vector each time | ||
auto awaitables_snapshot = std::move(rs.awaitables); | ||
std::vector<bool> done(awaitables_snapshot.size(), false); | ||
int done_cnt = 0; | ||
|
||
do { | ||
// XXX: can use umwait | ||
for (int i = 0; i < awaitables_snapshot.size(); i++) { | ||
auto &f = awaitables_snapshot[i]; | ||
if (f->is_done() && !done[i]) { | ||
done_cnt++; | ||
done[i] = true; | ||
f->resume(); | ||
} | ||
} | ||
} while (done_cnt != awaitables_snapshot.size()); | ||
} | ||
} | ||
|
||
task run_async_memcpy(runner_state &rs, char *dst, char *src, size_t n) | ||
{ | ||
std::cout << "Before memcpy" << std::endl; | ||
co_await async_memcpy(rs, dst, src, n/2); | ||
std::cout << "After memcpy " << ((char*) dst) << std::endl; | ||
co_await async_memcpy(rs, dst + n/2, src + n/2, n - n/2); | ||
std::cout << "After second memcpy " << ((char*) dst) << std::endl; | ||
|
||
auto a1 = async_memcpy(rs, dst, src, 1); | ||
auto a2 = async_memcpy(rs, dst + 1, src, 1); | ||
auto a3 = async_memcpy(rs, dst + 2, src, 1); | ||
|
||
co_await when_all(a1, a2, a3); | ||
std::cout << "After 3 concurrent memcopies " << ((char*) dst) << std::endl; | ||
} | ||
|
||
task async_memcpy_print(runner_state &rs, char *dst, char *src, size_t n, std::string to_print) | ||
{ | ||
auto a1 = run_async_memcpy(rs, dst, src, n/2); | ||
auto a2 = run_async_memcpy(rs, dst + n/2, src + n/2, n - n/2); | ||
|
||
co_await when_all(a1, a2); | ||
|
||
std::cout << to_print << std::endl; | ||
} | ||
|
||
int | ||
main(int argc, char *argv[]) | ||
{ | ||
runner_state rs = { | ||
.mover = std::unique_ptr<vdm, decltype(&vdm_delete)>( | ||
vdm_new(vdm_descriptor_pthreads_polled()), &vdm_delete) | ||
}; | ||
|
||
static constexpr auto buffer_size = 10; | ||
static constexpr auto to_copy = "something"; | ||
static constexpr auto to_print = "async print!"; | ||
|
||
char buffer[buffer_size] = {0}; | ||
{ | ||
auto future = async_memcpy_print(rs, buffer, std::string(to_copy).data(), buffer_size, to_print); | ||
|
||
std::cout << "inside main" << std::endl; | ||
|
||
wait(rs, future); | ||
|
||
std::cout << buffer << std::endl; | ||
} | ||
|
||
return 0; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
/* Copyright 2021, Intel Corporation */ | ||
|
||
/* | ||
* miniasync_awaitable.hpp - base class for all miniasync based awaitbales | ||
*/ | ||
|
||
#include "libminiasync.h" | ||
|
||
#include <coroutine> | ||
|
||
struct miniasync_awaitable_base | ||
{ | ||
bool await_ready(); | ||
void await_resume(); | ||
|
||
bool is_done(); | ||
void resume(); | ||
|
||
protected: | ||
struct future_notifier notifier; | ||
std::coroutine_handle<> cont; | ||
}; | ||
|
||
bool miniasync_awaitable_base::await_ready() | ||
{ | ||
return false; | ||
} | ||
|
||
void miniasync_awaitable_base::await_resume() | ||
{ | ||
} | ||
|
||
bool miniasync_awaitable_base::is_done() | ||
{ | ||
assert(notifier.notifier_used == FUTURE_NOTIFIER_POLLER); | ||
return *notifier.poller.ptr_to_monitor; | ||
} | ||
|
||
void miniasync_awaitable_base::resume() | ||
{ | ||
cont.resume(); | ||
} | ||
|
||
struct runner_state { | ||
std::unique_ptr<vdm, decltype(&vdm_delete)> mover; | ||
std::vector<miniasync_awaitable_base*> awaitables; | ||
}; | ||
|
||
template <typename Future> | ||
struct miniasync_awaitable : public miniasync_awaitable_base | ||
{ | ||
miniasync_awaitable(runner_state &rs, Future future); | ||
void await_suspend(std::coroutine_handle<> cont); | ||
|
||
runner_state &rs; | ||
Future future; | ||
}; | ||
|
||
template <typename Future> | ||
miniasync_awaitable<Future>::miniasync_awaitable(runner_state &rs, Future future): rs(rs), future(future) { | ||
} | ||
|
||
template <typename Future> | ||
void miniasync_awaitable<Future>::await_suspend(std::coroutine_handle<> cont) | ||
{ | ||
this->cont = cont; | ||
|
||
future_poll(FUTURE_AS_RUNNABLE(&future), ¬ifier); | ||
|
||
rs.awaitables.push_back(static_cast<miniasync_awaitable_base*>(this)); | ||
} | ||
|
||
miniasync_awaitable<vdm_memcpy_future> async_memcpy(runner_state &rs, void *dst, void *src, size_t n) | ||
{ | ||
return miniasync_awaitable<vdm_memcpy_future>(rs, vdm_memcpy(rs.mover.get(), dst, src, n, 0)); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters