Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force resumable functions sync #1018

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion builtin-functions/_functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ function rpc_send (\RpcConnection $rpc_conn, $timeout ::: float = -1.0) ::: int;
function rpc_send_noflush (\RpcConnection $rpc_conn, $timeout ::: float = -1.0) ::: int;
function rpc_flush () ::: void;
/** @kphp-extern-func-info resumable */
function rpc_get ($request_id ::: int) ::: string | false;
function rpc_get ($request_id ::: int, $is_sync ::: bool = false) ::: string | false;
function rpc_get_synchronously ($request_id ::: int) ::: string | false;
/** @kphp-extern-func-info resumable */
function rpc_get_and_parse ($request_id ::: int) ::: bool;
Expand Down
7 changes: 7 additions & 0 deletions compiler/code-gen/vertex-compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,10 @@ void compile_fork(VertexAdaptor<op_fork> root, CodeGenerator &W) {
compile_func_call(root->func_call(), W, func_call_mode::fork_call);
}

void compile_force_sync(VertexAdaptor<op_force_sync> root, CodeGenerator &W) {
compile_func_call(root->func_call(), W, func_call_mode::async_call);
}

void compile_async(VertexAdaptor<op_async> root, CodeGenerator &W) {
auto func_call = root->func_call();
if (root->has_save_var()) {
Expand Down Expand Up @@ -2161,6 +2165,9 @@ void compile_common_op(VertexPtr root, CodeGenerator &W) {
case op_fork:
compile_fork(root.as<op_fork>(), W);
break;
case op_force_sync:
compile_force_sync(root.as<op_force_sync>(), W);
break;
case op_async:
compile_async(root.as<op_async>(), W);
break;
Expand Down
1 change: 1 addition & 0 deletions compiler/compiler.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ prepend(KPHP_COMPILER_PIPES_SOURCES pipes/
filter-only-actually-used.cpp
final-check.cpp
fix-returns.cpp
force-sync.cpp
gen-tree-postprocess.cpp
generate-virtual-methods.cpp
inline-defines-usages.cpp
Expand Down
2 changes: 2 additions & 0 deletions compiler/compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "compiler/pipes/check-classes.h"
#include "compiler/pipes/check-conversions.h"
#include "compiler/pipes/check-func-calls-and-vararg.h"
#include "compiler/pipes/force-sync.h"
#include "compiler/pipes/check-modifications-of-const-vars.h"
#include "compiler/pipes/check-nested-foreach.h"
#include "compiler/pipes/wait-for-all-classes.h"
Expand Down Expand Up @@ -259,6 +260,7 @@ bool compiler_execute(CompilerSettings *settings) {
>> SyncC<GenerateVirtualMethodsF>{}
>> PipeC<ConvertInvokeToFuncCallF>{}
>> PassC<CheckFuncCallsAndVarargPass>{}
>> PassC<ForceSyncPass>{}
>> PassC<InstantiateFFIOperationsPass>{}
>> PipeC<CheckAbstractFunctionDefaults>{}
>> PipeC<CalcEmptyFunctions>{}
Expand Down
3 changes: 3 additions & 0 deletions compiler/pipes/calc-rl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ void rl_func_call_calc(VertexPtr root, RLValueType expected_rl_type) {
case op_fork:
rl_calc_all<val_none>(root);
return;
case op_force_sync:
rl_calc_all<val_none>(root);
return;
case op_array: //TODO: in fact it is wrong
case op_tuple:
case op_shape:
Expand Down
4 changes: 4 additions & 0 deletions compiler/pipes/cfg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,10 @@ void CFG::create_cfg(VertexPtr tree_node, Node *res_start, Node *res_finish, boo
create_cfg(tree_node.as<op_fork>()->func_call(), res_start, res_finish);
break;
}
case op_force_sync: {
create_cfg(tree_node.as<op_force_sync>()->func_call(), res_start, res_finish);
break;
}
case op_return: {
auto return_op = tree_node.as<op_return>();
if (return_op->has_expr()) {
Expand Down
10 changes: 10 additions & 0 deletions compiler/pipes/check-func-calls-and-vararg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ VertexPtr CheckFuncCallsAndVarargPass::on_enter_vertex(VertexPtr root) {
return on_func_call(root.as<op_func_call>());
} else if (root->type() == op_fork) {
return on_fork(root.as<op_fork>());
} else if (root->type() == op_force_sync) {
return on_force_sync(root.as<op_force_sync>());
}

return root;
Expand Down Expand Up @@ -317,3 +319,11 @@ VertexPtr CheckFuncCallsAndVarargPass::on_fork(VertexAdaptor<op_fork> v_fork) {
"Invalid fork() usage: it must be called with exactly one func call inside, e.g. fork(f(...))");
return v_fork;
}

// check that force_sync(...) is called correctly, as force_sync(f())
// note, that we do this after replacing op_invoke_call with op_func_call, not earlier
VertexPtr CheckFuncCallsAndVarargPass::on_force_sync(VertexAdaptor<op_force_sync> v_force_sync) {
kphp_error(v_force_sync->size() == 1 && (*v_force_sync->begin())->type() == op_func_call,
"Invalid force_sync() usage: it must be called with exactly one func call inside, e.g. force_sync(f(...))");
return v_force_sync;
}
1 change: 1 addition & 0 deletions compiler/pipes/check-func-calls-and-vararg.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ class CheckFuncCallsAndVarargPass final : public FunctionPassBase {

VertexPtr on_func_call(VertexAdaptor<op_func_call> call);
VertexPtr on_fork(VertexAdaptor<op_fork> v_fork);
VertexPtr on_force_sync(VertexAdaptor<op_force_sync> v_force_sync);
};
2 changes: 1 addition & 1 deletion compiler/pipes/extract-async.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ExtractAsyncPass final : public FunctionPassBase {
VertexPtr on_exit_vertex(VertexPtr vertex) override;

bool user_recursion(VertexPtr vertex) override {
return vertex->type() == op_fork;
return vertex->type() == op_fork || vertex->type() == op_force_sync;
}

private:
Expand Down
25 changes: 25 additions & 0 deletions compiler/pipes/force-sync.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "compiler/pipes/force-sync.h"

#include "compiler/compiler-core.h"
#include "compiler/inferring/public.h"
#include "compiler/data/class-data.h"
#include "compiler/function-pass.h"

VertexPtr ForceSyncPass::on_enter_vertex(VertexPtr v) {
return v;
}

bool ForceSyncPass::user_recursion(VertexPtr v) {
if (auto force_sync_v = v.try_as<op_force_sync>()) {
inside_force_sync++;
run_function_pass(force_sync_v->func_call_ref(), this);
inside_force_sync--;
return true;
}

return false;
}
23 changes: 23 additions & 0 deletions compiler/pipes/force-sync.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

#pragma once

#include <string>

#include "compiler/function-pass.h"
#include "compiler/threading/data-stream.h"

class ForceSyncPass final : public FunctionPassBase {
private:
int inside_force_sync = 0;

public:
std::string get_description() override {
return "Process force_sync() calls";
}

VertexPtr on_enter_vertex(VertexPtr v) override;

bool user_recursion(VertexPtr v) override;

};

1 change: 1 addition & 0 deletions compiler/pipes/gen-tree-postprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ GenTreePostprocessPass::builtin_fun GenTreePostprocessPass::get_builtin_function
{"not_false", {op_conv_drop_false, 1}},
{"not_null", {op_conv_drop_null, 1}},
{"fork", {op_fork, 1}},
{"force_sync", {op_force_sync, 1}},
{"pow", {op_pow, 2}}
};
auto it = functions.find(name);
Expand Down
17 changes: 17 additions & 0 deletions compiler/vertex-desc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,23 @@
"str": "fork"
}
},
{
"comment": "artificial op for a force_sync call; func_call() contains a force_sync argument",
"sons": {
"func_call": {
"id": 0,
"type": "op_func_call"
}
},
"name": "op_force_sync",
"base_name": "meta_op_base",
"props": {
"rl": "rl_func",
"cnst": "cnst_nonconst_func",
"type": "common_op",
"str": "force_sync"
}
},
{
"comment": "artificial op that wraps async function call, optionally with save_var() for LHS",
"sons": {
Expand Down
3 changes: 2 additions & 1 deletion runtime/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,8 @@ bool drop_tl_query_info(int64_t query_id) {
return true;
}

Optional<string> f$rpc_get(int64_t request_id, double timeout) {
Optional<string> f$rpc_get(int64_t request_id, double timeout, bool is_sync) {
(void)is_sync;
if (!drop_tl_query_info(request_id)) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ int64_t f$rpc_send_noflush(const class_instance<C$RpcConnection> &conn, double t

void f$rpc_flush();

Optional<string> f$rpc_get(int64_t request_id, double timeout = -1.0);
Optional<string> f$rpc_get(int64_t request_id, double timeout = -1.0, bool is_sync = false);

Optional<string> f$rpc_get_synchronously(int64_t request_id);

Expand Down
Loading