Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Add example with coroutines #24

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/on_pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
matrix:
CONFIG: ["N=1 OS=ubuntu OS_VER=21.04 TYPE=normal CC=gcc COVERAGE=1",
"N=2 OS=ubuntu OS_VER=21.04 TYPE=normal CC=clang PUSH_IMAGE=1",
"N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1"]
"N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1 CXX_STANDARD=20 BUILD_CPP_EXAMPLES=1"]
steps:
- name: Clone the git repo
uses: actions/checkout@v1
Expand Down
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

cmake_minimum_required(VERSION 3.3)

project(miniasync C)
project(miniasync C CXX)
set(MINIASYNC_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR})

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/out CACHE STRING "")
Expand Down Expand Up @@ -42,6 +42,7 @@ option(BUILD_EXAMPLES "build examples" ON)
option(BUILD_TESTS "build tests" ON)
option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON)
option(COMPILE_DML "compile miniasync dml implementation library" OFF)
option(BUILD_CPP_EXAMPLES "build cpp examples - require c++20 support" OFF)

include(FindPerl)
include(FindThreads)
Expand All @@ -50,6 +51,10 @@ include(CheckCCompilerFlag)
include(GNUInstallDirs)
include(${CMAKE_SOURCE_DIR}/cmake/functions.cmake)

if(BUILD_CPP_EXAMPLES AND "${CMAKE_CXX_STANDARD}" LESS 20)
message(FATAL_ERROR "To build cpp example, CMAKE_CXX_STANDARD must be set to 20 or higher")
endif()

# look for pkg config (use it later for managing valgrind)
if(NOT WIN32)
find_package(PkgConfig REQUIRED)
Expand Down
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ endfunction()
# add all the examples with a use of the add_example function defined above
add_example(basic basic/basic.c)
add_example(basic-async basic-async/basic-async.c)

if(BUILD_CPP_EXAMPLES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")
add_example(coroutine_memcpy coroutine_memcpy/coroutine_memcpy.cpp coroutine_memcpy/miniasync_operation.cpp)
endif()
144 changes: 144 additions & 0 deletions examples/coroutine_memcpy/coroutine_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/* SPDX-License-Identifier: BSD-3-Clause */
/* Copyright 2021-2022, Intel Corporation */
// SPDX-License-Identifier: MIT
/* Copyright (c) Lewis Baker */

#include <atomic>
#include <coroutine>
#include <functional>
#include <memory>
#include <string>
#include <utility>

#ifndef MINIASYNC_COROUTINE_HELPERS
#define MINIASYNC_COROUTINE_HELPERS

/*
* Helper structures for coroutines, they are heavily inspired by
* https://github.com/lewissbaker/cppcoro
*/

/* This is a generic task which supports continuation. */
struct task {
struct promise_type {
struct final_awaitable {
bool await_ready() const noexcept
{
return false;
}
void await_resume() noexcept
{
}

void await_suspend(std::coroutine_handle<task::promise_type> h) noexcept
{
auto &cont = h.promise().cont;
if (cont)
cont.resume();
}
};

task get_return_object()
{
return task{std::coroutine_handle<task::promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend()
{
return {};
}
auto final_suspend() noexcept
{
return final_awaitable{};
}
void return_void()
{
}
void unhandled_exception()
{
}

std::coroutine_handle<task::promise_type> cont;
};

void wait()
{
h.resume();
}

std::coroutine_handle<task::promise_type> release() &&
{
return std::exchange(h, nullptr);
}

bool await_ready()
{
return !h || h.done();
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<task::promise_type> aw)
{
h.promise().cont = aw;
return h;
}

void await_resume()
{
}

std::coroutine_handle<task::promise_type> h;
};

namespace detail
{

template <typename Awaitable>
task when_all_task(Awaitable awaitable, std::atomic<int> &counter, std::coroutine_handle<> h)
{
co_await awaitable;

auto cnt = counter.fetch_sub(1);
if (cnt - 1 == 0) {
h.resume();
}
}

template <typename Task>
struct when_all_ready_awaitable {
when_all_ready_awaitable(std::vector<Task> &&tasks) : counter(tasks.size()), tasks(std::move(tasks))
{
}

bool await_ready()
{
return false;
}

void await_suspend(std::coroutine_handle<> h)
{
for (auto &&task : tasks) {
when_all_task(std::move(task), counter, h).h.resume();
}
}

void await_resume()
{
}

std::atomic<int> counter = 0;
std::vector<Task> tasks;
};
} // namespace detail

template <typename A, typename... Awaitables>
auto when_all(A &&aw, Awaitables &&...awaitables)
{
std::vector<std::remove_reference_t<A>> tasks;
tasks.emplace_back(std::move<A>(aw));

for (auto &&a : {awaitables...})
tasks.emplace_back(std::move(a));

return detail::when_all_ready_awaitable(std::move(tasks));
}

#endif
70 changes: 70 additions & 0 deletions examples/coroutine_memcpy/coroutine_memcpy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2021-2022, Intel Corporation */

/*
* coroutine_memcpy.cpp -- example showing miniasync integration with coroutines
*/

#include "libminiasync.h"

#include <cassert>
#include <deque>
#include <iostream>
#include <numeric>
#include <optional>
#include <queue>
#include <unordered_set>

#include "coroutine_helpers.hpp"
#include "executor.hpp"
#include "miniasync_operation.hpp"

task run_async_memcpy(executor_type &executor, char *dst, char *src, size_t n)
{
std::cout << "Before memcpy" << std::endl;
co_await async_memcpy(executor, dst, src, n / 2);
std::cout << "After memcpy " << ((char *)dst) << std::endl;
co_await async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2);
std::cout << "After second memcpy " << ((char *)dst) << std::endl;

auto a1 = async_memcpy(executor, dst, src, 1);
auto a2 = async_memcpy(executor, dst + 1, src, 1);
auto a3 = async_memcpy(executor, dst + 2, src, 1);

co_await when_all(a1, a2, a3);
std::cout << "After 3 concurrent memcopies " << ((char *)dst) << std::endl;
}

task async_memcpy_print(executor_type &executor, char *dst, char *src, size_t n, std::string to_print)
{
auto a1 = run_async_memcpy(executor, dst, src, n / 2);
auto a2 = run_async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2);

co_await when_all(a1, a2);

std::cout << to_print << std::endl;
}

int main(int argc, char *argv[])
{
static constexpr size_t nthreads = 3;
static constexpr size_t ringbuf_size = 1024;
executor_type executor(
std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)>(data_mover_threads_new(nthreads, ringbuf_size, FUTURE_NOTIFIER_POLLER), &data_mover_threads_delete));

static constexpr auto buffer_size = 10;
static constexpr auto to_copy = "something";
static constexpr auto to_print = "async print!";

char buffer[buffer_size] = {0};
auto t = async_memcpy_print(executor, buffer, std::string(to_copy).data(), buffer_size, to_print);
executor.submit(std::move(t));

std::cout << "inside main" << std::endl;

executor.run_to_completion();

std::cout << buffer << std::endl;

return 0;
}
94 changes: 94 additions & 0 deletions examples/coroutine_memcpy/executor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* SPDX-License-Identifier: BSD-3-Clause */
/* Copyright 2021-2022, Intel Corporation */

/*
* executor.hpp -- miniasync-aware coroutines executor.
*/

#include "libminiasync.h"

#include <deque>
#include <memory>
#include <optional>

#include "coroutine_helpers.hpp"
#include "miniasync_operation.hpp"

#ifndef MINIASYNC_EXECUTOR
#define MINIASYNC_EXECUTOR

/* Executor keeps a queue of coroutines to execute. New coroutine can be
* submitted via submit() function. Run_to_completion will loop until all
* coroutines are executed. */
struct executor_type {
executor_type(std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)> &&data_mover) : vdm_(data_mover_threads_get_vdm(data_mover.get())), data_mover(std::move(data_mover))
{
}

void submit(task &&t)
{
auto handle = std::move(t).release();
pending_coro.push_back(handle);
}

void submit(miniasync_operation *operation)
{
pending_miniasync.push_back(operation);
}

void run_to_completion()
{
while (true) {
auto next_miniasync_op = pop_next(pending_miniasync);
auto next_coro = pop_next(pending_coro);
if (!next_miniasync_op && !next_coro)
break;

if (next_miniasync_op)
run_pending(next_miniasync_op.value());
if (next_coro)
run_pending(next_coro.value());
}
}

vdm *get_mover()
{
return vdm_;
}

private:
void run_pending(std::coroutine_handle<> h)
{
if (!h.done())
h.resume();
}

void run_pending(miniasync_operation *operation)
{
if (operation->ready() && !operation->done()) {
operation->resume();
} else {
/* Operation not ready, yet, put it back to the queue. */
pending_miniasync.push_back(operation);
}
}

template <typename Deque>
std::optional<typename Deque::value_type> pop_next(Deque &deque)
{
if (deque.empty())
return std::nullopt;

auto first = deque.front();
deque.pop_front();

return first;
}

std::deque<std::coroutine_handle<>> pending_coro;
std::deque<miniasync_operation *> pending_miniasync;
struct vdm* vdm_;
std::unique_ptr<data_mover_threads, decltype(&data_mover_threads_delete)> data_mover;
};

#endif /* MINIASYNC_EXECUTOR */
Loading