Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dispatch pipeline in C++ #1923

Merged
merged 11 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
"unordered_map": "cpp",
"variant": "cpp",
"algorithm": "cpp",
"xtree": "cpp"
"xtree": "cpp",
"execution": "cpp"
},
"C_Cpp.default.cppStandard": "c++20",
"editor.formatOnSave": true
}
87 changes: 87 additions & 0 deletions cpp/include/Ice/AsyncResponseHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef ICE_ASYNC_RESPONSE_HANDLER_H
#define ICE_ASYNC_RESPONSE_HANDLER_H

#include "Current.h"
#include "OutgoingResponse.h"
#include "LocalException.h"

#include <atomic>

namespace IceInternal
{
// This class helps with the implementation of the AMD response and exception callbacks. It allows the dispatch
// thread and these two callbacks to share the same sendResponse and Current objects, and ensures sendResponse is
// called exactly once.
class AsyncResponseHandler final
{
public:
// This class typically holds a _copy_ of the incoming request current object.
AsyncResponseHandler(std::function<void(Ice::OutgoingResponse)> sendResponse, Ice::Current current)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably miss something but I don't clearly understand why it's useful. Is it only about ensuring that we don't send twice the response? Could we have a internal flag on OutgoingResponse for this purpose instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a internal flag on OutgoingResponse for this purpose instead?

How would this work?

OutgoingResponse is a value that each thread can construct independently. Your response callback and exception callback could each construct an OutgoingResponse and pass it to sendResponse.

: _sendResponse(std::move(sendResponse)),
_current(std::move(current))
{
}

void sendEmptyResponse() noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(makeEmptyOutgoingResponse(_current));
}
// else we ignore this call.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should we debug assert these to make sure this never happens since it's otherwise silent.

}

void sendResponse(Ice::MarshaledResult marshaledResult) noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(Ice::OutgoingResponse{marshaledResult.outputStream(), _current});
}
// else we ignore this call.
}

void sendResponse(bool ok, const std::pair<const uint8_t*, const uint8_t*>& encaps) noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(makeOutgoingResponse(ok, encaps, _current));
}
// else we ignore this call.
}

void sendResponse(
std::function<void(Ice::OutputStream*)> marshal,
Ice::FormatType format = Ice::FormatType::DefaultFormat) noexcept
{
// It is critical to only call the _sendResponse function only once. Calling it multiple times results in an
// incorrect dispatch count.
if (!_responseSent.test_and_set())
{
_sendResponse(makeOutgoingResponse(std::move(marshal), _current, format));
}
// else we ignore this call.
}

void sendException(std::exception_ptr ex) noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(makeOutgoingResponse(ex, _current));
}
// else we ignore this call.
}

const Ice::Current& current() const noexcept { return _current; }

private:
const std::function<void(Ice::OutgoingResponse)> _sendResponse;
const Ice::Current _current;
std::atomic_flag _responseSent = ATOMIC_FLAG_INIT;
};
}

#endif
121 changes: 0 additions & 121 deletions cpp/include/Ice/Incoming.h

This file was deleted.

71 changes: 71 additions & 0 deletions cpp/include/Ice/IncomingRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef ICE_INCOMING_REQUEST_H
#define ICE_INCOMING_REQUEST_H

#include "ConnectionF.h"
#include "Current.h"
#include "ObjectAdapterF.h"

namespace Ice
{
class InputStream;

/**
* Represent a request received by a connection. It's the argument to the dispatch function on Object.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Represent a request received by a connection. It's the argument to the dispatch function on Object.
* Represents a request received by a connection. It's the argument to the dispatch function on Object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually use this style (no s) for every other doc-comment, and I'd rather not update them all.

* @remarks IncomingRequest is neither copyable nor movable. It can be used only on the dispatch thread.
* @see Object::dispatch
* \headerfile Ice/Ice.h
*/
class ICE_API IncomingRequest final
{
public:
/**
* Construct an IncomingRequest object.
* @param requestId The request ID. It's 0 for oneway requests.
* @param connection The connection that received the request. It's null for collocated invocations.
* @param adapter The object adapter to set in Current.
* @param inputStream The input stream buffer over the incoming Ice protocol request message. The stream is
* positioned at the beginning of the request header - the next data to read is the identity of the target.
*/
IncomingRequest(
bernardnormier marked this conversation as resolved.
Show resolved Hide resolved
int32_t requestId,
ConnectionPtr connection,
ObjectAdapterPtr adapter,
InputStream& inputStream);

IncomingRequest(const IncomingRequest&) = delete;
IncomingRequest(IncomingRequest&&) noexcept = delete;
IncomingRequest& operator=(const IncomingRequest&) = delete;
IncomingRequest& operator=(IncomingRequest&&) = delete;

/**
* Return the current object of the request.
*/
Current& current() noexcept { return _current; }
bernardnormier marked this conversation as resolved.
Show resolved Hide resolved

/**
* Return the current object of the request.
*/
const Ice::Current& current() const noexcept { return _current; }

/**
* Return the input stream buffer of the request.
*/
InputStream& inputStream() noexcept { return _inputStream; }

/**
* Return the number of bytes in the request. These are all the bytes starting with the identity of the target.
*/
std::int32_t size() const { return _requestSize; }

private:
InputStream& _inputStream;
Current _current;
std::int32_t _requestSize;
};
}

#endif
39 changes: 0 additions & 39 deletions cpp/include/Ice/LocalException.h
Original file line number Diff line number Diff line change
Expand Up @@ -3569,45 +3569,6 @@ namespace Ice
*/
ICE_MEMBER(ICE_API) virtual void ice_print(::std::ostream& stream) const override;
};

/**
* Indicates the application attempted to call the callbacks for an async dispatch more than once, or called an
* async dispatch callback after throwing an exception from the dispatch thread. \headerfile Ice/Ice.h
*/
class ICE_CLASS(ICE_API) ResponseSentException : public LocalExceptionHelper<ResponseSentException, LocalException>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this exception. If you attempt to call twice an AMD callback, the second call will be no-op. I want the AMD callbacks to be logically noexcept, just like sendResponse.

{
public:
ICE_MEMBER(ICE_API) virtual ~ResponseSentException();

ResponseSentException(const ResponseSentException&) = default;

/**
* The file and line number are required for all local exceptions.
* @param file The file name in which the exception was raised, typically __FILE__.
* @param line The line number at which the exception was raised, typically __LINE__.
*/
ResponseSentException(const char* file, int line)
: LocalExceptionHelper<ResponseSentException, LocalException>(file, line)
{
}

/**
* Obtains a tuple containing all of the exception's data members.
* @return The data members in a tuple.
*/
std::tuple<> ice_tuple() const { return std::tie(); }

/**
* Obtains the Slice type ID of this exception.
* @return The fully-scoped type ID.
*/
ICE_MEMBER(ICE_API) static ::std::string_view ice_staticId();
/**
* Prints this exception to the given stream.
* @param stream The target stream.
*/
ICE_MEMBER(ICE_API) virtual void ice_print(::std::ostream& stream) const override;
};
}

/// \cond STREAM
Expand Down
13 changes: 3 additions & 10 deletions cpp/include/Ice/MarshaledResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ namespace Ice
MarshaledResult& operator=(const MarshaledResult&) = delete;
MarshaledResult& operator=(MarshaledResult&&);

protected:
/// \cond INTERNAL

OutputStream& outputStream() noexcept { return _ostr; }

protected:
/**
* The constructor requires the Current object that was passed to the servant.
*/
Expand All @@ -43,15 +45,6 @@ namespace Ice
/** The output stream used to marshal the results. */
OutputStream _ostr;

private:
friend class IceInternal::Incoming;

/**
* Swaps the output stream of this object with the supplied output stream.
* @param other The output stream to swap with.
*/
void swap(OutputStream& other);

/// \endcond
};
}
Expand Down
Loading