Skip to content

Commit

Permalink
Install middleware in ObjectAdapter (C++) (#2283)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Jun 11, 2024
1 parent 3145575 commit 674707b
Show file tree
Hide file tree
Showing 16 changed files with 491 additions and 45 deletions.
19 changes: 14 additions & 5 deletions cpp/include/Ice/ObjectAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ namespace Ice
*/
virtual void destroy() noexcept = 0;

/**
* Add a middleware to the dispatch pipeline of this object adapter.
* @param middlewareFactory The middleware factory that creates the new middleware when this object adapter
* creates its dispatch pipeline. A middleware factory is a function that takes an ObjectPtr (the next element
* in the dispatch pipeline) and returns a new ObjectPtr (the middleware you want to install in the pipeline).
* @return This object adapter.
* @remark All middleware must be installed before the first dispatch.
* @remark The middleware are executed in the order they are installed.
*/
virtual ObjectAdapterPtr use(std::function<ObjectPtr(ObjectPtr)> middlewareFactory) = 0;

/**
* Add a servant to this object adapter's Active Servant Map. Note that one servant can implement several Ice
* objects by registering the servant with multiple identities. Adding a servant with an identity that is in the
Expand Down Expand Up @@ -351,12 +362,10 @@ namespace Ice
virtual ObjectPtr findDefaultServant(const std::string& category) const = 0;

/**
* Get the dispatcher associated with this object adapter. This object dispatches incoming requests to the
* servants managed by this object adapter, and takes into account the servant locators.
* @return The dispatcher. This shared_ptr is never null.
* @remarks You can add this dispatcher as a servant (including default servant) in another object adapter.
* Get the dispatch pipeline of this object adapter.
* @return The dispatch pipeline. This shared_ptr is never null.
*/
virtual ObjectPtr dispatcher() const noexcept = 0;
virtual const ObjectPtr& dispatchPipeline() const noexcept = 0;

/**
* Create a proxy for the object with the given identity. If this object adapter is configured with an adapter
Expand Down
17 changes: 17 additions & 0 deletions cpp/msbuild/ice.test.sln
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,13 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "server", "..\test\Ice\inact
{C7223CC8-0AAA-470B-ACB3-12B9DE75525C} = {C7223CC8-0AAA-470B-ACB3-12B9DE75525C}
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "middleware", "middleware", "{12BAF98A-A6A5-413D-9937-53BEC5256653}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client", "..\test\Ice\middleware\msbuild\client.vcxproj", "{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}"
ProjectSection(ProjectDependencies) = postProject
{C7223CC8-0AAA-470B-ACB3-12B9DE75525C} = {C7223CC8-0AAA-470B-ACB3-12B9DE75525C}
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Win32 = Debug|Win32
Expand Down Expand Up @@ -2648,6 +2655,14 @@ Global
{B71C77B9-6346-4762-9A7B-53CA6E8BB126}.Release|Win32.Build.0 = Release|Win32
{B71C77B9-6346-4762-9A7B-53CA6E8BB126}.Release|x64.ActiveCfg = Release|x64
{B71C77B9-6346-4762-9A7B-53CA6E8BB126}.Release|x64.Build.0 = Release|x64
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Debug|Win32.ActiveCfg = Debug|Win32
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Debug|Win32.Build.0 = Debug|Win32
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Debug|x64.ActiveCfg = Debug|x64
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Debug|x64.Build.0 = Debug|x64
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Release|Win32.ActiveCfg = Release|Win32
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Release|Win32.Build.0 = Release|Win32
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Release|x64.ActiveCfg = Release|x64
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -2927,6 +2942,8 @@ Global
{1B5F95AB-2CFF-4105-9091-D7461170C00E} = {2CAF9731-CB18-498C-A3EF-24F3D8A334AC}
{81828734-78F8-4C4B-8D69-BF548A53DA98} = {1B5F95AB-2CFF-4105-9091-D7461170C00E}
{B71C77B9-6346-4762-9A7B-53CA6E8BB126} = {1B5F95AB-2CFF-4105-9091-D7461170C00E}
{12BAF98A-A6A5-413D-9937-53BEC5256653} = {2CAF9731-CB18-498C-A3EF-24F3D8A334AC}
{A9DDEB21-4446-4B53-AEFF-FF0E8C262E95} = {12BAF98A-A6A5-413D-9937-53BEC5256653}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E6FDB68A-64BA-4577-ABCD-40A01257F8AB}
Expand Down
73 changes: 45 additions & 28 deletions cpp/src/Ice/ObjectAdapterI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,6 @@ namespace
}

inline EndpointIPtr toEndpointI(const EndpointPtr& endp) { return dynamic_pointer_cast<EndpointI>(endp); }

ObjectPtr createDispatchPipeline(Ice::ObjectPtr dispatcher, const InstancePtr& instance)
{
const auto& observer = instance->initializationData().observer;
if (observer)
{
dispatcher = make_shared<ObserverMiddleware>(std::move(dispatcher), observer);
}

const auto& logger = instance->initializationData().logger;
if (logger)
{
int warningLevel = instance->initializationData().properties->getIcePropertyAsInt("Ice.Warn.Dispatch");
if (warningLevel > 0)
{
dispatcher = make_shared<LoggerMiddleware>(
std::move(dispatcher),
logger,
warningLevel,
instance->toStringMode());
}
}
return dispatcher;
}
}

string
Expand Down Expand Up @@ -376,6 +352,19 @@ Ice::ObjectAdapterI::destroy() noexcept
}
}

ObjectAdapterPtr
Ice::ObjectAdapterI::use(function<ObjectPtr(ObjectPtr)> middlewareFactory)
{
// This code is not thread-safe and is not supposed to be.
if (_dispatchPipeline)
{
throw InitializationException{__FILE__, __LINE__, "all middleware must be installed before the first dispatch"};
}

_middlewareFactoryStack.push(std::move(middlewareFactory));
return shared_from_this();
}

ObjectPrx
Ice::ObjectAdapterI::add(const ObjectPtr& object, const Identity& ident)
{
Expand Down Expand Up @@ -538,10 +527,20 @@ Ice::ObjectAdapterI::findServantLocator(const string& prefix) const
return _servantManager->findServantLocator(prefix);
}

ObjectPtr
Ice::ObjectAdapterI::dispatcher() const noexcept
const ObjectPtr&
Ice::ObjectAdapterI::dispatchPipeline() const noexcept
{
return _servantManager;
lock_guard lock(_mutex);
if (!_dispatchPipeline)
{
_dispatchPipeline = _servantManager;
while (!_middlewareFactoryStack.empty())
{
_dispatchPipeline = _middlewareFactoryStack.top()(std::move(_dispatchPipeline));
_middlewareFactoryStack.pop();
}
}
return _dispatchPipeline;
}

ObjectPrx
Expand Down Expand Up @@ -878,7 +877,6 @@ Ice::ObjectAdapterI::ObjectAdapterI(
_communicator(communicator),
_objectAdapterFactory(objectAdapterFactory),
_servantManager(make_shared<ServantManager>(instance, name)),
_dispatchPipeline(createDispatchPipeline(_servantManager, instance)),
_name(name),
_directCount(0),
_noConfig(noConfig),
Expand All @@ -901,6 +899,25 @@ Ice::ObjectAdapterI::ObjectAdapterI(
void
Ice::ObjectAdapterI::initialize(optional<RouterPrx> router)
{
// shared_from_this() is available now and is called by `use`.

const LoggerPtr logger = _instance->initializationData().logger;
if (logger)
{
int warningLevel = _instance->initializationData().properties->getIcePropertyAsInt("Ice.Warn.Dispatch");
if (warningLevel > 0)
{
use([logger, warningLevel, toStringMode = _instance->toStringMode()](ObjectPtr next)
{ return make_shared<LoggerMiddleware>(std::move(next), logger, warningLevel, toStringMode); });
}
}

const Instrumentation::CommunicatorObserverPtr observer = _instance->initializationData().observer;
if (observer)
{
use([observer](ObjectPtr next) { return make_shared<ObserverMiddleware>(std::move(next), observer); });
}

if (_noConfig)
{
_reference = _instance->referenceFactory()->create("dummy -t", "");
Expand Down
17 changes: 8 additions & 9 deletions cpp/src/Ice/ObjectAdapterI.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <list>
#include <mutex>
#include <optional>
#include <stack>

namespace IceInternal
{
Expand All @@ -53,6 +54,8 @@ namespace Ice
bool isDeactivated() const noexcept final;
void destroy() noexcept final;

ObjectAdapterPtr use(std::function<ObjectPtr(ObjectPtr)> middlewareFactory) final;

ObjectPrx add(const ObjectPtr&, const Identity&) final;
ObjectPrx addFacet(const ObjectPtr&, const Identity&, const std::string&) final;
ObjectPrx addWithUUID(const ObjectPtr&) final;
Expand All @@ -70,7 +73,8 @@ namespace Ice
void addServantLocator(const ServantLocatorPtr&, const std::string&) final;
ServantLocatorPtr removeServantLocator(const std::string&) final;
ServantLocatorPtr findServantLocator(const std::string&) const final;
ObjectPtr dispatcher() const noexcept final;

const ObjectPtr& dispatchPipeline() const noexcept final;

ObjectPrx createProxy(const Identity&) const final;
ObjectPrx createDirectProxy(const Identity&) const final;
Expand Down Expand Up @@ -102,10 +106,6 @@ namespace Ice
void setAdapterOnConnection(const ConnectionIPtr&);
size_t messageSizeMax() const { return _messageSizeMax; }

// The dispatch pipeline is the dispatcher plus the logger and observer middleware. They are installed in the
// dispatch pipeline only when the communicator configuration enables them.
const ObjectPtr& dispatchPipeline() const noexcept { return _dispatchPipeline; }

ObjectAdapterI(
const IceInternal::InstancePtr&,
const CommunicatorPtr&,
Expand Down Expand Up @@ -151,10 +151,9 @@ namespace Ice
IceInternal::ThreadPoolPtr _threadPool;
const IceInternal::ServantManagerPtr _servantManager;

// There is no need to clear _dispatchPipeline during destroy because _dispatchPipeline does not hold onto this
// object adapter directly. It can hold onto a communicator that holds onto this object adapter, but the
// communicator will release this refcount when it is destroyed or when the object adapter is destroyed.
const ObjectPtr _dispatchPipeline; // must be declared after _servantManager
mutable ObjectPtr _dispatchPipeline;
mutable std::stack<std::function<ObjectPtr(ObjectPtr)>> _middlewareFactoryStack;

const std::string _name;
const std::string _id;
const std::string _replicaGroupId;
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/Ice/metrics/Collocated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Collocated::run(int argc, char** argv)

communicator->getProperties()->setProperty("ForwardingAdapter.Endpoints", getTestEndpoint(1));
Ice::ObjectAdapterPtr forwardingAdapter = communicator->createObjectAdapter("ForwardingAdapter");
forwardingAdapter->addDefaultServant(adapter->dispatcher(), "");
forwardingAdapter->addDefaultServant(adapter->dispatchPipeline(), "");

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(2));
Ice::ObjectAdapterPtr controllerAdapter = communicator->createObjectAdapter("ControllerAdapter");
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/Ice/metrics/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Server::run(int argc, char** argv)

communicator->getProperties()->setProperty("ForwardingAdapter.Endpoints", getTestEndpoint(1));
Ice::ObjectAdapterPtr forwardingAdapter = communicator->createObjectAdapter("ForwardingAdapter");
forwardingAdapter->addDefaultServant(adapter->dispatcher(), "");
forwardingAdapter->addDefaultServant(adapter->dispatchPipeline(), "");
forwardingAdapter->activate();

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(2));
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/Ice/metrics/ServerAMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ServerAMD::run(int argc, char** argv)

communicator->getProperties()->setProperty("ForwardingAdapter.Endpoints", getTestEndpoint(1));
Ice::ObjectAdapterPtr forwardingAdapter = communicator->createObjectAdapter("ForwardingAdapter");
forwardingAdapter->addDefaultServant(adapter->dispatcher(), "");
forwardingAdapter->addDefaultServant(adapter->dispatchPipeline(), "");
forwardingAdapter->activate();

communicator->getProperties()->setProperty("ControllerAdapter.Endpoints", getTestEndpoint(2));
Expand Down
75 changes: 75 additions & 0 deletions cpp/test/Ice/middleware/AllTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) ZeroC, Inc.

#include "TestHelper.h"
#include "TestI.h"

using namespace std;
using namespace Test;

class Middleware final : public Ice::Object
{
public:
Middleware(Ice::ObjectPtr next, string name, list<string>& inLog, list<string>& outLog)
: _next(std::move(next)),
_name(std::move(name)),
_inLog(inLog),
_outLog(outLog)
{
}

void dispatch(Ice::IncomingRequest& request, std::function<void(Ice::OutgoingResponse)> sendResponse) final
{
_inLog.push_back(_name);

_next->dispatch(
request,
[this, sendResponse = std::move(sendResponse)](Ice::OutgoingResponse response)
{
_outLog.push_back(_name);
sendResponse(std::move(response));
});
}

private:
Ice::ObjectPtr _next;
string _name;
list<string>& _inLog;
list<string>& _outLog;
};

void
testMiddlewareExecutionOrder(const Ice::CommunicatorPtr& communicator)
{
cout << "testing middleware execution order... " << flush;

// Arrange
list<string> inLog;
list<string> outLog;

Ice::ObjectAdapterPtr oa = communicator->createObjectAdapter("");
oa->add(make_shared<MyObjectI>(), Ice::Identity{"test", ""});

oa->use([&](Ice::ObjectPtr next) { return make_shared<Middleware>(std::move(next), "A", inLog, outLog); })
->use([&](Ice::ObjectPtr next) { return make_shared<Middleware>(std::move(next), "B", inLog, outLog); })
->use([&](Ice::ObjectPtr next) { return make_shared<Middleware>(std::move(next), "C", inLog, outLog); });

MyObjectPrx p(communicator, "test");

// Act
p->ice_ping();

// Assert
bool inOk = inLog == list<string>{"A", "B", "C"};
test(inOk);
bool outOk = outLog == list<string>{"C", "B", "A"};
test(outOk);

cout << "ok" << endl;
}

void
allTests(Test::TestHelper* helper)
{
Ice::CommunicatorPtr communicator = helper->communicator();
testMiddlewareExecutionOrder(communicator);
}
23 changes: 23 additions & 0 deletions cpp/test/Ice/middleware/Client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) ZeroC, Inc.

#include "Test.h"
#include "TestHelper.h"

using namespace std;
using namespace Test;

class Client : public Test::TestHelper
{
public:
void run(int, char**);
};

void
Client::run(int argc, char** argv)
{
Ice::CommunicatorHolder communicator = initialize(argc, argv);
void allTests(Test::TestHelper*);
allTests(this);
}

DEFINE_TEST(Client)
7 changes: 7 additions & 0 deletions cpp/test/Ice/middleware/Makefile.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) ZeroC, Inc. All rights reserved.
#

$(test)_client_sources = Client.cpp AllTests.cpp Test.ice TestI.cpp

tests += $(test)
11 changes: 11 additions & 0 deletions cpp/test/Ice/middleware/Test.ice
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) ZeroC, Inc.

#pragma once

module Test
{
interface MyObject
{
string getName();
}
}
11 changes: 11 additions & 0 deletions cpp/test/Ice/middleware/TestI.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) ZeroC, Inc.

#include "TestI.h"

using namespace std;

string
MyObjectI::getName(const Ice::Current&)
{
return "Foo";
}
Loading

0 comments on commit 674707b

Please sign in to comment.