-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/1.5.8/zenoh utransport impl #65
Feature/1.5.8/zenoh utransport impl #65
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stopping part way into this review. In addition to the comments recorded, the known list of TODO items includes:
- Rework invoke_nonblock_callback call
- Finish all TODO in PR
- Implement cleanupListener
- Finish and add tests for attachmentToUAttributes/uattributesToAttachment
- Implement this for C++ https://github.com/eclipse-uprotocol/up-spec/pull/188/files
- Add all necessary tests
- Switch to zenoh 1.0
#define ZENOHCXX_ZENOHC | ||
#include <zenoh.hxx> | ||
|
||
namespace zenohc { | ||
|
||
class OwnedQuery { | ||
public: | ||
OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {} | ||
|
||
OwnedQuery(const OwnedQuery&) = delete; | ||
OwnedQuery& operator=(const OwnedQuery&) = delete; | ||
|
||
~OwnedQuery() { z_drop(&_query); } | ||
|
||
Query loan() const { return z_loan(_query); } | ||
bool check() const { return z_check(_query); } | ||
|
||
private: | ||
z_owned_query_t _query; | ||
}; | ||
|
||
using OwnedQueryPtr = std::shared_ptr<OwnedQuery>; | ||
|
||
} // namespace zenohc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be relocated into the .cpp file. It's a detail specific to this implementation and does not need to be exposed externally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is temporary workaround and it should be removed after switching to zenohcpp 1.0
: listener(listener), zenoh_key(zenoh_key) {} | ||
|
||
bool operator==(const ListenerKey& other) const { | ||
return listener == other.listener && zenoh_key == other.zenoh_key; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return listener == other.listener && zenoh_key == other.zenoh_key; | |
return (listener == other.listener) && (zenoh_key == other.zenoh_key); |
This &&
might need to be changed or an additional equality operator for CallableConn
only might need to be added, assuming the intent is for this key to be used as part of the callback cleanup process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe that's not the intent here - the operator<
sorts on both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs only if the cleanupListener will receive sink/source filters like in rust, but currently it receives only listener, and I still haven't received an answer why this is so.
using RpcCallbackMap = std::map<UuriKey, CallableConn>; | ||
using SubscriberMap = std::map<ListenerKey, zenoh::Subscriber>; | ||
using QueryableMap = std::map<ListenerKey, zenoh::Queryable>; | ||
using QueryMap = std::map<std::string, zenoh::OwnedQueryPtr>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some clarification on what these are would be helpful.
The types are also only used once and not particularly long, so the using
statements could probably be removed.
static std::string toZenohKeyString( | ||
const std::string& default_authority_name, const v1::UUri& source, | ||
const std::optional<v1::UUri>& sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be documented.
static v1::UStatus uError(v1::UCode code, std::string_view message); | ||
|
||
static std::vector<std::pair<std::string, std::string>> | ||
uattributesToAttachment(const v1::UAttributes& attributes); | ||
|
||
static v1::UAttributes attachmentToUAttributes( | ||
const zenoh::AttachmentView& attachment); | ||
|
||
static zenoh::Priority mapZenohPriority(v1::UPriority upriority); | ||
|
||
static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could probably be in the anonymous namespace of the .cpp file.
// TODO: more efficient way? | ||
res.ParseFromString(std::string(attachment_vec[1].as_string_view())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improvements should be tracked as issues.
|
||
UMessage ZenohUTransport::sampleToUMessage(const Sample& sample) { | ||
UAttributes attributes; | ||
if (sample.get_attachment().check()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is check()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the check fails, the UMessage
would have blank attributes. That doesn't seem to be valid. The return type should probably be Expected<UMessage, ...>
so errors can be returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check() means that the attachment exists in the sample
src/ZenohUTransport.cpp
Outdated
message.set_payload(payload); | ||
message.set_allocated_attributes(&attributes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_allocated_attributes
is not safe. Use assignment on the dereferenced mutable attributes:
message.set_payload(payload); | |
message.set_allocated_attributes(&attributes); | |
message.set_payload(std::move(payload)); | |
(*message.mutable_attributes()) = std::move(attributes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was already fixed in my PR
session_(expect<Session>(open( | ||
std::move(expect(config_from_file(configFile.string().c_str())))))) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be easier to read as a function in the anonymous namespace.
session_(expect<Session>(open( | |
std::move(expect(config_from_file(configFile.string().c_str())))))) {} | |
session_(openSession(configFile)) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the error condition for the expect<Session>
checked? That error should probably be an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"expect" raises the exception
Most of the basic functionality is present, but there are still several TODO items remaining: * Rework invoke_nonblock_callback call * Finish all TODO comments in code files * Implement cleanupListener * Add and finish tests * Fix anything that fails tests * Switch to Zenoh 1.0 when available
c53133f
to
3975338
Compare
* Remove use to gtest outside of test tree. * Address error message from zenohcpp library. * Switch to our temporary zenohcpp conan recipe.
3975338
to
cefbca4
Compare
This is an early draft of the zenoh utransport implementation. It is known to contain bugs and not be fully functional. We plan on merging this once eclipse-uprotocol/up-cpp#240 has been merged. All existing feedback will be captured as bugs to be addressed after the merge. This will allow multiple contributors to proceed in parallel using this code as a foundation. |
attributes = attachmentToUAttributes(query.get_attachment()); | ||
} | ||
auto id_str = serializer::uuid::AsString().serialize(attributes.id()); | ||
std::unique_lock<std::mutex> lock(query_map_mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to take the data structs that need thread safety, and turn them into classes holding the map or whatever and the mutex, and having accesor methods to do the locking? This style would make the main code much more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, but before we should solve the question about cleanupListener signature, because if it will be as is (but not as rust one) we should rework these internals types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sashacmc - I'll summarize what we discussed on cleanupListener
over slack here so its available for everyone.
cleanupListener()
is not a direct equivalent to unregisterListener()
in other language libraries. Since we are using connection objects to represent listener callbacks, by the time cleanupListener()
is called the connection is already broken. Calling a disconnected handle from the transport side will do nothing.
The purpose of cleanupListener()
is purely informational - it exists to inform the transport implementation that a connection to a listener / callback function has been broken and that, depending on the details of the transport's implementation, it may need to schedule a cleanup operation. Since the connection to the callback function is already broken, this cleanup does not need to occur immediately. It can be deferred if that makes the cleanup easier.
Additionally, while the listener handle parameter passed to cleanupListener()
can be compared against other listener handles to find matching handles, that is not strictly necessary. The CallerHandle
class used there has both an isConnected()
method and a boolean conversion operator that will return false if the handle is no longer connected. A transport implementation could just remove all unconnected handles after cleanupListener()
is called.
if (auto resp_callback_it = rpc_callback_map_.find(source_str); | ||
resp_callback_it == rpc_callback_map_.end()) { | ||
return uError(UCode::UNAVAILABLE, "failed to find UUID"); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In relation to my prior comment about making a collection of thread safe data struct wrappers, this code shows the error object being created inside a locked region when it really shouldn't be. It seems better to me to have a thread-safe RpcCallbackMap class with an accessor like "[[nodiscard]] bool find(CallableConn& thing_to_capture)".
std::string data; | ||
attributes.SerializeToString(&data); | ||
|
||
res.push_back(std::make_pair("", version)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be emplace_back instead of push_back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
Updating up-cpp version, rewriting pub/sub test to use L2 APIs, updating the transport to use the revised up-cpp interface from up-cpp#240, and fixing some general bugs. Also adds a zenoh config json for use with the tests.
057e129
to
dab0902
Compare
2cc0f8b
to
78de5e5
Compare
The pre-release up-cpp has critical bugfixes required for this code to build. Also makes sure the test configs are captured in the build artifacts so tests can run in CI.
We know that the current state of this implementaiton does not fully work. Making this change is a compromise - it keeps the tests active and running so we can see the output as we work on resolving issues, but does not prevent us from merging early, incomplete code. Once this implementation is more stable, we will re-enable this check
78de5e5
to
db76fd6
Compare
Fix unittest and code cleanup: * fix some notes from #65 * implement cleanupListener * change listener storing key * fix lambda transmission to zenoh
With patches applied to #54