Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
examples: Add example with coroutines
Browse files Browse the repository at this point in the history
How it works:

When co_await async_memcpy is called, the compiler calls
async_memcpy::awaitable::await_suspend() which puts miniasync
future and coroutine handle to a shared containers. It also
registers miniasync notifier so that other entities can now
when the future completes.

There is also an executor loop which takes elements from the container
and executes them (calls runtime_wait for miniasync future and
and then resumes the coroutine).
  • Loading branch information
igchor committed Mar 22, 2022
1 parent 80370f6 commit c998f3b
Show file tree
Hide file tree
Showing 10 changed files with 410 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/on_pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
matrix:
CONFIG: ["N=1 OS=ubuntu OS_VER=21.04 TYPE=normal CC=gcc COVERAGE=1",
"N=2 OS=ubuntu OS_VER=21.04 TYPE=normal CC=clang PUSH_IMAGE=1",
"N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1"]
"N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1 CXX_STANDARD=20 BUILD_CPP_EXAMPLES=1"]
steps:
- name: Clone the git repo
uses: actions/checkout@v1
Expand Down
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

cmake_minimum_required(VERSION 3.3)

project(miniasync C)
project(miniasync C CXX)
set(MINIASYNC_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR})

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/out CACHE STRING "")
Expand Down Expand Up @@ -42,6 +42,7 @@ option(BUILD_EXAMPLES "build examples" ON)
option(BUILD_TESTS "build tests" ON)
option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON)
option(COMPILE_DML "compile miniasync dml implementation library" OFF)
option(BUILD_CPP_EXAMPLES "build cpp examples - require c++20 support" OFF)

include(FindPerl)
include(FindThreads)
Expand All @@ -50,6 +51,10 @@ include(CheckCCompilerFlag)
include(GNUInstallDirs)
include(${CMAKE_SOURCE_DIR}/cmake/functions.cmake)

if(BUILD_CPP_EXAMPLES AND "${CMAKE_CXX_STANDARD}" LESS 20)
message(FATAL_ERROR "To build cpp example, CMAKE_CXX_STANDARD must be set to 20 or higher")
endif()

# look for pkg config (use it later for managing valgrind)
if(NOT WIN32)
find_package(PkgConfig REQUIRED)
Expand Down
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ endfunction()
# add all the examples with a use of the add_example function defined above
add_example(basic basic/basic.c)
add_example(basic-async basic-async/basic-async.c)

if(BUILD_CPP_EXAMPLES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")
add_example(coroutine_memcpy coroutine_memcpy/coroutine_memcpy.cpp coroutine_memcpy/miniasync_operation.cpp)
endif()
144 changes: 144 additions & 0 deletions examples/coroutine_memcpy/coroutine_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/* SPDX-License-Identifier: BSD-3-Clause */
/* Copyright 2021-2022, Intel Corporation */
// SPDX-License-Identifier: MIT
/* Copyright (c) Lewis Baker */

#include <atomic>
#include <coroutine>
#include <functional>
#include <memory>
#include <string>
#include <utility>

#ifndef MINIASYNC_COROUTINE_HELPERS
#define MINIASYNC_COROUTINE_HELPERS

/*
* Helper structures for coroutines, they are heavily inspired by
* https://github.com/lewissbaker/cppcoro
*/

/* This is a generic task which supports continuation. */
struct task {
struct promise_type {
struct final_awaitable {
bool await_ready() const noexcept
{
return false;
}
void await_resume() noexcept
{
}

void await_suspend(std::coroutine_handle<task::promise_type> h) noexcept
{
auto &cont = h.promise().cont;
if (cont)
cont.resume();
}
};

task get_return_object()
{
return task{std::coroutine_handle<task::promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend()
{
return {};
}
auto final_suspend() noexcept
{
return final_awaitable{};
}
void return_void()
{
}
void unhandled_exception()
{
}

std::coroutine_handle<task::promise_type> cont;
};

void wait()
{
h.resume();
}

std::coroutine_handle<task::promise_type> release() &&
{
return std::exchange(h, nullptr);
}

bool await_ready()
{
return !h || h.done();
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<task::promise_type> aw)
{
h.promise().cont = aw;
return h;
}

void await_resume()
{
}

std::coroutine_handle<task::promise_type> h;
};

namespace detail
{

template <typename Awaitable>
task when_all_task(Awaitable awaitable, std::atomic<int> &counter, std::coroutine_handle<> h)
{
co_await awaitable;

auto cnt = counter.fetch_sub(1);
if (cnt - 1 == 0) {
h.resume();
}
}

template <typename Task>
struct when_all_ready_awaitable {
when_all_ready_awaitable(std::vector<Task> &&tasks) : counter(tasks.size()), tasks(std::move(tasks))
{
}

bool await_ready()
{
return false;
}

void await_suspend(std::coroutine_handle<> h)
{
for (auto &&task : tasks) {
when_all_task(std::move(task), counter, h).h.resume();
}
}

void await_resume()
{
}

std::atomic<int> counter = 0;
std::vector<Task> tasks;
};
} // namespace detail

template <typename A, typename... Awaitables>
auto when_all(A &&aw, Awaitables &&...awaitables)
{
std::vector<std::remove_reference_t<A>> tasks;
tasks.emplace_back(std::move<A>(aw));

for (auto &&a : {awaitables...})
tasks.emplace_back(std::move(a));

return detail::when_all_ready_awaitable(std::move(tasks));
}

#endif
68 changes: 68 additions & 0 deletions examples/coroutine_memcpy/coroutine_memcpy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2021-2022, Intel Corporation */

/*
* coroutine_memcpy.cpp -- example showing miniasync integration with coroutines
*/

#include "libminiasync.h"

#include <cassert>
#include <deque>
#include <iostream>
#include <numeric>
#include <optional>
#include <queue>
#include <unordered_set>

#include "coroutine_helpers.hpp"
#include "executor.hpp"
#include "miniasync_operation.hpp"

task run_async_memcpy(executor_type &executor, char *dst, char *src, size_t n)
{
std::cout << "Before memcpy" << std::endl;
co_await async_memcpy(executor, dst, src, n / 2);
std::cout << "After memcpy " << ((char *)dst) << std::endl;
co_await async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2);
std::cout << "After second memcpy " << ((char *)dst) << std::endl;

auto a1 = async_memcpy(executor, dst, src, 1);
auto a2 = async_memcpy(executor, dst + 1, src, 1);
auto a3 = async_memcpy(executor, dst + 2, src, 1);

co_await when_all(a1, a2, a3);
std::cout << "After 3 concurrent memcopies " << ((char *)dst) << std::endl;
}

task async_memcpy_print(executor_type &executor, char *dst, char *src, size_t n, std::string to_print)
{
auto a1 = run_async_memcpy(executor, dst, src, n / 2);
auto a2 = run_async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2);

co_await when_all(a1, a2);

std::cout << to_print << std::endl;
}

int main(int argc, char *argv[])
{
executor_type executor(
std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)>(data_mover_threads_default(), &data_mover_threads_delete));

static constexpr auto buffer_size = 10;
static constexpr auto to_copy = "something";
static constexpr auto to_print = "async print!";

char buffer[buffer_size] = {0};
auto t = async_memcpy_print(executor, buffer, std::string(to_copy).data(), buffer_size, to_print);
executor.submit(std::move(t));

std::cout << "inside main" << std::endl;

executor.run_to_completion();

std::cout << buffer << std::endl;

return 0;
}
94 changes: 94 additions & 0 deletions examples/coroutine_memcpy/executor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* SPDX-License-Identifier: BSD-3-Clause */
/* Copyright 2021-2022, Intel Corporation */

/*
* executor.hpp -- miniasync-aware coroutines executor.
*/

#include "libminiasync.h"

#include <deque>
#include <memory>
#include <optional>

#include "coroutine_helpers.hpp"
#include "miniasync_operation.hpp"

#ifndef MINIASYNC_EXECUTOR
#define MINIASYNC_EXECUTOR

/* Executor keeps a queue of coroutines to execute. New coroutine can be
* submitted via submit() function. Run_to_completion will loop until all
* coroutines are executed. */
struct executor_type {
executor_type(std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)> &&data_mover) : vdm_(data_mover_threads_get_vdm(data_mover.get())), data_mover(std::move(data_mover))
{
}

void submit(task &&t)
{
auto handle = std::move(t).release();
pending_coro.push_back(handle);
}

void submit(miniasync_operation *operation)
{
pending_miniasync.push_back(operation);
}

void run_to_completion()
{
while (true) {
auto next_miniasync_op = pop_next(pending_miniasync);
auto next_coro = pop_next(pending_coro);
if (!next_miniasync_op && !next_coro)
break;

if (next_miniasync_op)
run_pending(next_miniasync_op.value());
if (next_coro)
run_pending(next_coro.value());
}
}

vdm *get_mover()
{
return vdm_;
}

private:
void run_pending(std::coroutine_handle<> h)
{
if (!h.done())
h.resume();
}

void run_pending(miniasync_operation *operation)
{
if (operation->ready() && !operation->done()) {
operation->resume();
} else {
/* Operation not ready, yet, put it back to the queue. */
pending_miniasync.push_back(operation);
}
}

template <typename Deque>
std::optional<typename Deque::value_type> pop_next(Deque &deque)
{
if (deque.empty())
return std::nullopt;

auto first = deque.front();
deque.pop_front();

return first;
}

std::deque<std::coroutine_handle<>> pending_coro;
std::deque<miniasync_operation *> pending_miniasync;
struct vdm* vdm_;
std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)> data_mover;
};

#endif /* MINIASYNC_EXECUTOR */
43 changes: 43 additions & 0 deletions examples/coroutine_memcpy/miniasync_operation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2021-2022, Intel Corporation */

/*
* miniasync_operation.cpp - implementation of miniasync_operation
*/

#include "miniasync_operation.hpp"

#include <cassert>

#include "executor.hpp"

void miniasync_operation::await_resume()
{
}

bool miniasync_operation::await_ready()
{
return ready();
}

void miniasync_operation::await_suspend(std::coroutine_handle<> h)
{
this->h = h;
this->executor.submit(this);
}

bool miniasync_operation::ready()
{
assert(notifier.notifier_used == FUTURE_NOTIFIER_POLLER);
return *notifier.poller.ptr_to_monitor == 1;
}

bool miniasync_operation::done()
{
return this->h.done();
}

void miniasync_operation::resume()
{
this->h.resume();
}
Loading

0 comments on commit c998f3b

Please sign in to comment.