Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing all modes other than pub/sub #110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,35 +113,16 @@ struct ZenohUTransport : public UTransport {
static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);
static v1::UMessage queryToUMessage(const zenoh::Query& query);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerResponseListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener);

v1::UStatus sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendResponse_(const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendPublishNotification_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

zenoh::Session session_;

ThreadSafeMap<UuriKey, CallableConn> rpc_callback_map_;

ThreadSafeMap<CallableConn, zenoh::Subscriber<void>> subscriber_map_;

ThreadSafeMap<CallableConn, zenoh::Queryable<void>> queryable_map_;

ThreadSafeMap<std::string, std::shared_ptr<zenoh::Query>> query_map_;
};

} // namespace uprotocol::transport
Expand Down
189 changes: 3 additions & 186 deletions src/ZenohUTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,46 +185,6 @@ ZenohUTransport::ZenohUTransport(const v1::UUri& defaultUri,
spdlog::info("ZenohUTransport init");
}

v1::UStatus ZenohUTransport::registerRequestListener_(
const std::string& zenoh_key, CallableConn listener) {
spdlog::info("registerRequestListener_: {}", zenoh_key);

// NOTE: listener is captured by copy here so that it does not go out
// of scope when this function returns.
auto on_query = [this, listener](const zenoh::Query& query) mutable {
auto attributes = attachmentToUAttributes(query.get_attachment());
auto id_str =
datamodel::serializer::uuid::AsString().serialize(attributes.id());

// TODO(sashacmc): Replace this workaround with `query.clone()`
// after zenohcpp 1.0.0-rc6 release
auto cloned_query = std::make_shared<zenoh::Query>(nullptr);
z_query_clone(zenoh::detail::as_owned_c_ptr(*cloned_query),
zenoh::detail::loan(query));

query_map_.emplace(std::move(id_str), std::move(cloned_query));
listener(queryToUMessage(query));
};

auto on_drop = []() {};

auto queryable = session_.declare_queryable(zenoh_key, std::move(on_query),
std::move(on_drop));

queryable_map_.emplace(listener, std::move(queryable));

return v1::UStatus();
}

v1::UStatus ZenohUTransport::registerResponseListener_(
const std::string& zenoh_key, CallableConn listener) {
spdlog::info("registerResponseListener_: {}", zenoh_key);

rpc_callback_map_.emplace(zenoh_key, listener);

return v1::UStatus();
}

v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener) {
spdlog::info("registerPublishNotificationListener_: {}", zenoh_key);
Expand All @@ -243,93 +203,6 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
return v1::UStatus();
}

v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes) {
spdlog::debug("sendRequest_: {}: {}", zenoh_key, payload);
zenoh::KeyExpr ke(zenoh_key);
auto ke_search = [&](const std::pair<std::string, CallableConn>& pair) {
return zenoh::KeyExpr(pair.first).intersects(ke);
};

CallableConn resp_callback;

if (auto resp_callback_opt = rpc_callback_map_.find_if(ke_search);
resp_callback_opt) {
spdlog::debug("sendRequest_: found callback for '{}'", zenoh_key);
resp_callback = *resp_callback_opt;
} else {
spdlog::error("sendRequest_: failed to find response callback for '{}'",
zenoh_key);
return uError(v1::UCode::UNAVAILABLE,
"failed to find response callback");
}
auto on_reply = [=](const zenoh::Reply& reply) mutable {
spdlog::debug("on_reply for {}", zenoh_key);
if (reply.is_ok()) {
const auto& sample = reply.get_ok();
spdlog::debug("resp_callback: {}",
sample.get_payload().deserialize<std::string>());
resp_callback(sampleToUMessage(sample));
spdlog::debug("resp_callback: done");
} else {
spdlog::error(
"on_reply got en error: {}",
reply.get_err().get_payload().deserialize<std::string>());
// TODO: error report
}
};

auto attachment = uattributesToAttachment(attributes);

auto on_done = []() {};

try {
// -Wpedantic disallows named member initialization until C++20,
// so GetOptions needs to be explicitly created and passed with
// std::move()
zenoh::Session::GetOptions options;
options.target = Z_QUERY_TARGET_BEST_MATCHING;
options.consolidation =
zenoh::QueryConsolidation(Z_CONSOLIDATION_MODE_NONE);
options.payload = zenoh::Bytes::serialize(payload);
options.attachment = zenoh::Bytes::serialize(attachment);
session_.get(zenoh_key, "", std::move(on_reply), std::move(on_done),
std::move(options));
} catch (const zenoh::ZException& e) {
return uError(v1::UCode::INTERNAL, e.what());
}

return v1::UStatus();
}

v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload,
const v1::UAttributes& attributes) {
auto reqid_str =
datamodel::serializer::uuid::AsString().serialize(attributes.reqid());
spdlog::debug("sendResponse_: {}: {}", reqid_str, payload);
std::shared_ptr<zenoh::Query> query(nullptr);
if (auto query_opt = query_map_.find(reqid_str); query_opt) {
query = *query_opt;
} else {
spdlog::error("sendResponse_: query doesn't exist");
return uError(v1::UCode::INTERNAL, "query doesn't exist");
}

spdlog::debug("sendResponse_ to query: {}",
query->get_keyexpr().as_string_view());
auto attachment = uattributesToAttachment(attributes);
// -Wpedantic disallows named member initialization until C++20,
// so PutOptions needs to be explicitly created and passed with
// std::move()
zenoh::Query::ReplyOptions options =
zenoh::Query::ReplyOptions::create_default();
options.attachment = zenoh::Bytes::serialize(attachment);
query->reply(query->get_keyexpr(), payload, std::move(options));

return v1::UStatus();
}

v1::UStatus ZenohUTransport::sendPublishNotification_(
const std::string& zenoh_key, const std::string& payload,
const v1::UAttributes& attributes) {
Expand Down Expand Up @@ -371,76 +244,20 @@ v1::UStatus ZenohUTransport::sendImpl(const v1::UMessage& message) {
attributes.source(), attributes.sink());
}

switch (attributes.type()) {
case v1::UMessageType::UMESSAGE_TYPE_PUBLISH: {
return sendPublishNotification_(zenoh_key, payload, attributes);
}
case v1::UMessageType::UMESSAGE_TYPE_NOTIFICATION: {
return sendPublishNotification_(zenoh_key, payload, attributes);
}
case v1::UMessageType::UMESSAGE_TYPE_REQUEST: {
return sendRequest_(zenoh_key, payload, attributes);
}
case v1::UMessageType::UMESSAGE_TYPE_RESPONSE: {
return sendResponse_(payload, attributes);
}
// These sentinel values come from the protobuf compiler.
// They are illegal for the enum, but cause linting problems.
// In order to suppress the linting error, they need to
// be included in the switch-case statement.
// It is deemed acceptable to use an exception here because
// it is in the sending code. An exception would not be
// acceptable in receiving code. The correct strategy wopuld be
// to drop the message.
case v1::UMessageType::UMessageType_INT_MIN_SENTINEL_DO_NOT_USE_:
case v1::UMessageType::UMessageType_INT_MAX_SENTINEL_DO_NOT_USE_:
throw std::runtime_error(
"Sentinel values detected in attribute type switch-case");
case v1::UMessageType::UMESSAGE_TYPE_UNSPECIFIED:
default: {
return uError(v1::UCode::INVALID_ARGUMENT,
"Wrong Message type in v1::UAttributes");
}
}
return v1::UStatus();
return sendPublishNotification_(zenoh_key, payload, attributes);
}

v1::UStatus ZenohUTransport::registerListenerImpl(
CallableConn&& listener, const v1::UUri& source_filter,
std::optional<v1::UUri>&& sink_filter) {
std::string zenoh_key = toZenohKeyString(getEntityUri().authority_name(),
source_filter, sink_filter);
if (!sink_filter) {
// When only a single filter is provided, this signals that the
// listener is for a pub/sub-like communication mode where then
// messages are expected to only have a source address.
registerPublishNotificationListener_(zenoh_key, listener);
} else {
// Otherwise, the filters could be for any communication mode.
// We can't use the UUri validators to determine what mode they
// are for because a) there is overlap in allowed values between
// modes and b) any filter is allowed to have wildcards present.
registerRequestListener_(zenoh_key, listener);
registerPublishNotificationListener_(zenoh_key, listener);

if (sink_filter.has_value()) {
// zenoh_key for response listener should be in revert order
std::string zenoh_response_key = toZenohKeyString(
getEntityUri().authority_name(), *sink_filter, source_filter);
registerResponseListener_(zenoh_response_key, listener);
}
}

v1::UStatus status;
status.set_code(v1::UCode::OK);
return status;
return registerPublishNotificationListener_(zenoh_key, listener);
}

void ZenohUTransport::cleanupListener(CallableConn listener) {
if (subscriber_map_.erase(listener) > 0) {
return;
}
queryable_map_.erase(listener);
subscriber_map_.erase(listener);
}

} // namespace uprotocol::transport