Skip to content

Commit

Permalink
Merge pull request #201 from anarthal/feature/type-erased-response
Browse files Browse the repository at this point in the history
Adds a type-erased response adapter to the public API
  • Loading branch information
mzimbres authored Dec 22, 2024
2 parents 26197e8 + 5089eef commit d910557
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 14 deletions.
80 changes: 80 additions & 0 deletions include/boost/redis/adapter/any_adapter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#ifndef BOOST_REDIS_ANY_ADAPTER_HPP
#define BOOST_REDIS_ANY_ADAPTER_HPP


#include <boost/redis/resp3/node.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <functional>
#include <string_view>
#include <type_traits>

namespace boost::redis {

namespace detail {

// Forward decl
template <class Executor>
class connection_base;

}

/** @brief A type-erased reference to a response.
* @ingroup high-level-api
*
* A type-erased response adapter. It can be executed using @ref connection::async_exec.
* Using this type instead of raw response references enables separate compilation.
*
* Given a response object `resp` that can be passed to `async_exec`, the following two
* statements have the same effect:
* ```
* co_await conn.async_exec(req, resp);
* co_await conn.async_exec(req, any_response(resp));
* ```
*/
class any_adapter
{
using fn_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;

struct impl_t {
fn_type adapt_fn;
std::size_t supported_response_size;
} impl_;

template <class T>
static auto create_impl(T& resp) -> impl_t
{
using namespace boost::redis::adapter;
auto adapter = boost_redis_adapt(resp);
std::size_t size = adapter.get_supported_response_size();
return { std::move(adapter), size };
}

template <class Executor>
friend class detail::connection_base;

public:
/**
* @brief Constructor.
*
* Creates a type-erased response adapter from `resp` by calling
* `boost_redis_adapt`. `T` must be a valid Redis response type.
* Any type passed to @ref connection::async_exec qualifies.
*
* This object stores a reference to `resp`, which must be kept alive
* while `*this` is being used.
*/
template <class T, class = std::enable_if_t<!std::is_same_v<T, any_adapter>>>
explicit any_adapter(T& resp) : impl_(create_impl(resp)) {}
};

}

#endif
42 changes: 38 additions & 4 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_CONNECTION_HPP
#define BOOST_REDIS_CONNECTION_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/connection_base.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/config.hpp>
Expand All @@ -17,7 +18,7 @@
#include <boost/asio/any_completion_handler.hpp>

#include <chrono>
#include <memory>
#include <cstddef>
#include <limits>

namespace boost::redis {
Expand Down Expand Up @@ -256,7 +257,22 @@ class basic_connection {
Response& resp = ignore,
CompletionToken&& token = CompletionToken{})
{
return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
return impl_.async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/** @copydoc async_exec
*
* @details This function uses the type-erased @ref any_adapter class, which
* encapsulates a reference to a response object.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto
async_exec(
request const& req,
any_adapter adapter,
CompletionToken&& token = CompletionToken{})
{
return impl_.async_exec(req, std::move(adapter), std::forward<CompletionToken>(token));
}

/** @brief Cancel operations.
Expand Down Expand Up @@ -392,9 +408,21 @@ class connection {

/// Calls `boost::redis::basic_connection::async_exec`.
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
auto async_exec(request const& req, Response& resp, CompletionToken&& token)
{
return impl_.async_exec(req, resp, std::move(token));
return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/// Calls `boost::redis::basic_connection::async_exec`.
template <class CompletionToken>
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code, std::size_t)>(
[](auto handler, connection* self, request const* req, any_adapter&& adapter)
{
self->async_exec_impl(*req, std::move(adapter), std::move(handler));
}, token, this, &req, std::move(adapter));
}

/// Calls `boost::redis::basic_connection::cancel`.
Expand Down Expand Up @@ -435,6 +463,12 @@ class connection {
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token);

void
async_exec_impl(
request const& req,
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);

basic_connection<executor_type> impl_;
};
Expand Down
19 changes: 11 additions & 8 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
#define BOOST_REDIS_CONNECTION_BASE_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/error.hpp>
Expand All @@ -30,15 +31,16 @@
#include <boost/asio/read_until.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/associated_immediate_executor.hpp>

#include <algorithm>
#include <array>
#include <chrono>
#include <deque>
#include <memory>
#include <string_view>
#include <type_traits>
#include <functional>
#include <utility>

namespace boost::redis::detail
{
Expand Down Expand Up @@ -121,7 +123,9 @@ struct exec_op {
// be stablished.
if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
asio::dispatch(
asio::get_associated_immediate_executor(self, self.get_io_executor()),
std::move(self));
return self.complete(error::not_connected, 0);
}

Expand Down Expand Up @@ -440,14 +444,13 @@ class connection_base {
cancel_impl(op);
}

template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
template <class CompletionToken>
auto async_exec(request const& req, any_adapter&& adapter, CompletionToken&& token)
{
using namespace boost::redis::adapter;
auto f = boost_redis_adapt(resp);
BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
auto& adapter_impl = adapter.impl_;
BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");

auto info = std::make_shared<req_info>(req, f, get_executor());
auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());

return asio::async_compose
< CompletionToken
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/detail/health_checker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/config.hpp>
#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -44,7 +45,7 @@ class ping_op {
}

BOOST_ASIO_CORO_YIELD
conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
if (ec || is_cancelled(self)) {
logger_.trace("ping_op: error/cancelled (1).");
checker_->wait_timer_.cancel();
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/detail/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_RUNNER_HPP
#define BOOST_REDIS_RUNNER_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/response.hpp>
Expand Down Expand Up @@ -47,7 +48,7 @@ struct hello_op {
runner_->add_hello();

BOOST_ASIO_CORO_YIELD
conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
logger_.on_hello(ec, runner_->hello_resp_);

if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
Expand Down
10 changes: 10 additions & 0 deletions include/boost/redis/impl/connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <boost/redis/connection.hpp>
#include <cstddef>

namespace boost::redis {

Expand All @@ -31,6 +32,15 @@ connection::async_run_impl(
impl_.async_run(cfg, l, std::move(token));
}

void
connection::async_exec_impl(
request const& req,
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token)
{
impl_.async_exec(req, std::move(adapter), std::move(token));
}

void connection::cancel(operation op)
{
impl_.cancel(op);
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ make_test(test_conn_exec_cancel 20)
make_test(test_conn_exec_cancel2 20)
make_test(test_conn_echo_stress 20)
make_test(test_conn_run_cancel 20)
make_test(test_any_adapter 17)
make_test(test_issue_50 20)
make_test(test_issue_181 17)

Expand Down
1 change: 1 addition & 0 deletions test/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ local tests =
test_low_level
test_request
test_run
test_any_adapter
;

# Build and run the tests
Expand Down
49 changes: 49 additions & 0 deletions test/test_any_adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#include <boost/redis/ignore.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <string>
#define BOOST_TEST_MODULE any_adapter
#include <boost/test/included/unit_test.hpp>

using boost::redis::generic_response;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::any_adapter;

BOOST_AUTO_TEST_CASE(any_adapter_response_types)
{
// any_adapter can be used with any supported responses
response<int> r1;
response<int, std::string> r2;
generic_response r3;

BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{ignore});
}

BOOST_AUTO_TEST_CASE(any_adapter_copy_move)
{
// any_adapter can be copied/moved
response<int, std::string> r;
any_adapter ad1 {r};

// copy constructor
any_adapter ad2 {ad1};

// move constructor
any_adapter ad3 {std::move(ad2)};

// copy assignment
BOOST_CHECK_NO_THROW(ad2 = ad1);

// move assignment
BOOST_CHECK_NO_THROW(ad2 = std::move(ad1));
}
24 changes: 24 additions & 0 deletions test/test_conn_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
* accompanying file LICENSE.txt)
*/

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/connection.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/detached.hpp>
#include <string>
#define BOOST_TEST_MODULE conn-exec
#include <boost/test/included/unit_test.hpp>
#include <iostream>
Expand Down Expand Up @@ -191,3 +193,25 @@ BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170)
BOOST_CHECK_EQUAL(counter, repeat);
}

BOOST_AUTO_TEST_CASE(exec_any_adapter)
{
// Executing an any_adapter object works
request req;
req.push("PING", "PONG");
response<std::string> res;

net::io_context ioc;

auto conn = std::make_shared<connection>(ioc);

conn->async_exec(req, boost::redis::any_adapter(res), [&](auto ec, auto){
BOOST_TEST(!ec);
conn->cancel();
});

run(conn);
ioc.run();

BOOST_TEST(std::get<0>(res).value() == "PONG");
}

0 comments on commit d910557

Please sign in to comment.