Skip to content

Commit

Permalink
Removing all modes other than pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
gregmedd committed Aug 14, 2024
1 parent ee0a112 commit 1d3fc4d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 182 deletions.
19 changes: 0 additions & 19 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,35 +113,16 @@ struct ZenohUTransport : public UTransport {
static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);
static v1::UMessage queryToUMessage(const zenoh::Query& query);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerResponseListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener);

v1::UStatus sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendResponse_(const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendPublishNotification_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

zenoh::Session session_;

ThreadSafeMap<UuriKey, CallableConn> rpc_callback_map_;

ThreadSafeMap<CallableConn, zenoh::Subscriber<void>> subscriber_map_;

ThreadSafeMap<CallableConn, zenoh::Queryable<void>> queryable_map_;

ThreadSafeMap<std::string, std::shared_ptr<zenoh::Query>> query_map_;
};

} // namespace uprotocol::transport
Expand Down
166 changes: 3 additions & 163 deletions src/ZenohUTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,46 +173,6 @@ ZenohUTransport::ZenohUTransport(const v1::UUri& defaultUri,
spdlog::info("ZenohUTransport init");
}

v1::UStatus ZenohUTransport::registerRequestListener_(
const std::string& zenoh_key, CallableConn listener) {
spdlog::info("registerRequestListener_: {}", zenoh_key);

// NOTE: listener is captured by copy here so that it does not go out
// of scope when this function returns.
auto on_query = [this, listener](const zenoh::Query& query) mutable {
auto attributes = attachmentToUAttributes(query.get_attachment());
auto id_str =
datamodel::serializer::uuid::AsString().serialize(attributes.id());

// TODO(sashacmc): Replace this workaround with `query.clone()`
// after zenohcpp 1.0.0-rc6 release
auto cloned_query = std::make_shared<zenoh::Query>(nullptr);
z_query_clone(zenoh::detail::as_owned_c_ptr(*cloned_query),
zenoh::detail::loan(query));

query_map_.emplace(std::move(id_str), std::move(cloned_query));
listener(queryToUMessage(query));
};

auto on_drop = []() {};

auto queryable = session_.declare_queryable(zenoh_key, std::move(on_query),
std::move(on_drop));

queryable_map_.emplace(listener, std::move(queryable));

return v1::UStatus();
}

v1::UStatus ZenohUTransport::registerResponseListener_(
const std::string& zenoh_key, CallableConn listener) {
spdlog::info("registerResponseListener_: {}", zenoh_key);

rpc_callback_map_.emplace(zenoh_key, listener);

return v1::UStatus();
}

v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener) {
spdlog::info("registerPublishNotificationListener_: {}", zenoh_key);
Expand All @@ -231,83 +191,6 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
return v1::UStatus();
}

v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes) {
spdlog::debug("sendRequest_: {}: {}", zenoh_key, payload);
zenoh::KeyExpr ke(zenoh_key);
auto ke_search = [&](const std::pair<std::string, CallableConn>& pair) {
return zenoh::KeyExpr(pair.first).intersects(ke);
};

CallableConn resp_callback;

if (auto resp_callback_opt = rpc_callback_map_.find_if(ke_search);
resp_callback_opt) {
spdlog::debug("sendRequest_: found callback for '{}'", zenoh_key);
resp_callback = *resp_callback_opt;
} else {
spdlog::error("sendRequest_: failed to find response callback for '{}'",
zenoh_key);
return uError(v1::UCode::UNAVAILABLE,
"failed to find response callback");
}
auto on_reply = [=](const zenoh::Reply& reply) mutable {
spdlog::debug("on_reply for {}", zenoh_key);
if (reply.is_ok()) {
const auto& sample = reply.get_ok();
spdlog::debug("resp_callback: {}",
sample.get_payload().deserialize<std::string>());
resp_callback(sampleToUMessage(sample));
spdlog::debug("resp_callback: done");
} else {
spdlog::error(
"on_reply got en error: {}",
reply.get_err().get_payload().deserialize<std::string>());
// TODO: error report
}
};

auto attachment = uattributesToAttachment(attributes);

auto on_done = []() {};

try {
session_.get(zenoh_key, "", std::move(on_reply), std::move(on_done),
{.target = Z_QUERY_TARGET_BEST_MATCHING,
.consolidation = zenoh::QueryConsolidation(
{.mode = Z_CONSOLIDATION_MODE_NONE}),
.payload = zenoh::Bytes::serialize(payload),
.attachment = zenoh::Bytes::serialize(attachment)});
} catch (const zenoh::ZException& e) {
return uError(v1::UCode::INTERNAL, e.what());
}

return v1::UStatus();
}

v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload,
const v1::UAttributes& attributes) {
auto reqid_str =
datamodel::serializer::uuid::AsString().serialize(attributes.reqid());
spdlog::debug("sendResponse_: {}: {}", reqid_str, payload);
std::shared_ptr<zenoh::Query> query(nullptr);
if (auto query_opt = query_map_.find(reqid_str); query_opt) {
query = *query_opt;
} else {
spdlog::error("sendResponse_: query doesn't exist");
return uError(v1::UCode::INTERNAL, "query doesn't exist");
}

spdlog::debug("sendResponse_ to query: {}",
query->get_keyexpr().as_string_view());
auto attachment = uattributesToAttachment(attributes);
query->reply(query->get_keyexpr(), payload,
{.attachment = zenoh::Bytes::serialize(attachment)});

return v1::UStatus();
}

v1::UStatus ZenohUTransport::sendPublishNotification_(
const std::string& zenoh_key, const std::string& payload,
const v1::UAttributes& attributes) {
Expand Down Expand Up @@ -345,63 +228,20 @@ v1::UStatus ZenohUTransport::sendImpl(const v1::UMessage& message) {
attributes.source(), attributes.sink());
}

switch (attributes.type()) {
case v1::UMessageType::UMESSAGE_TYPE_PUBLISH: {
return sendPublishNotification_(zenoh_key, payload, attributes);
}
case v1::UMessageType::UMESSAGE_TYPE_NOTIFICATION: {
return sendPublishNotification_(zenoh_key, payload, attributes);
}
case v1::UMessageType::UMESSAGE_TYPE_REQUEST: {
return sendRequest_(zenoh_key, payload, attributes);
}
case v1::UMessageType::UMESSAGE_TYPE_RESPONSE: {
return sendResponse_(payload, attributes);
}
default: {
return uError(v1::UCode::INVALID_ARGUMENT,
"Wrong Message type in v1::UAttributes");
}
}
return v1::UStatus();
return sendPublishNotification_(zenoh_key, payload, attributes);
}

v1::UStatus ZenohUTransport::registerListenerImpl(
CallableConn&& listener, const v1::UUri& source_filter,
std::optional<v1::UUri>&& sink_filter) {
std::string zenoh_key = toZenohKeyString(getEntityUri().authority_name(),
source_filter, sink_filter);
if (!sink_filter) {
// When only a single filter is provided, this signals that the
// listener is for a pub/sub-like communication mode where then
// messages are expected to only have a source address.
registerPublishNotificationListener_(zenoh_key, listener);
} else {
// Otherwise, the filters could be for any communication mode.
// We can't use the UUri validators to determine what mode they
// are for because a) there is overlap in allowed values between
// modes and b) any filter is allowed to have wildcards present.
registerRequestListener_(zenoh_key, listener);
registerPublishNotificationListener_(zenoh_key, listener);

if (sink_filter.has_value()) {
// zenoh_key for response listener should be in revert order
std::string zenoh_response_key = toZenohKeyString(
getEntityUri().authority_name(), *sink_filter, source_filter);
registerResponseListener_(zenoh_response_key, listener);
}
}

v1::UStatus status;
status.set_code(v1::UCode::OK);
return status;
return registerPublishNotificationListener_(zenoh_key, listener);
}

void ZenohUTransport::cleanupListener(CallableConn listener) {
if (subscriber_map_.erase(listener) > 0) {
return;
}
queryable_map_.erase(listener);
subscriber_map_.erase(listener);
}

} // namespace uprotocol::transport

0 comments on commit 1d3fc4d

Please sign in to comment.