Skip to content

Commit

Permalink
More dispatch pipeline refactoring (#1929)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Mar 13, 2024
1 parent ab839c8 commit ce9350d
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 57 deletions.
1 change: 1 addition & 0 deletions cpp/include/Ice/ObjectAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ namespace Ice
* Get the dispatcher associated with this object adapter. This object dispatches incoming requests to the
* servants managed by this object adapter, and takes into account the servant locators.
* @return The dispatcher. This shared_ptr is never null.
* @remarks You can add this dispatcher as a servant (including default servant) in another object adapter.
*/
virtual ObjectPtr dispatcher() const noexcept = 0;

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Ice/CollocatedRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ CollocatedRequestHandler::invokeAll(OutputStream* os, int32_t requestId, int32_t

try
{
_adapter->dispatcher()->dispatch(
_adapter->dispatchPipeline()->dispatch(
request,
[self = shared_from_this()](OutgoingResponse response)
{ self->sendResponse(std::move(response)); });
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/Ice/ConnectionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace
uint8_t compress,
int32_t requestId,
int32_t invokeNum,
const ObjectAdapterPtr& adapter,
const ObjectAdapterIPtr& adapter,
const OutgoingAsyncBasePtr& outAsync,
const HeartbeatCallback& heartbeatCallback,
InputStream& stream)
Expand Down Expand Up @@ -97,7 +97,7 @@ namespace
const uint8_t _compress;
const int32_t _requestId;
const int32_t _invokeNum;
const ObjectAdapterPtr _adapter;
const ObjectAdapterIPtr _adapter;
const OutgoingAsyncBasePtr _outAsync;
const HeartbeatCallback _heartbeatCallback;
InputStream _stream;
Expand Down Expand Up @@ -580,7 +580,7 @@ Ice::ConnectionI::waitUntilFinished()
//
// Clear the OA. See bug 1673 for the details of why this is necessary.
//
_adapter = 0;
_adapter = nullptr;
}

void
Expand Down Expand Up @@ -1263,7 +1263,7 @@ Ice::ConnectionI::createProxy(const Identity& ident) const
}

void
Ice::ConnectionI::setAdapterFromAdapter(const ObjectAdapterPtr& adapter)
Ice::ConnectionI::setAdapterFromAdapter(const ObjectAdapterIPtr& adapter)
{
std::lock_guard lock(_mutex);
if (_state <= StateNotValidated || _state >= StateClosing)
Expand Down Expand Up @@ -1382,7 +1382,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
uint8_t compress = 0;
int32_t requestId = 0;
int32_t invokeNum = 0;
ObjectAdapterPtr adapter;
ObjectAdapterIPtr adapter;
OutgoingAsyncBasePtr outAsync;
HeartbeatCallback heartbeatCallback;
int dispatchCount = 0;
Expand Down Expand Up @@ -1701,7 +1701,7 @@ ConnectionI::dispatch(
uint8_t compress,
int32_t requestId,
int32_t invokeNum,
const ObjectAdapterPtr& adapter,
const ObjectAdapterIPtr& adapter,
const OutgoingAsyncBasePtr& outAsync,
const HeartbeatCallback& heartbeatCallback,
InputStream& stream)
Expand Down Expand Up @@ -3189,7 +3189,7 @@ Ice::ConnectionI::parseMessage(
int32_t& invokeNum,
int32_t& requestId,
uint8_t& compress,
ObjectAdapterPtr& adapter,
ObjectAdapterIPtr& adapter,
OutgoingAsyncBasePtr& outAsync,
HeartbeatCallback& heartbeatCallback,
int& dispatchCount)
Expand Down Expand Up @@ -3420,7 +3420,7 @@ Ice::ConnectionI::invokeAll(
int32_t invokeNum,
int32_t requestId,
uint8_t compress,
const ObjectAdapterPtr& adapter)
const ObjectAdapterIPtr& adapter)
{
//
// Note: In contrast to other private or protected methods, this
Expand All @@ -3441,7 +3441,7 @@ Ice::ConnectionI::invokeAll(
{
try
{
adapter->dispatcher()->dispatch(
adapter->dispatchPipeline()->dispatch(
request,
[self = shared_from_this(), compress](OutgoingResponse response)
{ self->sendResponse(std::move(response), compress); });
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/Ice/ConnectionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace Ice
{
class LocalException;
class ObjectAdapterI;
using ObjectAdapterIPtr = std::shared_ptr<ObjectAdapterI>;

class ConnectionI : public Connection, public IceInternal::EventHandler, public IceInternal::CancellationHandler
{
Expand Down Expand Up @@ -177,7 +178,7 @@ namespace Ice
virtual EndpointPtr getEndpoint() const noexcept; // From Connection.
virtual ObjectPrx createProxy(const Identity& ident) const; // From Connection.

void setAdapterFromAdapter(const ObjectAdapterPtr&); // From ObjectAdapterI.
void setAdapterFromAdapter(const ObjectAdapterIPtr&); // From ObjectAdapterI.

//
// Operations from EventHandler
Expand Down Expand Up @@ -208,7 +209,7 @@ namespace Ice
std::uint8_t,
std::int32_t,
std::int32_t,
const ObjectAdapterPtr&,
const ObjectAdapterIPtr&,
const IceInternal::OutgoingAsyncBasePtr&,
const HeartbeatCallback&,
Ice::InputStream&);
Expand Down Expand Up @@ -278,12 +279,12 @@ namespace Ice
std::int32_t&,
std::int32_t&,
std::uint8_t&,
ObjectAdapterPtr&,
ObjectAdapterIPtr&,
IceInternal::OutgoingAsyncBasePtr&,
HeartbeatCallback&,
int&);

void invokeAll(Ice::InputStream&, std::int32_t, std::int32_t, std::uint8_t, const ObjectAdapterPtr&);
void invokeAll(Ice::InputStream&, std::int32_t, std::int32_t, std::uint8_t, const ObjectAdapterIPtr&);

void scheduleTimeout(IceInternal::SocketOperation status);
void unscheduleTimeout(IceInternal::SocketOperation status);
Expand All @@ -307,7 +308,7 @@ namespace Ice

mutable Ice::ConnectionInfoPtr _info;

ObjectAdapterPtr _adapter;
ObjectAdapterIPtr _adapter;

const bool _hasExecutor;
const LoggerPtr _logger;
Expand Down
60 changes: 27 additions & 33 deletions cpp/src/Ice/ObjectAdapterI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ namespace
}

inline EndpointIPtr toEndpointI(const EndpointPtr& endp) { return dynamic_pointer_cast<EndpointI>(endp); }

ObjectPtr createDispatchPipeline(Ice::ObjectPtr dispatcher, const InstancePtr& instance)
{
const auto& observer = instance->initializationData().observer;
if (observer)
{
dispatcher = make_shared<ObserverMiddleware>(std::move(dispatcher), observer);
}

const auto& logger = instance->initializationData().logger;
if (logger)
{
int warningLevel =
instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1);
if (warningLevel > 0)
{
dispatcher = make_shared<LoggerMiddleware>(
std::move(dispatcher),
logger,
warningLevel,
instance->toStringMode());
}
}
return dispatcher;
}
}

string
Expand Down Expand Up @@ -518,12 +543,7 @@ Ice::ObjectAdapterI::findServantLocator(const string& prefix) const
ObjectPtr
Ice::ObjectAdapterI::dispatcher() const noexcept
{
// _dispatcher is immutable, so no need to lock _mutex. There is no need to clear _dispatcher during destroy
// because _dispatcher does not hold onto this object adapter directly. It can hold onto a communicator that holds
// onto this object adapter, but the communicator will release this refcount when it is destroyed or when the
// object adapter is destroyed.

return _dispatcher;
return _servantManager;
}

ObjectPrx
Expand Down Expand Up @@ -835,15 +855,6 @@ Ice::ObjectAdapterI::getThreadPool() const
}
}

ServantManagerPtr
Ice::ObjectAdapterI::getServantManager() const
{
//
// No mutex lock necessary, _servantManager is immutable.
//
return _servantManager;
}

IceInternal::ACMConfig
Ice::ObjectAdapterI::getACM() const
{
Expand Down Expand Up @@ -877,29 +888,12 @@ Ice::ObjectAdapterI::ObjectAdapterI(
_communicator(communicator),
_objectAdapterFactory(objectAdapterFactory),
_servantManager(make_shared<ServantManager>(instance, name)),
_dispatchPipeline(createDispatchPipeline(_servantManager, instance)),
_name(name),
_directCount(0),
_noConfig(noConfig),
_messageSizeMax(0)
{
_dispatcher = _servantManager;

const auto& observer = _instance->initializationData().observer;
if (observer)
{
_dispatcher = make_shared<ObserverMiddleware>(_dispatcher, observer);
}

const auto& logger = _instance->initializationData().logger;
if (logger)
{
int warningLevel =
_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1);
if (warningLevel > 0)
{
_dispatcher = make_shared<LoggerMiddleware>(_dispatcher, logger, warningLevel, _instance->toStringMode());
}
}
}

void
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/Ice/ObjectAdapterI.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ namespace Ice
void decDirectCount();

IceInternal::ThreadPoolPtr getThreadPool() const;
IceInternal::ServantManagerPtr getServantManager() const;
IceInternal::ACMConfig getACM() const;
void setAdapterOnConnection(const Ice::ConnectionIPtr&);
size_t messageSizeMax() const { return _messageSizeMax; }

// The dispatch pipeline is the dispatcher plus the logger and observer middleware. They are installed in the
// dispatch pipeline only when the communicator configuration enables them.
const Ice::ObjectPtr& dispatchPipeline() const noexcept { return _dispatchPipeline; }

ObjectAdapterI(
const IceInternal::InstancePtr&,
const CommunicatorPtr&,
Expand Down Expand Up @@ -138,7 +141,11 @@ namespace Ice
IceInternal::ThreadPoolPtr _threadPool;
IceInternal::ACMConfig _acm;
const IceInternal::ServantManagerPtr _servantManager;
ObjectPtr _dispatcher;

// There is no need to clear _dispatchPipeline during destroy because _dispatchPipeline does not hold onto this
// object adapter directly. It can hold onto a communicator that holds onto this object adapter, but the
// communicator will release this refcount when it is destroyed or when the object adapter is destroyed.
const ObjectPtr _dispatchPipeline; // must be declared after _servantManager
const std::string _name;
const std::string _id;
const std::string _replicaGroupId;
Expand Down
27 changes: 23 additions & 4 deletions cpp/test/Ice/metrics/AllTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ using namespace Test;

namespace
{
string getPort(const Ice::PropertiesAdminPrx& p)
string getPort(const Ice::PropertiesAdminPrx& p, int testPort = 0)
{
ostringstream os;
os << TestHelper::getTestPort(p->ice_getCommunicator()->getProperties(), 0);
os << TestHelper::getTestPort(p->ice_getCommunicator()->getProperties(), testPort);
return os.str();
}

Expand Down Expand Up @@ -62,7 +62,12 @@ namespace
map += "Map." + m + '.';
}
props["IceMX.Metrics.View." + map + "Reject.parent"] = "Ice\\.Admin|Controller";
props["IceMX.Metrics.View." + map + "Accept.endpointPort"] = getPort(pa);

// Regular expression to match server test endpoint 0 and test endpoint 1
ostringstream os;
os << getPort(pa, 0) << "|" << getPort(pa, 1);

props["IceMX.Metrics.View." + map + "Accept.endpointPort"] = os.str();
return props;
}

Expand Down Expand Up @@ -371,6 +376,12 @@ allTests(Test::TestHelper* helper, const CommunicatorObserverIPtr& obsv)
os << protocol << " -h " << host << " -p " << port;
endpoint = os.str();
}
string forwardingEndpoint;
{
ostringstream os;
os << protocol << " -h " << host << " -p " << helper->getTestPort(1);
forwardingEndpoint = os.str();
}

MetricsPrx metrics(communicator, "metrics:" + endpoint);
bool collocated = !metrics->ice_getConnection();
Expand Down Expand Up @@ -574,7 +585,7 @@ allTests(Test::TestHelper* helper, const CommunicatorObserverIPtr& obsv)
map = toMap(clientMetrics->getMetricsView("View", timestamp)["Connection"]);
test(map["active"]->current == 1);

ControllerPrx controller(communicator, "controller:" + helper->getTestEndpoint(1));
ControllerPrx controller(communicator, "controller:" + helper->getTestEndpoint(2));
controller->hold();

map = toMap(clientMetrics->getMetricsView("View", timestamp)["Connection"]);
Expand Down Expand Up @@ -948,6 +959,14 @@ allTests(Test::TestHelper* helper, const CommunicatorObserverIPtr& obsv)

cout << "ok" << endl;

cout << "testing dispatch metrics with forwarding object adapter... " << flush;
MetricsPrx indirectMetrics(communicator, "metrics:" + forwardingEndpoint);
InvokeOp secondOp(indirectMetrics);

testAttribute(serverMetrics, serverProps, update.get(), "Dispatch", "parent", "ForwardingAdapter", secondOp);
testAttribute(serverMetrics, serverProps, update.get(), "Dispatch", "id", "metrics [op]", secondOp);
cout << "ok" << endl;

cout << "testing invocation metrics... " << flush;

props["IceMX.Metrics.View.Map.Invocation.GroupBy"] = "operation";
Expand Down
6 changes: 5 additions & 1 deletion cpp/test/Ice/metrics/Collocated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ Collocated::run(int argc, char** argv)
adapter->add(make_shared<MetricsI>(), Ice::stringToIdentity("metrics"));
// adapter->activate(); // Don't activate OA to ensure collocation is used.

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(1));
communicator->getProperties()->setProperty("ForwardingAdapter.Endpoints", getTestEndpoint(1));
Ice::ObjectAdapterPtr forwardingAdapter = communicator->createObjectAdapter("ForwardingAdapter");
forwardingAdapter->addDefaultServant(adapter->dispatcher(), "");

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(2));
Ice::ObjectAdapterPtr controllerAdapter = communicator->createObjectAdapter("ControllerAdapter");
controllerAdapter->add(make_shared<ControllerI>(adapter), Ice::stringToIdentity("controller"));
// controllerAdapter->activate(); // Don't activate OA to ensure collocation is used.
Expand Down
7 changes: 6 additions & 1 deletion cpp/test/Ice/metrics/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ Server::run(int argc, char** argv)
adapter->add(make_shared<MetricsI>(), Ice::stringToIdentity("metrics"));
adapter->activate();

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(1));
communicator->getProperties()->setProperty("ForwardingAdapter.Endpoints", getTestEndpoint(1));
Ice::ObjectAdapterPtr forwardingAdapter = communicator->createObjectAdapter("ForwardingAdapter");
forwardingAdapter->addDefaultServant(adapter->dispatcher(), "");
forwardingAdapter->activate();

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(2));
Ice::ObjectAdapterPtr controllerAdapter = communicator->createObjectAdapter("ControllerAdapter");
controllerAdapter->add(make_shared<ControllerI>(adapter), Ice::stringToIdentity("controller"));
controllerAdapter->activate();
Expand Down
7 changes: 6 additions & 1 deletion cpp/test/Ice/metrics/ServerAMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ ServerAMD::run(int argc, char** argv)
adapter->add(make_shared<MetricsI>(), Ice::stringToIdentity("metrics"));
adapter->activate();

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(1));
communicator->getProperties()->setProperty("ForwardingAdapter.Endpoints", getTestEndpoint(1));
Ice::ObjectAdapterPtr forwardingAdapter = communicator->createObjectAdapter("ForwardingAdapter");
forwardingAdapter->addDefaultServant(adapter->dispatcher(), "");
forwardingAdapter->activate();

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(2));
Ice::ObjectAdapterPtr controllerAdapter = communicator->createObjectAdapter("ControllerAdapter");
controllerAdapter->add(make_shared<ControllerI>(adapter), Ice::stringToIdentity("controller"));
controllerAdapter->activate();
Expand Down

0 comments on commit ce9350d

Please sign in to comment.