Skip to content

Commit

Permalink
K2: add support for asynchronous callbacks (#1103)
Browse files Browse the repository at this point in the history
Also, this commit adds array_map implementation that supports both sync and async functors
  • Loading branch information
apolyakov authored Sep 17, 2024
1 parent a0a7b38 commit 3ff5b53
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 11 deletions.
4 changes: 4 additions & 0 deletions builtin-functions/kphp-light/array.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?php

/** @kphp-extern-func-info interruptible */
function array_map (callable(^2[*] $x):any $callback, $a ::: array) ::: ^1() [];
1 change: 1 addition & 0 deletions builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php

require_once __DIR__ . '/array.txt';
require_once __DIR__ . '/job-workers.txt';

define('TODO', -1);
Expand Down
1 change: 0 additions & 1 deletion builtin-functions/kphp-light/unsupported/arrays.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ function array_unset (&$a ::: array, any $key) ::: ^1[*];

function array_filter ($a ::: array, callable(^1[*] $x):bool $callback = TODO) ::: ^1;
function array_filter_by_key ($a ::: array, callable(mixed $key):bool $callback) ::: ^1;
function array_map (callable(^2[*] $x):any $callback, $a ::: array) ::: ^1() [];
/** @kphp-extern-func-info cpp_template_call */
function array_reduce ($a ::: array, callable(^3 | ^2() $carry, ^1[*] $item):any $callback, $initial ::: any) ::: ^2() | ^3;
function array_reserve (&$a ::: array, $int_size ::: int, $string_size ::: int, $make_vector_if_possible ::: bool) ::: void;
Expand Down
25 changes: 18 additions & 7 deletions compiler/code-gen/vertex-compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <unordered_map>

#include "common/wrappers/field_getter.h"

#include "common/wrappers/likely.h"
#include "compiler/code-gen/common.h"
#include "compiler/code-gen/const-globals-batched-mem.h"
Expand All @@ -17,17 +16,18 @@
#include "compiler/code-gen/files/tracing-autogen.h"
#include "compiler/code-gen/naming.h"
#include "compiler/code-gen/raw-data.h"
#include "compiler/compiler-core.h"
#include "compiler/data/class-data.h"
#include "compiler/data/define-data.h"
#include "compiler/data/ffi-data.h"
#include "compiler/data/function-data.h"
#include "compiler/data/src-file.h"
#include "compiler/data/var-data.h"
#include "compiler/type-hint.h"
#include "compiler/inferring/public.h"
#include "compiler/name-gen.h"
#include "compiler/vertex.h"
#include "compiler/type-hint.h"
#include "compiler/vertex-util.h"
#include "compiler/vertex.h"

namespace {

Expand Down Expand Up @@ -1991,7 +1991,12 @@ void compile_callback_of_builtin(VertexAdaptor<op_callback_of_builtin> root, Cod
* Will be transformed to:
* array_map([captured1 = $extern] (auto &&... args) {
* return lambda$xxx(captured1, std::forward<decltype(args)>(args)...);
* }), const_array);
* }, const_array);
* or in K2 mode:
* co_await array_map([captured1 = $extern] (auto &&... args) (-> task_t<lambda's return type>)? {
* (return lambda$xxx(captured1, std::forward<decltype(args)>(args)...);
* | co_return(co_await lambda$xxx(captured1, std::forward<decltype(args)>(args)...));)
* }, const_array);
* Where the body calls a lambda function:
* int lambda$xxx(int $extern, int $x) { return $xx + $extern; }
* Captured vars are not always op_var. For example, [captured1 = check_not_false($extern).val()] after smart casts.
Expand All @@ -2003,13 +2008,19 @@ void compile_callback_of_builtin(VertexAdaptor<op_callback_of_builtin> root, Cod
W << "captured" << (++idx) << " = " << cpp_captured;
}) << "]";

FunctionSignatureGenerator(W) << "(auto &&... args) " << BEGIN;
const bool k2_async_callback = G->is_output_mode_k2() && root->func_id->is_interruptible;

FunctionSignatureGenerator(W) << "(auto &&... args) ";
if (k2_async_callback) { // add explicit return type to make this lambda async
W << "-> task_t<" << TypeName(tinf::get_type(root->func_id, -1)) << "> ";
}
W << BEGIN;

W << "return " << FunctionName(root->func_id) << "(";
W << (k2_async_callback ? "co_return(co_await " : "return ") << FunctionName(root->func_id) << "(";
for (int idx = 1; idx <= root->size(); ++idx) {
W << "captured" << idx << ", ";
}
W << "std::forward<decltype(args)>(args)...);";
W << "std::forward<decltype(args)>(args)...)" << (k2_async_callback ? ")" : "") << ";";

W << NL << END;
W << UnlockComments{};
Expand Down
31 changes: 31 additions & 0 deletions runtime-light/coroutine/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <concepts>
#include <coroutine>
#include <optional>
#include <type_traits>
#include <utility>

#include "common/containers/final_action.h"
Expand Down Expand Up @@ -238,3 +239,33 @@ struct task_t : public task_base_t {
return task_t<U>{std::coroutine_handle<>::from_address(std::exchange(handle_address, nullptr))};
}
};

// === Type traits ================================================================================

template<typename F, typename... Args>
requires std::invocable<F, Args...> inline constexpr bool is_async_function_v = requires {
{static_cast<task_t<void>>(std::declval<std::invoke_result_t<F, Args...>>())};
};

// ================================================================================================

template<typename F, typename... Args>
requires std::invocable<F, Args...> class async_function_unwrapped_return_type {
using return_type = std::invoke_result_t<F, Args...>;

template<typename U>
struct task_inner {
using type = U;
};

template<typename U>
struct task_inner<task_t<U>> {
using type = U;
};

public:
using type = task_inner<return_type>::type;
};

template<typename F, typename... Args>
requires std::invocable<F, Args...> using async_function_unwrapped_return_type_t = async_function_unwrapped_return_type<F, Args...>::type;
24 changes: 21 additions & 3 deletions runtime-light/stdlib/array/array-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

#pragma once

#include <concepts>
#include <functional>
#include <utility>

#include "runtime-core/runtime-core.h"
#include "runtime-light/coroutine/task.h"

inline constexpr int64_t SORT_REGULAR = 0;
inline constexpr int64_t SORT_NUMERIC = 1;
Expand Down Expand Up @@ -60,9 +65,22 @@ array<T> f$array_filter_by_key(const array<T> &a, const T1 &callback) noexcept {
php_critical_error("call to unsupported function");
}

template<class T, class CallbackT, class R = typename std::invoke_result_t<std::decay_t<CallbackT>, T>>
array<R> f$array_map(const CallbackT &callback, const array<T> &a) {
php_critical_error("call to unsupported function");
/**
* Currently, array_map is always considered async. Despite we rely on symmetric transfer optimization,
* we need to be careful with such functions. We may want to split such functions into sync and async
* versions in case we face with performance problems.
*/
template<class A, std::invocable<A> F, class R = async_function_unwrapped_return_type_t<F, A>>
task_t<array<R>> f$array_map(F f, array<A> arr) noexcept {
array<R> result{arr.size()};
for (const auto &it : arr) {
if constexpr (is_async_function_v<F, A>) {
result.set_value(it.get_key(), co_await std::invoke(f, it.get_value()));
} else {
result.set_value(it.get_key(), std::invoke(f, it.get_value()));
}
}
co_return result;
}

template<class R, class T, class CallbackT, class InitialT>
Expand Down
31 changes: 31 additions & 0 deletions tests/k2-components/test_async_builtins.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

$async_func_str = "async function result";
$sync_func_str = "sync function result";

/** @kphp-required */
function async_func(string $str): string {
sched_yield_sleep(0.005);
global $async_func_str;
return $async_func_str;
}

/** @kphp-required */
function sync_func(string $str): string {
global $sync_func_str;
return $sync_func_str;
}

$arr = ["1", "2", "3"];

foreach (array_map('async_func', $arr) as $elem) {
if ($elem != $async_func_str) {
critical_error("error");
}
}

foreach (array_map('sync_func', $arr) as $elem) {
if ($elem != $sync_func_str) {
critical_error("error");
}
}

0 comments on commit 3ff5b53

Please sign in to comment.