Skip to content

Commit

Permalink
Refactor dispatch pipeline in C++ (#1923)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Mar 13, 2024
1 parent 94ed832 commit ab839c8
Show file tree
Hide file tree
Showing 48 changed files with 1,461 additions and 1,407 deletions.
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
"unordered_map": "cpp",
"variant": "cpp",
"algorithm": "cpp",
"xtree": "cpp"
"xtree": "cpp",
"execution": "cpp"
},
"C_Cpp.default.cppStandard": "c++20",
"editor.formatOnSave": true
}
87 changes: 87 additions & 0 deletions cpp/include/Ice/AsyncResponseHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef ICE_ASYNC_RESPONSE_HANDLER_H
#define ICE_ASYNC_RESPONSE_HANDLER_H

#include "Current.h"
#include "OutgoingResponse.h"
#include "LocalException.h"

#include <atomic>

namespace IceInternal
{
// This class helps with the implementation of the AMD response and exception callbacks. It allows the dispatch
// thread and these two callbacks to share the same sendResponse and Current objects, and ensures sendResponse is
// called exactly once.
class AsyncResponseHandler final
{
public:
// This class typically holds a _copy_ of the incoming request current object.
AsyncResponseHandler(std::function<void(Ice::OutgoingResponse)> sendResponse, Ice::Current current)
: _sendResponse(std::move(sendResponse)),
_current(std::move(current))
{
}

void sendEmptyResponse() noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(makeEmptyOutgoingResponse(_current));
}
// else we ignore this call.
}

void sendResponse(Ice::MarshaledResult marshaledResult) noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(Ice::OutgoingResponse{marshaledResult.outputStream(), _current});
}
// else we ignore this call.
}

void sendResponse(bool ok, const std::pair<const uint8_t*, const uint8_t*>& encaps) noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(makeOutgoingResponse(ok, encaps, _current));
}
// else we ignore this call.
}

void sendResponse(
std::function<void(Ice::OutputStream*)> marshal,
Ice::FormatType format = Ice::FormatType::DefaultFormat) noexcept
{
// It is critical to only call the _sendResponse function only once. Calling it multiple times results in an
// incorrect dispatch count.
if (!_responseSent.test_and_set())
{
_sendResponse(makeOutgoingResponse(std::move(marshal), _current, format));
}
// else we ignore this call.
}

void sendException(std::exception_ptr ex) noexcept
{
if (!_responseSent.test_and_set())
{
_sendResponse(makeOutgoingResponse(ex, _current));
}
// else we ignore this call.
}

const Ice::Current& current() const noexcept { return _current; }

private:
const std::function<void(Ice::OutgoingResponse)> _sendResponse;
const Ice::Current _current;
std::atomic_flag _responseSent = ATOMIC_FLAG_INIT;
};
}

#endif
121 changes: 0 additions & 121 deletions cpp/include/Ice/Incoming.h

This file was deleted.

77 changes: 77 additions & 0 deletions cpp/include/Ice/IncomingRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef ICE_INCOMING_REQUEST_H
#define ICE_INCOMING_REQUEST_H

#include "ConnectionF.h"
#include "Current.h"
#include "ObjectAdapterF.h"

namespace Ice
{
class InputStream;

/**
* Represent a request received by a connection. It's the argument to the dispatch function on Object.
* @remarks IncomingRequest is neither copyable nor movable. It can be used only on the dispatch thread.
* @see Object::dispatch
* \headerfile Ice/Ice.h
*/
class ICE_API IncomingRequest final
{
public:
/**
* Construct an IncomingRequest object.
* @param requestId The request ID. It's 0 for oneway requests.
* @param connection The connection that received the request. It's null for collocated invocations.
* @param adapter The object adapter to set in Current.
* @param inputStream The input stream buffer over the incoming Ice protocol request message. The stream is
* positioned at the beginning of the request header - the next data to read is the identity of the target.
* @remarks This constructor reads the request header from inputStream. When it completes, the input stream is
* positioned at the beginning of encapsulation carried by the request.
*/
IncomingRequest(
int32_t requestId,
ConnectionPtr connection,
ObjectAdapterPtr adapter,
InputStream& inputStream);

IncomingRequest(const IncomingRequest&) = delete;
IncomingRequest(IncomingRequest&&) noexcept = delete;
IncomingRequest& operator=(const IncomingRequest&) = delete;
IncomingRequest& operator=(IncomingRequest&&) = delete;

/**
* Get the current object of the request.
* @return A reference to the current object of the request.
*/
Current& current() noexcept { return _current; }

/**
* Get the current object of the request.
* @return A const reference to the current object of the request.
*/
const Ice::Current& current() const noexcept { return _current; }

/**
* Get the input stream buffer of the request.
* @return A reference to the input stream buffer.
*/
InputStream& inputStream() noexcept { return _inputStream; }

/**
* Get the number of bytes in the request.
* @return The number of bytes in the request. These are all the bytes starting with the identity of the target.
*/
std::int32_t size() const { return _requestSize; }

private:
InputStream& _inputStream;
Current _current;
std::int32_t _requestSize;
};
}

#endif
39 changes: 0 additions & 39 deletions cpp/include/Ice/LocalException.h
Original file line number Diff line number Diff line change
Expand Up @@ -3569,45 +3569,6 @@ namespace Ice
*/
ICE_MEMBER(ICE_API) virtual void ice_print(::std::ostream& stream) const override;
};

/**
* Indicates the application attempted to call the callbacks for an async dispatch more than once, or called an
* async dispatch callback after throwing an exception from the dispatch thread. \headerfile Ice/Ice.h
*/
class ICE_CLASS(ICE_API) ResponseSentException : public LocalExceptionHelper<ResponseSentException, LocalException>
{
public:
ICE_MEMBER(ICE_API) virtual ~ResponseSentException();

ResponseSentException(const ResponseSentException&) = default;

/**
* The file and line number are required for all local exceptions.
* @param file The file name in which the exception was raised, typically __FILE__.
* @param line The line number at which the exception was raised, typically __LINE__.
*/
ResponseSentException(const char* file, int line)
: LocalExceptionHelper<ResponseSentException, LocalException>(file, line)
{
}

/**
* Obtains a tuple containing all of the exception's data members.
* @return The data members in a tuple.
*/
std::tuple<> ice_tuple() const { return std::tie(); }

/**
* Obtains the Slice type ID of this exception.
* @return The fully-scoped type ID.
*/
ICE_MEMBER(ICE_API) static ::std::string_view ice_staticId();
/**
* Prints this exception to the given stream.
* @param stream The target stream.
*/
ICE_MEMBER(ICE_API) virtual void ice_print(::std::ostream& stream) const override;
};
}

/// \cond STREAM
Expand Down
13 changes: 3 additions & 10 deletions cpp/include/Ice/MarshaledResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ namespace Ice
MarshaledResult& operator=(const MarshaledResult&) = delete;
MarshaledResult& operator=(MarshaledResult&&);

protected:
/// \cond INTERNAL

OutputStream& outputStream() noexcept { return _ostr; }

protected:
/**
* The constructor requires the Current object that was passed to the servant.
*/
Expand All @@ -43,15 +45,6 @@ namespace Ice
/** The output stream used to marshal the results. */
OutputStream _ostr;

private:
friend class IceInternal::Incoming;

/**
* Swaps the output stream of this object with the supplied output stream.
* @param other The output stream to swap with.
*/
void swap(OutputStream& other);

/// \endcond
};
}
Expand Down
Loading

0 comments on commit ab839c8

Please sign in to comment.