Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
Coroutines v2 - allow nesting async operations
Browse files Browse the repository at this point in the history
TODO:
- how to better pass/get future* from/to memcpy_task
- current approach would not allow to implement when_all(),
  we could just resume coroutines from wait() method instead
  of relying on miniasync_runtime to do that (no chaining)
  • Loading branch information
igchor committed Dec 7, 2021
1 parent e88c304 commit bbd5363
Showing 1 changed file with 86 additions and 66 deletions.
152 changes: 86 additions & 66 deletions examples/basic_cpp/basic.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2021, Intel Corporation */

///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licensed under MIT license.
///////////////////////////////////////////////////////////////////////////////

/*
* basic.cpp -- example showing miniasync integration with coroutines
*/
Expand All @@ -12,23 +17,50 @@

#include "libminiasync.h"

struct simple_future {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;

simple_future(handle_type h): h(h) {}
~simple_future();

handle_type h;

void wait(struct runtime *r);
/* Similar to https://github.com/lewissbaker/cppcoro/blob/master/include/cppcoro/task.hpp */
struct task {
struct promise_type {
struct final_awaitable
{
bool await_ready() const noexcept { return false; }
void await_resume() noexcept {}

std::coroutine_handle<> await_suspend(std::coroutine_handle<task::promise_type> h) noexcept {
auto &cont = h.promise().cont;
return cont ? cont : std::noop_coroutine();
}
};


task get_return_object() { return task{std::coroutine_handle<task::promise_type>::from_promise(*this)}; }
std::suspend_always initial_suspend() { return {}; }
auto final_suspend() noexcept { return final_awaitable{}; }
void return_void() {}
void unhandled_exception() {}

std::coroutine_handle<> cont;
};

void wait() {
h.resume();
}

bool await_ready() { return !h || h.done();}
std::coroutine_handle<> await_suspend(std::coroutine_handle<> aw) {
h.promise().cont = aw;
return h;
}
void await_resume() {}

std::coroutine_handle<task::promise_type> h;
};

struct coroutine_data {
std::coroutine_handle<simple_future::promise_type> handle;
std::coroutine_handle<> handle;
};

struct coroutine_output {
uint64_t b;
};

FUTURE(coroutine_future, struct coroutine_data, struct coroutine_output);
Expand All @@ -39,33 +71,12 @@ struct async_memcpy_resume_data {
};

struct async_memcpy_resume_output {
uint64_t a;
};

FUTURE(async_memcpy_resume_fut, struct async_memcpy_resume_data,
struct async_memcpy_resume_output);

struct simple_future::promise_type {
simple_future get_return_object() { return simple_future(handle_type::from_promise(*this)); }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}

struct vdm *pthread_mover;
struct async_memcpy_resume_fut fut;
};

void simple_future::wait(struct runtime *r)
{
runtime_wait(r, FUTURE_AS_RUNNABLE(&h.promise().fut));
}

simple_future::~simple_future() {
if (h) {
vdm_delete(h.promise().pthread_mover);
}
}

static enum future_state
resume_impl(struct future_context *ctx, struct future_waker waker)
{
Expand All @@ -78,7 +89,7 @@ resume_impl(struct future_context *ctx, struct future_waker waker)
}

static struct coroutine_future
resume_coroutine(std::coroutine_handle<simple_future::promise_type> h)
resume_coroutine(std::coroutine_handle<> h)
{
struct coroutine_future fut;
fut.data.handle = h;
Expand All @@ -88,42 +99,49 @@ resume_coroutine(std::coroutine_handle<simple_future::promise_type> h)
return fut;
}

auto async_memcpy(void *dst, void *src, size_t n)
struct memcpy_task
{
struct awaitable {
awaitable(void *dst, void *src, size_t n): dst(dst), src(src), n(n)
{
}

bool await_ready() { return false; /* always suspend (call await_suspend) */ }
void await_suspend(std::coroutine_handle<simple_future::promise_type> h) {
auto pthread_mover = vdm_new(vdm_descriptor_pthreads());
auto &chain = h.promise().fut;
memcpy_task(void *dst, void *src, size_t n, struct async_memcpy_resume_fut *fut): fut(fut) {
auto *pthread_mover = vdm_new(vdm_descriptor_pthreads()); // XXX - lifetime
FUTURE_CHAIN_ENTRY_INIT(&fut->data.memcpy,
vdm_memcpy(pthread_mover, dst, src, n),
NULL, NULL);
}

h.promise().pthread_mover = pthread_mover;
bool await_ready()
{
return false;
}

FUTURE_CHAIN_ENTRY_INIT(&chain.data.memcpy,
vdm_memcpy(pthread_mover, dst, src, n),
NULL, NULL);
FUTURE_CHAIN_ENTRY_INIT(&chain.data.resume, resume_coroutine(h),
NULL, NULL);
void await_suspend(std::coroutine_handle<> h)
{
FUTURE_CHAIN_ENTRY_INIT(&fut->data.resume, resume_coroutine(h),
NULL, NULL);
FUTURE_CHAIN_INIT(fut);
}

FUTURE_CHAIN_INIT(&chain);
}
void await_resume() {}

void await_resume() {}
struct async_memcpy_resume_fut *fut;
};

void *dst;
void *src;
size_t n;
};
// XXX - soe similar as in cpp coro for when_all + when_all_task:
// memcpy_task is a separate task with it's own promise and start() method
// which takes appropriate argumentes (maybe handle to coroutine?)
// then on top of that there is extra awaiter -> something like when_all_ready_awaitable
// which is responsible for starting the task and waiting for runtime to finish
// AND IT COULD ALSO RESUME the coroutine manuallY (not using chaining?)

return awaitable(dst, src, n);
task async_mempcy(void *dst, void *src, size_t n, struct async_memcpy_resume_fut *fut)
{
std::cout << "Before memcpy" << std::endl;
co_await memcpy_task{dst, src, n, fut};
std::cout << "After memcpy" << std::endl;
}

simple_future async_memcpy_print(std::string to_copy, char *buffer, const std::string &to_print)
task async_memcpy_print(std::string to_copy, char *buffer, const std::string &to_print, struct async_memcpy_resume_fut *fut)
{
co_await async_memcpy(reinterpret_cast<void*>(buffer), reinterpret_cast<void*>(to_copy.data()), to_copy.size());
co_await async_mempcy(reinterpret_cast<void*>(buffer), reinterpret_cast<void*>(to_copy.data()), to_copy.size(), fut);
std::cout << to_print << std::endl;
}

Expand All @@ -136,17 +154,19 @@ main(int argc, char *argv[])
static constexpr auto to_copy = "something";
static constexpr auto to_print = "async print!";

char buffer[buffer_size];
for (auto &c : buffer)
c = 0;

char buffer[buffer_size] = {0};
{
auto future = async_memcpy_print(to_copy, buffer, to_print);
struct async_memcpy_resume_fut fut;
auto future = async_memcpy_print(to_copy, buffer, to_print, &fut);

std::cout << "inside main" << std::endl;

// actually executes future on runtime r
future.wait(r);
// XXX - make it a single function
future.wait();
runtime_wait(r, FUTURE_AS_RUNNABLE(&fut));

std::cout << buffer << std::endl;
}

runtime_delete(r);
Expand Down

0 comments on commit bbd5363

Please sign in to comment.