diff --git a/src/tateyama/endpoint/common/endpoint_proto_utils.h b/src/tateyama/endpoint/common/endpoint_proto_utils.h index c9981d9a..e9086da4 100644 --- a/src/tateyama/endpoint/common/endpoint_proto_utils.h +++ b/src/tateyama/endpoint/common/endpoint_proto_utils.h @@ -65,13 +65,13 @@ inline bool append_response_header(std::stringstream& ss, std::string_view body, hdr.set_payload_type(type); if(input.blobs_ && type == ::tateyama::proto::framework::response::Header::SERVICE_RESULT) { if (!(input.blobs_)->empty()) { - auto* blobs = hdr.mutable_blobs(); + auto* mutable_blobs = hdr.mutable_blobs(); for(auto&& e: *input.blobs_) { - auto* blob = blobs->add_blobs(); + auto* mutable_blob = mutable_blobs->add_blobs(); auto cn = e->channel_name(); - blob->set_channel_name(cn.data(), cn.length()); - blob->set_path((e->path()).string()); - blob->set_temporary(e->is_temporary()); + mutable_blob->set_channel_name(cn.data(), cn.length()); + mutable_blob->set_path((e->path()).string()); + mutable_blob->set_temporary(e->is_temporary()); } } } diff --git a/test/tateyama/endpoint/header_utils.h b/test/tateyama/endpoint/header_utils.h index 2d316fb0..0bdc060b 100644 --- a/test/tateyama/endpoint/header_utils.h +++ b/test/tateyama/endpoint/header_utils.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,24 +14,43 @@ * limitations under the License. */ #pragma once + +#include +#include +#include +#include + #include #include #include #include - -// #include +#include +#include namespace tateyama::endpoint { struct request_header_content { std::size_t session_id_{}; std::size_t service_id_{}; + std::set>* blobs_{}; }; inline bool append_request_header(std::stringstream& ss, std::string_view body, request_header_content input) { ::tateyama::proto::framework::request::Header hdr{}; hdr.set_session_id(input.session_id_); hdr.set_service_id(input.service_id_); + if (input.blobs_) { + if (!(input.blobs_)->empty()) { + ::tateyama::proto::framework::common::RepeatedBlobInfo blobs{}; + auto* mutable_blobs = hdr.mutable_blobs(); + for (auto&& e: *input.blobs_) { + auto* mutable_blob = mutable_blobs->add_blobs(); + mutable_blob->set_channel_name(std::get<0>(e)); + mutable_blob->set_path(std::get<1>(e)); + mutable_blob->set_temporary(std::get<2>(e)); + } + } + } if(auto res = utils::SerializeDelimitedToOstream(hdr, std::addressof(ss)); ! res) { return false; } diff --git a/test/tateyama/endpoint/ipc/ipc_client.cpp b/test/tateyama/endpoint/ipc/ipc_client.cpp index 166e55b7..ce5dc618 100644 --- a/test/tateyama/endpoint/ipc/ipc_client.cpp +++ b/test/tateyama/endpoint/ipc/ipc_client.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,25 +55,22 @@ void ipc_client::send(const std::size_t tag, const std::string &message, std::si ipc_test_index + index_offset); } -/* - * see parse_header() in tateyama/endpoint/common/endpoint_proto_utils.h - */ -struct parse_response_result { - std::size_t session_id_ { }; - tateyama::proto::framework::response::Header::PayloadType payload_type_{}; - std::string_view payload_ { }; -}; - -bool parse_response_header(std::string_view input, parse_response_result &result) { - result = { }; - ::tateyama::proto::framework::response::Header hdr { }; +void ipc_client::send(const std::size_t tag, const std::string &message, std::set>& blobs, std::size_t index_offset) { + request_header_content hdr { session_id_, tag, &blobs }; + std::stringstream ss { }; + append_request_header(ss, message, hdr); + auto request_message = ss.str(); + request_wire_->write(reinterpret_cast(request_message.data()), + request_message.length(), + ipc_test_index + index_offset); +} + +bool parse_response_header(std::string_view input, tateyama::proto::framework::response::Header& hdr, std::string_view &payload) { google::protobuf::io::ArrayInputStream in { input.data(), static_cast(input.size()) }; if (auto res = utils::ParseDelimitedFromZeroCopyStream(std::addressof(hdr), std::addressof(in), nullptr); !res) { return false; } - result.session_id_ = hdr.session_id(); - result.payload_type_ = hdr.payload_type(); - return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, result.payload_); + return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, payload); } void ipc_client::receive(std::string &message) { @@ -110,12 +107,12 @@ void ipc_client::receive(std::string &message, tateyama::proto::framework::respo r_msg.resize(header.get_length()); response_wire_->read(reinterpret_cast(r_msg.data())); // - parse_response_result result; - parse_response_header(r_msg, result); + std::string_view payload{}; + parse_response_header(r_msg, hdr_, payload); // ASSERT_TRUE(parse_response_header(r_msg, result)); // EXPECT_EQ(session_id_, result.session_id_); - message = result.payload_; - type = result.payload_type_; + message = payload; + type = hdr_.payload_type(); } resultset_wires_container* ipc_client::create_resultset_wires() { diff --git a/test/tateyama/endpoint/ipc/ipc_client.h b/test/tateyama/endpoint/ipc/ipc_client.h index a048e19a..d7a7ab07 100644 --- a/test/tateyama/endpoint/ipc/ipc_client.h +++ b/test/tateyama/endpoint/ipc/ipc_client.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,8 @@ */ #pragma once +#include + #include "ipc_test_utils.h" // FIXME: temporary header handling - start @@ -41,6 +43,7 @@ class ipc_client { ~ipc_client() { disconnect(); } void send(const std::size_t tag, const std::string &message, std::size_t index_offset = 0); + void send(const std::size_t tag, const std::string &message, std::set>& blobs, std::size_t index_offset = 0); void receive(std::string &message); void receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType& type); void disconnect() { @@ -51,7 +54,6 @@ class ipc_client { } resultset_wires_container* create_resultset_wires(); void dispose_resultset_wires(resultset_wires_container *rwc); - std::size_t session_id() const noexcept { return session_id_; } @@ -59,6 +61,9 @@ class ipc_client { std::string session_name() const noexcept { return session_name_; } + tateyama::proto::framework::response::Header& framework_response_header() { + return hdr_; + } // tateyama/src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h // - private server_wire_container_impl::resultset_buffer_size = 64KB @@ -79,6 +84,7 @@ class ipc_client { tsubakuro::common::wire::session_wire_container::wire_container *request_wire_ { }; tsubakuro::common::wire::session_wire_container::response_wire_container *response_wire_ { }; tateyama::proto::endpoint::request::Handshake default_endpoint_handshake_{ }; + tateyama::proto::framework::response::Header hdr_ { }; bool disconnected_{ }; void handshake(); diff --git a/test/tateyama/endpoint/ipc/ipc_lob_test.cpp b/test/tateyama/endpoint/ipc/ipc_lob_test.cpp new file mode 100644 index 00000000..5bab2fa7 --- /dev/null +++ b/test/tateyama/endpoint/ipc/ipc_lob_test.cpp @@ -0,0 +1,212 @@ +/* + * Copyright 2018-2025 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "tateyama/endpoint/ipc/bootstrap/ipc_worker.h" +#include "tateyama/endpoint/header_utils.h" +#include +#include "tateyama/status/resource/database_info_impl.h" +#include "ipc_client.h" + +#include + +namespace tateyama::server { +class ipc_listener_for_test { +public: + static void run(tateyama::endpoint::ipc::bootstrap::ipc_worker& worker) { + worker.invoke([&]{ + worker.run(); + worker.delete_hook(); + }); + } + static void wait(tateyama::endpoint::ipc::bootstrap::ipc_worker& worker) { + while (!worker.is_terminated()); + } +}; +} // namespace tateyama::server + +namespace tateyama::endpoint::ipc { + +class blob_info_for_test : public tateyama::api::server::blob_info { +public: + blob_info_for_test(std::string_view channel_name, std::filesystem::path path, bool temporary) + : channel_name_(channel_name), path_(std::move(path)), temporary_(temporary) { + } + [[nodiscard]] std::string_view channel_name() const noexcept override { + return channel_name_; + } + [[nodiscard]] std::filesystem::path path() const noexcept override { + return path_; + } + [[nodiscard]] bool is_temporary() const noexcept override { + return temporary_; + } + void dispose() override { + } +private: + const std::string channel_name_{}; + const std::filesystem::path path_{}; + const bool temporary_{}; +}; + +static constexpr std::size_t my_session_id = 123; + +static constexpr std::string_view database_name = "ipc_lob_test"; +static constexpr std::string_view label = "label_fot_test"; +static constexpr std::string_view application_name = "application_name_fot_test"; +static constexpr std::size_t datachannel_buffer_size = 64 * 1024; +static constexpr tateyama::common::wire::message_header::index_type index_ = 1; +static constexpr std::string_view response_test_message = "opqrstuvwxyz"; +static constexpr std::string_view request_test_message = "abcdefgh"; +static constexpr std::size_t service_id_of_lob_service = 102; + +class lob_service : public tateyama::framework::routing_service { +public: + bool setup(tateyama::framework::environment&) { return true; } + bool start(tateyama::framework::environment&) { return true; } + bool shutdown(tateyama::framework::environment&) { return true; } + std::string_view label() const noexcept { return __func__; } + + id_type id() const noexcept { return service_id_of_lob_service; } + bool operator ()(std::shared_ptr req, + std::shared_ptr res) override { + req_ = req; + for (auto&& e: blobs_) { + res->add_blob(std::make_unique(std::get<0>(e), std::get<1>(e), std::get<2>(e))); + } + res->body(response_test_message); + return true; + } + + tateyama::api::server::request* request() { + return req_.get(); + } + + void push_blob(const std::string& name, const std::string& path, const bool temporary) { + blobs_.emplace(name, std::filesystem::path(path), temporary); + } + +private: + std::shared_ptr req_{}; + std::set> blobs_{}; +}; + +class ipc_lob_test : public ::testing::Test { + virtual void SetUp() { + auto rv = system("if [ -f /dev/shm/ipc_lob_test ]; then rm -f /dev/shm/ipc_lob_test; fi"); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // server part + std::string session_name{database_name}; + session_name += "-"; + session_name += std::to_string(my_session_id); + wire_ = std::make_shared(session_name, "dummy_mutex_file_name", datachannel_buffer_size, 16); + const tateyama::endpoint::common::worker_common::configuration conf(tateyama::endpoint::common::worker_common::connection_type::ipc); + worker_ = std::make_unique(service_, conf, my_session_id, wire_, database_info_); + tateyama::server::ipc_listener_for_test::run(*worker_); + + // client part + tateyama::proto::endpoint::request::ClientInformation cci{}; + cci.set_connection_label(std::string(label)); + cci.set_application_name(std::string(application_name)); + tateyama::proto::endpoint::request::Credential cred{}; + // FIXME handle userName when a credential specification is fixed. + cci.set_allocated_credential(&cred); + tateyama::proto::endpoint::request::Handshake hs{}; + hs.set_allocated_client_information(&cci); + client_ = std::make_unique(database_name, my_session_id, hs); + cci.release_credential(); + hs.release_client_information(); + } + + virtual void TearDown() { + worker_->terminate(tateyama::session::shutdown_request_type::forceful); + tateyama::server::ipc_listener_for_test::wait(*worker_); + + auto rv = system("if [ -f /dev/shm/ipc_lob_test ]; then rm -f /dev/shm/ipc_lob_test; fi"); + blobs_.clear(); + } + +public: + tateyama::status_info::resource::database_info_impl database_info_{database_name}; + std::set> blobs_{}; + +protected: + std::shared_ptr wire_{}; + std::unique_ptr worker_{}; + std::unique_ptr client_{}; + lob_service service_{}; +}; + + +TEST_F(ipc_lob_test, receive) { + blobs_.emplace("BlobChannel-123-456", "/tmp/BlobFile", false); + blobs_.emplace("ClobChannel-123-789", "/tmp/ClobFile", true); + + // we do not care service_id nor request message here + client_->send(service_id_of_lob_service, std::string(request_test_message), blobs_); + std::string res{}; + client_->receive(res); + + // server part + auto* request = service_.request(); + + // test for blob_info + // blob + EXPECT_TRUE(request->has_blob("BlobChannel-123-456")); + auto& blob = request->get_blob("BlobChannel-123-456"); + EXPECT_EQ(blob.channel_name(), "BlobChannel-123-456"); + EXPECT_EQ(blob.path().string(), "/tmp/BlobFile"); + EXPECT_EQ(blob.is_temporary(), false); + + // clob + EXPECT_TRUE(request->has_blob("ClobChannel-123-789")); + auto& clob = request->get_blob("ClobChannel-123-789"); + EXPECT_EQ(clob.channel_name(), "ClobChannel-123-789"); + EXPECT_EQ(clob.path().string(), "/tmp/ClobFile"); + EXPECT_EQ(clob.is_temporary(),true); + + // blob, clob that does not exist + EXPECT_FALSE(request->has_blob("BlobChannel-987-654")); + EXPECT_THROW(auto& blob_not_find = request->get_blob("BlobChannel-987-654"), std::runtime_error); + EXPECT_FALSE(request->has_blob("ClobChannel-654-321")); + EXPECT_THROW(auto& clob_not_find = request->get_blob("ClobChannel-654-321"), std::runtime_error); +} + +TEST_F(ipc_lob_test, send) { + service_.push_blob("BlobChannel-123-456", "/tmp/BlobFile", false); + service_.push_blob("ClobChannel-123-789", "/tmp/ClobFile", true); + + // we do not care service_id nor request message here + client_->send(service_id_of_lob_service, std::string(request_test_message), blobs_); + std::string res{}; + client_->receive(res); + + ::tateyama::proto::framework::response::Header& header = client_->framework_response_header(); + ASSERT_TRUE(header.has_blobs()); + for (auto&& e: header.blobs().blobs()) { + if (e.channel_name().compare("BlobChannel-123-456") == 0) { + EXPECT_EQ(e.channel_name(), "BlobChannel-123-456"); + EXPECT_EQ(e.path(), "/tmp/BlobFile"); + EXPECT_EQ(e.temporary(), false); + } else if (e.channel_name().compare("ClobChannel-123-789") == 0) { + EXPECT_EQ(e.channel_name(), "ClobChannel-123-789"); + EXPECT_EQ(e.path(), "/tmp/ClobFile"); + EXPECT_EQ(e.temporary(), true); + } + } +} + +} // namespace tateyama::endpoint::ipc