This repository has been archived by the owner on Mar 22, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
examples: Add example with coroutines
How it works: When co_await async_memcpy is called, the compiler calls async_memcpy::awaitable::await_suspend() which puts miniasync future and coroutine handle to a shared containers. It also registers miniasync notifier so that other entities can now when the future completes. There is also an executor loop which takes elements from the container and executes them (calls runtime_wait for miniasync future and and then resumes the coroutine).
- Loading branch information
Showing
10 changed files
with
412 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* SPDX-License-Identifier: BSD-3-Clause */ | ||
/* Copyright 2021-2022, Intel Corporation */ | ||
// SPDX-License-Identifier: MIT | ||
/* Copyright (c) Lewis Baker */ | ||
|
||
#include <atomic> | ||
#include <coroutine> | ||
#include <functional> | ||
#include <memory> | ||
#include <string> | ||
#include <utility> | ||
|
||
#ifndef MINIASYNC_COROUTINE_HELPERS | ||
#define MINIASYNC_COROUTINE_HELPERS | ||
|
||
/* | ||
* Helper structures for coroutines, they are heavily inspired by | ||
* https://github.com/lewissbaker/cppcoro | ||
*/ | ||
|
||
/* This is a generic task which supports continuation. */ | ||
struct task { | ||
struct promise_type { | ||
struct final_awaitable { | ||
bool await_ready() const noexcept | ||
{ | ||
return false; | ||
} | ||
void await_resume() noexcept | ||
{ | ||
} | ||
|
||
void await_suspend(std::coroutine_handle<task::promise_type> h) noexcept | ||
{ | ||
auto &cont = h.promise().cont; | ||
if (cont) | ||
cont.resume(); | ||
} | ||
}; | ||
|
||
task get_return_object() | ||
{ | ||
return task{std::coroutine_handle<task::promise_type>::from_promise(*this)}; | ||
} | ||
std::suspend_always initial_suspend() | ||
{ | ||
return {}; | ||
} | ||
auto final_suspend() noexcept | ||
{ | ||
return final_awaitable{}; | ||
} | ||
void return_void() | ||
{ | ||
} | ||
void unhandled_exception() | ||
{ | ||
} | ||
|
||
std::coroutine_handle<task::promise_type> cont; | ||
}; | ||
|
||
void wait() | ||
{ | ||
h.resume(); | ||
} | ||
|
||
std::coroutine_handle<task::promise_type> release() && | ||
{ | ||
return std::exchange(h, nullptr); | ||
} | ||
|
||
bool await_ready() | ||
{ | ||
return !h || h.done(); | ||
} | ||
|
||
std::coroutine_handle<> await_suspend(std::coroutine_handle<task::promise_type> aw) | ||
{ | ||
h.promise().cont = aw; | ||
return h; | ||
} | ||
|
||
void await_resume() | ||
{ | ||
} | ||
|
||
std::coroutine_handle<task::promise_type> h; | ||
}; | ||
|
||
namespace detail | ||
{ | ||
|
||
template <typename Awaitable> | ||
task when_all_task(Awaitable awaitable, std::atomic<int> &counter, std::coroutine_handle<> h) | ||
{ | ||
co_await awaitable; | ||
|
||
auto cnt = counter.fetch_sub(1); | ||
if (cnt - 1 == 0) { | ||
h.resume(); | ||
} | ||
} | ||
|
||
template <typename Task> | ||
struct when_all_ready_awaitable { | ||
when_all_ready_awaitable(std::vector<Task> &&tasks) : counter(tasks.size()), tasks(std::move(tasks)) | ||
{ | ||
} | ||
|
||
bool await_ready() | ||
{ | ||
return false; | ||
} | ||
|
||
void await_suspend(std::coroutine_handle<> h) | ||
{ | ||
for (auto &&task : tasks) { | ||
when_all_task(std::move(task), counter, h).h.resume(); | ||
} | ||
} | ||
|
||
void await_resume() | ||
{ | ||
} | ||
|
||
std::atomic<int> counter = 0; | ||
std::vector<Task> tasks; | ||
}; | ||
} // namespace detail | ||
|
||
template <typename A, typename... Awaitables> | ||
auto when_all(A &&aw, Awaitables &&...awaitables) | ||
{ | ||
std::vector<std::remove_reference_t<A>> tasks; | ||
tasks.emplace_back(std::move<A>(aw)); | ||
|
||
for (auto &&a : {awaitables...}) | ||
tasks.emplace_back(std::move(a)); | ||
|
||
return detail::when_all_ready_awaitable(std::move(tasks)); | ||
} | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
/* Copyright 2021-2022, Intel Corporation */ | ||
|
||
/* | ||
* coroutine_memcpy.cpp -- example showing miniasync integration with coroutines | ||
*/ | ||
|
||
#include "libminiasync.h" | ||
|
||
#include <cassert> | ||
#include <deque> | ||
#include <iostream> | ||
#include <numeric> | ||
#include <optional> | ||
#include <queue> | ||
#include <unordered_set> | ||
|
||
#include "coroutine_helpers.hpp" | ||
#include "executor.hpp" | ||
#include "miniasync_operation.hpp" | ||
|
||
task run_async_memcpy(executor_type &executor, char *dst, char *src, size_t n) | ||
{ | ||
std::cout << "Before memcpy" << std::endl; | ||
co_await async_memcpy(executor, dst, src, n / 2); | ||
std::cout << "After memcpy " << ((char *)dst) << std::endl; | ||
co_await async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2); | ||
std::cout << "After second memcpy " << ((char *)dst) << std::endl; | ||
|
||
auto a1 = async_memcpy(executor, dst, src, 1); | ||
auto a2 = async_memcpy(executor, dst + 1, src, 1); | ||
auto a3 = async_memcpy(executor, dst + 2, src, 1); | ||
|
||
co_await when_all(a1, a2, a3); | ||
std::cout << "After 3 concurrent memcopies " << ((char *)dst) << std::endl; | ||
} | ||
|
||
task async_memcpy_print(executor_type &executor, char *dst, char *src, size_t n, std::string to_print) | ||
{ | ||
auto a1 = run_async_memcpy(executor, dst, src, n / 2); | ||
auto a2 = run_async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2); | ||
|
||
co_await when_all(a1, a2); | ||
|
||
std::cout << to_print << std::endl; | ||
} | ||
|
||
int main(int argc, char *argv[]) | ||
{ | ||
static constexpr size_t nthreads = 3; | ||
static constexpr size_t ringbuf_size = 1024; | ||
executor_type executor( | ||
std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)>(data_mover_threads_new(nthreads, ringbuf_size, FUTURE_NOTIFIER_POLLER), &data_mover_threads_delete)); | ||
|
||
static constexpr auto buffer_size = 10; | ||
static constexpr auto to_copy = "something"; | ||
static constexpr auto to_print = "async print!"; | ||
|
||
char buffer[buffer_size] = {0}; | ||
auto t = async_memcpy_print(executor, buffer, std::string(to_copy).data(), buffer_size, to_print); | ||
executor.submit(std::move(t)); | ||
|
||
std::cout << "inside main" << std::endl; | ||
|
||
executor.run_to_completion(); | ||
|
||
std::cout << buffer << std::endl; | ||
|
||
return 0; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* SPDX-License-Identifier: BSD-3-Clause */ | ||
/* Copyright 2021-2022, Intel Corporation */ | ||
|
||
/* | ||
* executor.hpp -- miniasync-aware coroutines executor. | ||
*/ | ||
|
||
#include "libminiasync.h" | ||
|
||
#include <deque> | ||
#include <memory> | ||
#include <optional> | ||
|
||
#include "coroutine_helpers.hpp" | ||
#include "miniasync_operation.hpp" | ||
|
||
#ifndef MINIASYNC_EXECUTOR | ||
#define MINIASYNC_EXECUTOR | ||
|
||
/* Executor keeps a queue of coroutines to execute. New coroutine can be | ||
* submitted via submit() function. Run_to_completion will loop until all | ||
* coroutines are executed. */ | ||
struct executor_type { | ||
executor_type(std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)> &&data_mover) : vdm_(data_mover_threads_get_vdm(data_mover.get())), data_mover(std::move(data_mover)) | ||
{ | ||
} | ||
|
||
void submit(task &&t) | ||
{ | ||
auto handle = std::move(t).release(); | ||
pending_coro.push_back(handle); | ||
} | ||
|
||
void submit(miniasync_operation *operation) | ||
{ | ||
pending_miniasync.push_back(operation); | ||
} | ||
|
||
void run_to_completion() | ||
{ | ||
while (true) { | ||
auto next_miniasync_op = pop_next(pending_miniasync); | ||
auto next_coro = pop_next(pending_coro); | ||
if (!next_miniasync_op && !next_coro) | ||
break; | ||
|
||
if (next_miniasync_op) | ||
run_pending(next_miniasync_op.value()); | ||
if (next_coro) | ||
run_pending(next_coro.value()); | ||
} | ||
} | ||
|
||
vdm *get_mover() | ||
{ | ||
return vdm_; | ||
} | ||
|
||
private: | ||
void run_pending(std::coroutine_handle<> h) | ||
{ | ||
if (!h.done()) | ||
h.resume(); | ||
} | ||
|
||
void run_pending(miniasync_operation *operation) | ||
{ | ||
if (operation->ready() && !operation->done()) { | ||
operation->resume(); | ||
} else { | ||
/* Operation not ready, yet, put it back to the queue. */ | ||
pending_miniasync.push_back(operation); | ||
} | ||
} | ||
|
||
template <typename Deque> | ||
std::optional<typename Deque::value_type> pop_next(Deque &deque) | ||
{ | ||
if (deque.empty()) | ||
return std::nullopt; | ||
|
||
auto first = deque.front(); | ||
deque.pop_front(); | ||
|
||
return first; | ||
} | ||
|
||
std::deque<std::coroutine_handle<>> pending_coro; | ||
std::deque<miniasync_operation *> pending_miniasync; | ||
struct vdm* vdm_; | ||
std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)> data_mover; | ||
}; | ||
|
||
#endif /* MINIASYNC_EXECUTOR */ |
Oops, something went wrong.