-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor dispatch pipeline in C++ #1923
Changes from 1 commit
1cd1e0e
f66a15e
6d7c7af
281d5d4
2af6e74
a296d14
64423eb
6d9abc5
464269a
fdfa8e1
c4618eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// | ||
// Copyright (c) ZeroC, Inc. All rights reserved. | ||
// | ||
|
||
#ifndef ICE_ASYNC_RESPONSE_HANDLER_H | ||
#define ICE_ASYNC_RESPONSE_HANDLER_H | ||
|
||
#include "Current.h" | ||
#include "OutgoingResponse.h" | ||
#include "LocalException.h" | ||
|
||
#include <atomic> | ||
|
||
namespace IceInternal | ||
{ | ||
// This class helps with the implementation of the AMD response and exception callbacks. It allows the dispatch | ||
// thread and these two callbacks to share the same sendResponse and Current objects, and ensures sendResponse is | ||
// called exactly once. | ||
class AsyncResponseHandler final | ||
{ | ||
public: | ||
// This class typically holds a _copy_ of the incoming request current object. | ||
AsyncResponseHandler(std::function<void(Ice::OutgoingResponse)> sendResponse, Ice::Current current) | ||
: _sendResponse(std::move(sendResponse)), | ||
_current(std::move(current)) | ||
{ | ||
} | ||
|
||
void sendEmptyResponse() noexcept | ||
{ | ||
if (!_responseSent.test_and_set()) | ||
{ | ||
_sendResponse(makeEmptyOutgoingResponse(_current)); | ||
} | ||
// else we ignore this call. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should we debug assert these to make sure this never happens since it's otherwise silent. |
||
} | ||
|
||
void sendResponse(Ice::MarshaledResult marshaledResult) noexcept | ||
{ | ||
if (!_responseSent.test_and_set()) | ||
{ | ||
_sendResponse(Ice::OutgoingResponse{marshaledResult.outputStream(), _current}); | ||
} | ||
// else we ignore this call. | ||
} | ||
|
||
void sendResponse(bool ok, const std::pair<const uint8_t*, const uint8_t*>& encaps) noexcept | ||
{ | ||
if (!_responseSent.test_and_set()) | ||
{ | ||
_sendResponse(makeOutgoingResponse(ok, encaps, _current)); | ||
} | ||
// else we ignore this call. | ||
} | ||
|
||
void sendResponse( | ||
std::function<void(Ice::OutputStream*)> marshal, | ||
Ice::FormatType format = Ice::FormatType::DefaultFormat) noexcept | ||
{ | ||
// It is critical to only call the _sendResponse function only once. Calling it multiple times results in an | ||
// incorrect dispatch count. | ||
if (!_responseSent.test_and_set()) | ||
{ | ||
_sendResponse(makeOutgoingResponse(std::move(marshal), _current, format)); | ||
} | ||
// else we ignore this call. | ||
} | ||
|
||
void sendException(std::exception_ptr ex) noexcept | ||
{ | ||
if (!_responseSent.test_and_set()) | ||
{ | ||
_sendResponse(makeOutgoingResponse(ex, _current)); | ||
} | ||
// else we ignore this call. | ||
} | ||
|
||
const Ice::Current& current() const noexcept { return _current; } | ||
|
||
private: | ||
const std::function<void(Ice::OutgoingResponse)> _sendResponse; | ||
const Ice::Current _current; | ||
std::atomic_flag _responseSent = ATOMIC_FLAG_INIT; | ||
}; | ||
} | ||
|
||
#endif |
This file was deleted.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,71 @@ | ||||||
// | ||||||
// Copyright (c) ZeroC, Inc. All rights reserved. | ||||||
// | ||||||
|
||||||
#ifndef ICE_INCOMING_REQUEST_H | ||||||
#define ICE_INCOMING_REQUEST_H | ||||||
|
||||||
#include "ConnectionF.h" | ||||||
#include "Current.h" | ||||||
#include "ObjectAdapterF.h" | ||||||
|
||||||
namespace Ice | ||||||
{ | ||||||
class InputStream; | ||||||
|
||||||
/** | ||||||
* Represent a request received by a connection. It's the argument to the dispatch function on Object. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually use this style (no s) for every other doc-comment, and I'd rather not update them all. |
||||||
* @remarks IncomingRequest is neither copyable nor movable. It can be used only on the dispatch thread. | ||||||
* @see Object::dispatch | ||||||
* \headerfile Ice/Ice.h | ||||||
*/ | ||||||
class ICE_API IncomingRequest final | ||||||
{ | ||||||
public: | ||||||
/** | ||||||
* Construct an IncomingRequest object. | ||||||
* @param requestId The request ID. It's 0 for oneway requests. | ||||||
* @param connection The connection that received the request. It's null for collocated invocations. | ||||||
* @param adapter The object adapter to set in Current. | ||||||
* @param inputStream The input stream buffer over the incoming Ice protocol request message. The stream is | ||||||
* positioned at the beginning of the request header - the next data to read is the identity of the target. | ||||||
*/ | ||||||
IncomingRequest( | ||||||
bernardnormier marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
int32_t requestId, | ||||||
ConnectionPtr connection, | ||||||
ObjectAdapterPtr adapter, | ||||||
InputStream& inputStream); | ||||||
|
||||||
IncomingRequest(const IncomingRequest&) = delete; | ||||||
IncomingRequest(IncomingRequest&&) noexcept = delete; | ||||||
IncomingRequest& operator=(const IncomingRequest&) = delete; | ||||||
IncomingRequest& operator=(IncomingRequest&&) = delete; | ||||||
|
||||||
/** | ||||||
* Return the current object of the request. | ||||||
*/ | ||||||
Current& current() noexcept { return _current; } | ||||||
bernardnormier marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
/** | ||||||
* Return the current object of the request. | ||||||
*/ | ||||||
const Ice::Current& current() const noexcept { return _current; } | ||||||
|
||||||
/** | ||||||
* Return the input stream buffer of the request. | ||||||
*/ | ||||||
InputStream& inputStream() noexcept { return _inputStream; } | ||||||
|
||||||
/** | ||||||
* Return the number of bytes in the request. These are all the bytes starting with the identity of the target. | ||||||
*/ | ||||||
std::int32_t size() const { return _requestSize; } | ||||||
|
||||||
private: | ||||||
InputStream& _inputStream; | ||||||
Current _current; | ||||||
std::int32_t _requestSize; | ||||||
}; | ||||||
} | ||||||
|
||||||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3569,45 +3569,6 @@ namespace Ice | |
*/ | ||
ICE_MEMBER(ICE_API) virtual void ice_print(::std::ostream& stream) const override; | ||
}; | ||
|
||
/** | ||
* Indicates the application attempted to call the callbacks for an async dispatch more than once, or called an | ||
* async dispatch callback after throwing an exception from the dispatch thread. \headerfile Ice/Ice.h | ||
*/ | ||
class ICE_CLASS(ICE_API) ResponseSentException : public LocalExceptionHelper<ResponseSentException, LocalException> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed this exception. If you attempt to call twice an AMD callback, the second call will be no-op. I want the AMD callbacks to be logically noexcept, just like sendResponse. |
||
{ | ||
public: | ||
ICE_MEMBER(ICE_API) virtual ~ResponseSentException(); | ||
|
||
ResponseSentException(const ResponseSentException&) = default; | ||
|
||
/** | ||
* The file and line number are required for all local exceptions. | ||
* @param file The file name in which the exception was raised, typically __FILE__. | ||
* @param line The line number at which the exception was raised, typically __LINE__. | ||
*/ | ||
ResponseSentException(const char* file, int line) | ||
: LocalExceptionHelper<ResponseSentException, LocalException>(file, line) | ||
{ | ||
} | ||
|
||
/** | ||
* Obtains a tuple containing all of the exception's data members. | ||
* @return The data members in a tuple. | ||
*/ | ||
std::tuple<> ice_tuple() const { return std::tie(); } | ||
|
||
/** | ||
* Obtains the Slice type ID of this exception. | ||
* @return The fully-scoped type ID. | ||
*/ | ||
ICE_MEMBER(ICE_API) static ::std::string_view ice_staticId(); | ||
/** | ||
* Prints this exception to the given stream. | ||
* @param stream The target stream. | ||
*/ | ||
ICE_MEMBER(ICE_API) virtual void ice_print(::std::ostream& stream) const override; | ||
}; | ||
} | ||
|
||
/// \cond STREAM | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably miss something but I don't clearly understand why it's useful. Is it only about ensuring that we don't send twice the response? Could we have a internal flag on
OutgoingResponse
for this purpose instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would this work?
OutgoingResponse is a value that each thread can construct independently. Your response callback and exception callback could each construct an OutgoingResponse and pass it to sendResponse.