Skip to content

Commit

Permalink
Add job workers support to K2 runtime (#1097)
Browse files Browse the repository at this point in the history
  • Loading branch information
apolyakov authored Sep 12, 2024
1 parent 15dd7c4 commit e8da89f
Show file tree
Hide file tree
Showing 32 changed files with 707 additions and 145 deletions.
3 changes: 2 additions & 1 deletion builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

require_once __DIR__ . '/job-workers.txt';

define('TODO', -1);
define('TODO_OVERLOAD', -1);

Expand Down Expand Up @@ -138,7 +140,6 @@ function rpc_tl_query_result($query_ids ::: array) ::: mixed[][];
/** @kphp-extern-func-info interruptible */
function typed_rpc_tl_query_result(int[] $query_ids) ::: @tl\RpcResponse[];


// === Component ==================================================================================

class ComponentQuery {
Expand Down
49 changes: 49 additions & 0 deletions builtin-functions/kphp-light/job-workers.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

// === Job Worker =================================================================================

/** @kphp-immutable-class */
interface KphpJobWorkerSharedMemoryPiece {}

interface KphpJobWorkerRequest {}
interface KphpJobWorkerResponse {}

class KphpJobWorkerResponseError implements KphpJobWorkerResponse {
// Job script execution errors:
const JOB_MEMORY_LIMIT_ERROR = -101;
const JOB_TIMEOUT_ERROR = -102;
const JOB_EXCEPTION_ERROR = -103;
const JOB_STACK_OVERFLOW_ERROR = -104;
const JOB_PHP_ASSERT_ERROR = -105;

const JOB_CLIENT_MEMORY_LIMIT_ERROR = -1001; // client doesn't have enough memory to accept job response
const JOB_NOTHING_REPLIED_ERROR = -2001; // kphp_job_worker_store_response() was not succeeded

const JOB_STORE_RESPONSE_INCORRECT_CALL_ERROR = -3000;
const JOB_STORE_RESPONSE_NOT_ENOUGH_SHARED_MESSAGES_ERROR = -3001;
const JOB_STORE_RESPONSE_TOO_BIG_ERROR = -3002;
const JOB_STORE_RESPONSE_CANT_SEND_ERROR = -3003;

public function getError() ::: string;
public function getErrorCode() ::: int; // returns one of listed above error codes
}

/** @kphp-extern-func-info interruptible */
function kphp_job_worker_start(string $request, float $timeout): future<string> | false;

/** @kphp-extern-func-info interruptible */
function kphp_job_worker_start_no_reply(string $request, float $timeout): bool;

/** @kphp-extern-func-info interruptible */
function kphp_job_worker_start_multi(string[] $request, float $timeout): (future<string> | false)[];

/** @kphp-extern-func-info interruptible */
function kphp_job_worker_fetch_request(): string;

/** @kphp-extern-func-info interruptible */
function kphp_job_worker_store_response(string $response): int;

function is_kphp_job_workers_enabled(): bool;

function get_job_workers_number(): int;

1 change: 0 additions & 1 deletion builtin-functions/kphp-light/unsupported-functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require_once __DIR__ . '/unsupported/error.txt';
require_once __DIR__ . '/unsupported/file.txt';
require_once __DIR__ . '/unsupported/fork.txt';
require_once __DIR__ . '/unsupported/hash.txt';
require_once __DIR__ . '/unsupported/job-worker.txt';
require_once __DIR__ . '/unsupported/kml.txt';
require_once __DIR__ . '/unsupported/kphp-toggles.txt';
require_once __DIR__ . '/unsupported/kphp-tracing.txt';
Expand Down
52 changes: 0 additions & 52 deletions builtin-functions/kphp-light/unsupported/job-worker.txt

This file was deleted.

9 changes: 5 additions & 4 deletions runtime-core/class-instance/refcountable-php-classes.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class abstract_refcountable_php_interface : public ScriptAllocatorManaged {
virtual void *get_instance_data_raw_ptr() noexcept = 0;
};

template<class ...Bases>
template<class... Bases>
class refcountable_polymorphic_php_classes : public Bases... {
public:
void add_ref() noexcept final {
Expand Down Expand Up @@ -55,7 +55,7 @@ class refcountable_polymorphic_php_classes : public Bases... {
uint32_t refcnt{0};
};

template<class ...Interfaces>
template<class... Interfaces>
class refcountable_polymorphic_php_classes_virt : public virtual abstract_refcountable_php_interface, public Interfaces... {
public:
refcountable_polymorphic_php_classes_virt() __attribute__((always_inline)) = default;
Expand Down Expand Up @@ -98,7 +98,7 @@ class refcountable_polymorphic_php_classes_virt<> : public virtual abstract_refc
};

template<class Derived>
class refcountable_php_classes : public ScriptAllocatorManaged {
class refcountable_php_classes : public ScriptAllocatorManaged {
public:
void add_ref() noexcept {
if (refcnt < ExtraRefCnt::for_global_const) {
Expand Down Expand Up @@ -133,6 +133,7 @@ class refcountable_php_classes : public ScriptAllocatorManaged {
void *get_instance_data_raw_ptr() noexcept {
return this;
}

private:
uint32_t refcnt{0};
};
Expand All @@ -144,6 +145,6 @@ class refcountable_empty_php_classes {
};

struct may_be_mixed_base : public virtual abstract_refcountable_php_interface {
virtual ~may_be_mixed_base() = default;
~may_be_mixed_base() override = default;
virtual const char *get_class() const noexcept = 0;
};
2 changes: 1 addition & 1 deletion runtime-light/component/component.cmake
Original file line number Diff line number Diff line change
@@ -1 +1 @@
prepend(RUNTIME_COMPONENT_SRC component/ component.cpp)
prepend(RUNTIME_COMPONENT_SRC component/ component.cpp init-functions.cpp)
42 changes: 8 additions & 34 deletions runtime-light/component/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,19 @@
#include <utility>

#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/init-functions.h"
#include "runtime-light/core/globals/php-init-scripts.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/header.h"
#include "runtime-light/scheduler/scheduler.h"
#include "runtime-light/stdlib/job-worker/job-worker-context.h"
#include "runtime-light/streams/streams.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/utils/json-functions.h"

namespace {

constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0xd909efe8;
constexpr uint32_t K2_INVOKE_JOB_WORKER_MAGIC = 0x437d7312;

void init_http_superglobals(const string &http_query) noexcept {
int32_t merge_output_buffers() noexcept {
auto &component_ctx{*get_component_context()};
component_ctx.php_script_mutable_globals_singleton.get_superglobals().v$_SERVER.set_value(string{"QUERY_TYPE"}, string{"http"});
component_ctx.php_script_mutable_globals_singleton.get_superglobals().v$_POST = f$json_decode(http_query, true);
}

task_t<uint64_t> init_kphp_cli_component() noexcept {
co_return co_await wait_for_incoming_stream_t{};
}

task_t<uint64_t> init_kphp_server_component() noexcept {
uint32_t magic{};
const auto stream_d{co_await wait_for_incoming_stream_t{}};
const auto read{co_await read_exact_from_stream(stream_d, reinterpret_cast<char *>(std::addressof(magic)), sizeof(uint32_t))};
php_assert(read == sizeof(uint32_t));
if (magic == K2_INVOKE_HTTP_MAGIC) {
const auto [buffer, size]{co_await read_all_from_stream(stream_d)};
init_http_superglobals(string{buffer, static_cast<string::size_type>(size)});
get_platform_context()->allocator.free(buffer);
} else if (magic == K2_INVOKE_JOB_WORKER_MAGIC) {
php_error("not implemented");
} else {
php_error("server got unexpected type of request: 0x%x", magic);
}

co_return stream_d;
}

int32_t merge_output_buffers(ComponentState &component_ctx) noexcept {
Response &response{component_ctx.response};
php_assert(response.current_buffer >= 0);

Expand Down Expand Up @@ -95,12 +65,16 @@ task_t<void> ComponentState::run_component_epilogue() noexcept {
if (component_kind_ == ComponentKind::Oneshot || component_kind_ == ComponentKind::Multishot) {
co_return;
}
// do not flush output buffers if we are in job worker
if (JobWorkerServerComponentContext::get().kind != JobWorkerServerComponentContext::Kind::Invalid) {
co_return;
}
if (standard_stream() == INVALID_PLATFORM_DESCRIPTOR) {
poll_status = PollStatus::PollFinishedError;
co_return;
}

const auto &buffer{response.output_buffers[merge_output_buffers(*this)]};
const auto &buffer{response.output_buffers[merge_output_buffers()]};
if ((co_await write_all_to_stream(standard_stream(), buffer.buffer(), buffer.size())) != buffer.size()) {
php_warning("can't write component result to stream %" PRIu64, standard_stream());
}
Expand Down
7 changes: 7 additions & 0 deletions runtime-light/component/component.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "runtime-light/header.h"
#include "runtime-light/scheduler/scheduler.h"
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/job-worker/job-worker-context.h"
#include "runtime-light/stdlib/output/output-buffer.h"
#include "runtime-light/stdlib/regex/regex-functions.h"
#include "runtime-light/stdlib/curl/curl.h"
Expand Down Expand Up @@ -63,6 +64,10 @@ struct ComponentState {

task_t<void> run_component_epilogue() noexcept;

ComponentKind component_kind() const noexcept {
return component_kind_;
}

void process_platform_updates() noexcept;

bool stream_updated(uint64_t stream_d) const noexcept {
Expand Down Expand Up @@ -94,6 +99,8 @@ struct ComponentState {

KphpCoreContext kphp_core_context;
RpcComponentContext rpc_component_context;
JobWorkerClientComponentContext job_worker_client_component_context{};
JobWorkerServerComponentContext job_worker_server_component_context{};

RegexComponentState regex_component_context;
CurlComponentState curl_component_state;
Expand Down
1 change: 0 additions & 1 deletion runtime-light/component/image.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include "runtime-light/header.h"
#include "runtime-light/stdlib/rpc/rpc-context.h"

struct ImageState {
Expand Down
69 changes: 69 additions & 0 deletions runtime-light/component/init-functions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/component/init-functions.h"

#include <cstdint>

#include "runtime-core/runtime-core.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/header.h"
#include "runtime-light/stdlib/job-worker/job-worker-context.h"
#include "runtime-light/streams/streams.h"
#include "runtime-light/tl/tl-core.h"
#include "runtime-light/tl/tl-functions.h"
#include "runtime-light/utils/context.h"

namespace {

void process_k2_invoke_job_worker(tl::TLBuffer &tlb) noexcept {
tl::K2InvokeJobWorker invoke_jw{};
if (!invoke_jw.fetch(tlb)) {
php_error("erroneous job worker request");
}
php_assert(invoke_jw.image_id == vk_k2_describe()->build_timestamp); // ensure that we got the request from ourselves

auto &jw_server_ctx{JobWorkerServerComponentContext::get()};
jw_server_ctx.kind = invoke_jw.ignore_answer ? JobWorkerServerComponentContext::Kind::NoReply : JobWorkerServerComponentContext::Kind::Regular;
jw_server_ctx.state = JobWorkerServerComponentContext::State::Working;
jw_server_ctx.job_id = invoke_jw.job_id;
jw_server_ctx.body = std::move(invoke_jw.body);
get_component_context()->php_script_mutable_globals_singleton.get_superglobals().v$_SERVER.set_value(string{"JOB_ID"}, invoke_jw.job_id);
}

void process_k2_invoke_http([[maybe_unused]] tl::TLBuffer &tlb) noexcept {}

} // namespace

task_t<uint64_t> init_kphp_server_component() noexcept {
auto stream_d{co_await wait_for_incoming_stream_t{}};
const auto [buffer, size]{co_await read_all_from_stream(stream_d)};
php_assert(size >= sizeof(uint32_t)); // check that we can fetch at least magic
tl::TLBuffer tlb{};
tlb.store_bytes(buffer, static_cast<size_t>(size));
get_platform_context()->allocator.free(buffer);

switch (const auto magic{*reinterpret_cast<const uint32_t *>(tlb.data())}) { // lookup magic
case tl::K2_INVOKE_HTTP_MAGIC: {
process_k2_invoke_http(tlb);
break;
}
case tl::K2_INVOKE_JOB_WORKER_MAGIC: {
process_k2_invoke_job_worker(tlb);
// release standard stream in case of a no reply job worker since we don't need that stream anymore
if (JobWorkerServerComponentContext::get().kind == JobWorkerServerComponentContext::Kind::NoReply) {
get_component_context()->release_stream(stream_d);
stream_d = INVALID_PLATFORM_DESCRIPTOR;
}
break;
}
default: {
php_error("unexpected magic: 0x%x", magic);
}
}
co_return stream_d;
}
16 changes: 16 additions & 0 deletions runtime-light/component/init-functions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"

// Returns a stream descriptor that is supposed to be a stream to stdout
inline task_t<uint64_t> init_kphp_cli_component() noexcept {
co_return co_await wait_for_incoming_stream_t{};
}

// Performs some initialization and returns a stream descriptor we need to write server response into
task_t<uint64_t> init_kphp_server_component() noexcept;
Loading

0 comments on commit e8da89f

Please sign in to comment.