diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index 2cc7dbb..38074a4 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -113,35 +113,16 @@ struct ZenohUTransport : public UTransport { static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); static v1::UMessage queryToUMessage(const zenoh::Query& query); - v1::UStatus registerRequestListener_(const std::string& zenoh_key, - CallableConn listener); - - v1::UStatus registerResponseListener_(const std::string& zenoh_key, - CallableConn listener); - v1::UStatus registerPublishNotificationListener_( const std::string& zenoh_key, CallableConn listener); - v1::UStatus sendRequest_(const std::string& zenoh_key, - const std::string& payload, - const v1::UAttributes& attributes); - - v1::UStatus sendResponse_(const std::string& payload, - const v1::UAttributes& attributes); - v1::UStatus sendPublishNotification_(const std::string& zenoh_key, const std::string& payload, const v1::UAttributes& attributes); zenoh::Session session_; - ThreadSafeMap rpc_callback_map_; - ThreadSafeMap> subscriber_map_; - - ThreadSafeMap> queryable_map_; - - ThreadSafeMap> query_map_; }; } // namespace uprotocol::transport diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index 38890b7..d733975 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -173,46 +173,6 @@ ZenohUTransport::ZenohUTransport(const v1::UUri& defaultUri, spdlog::info("ZenohUTransport init"); } -v1::UStatus ZenohUTransport::registerRequestListener_( - const std::string& zenoh_key, CallableConn listener) { - spdlog::info("registerRequestListener_: {}", zenoh_key); - - // NOTE: listener is captured by copy here so that it does not go out - // of scope when this function returns. - auto on_query = [this, listener](const zenoh::Query& query) mutable { - auto attributes = attachmentToUAttributes(query.get_attachment()); - auto id_str = - datamodel::serializer::uuid::AsString().serialize(attributes.id()); - - // TODO(sashacmc): Replace this workaround with `query.clone()` - // after zenohcpp 1.0.0-rc6 release - auto cloned_query = std::make_shared(nullptr); - z_query_clone(zenoh::detail::as_owned_c_ptr(*cloned_query), - zenoh::detail::loan(query)); - - query_map_.emplace(std::move(id_str), std::move(cloned_query)); - listener(queryToUMessage(query)); - }; - - auto on_drop = []() {}; - - auto queryable = session_.declare_queryable(zenoh_key, std::move(on_query), - std::move(on_drop)); - - queryable_map_.emplace(listener, std::move(queryable)); - - return v1::UStatus(); -} - -v1::UStatus ZenohUTransport::registerResponseListener_( - const std::string& zenoh_key, CallableConn listener) { - spdlog::info("registerResponseListener_: {}", zenoh_key); - - rpc_callback_map_.emplace(zenoh_key, listener); - - return v1::UStatus(); -} - v1::UStatus ZenohUTransport::registerPublishNotificationListener_( const std::string& zenoh_key, CallableConn listener) { spdlog::info("registerPublishNotificationListener_: {}", zenoh_key); @@ -231,81 +191,6 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_( return v1::UStatus(); } -v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key, - const std::string& payload, - const v1::UAttributes& attributes) { - spdlog::debug("sendRequest_: {}: {}", zenoh_key, payload); - zenoh::KeyExpr ke(zenoh_key); - auto ke_search = [&](const std::pair& pair) { - return zenoh::KeyExpr(pair.first).intersects(ke); - }; - - CallableConn resp_callback; - - if (auto resp_callback_opt = rpc_callback_map_.find_if(ke_search); - resp_callback_opt) { - spdlog::debug("sendRequest_: found callback for '{}'", zenoh_key); - resp_callback = *resp_callback_opt; - } else { - spdlog::error("sendRequest_: failed to find response callback for '{}'", - zenoh_key); - return uError(v1::UCode::UNAVAILABLE, - "failed to find response callback"); - } - auto on_reply = [=](const zenoh::Reply& reply) mutable { - spdlog::debug("on_reply for {}", zenoh_key); - if (reply.is_ok()) { - const auto& sample = reply.get_ok(); - spdlog::debug("resp_callback: {}", - sample.get_payload().deserialize()); - resp_callback(sampleToUMessage(sample)); - spdlog::debug("resp_callback: done"); - } else { - spdlog::error( - "on_reply got en error: {}", - reply.get_err().get_payload().deserialize()); - // TODO: error report - } - }; - - auto attachment = uattributesToAttachment(attributes); - - auto on_done = []() {}; - - try { - session_.get(zenoh_key, "", std::move(on_reply), std::move(on_done), - {.target = Z_QUERY_TARGET_BEST_MATCHING, - .payload = zenoh::Bytes::serialize(payload), - .attachment = zenoh::Bytes::serialize(attachment)}); - } catch (const zenoh::ZException& e) { - return uError(v1::UCode::INTERNAL, e.what()); - } - - return v1::UStatus(); -} - -v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload, - const v1::UAttributes& attributes) { - auto reqid_str = - datamodel::serializer::uuid::AsString().serialize(attributes.reqid()); - spdlog::debug("sendResponse_: {}: {}", reqid_str, payload); - std::shared_ptr query(nullptr); - if (auto query_opt = query_map_.find(reqid_str); query_opt) { - query = *query_opt; - } else { - spdlog::error("sendResponse_: query doesn't exist"); - return uError(v1::UCode::INTERNAL, "query doesn't exist"); - } - - spdlog::debug("sendResponse_ to query: {}", - query->get_keyexpr().as_string_view()); - auto attachment = uattributesToAttachment(attributes); - query->reply(query->get_keyexpr(), payload, - {.attachment = zenoh::Bytes::serialize(attachment)}); - - return v1::UStatus(); -} - v1::UStatus ZenohUTransport::sendPublishNotification_( const std::string& zenoh_key, const std::string& payload, const v1::UAttributes& attributes) { @@ -343,25 +228,7 @@ v1::UStatus ZenohUTransport::sendImpl(const v1::UMessage& message) { attributes.source(), attributes.sink()); } - switch (attributes.type()) { - case v1::UMessageType::UMESSAGE_TYPE_PUBLISH: { - return sendPublishNotification_(zenoh_key, payload, attributes); - } - case v1::UMessageType::UMESSAGE_TYPE_NOTIFICATION: { - return sendPublishNotification_(zenoh_key, payload, attributes); - } - case v1::UMessageType::UMESSAGE_TYPE_REQUEST: { - return sendRequest_(zenoh_key, payload, attributes); - } - case v1::UMessageType::UMESSAGE_TYPE_RESPONSE: { - return sendResponse_(payload, attributes); - } - default: { - return uError(v1::UCode::INVALID_ARGUMENT, - "Wrong Message type in v1::UAttributes"); - } - } - return v1::UStatus(); + return sendPublishNotification_(zenoh_key, payload, attributes); } v1::UStatus ZenohUTransport::registerListenerImpl( @@ -369,37 +236,12 @@ v1::UStatus ZenohUTransport::registerListenerImpl( std::optional&& sink_filter) { std::string zenoh_key = toZenohKeyString(getEntityUri().authority_name(), source_filter, sink_filter); - if (!sink_filter) { - // When only a single filter is provided, this signals that the - // listener is for a pub/sub-like communication mode where then - // messages are expected to only have a source address. - registerPublishNotificationListener_(zenoh_key, listener); - } else { - // Otherwise, the filters could be for any communication mode. - // We can't use the UUri validators to determine what mode they - // are for because a) there is overlap in allowed values between - // modes and b) any filter is allowed to have wildcards present. - registerRequestListener_(zenoh_key, listener); - registerPublishNotificationListener_(zenoh_key, listener); - - if (sink_filter.has_value()) { - // zenoh_key for response listener should be in revert order - std::string zenoh_response_key = toZenohKeyString( - getEntityUri().authority_name(), *sink_filter, source_filter); - registerResponseListener_(zenoh_response_key, listener); - } - } - v1::UStatus status; - status.set_code(v1::UCode::OK); - return status; + return registerPublishNotificationListener_(zenoh_key, listener); } void ZenohUTransport::cleanupListener(CallableConn listener) { - if (subscriber_map_.erase(listener) > 0) { - return; - } - queryable_map_.erase(listener); + subscriber_map_.erase(listener); } } // namespace uprotocol::transport