Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZenohUTransport implementaton #54

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ find_package(spdlog REQUIRED)
find_package(up-core-api REQUIRED)
find_package(up-cpp REQUIRED)
find_package(zenohcpp REQUIRED)
find_package(GTest REQUIRED)
include(GoogleTest)

# TODO NEEDED?
#add_definitions(-DSPDLOG_FMT_EXTERNAL)
Expand Down Expand Up @@ -55,6 +57,7 @@ target_link_libraries(${PROJECT_NAME}
up-cpp::up-cpp
up-core-api::up-core-api
protobuf::libprotobuf
GTest::gtest_main
spdlog::spdlog)

enable_testing()
Expand Down
4 changes: 2 additions & 2 deletions conanfile.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[requires]
up-cpp/0.2.0
zenohcpp/0.11.0
up-cpp/[>=1.1.0]
zenohcpp/0.11.0.3
# Should result in using the packages from up-cpp
spdlog/[>=1.13.0]
up-core-api/[>=1.5.8]
Expand Down
103 changes: 103 additions & 0 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,34 @@
#include <up-cpp/transport/UTransport.h>

#include <filesystem>
#include <mutex>
#include <optional>
#include <unordered_map>

#define ZENOHCXX_ZENOHC
#include <zenoh.hxx>

namespace zenohc {

class OwnedQuery {
public:
OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {}

OwnedQuery(const OwnedQuery&) = delete;
OwnedQuery& operator=(const OwnedQuery&) = delete;

~OwnedQuery() { z_drop(&_query); }

Query loan() const { return z_loan(_query); }
bool check() const { return z_check(_query); }

private:
z_owned_query_t _query;
};

using OwnedQueryPtr = std::shared_ptr<OwnedQuery>;

} // namespace zenohc

namespace uprotocol::transport {

Expand Down Expand Up @@ -61,6 +88,31 @@ struct ZenohUTransport : public UTransport {

/// @brief Represents the callable end of a callback connection.
using CallableConn = typename UTransport::CallableConn;
using UuriKey = std::string;

struct ListenerKey {
CallableConn listener;
std::string zenoh_key;

ListenerKey(CallableConn listener, const std::string& zenoh_key)
: listener(listener), zenoh_key(zenoh_key) {}

bool operator==(const ListenerKey& other) const {
return listener == other.listener && zenoh_key == other.zenoh_key;
}

bool operator<(const ListenerKey& other) const {
if (listener == other.listener) {
return zenoh_key < other.zenoh_key;
}
return listener < other.listener;
}
};

using RpcCallbackMap = std::map<UuriKey, CallableConn>;
using SubscriberMap = std::map<ListenerKey, zenoh::Subscriber>;
using QueryableMap = std::map<ListenerKey, zenoh::Queryable>;
using QueryMap = std::map<std::string, zenoh::OwnedQueryPtr>;

/// @brief Register listener to be called when UMessage is received
/// for the given URI.
Expand Down Expand Up @@ -94,7 +146,58 @@ struct ZenohUTransport : public UTransport {
/// @param listener shared_ptr of the Connection that has been broken.
virtual void cleanupListener(CallableConn listener) override;

static std::string toZenohKeyString(
const std::string& default_authority_name, const v1::UUri& source,
const std::optional<v1::UUri>& sink);

private:
static v1::UStatus uError(v1::UCode code, std::string_view message);

static std::vector<std::pair<std::string, std::string>>
uattributesToAttachment(const v1::UAttributes& attributes);

static v1::UAttributes attachmentToUAttributes(
const zenoh::AttachmentView& attachment);

static zenoh::Priority mapZenohPriority(v1::UPriority upriority);

static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);

static v1::UMessage queryToUMessage(const zenoh::Query& query);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerResponseListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener);

v1::UStatus sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendResponse_(const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendPublishNotification_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

zenoh::Session session_;

RpcCallbackMap rpc_callback_map_;
std::mutex rpc_callback_map_mutex_;

SubscriberMap subscriber_map_;
std::mutex subscriber_map_mutex_;

QueryableMap queryable_map_;
std::mutex queryable_map_mutex_;

QueryMap query_map_;
std::mutex query_map_mutex_;
};

} // namespace uprotocol::transport
Expand Down
Loading
Loading