-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor dispatch pipeline in C++ #1923
Refactor dispatch pipeline in C++ #1923
Conversation
…uest, sendResponse) on Object.
} | ||
|
||
OutputStream* | ||
Incoming::startWriteParams() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the "write response" code was moved or rewritten in OutgoingResponse.cpp.
assert(_responseHandler); | ||
if (_isTwoWay) | ||
{ | ||
_observer.reply(static_cast<int32_t>(_os.b.size() - headerSize - 4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the observer code is now in the ObserverMiddleware.
if (_isTwoWay) | ||
{ | ||
_observer.reply(static_cast<int32_t>(_os.b.size() - headerSize - 4)); | ||
_responseHandler->sendResponse(_current.requestId, &_os, _compress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sendResponse/sendNoResponse logic is now in the new sendResponse(OutgoingResponse) function implemented by ConnectionI and CollocatedRequestHandler.
} | ||
|
||
void | ||
Incoming::warning(const Exception& ex) const |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code moved to the LoggerMiddleware.
} | ||
|
||
bool | ||
Incoming::servantLocatorFinished() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The servant locator handling moved to ServantManager::dispatch.
} | ||
|
||
void | ||
Incoming::handleException(std::exception_ptr exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This largely moved to OutgoingResponse.cpp - except for the logging code (=> LoggerMiddleware) and the observer code (=> ObserverMiddleware).
} | ||
|
||
void | ||
Incoming::invoke(const ServantManagerPtr& servantManager, InputStream* stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dispatch logic moved to the IncomingRequest constructor and ServantManager::dispatch.
@@ -268,6 +258,11 @@ namespace Ice | |||
void initiateShutdown(); | |||
void sendHeartbeatNow(); | |||
|
|||
void sendResponse(OutgoingResponse, std::uint8_t compress); | |||
void sendResponse(std::int32_t, Ice::OutputStream*, std::uint8_t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 3 functions used to be the ResponseHandler. They are called by the first sendResponse; we could refactor further and merge sendResponse + sendNoResponse into the main sendResponse.
@@ -320,7 +308,6 @@ namespace Ice | |||
mutable Ice::ConnectionInfoPtr _info; | |||
|
|||
ObjectAdapterPtr _adapter; | |||
IceInternal::ServantManagerPtr _servantManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear to me why we stored and passed around the servant manager. _servantManager is an immutable field of ObjectAdapter, so we could just ask ObjectAdapter whenever we needed it. That's what this PR does for dispatcher.
* Indicates the application attempted to call the callbacks for an async dispatch more than once, or called an | ||
* async dispatch callback after throwing an exception from the dispatch thread. \headerfile Ice/Ice.h | ||
*/ | ||
class ICE_CLASS(ICE_API) ResponseSentException : public LocalExceptionHelper<ResponseSentException, LocalException> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this exception. If you attempt to call twice an AMD callback, the second call will be no-op. I want the AMD callbacks to be logically noexcept, just like sendResponse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just a few comments.
{ | ||
invokeException(requestId, current_exception(), invokeNum); // Fatal invocation exception | ||
invokeException(requestId, current_exception()); // Fatal invocation exception | ||
} | ||
|
||
_adapter->decDirectCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call decDirectCount in invokeException
implementation and again here, which doesn't seem correct. We can end up with a negative count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't change this code.
I believe it's correct, even if the style is not the best.
This _adapter->decDirectCount() "releases" the count that we "acquired" before the call to invokeAll. It's unrelated to the count acquired in the while loop.
@@ -137,7 +137,8 @@ namespace Ice | |||
IceInternal::ObjectAdapterFactoryPtr _objectAdapterFactory; | |||
IceInternal::ThreadPoolPtr _threadPool; | |||
IceInternal::ACMConfig _acm; | |||
IceInternal::ServantManagerPtr _servantManager; | |||
const IceInternal::ServantManagerPtr _servantManager; | |||
ObjectPtr _dispatcher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I update it in the constructor, so making it const is not convenient. I also plan to move the "composition root" somewhere else in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use const_cast, that's what we do in other constructors.
cpp/src/Ice/ObserverMiddleware.cpp
Outdated
void | ||
ObserverMiddleware::dispatch(Ice::IncomingRequest& request, function<void(OutgoingResponse)> sendResponse) | ||
{ | ||
auto observerPtr = _communicatorObserver->getDispatchObserver(request.current(), request.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use ObserverPtr observer = ...
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. It's actually a DispatchObserverPtr.
observerPtr->detach(); | ||
|
||
// If we get a synchronous exception, sendResponse was not called. | ||
sendResponse(std::move(response)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this preventing the logger middleware from seeing the exceptions? Where are these exceptions handled when we don't have an observer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, absolutely not. Each middleware must handle both path:
- the exception path (when an exception is thrown synchronously)
- the sendResponse path, when the response carries an exception
And OutgoingResponse carries all the information any middleware may need to log/observe (...) any response, including responses with an exception payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow the logic, here we call sendResponse
for exception that were throw synchronously. In LoggerMiddlware we don't and rethrow them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preferred style when you catch an exception synchronously is to observe it and rethrow, because that's the least amount of work.
In the observer middleware, we use another style because we need to build the OutgoingResponse when catching an exception. We build this OutgoingResponse to report the size of this response. So since we've built the response, we "return" it with sendResponse
and don't rethrow.
replyStatus = ReplyStatus::UnknownLocalException; | ||
ostringstream str; | ||
str << ex; | ||
if (IceUtilInternal::printStackTraces) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we should remove this setting, and always provide stack traces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should definitely not do that. Stack traces can provide sensitive information from the server to the client.
{ | ||
OutputStream ostr(current.adapter->getCommunicator(), Ice::currentProtocolEncoding); | ||
|
||
if (current.requestId != 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to call this method when current.requestId == 0
? I suppose we still need to construct the OutgoingResponse
even if we aren't sending it back to the client, to ensure it's available for use in the dispatch pipeline middleware. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. The response carries the reply status and error message (and more) and middleware (and dispatchers such as ServantManager) need it even for oneway request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
{ | ||
_sendResponse(makeEmptyOutgoingResponse(_current)); | ||
} | ||
// else we ignore this call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should we debug assert these to make sure this never happens since it's otherwise silent.
class InputStream; | ||
|
||
/** | ||
* Represent a request received by a connection. It's the argument to the dispatch function on Object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Represent a request received by a connection. It's the argument to the dispatch function on Object. | |
* Represents a request received by a connection. It's the argument to the dispatch function on Object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually use this style (no s) for every other doc-comment, and I'd rather not update them all.
// _dispatcher is immutable, so no need to lock _mutex. There is no need to clear _dispatcher during destroy | ||
// because _dispatcher does not hold onto this object adapter directly. It can hold onto a communicator that holds | ||
// onto this object adapter, but the communicator will release this refcount when it is destroyed or when the | ||
// object adapter is destroyed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think much of this comment belongs next to the declaration in the header file.
_replyStatus = other._replyStatus; | ||
_exceptionId = std::move(other._exceptionId); | ||
_errorMessage = std::move(other._errorMessage); | ||
_outputStream.swap(other._outputStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we implemented the move assignment operator on OuputStream could we then use the = default
implementations for some these classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and it would be less error-prone. It's easy to forget to update all these functions when you add a new field to OutgoingResponse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm approving because the implementation looks corect. But while design is closer to IceRPC and cleaner, to me it doesn't fall into the category of a simple refactoring.
It introduces public APIs that are not totally safe to use (the application can now mess around with the input/output stream of the incoming request and outgoing response that contains the reply header). Some APIs are public but it's unclear whether or not there's really a use-cases for them.
It also makes the server-side and client-side inconsistent and now that we refactored the dispatch pipeline, I don't see why we wouldn't also refactor the client-side invocation pipeline later (not in 3.8).
Should we add a demo to show how to write a middleware? Do we need to add back a test like the the dispatch interceptor test?
* Dispatch an incoming request and return the corresponding outgoing response. | ||
* @param request The incoming request. | ||
* @param sendResponse A callback that the implementation calls to return the response. sendResponse does not | ||
* throw any exception and any sendResponse wrapper must not throw any exception. sendResponse can be called by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be good to document what happens if the application doesn't respect the contract (raising exceptions and calling twice sendResponse).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With connections, it messes up with the dispatch count of the connection. Presumably, the result is the connection shutdown wouldn't wait for all outstanding dispatches (since the dispatch count is too low). I could not think of a simple way to describe this situation. It's also different for collocated invocations. When I accidentally called it twice, I was actually getting hangs at shutdown in some tests.
* response. | ||
* @param current A reference to the current object of the request. | ||
*/ | ||
OutgoingResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this constructor private since it's really not clear how the application could use it. Or we have to document that the given output stream should also contain the reply header.
Another option would be to get rid of the makeOutgoingReponse
factory methods and like the IncomingRequest
constructor is reading the reply header, the OutgoingResponse
constructor would write the reply header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this constructor should be public. The makeOutgoingResponse are helpers that the user could implement. The core functionality is this constructor.
@@ -137,7 +137,8 @@ namespace Ice | |||
IceInternal::ObjectAdapterFactoryPtr _objectAdapterFactory; | |||
IceInternal::ThreadPoolPtr _threadPool; | |||
IceInternal::ACMConfig _acm; | |||
IceInternal::ServantManagerPtr _servantManager; | |||
const IceInternal::ServantManagerPtr _servantManager; | |||
ObjectPtr _dispatcher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use const_cast, that's what we do in other constructors.
{ | ||
public: | ||
// This class typically holds a _copy_ of the incoming request current object. | ||
AsyncResponseHandler(std::function<void(Ice::OutgoingResponse)> sendResponse, Ice::Current current) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably miss something but I don't clearly understand why it's useful. Is it only about ensuring that we don't send twice the response? Could we have a internal flag on OutgoingResponse
for this purpose instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have a internal flag on OutgoingResponse for this purpose instead?
How would this work?
OutgoingResponse is a value that each thread can construct independently. Your response callback and exception callback could each construct an OutgoingResponse and pass it to sendResponse.
This PR refactor the dispatch pipeline in C++.
It replaces Incoming and ResponseHandler with an IceRPC-like dispatch pipeline. Ice's dispatcher "interface" is simply Object, with a public dispatch function.
It also introduces two internal middleware: ObserverMiddleware and LoggerMiddleware. ServantManager also becomes a dispatcher.
You'll notice that this PR doesn't change any test: the external behavior remains identical.