Skip to content

Commit

Permalink
Replaced threadpool work item class with lambda function (#2083)
Browse files Browse the repository at this point in the history
  • Loading branch information
bentoi authored May 1, 2024
1 parent f26ee55 commit 409e40d
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 356 deletions.
84 changes: 31 additions & 53 deletions cpp/src/Ice/CollocatedRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,6 @@ using namespace IceInternal;

namespace
{
class ExecuteDispatchAll final : public ExecutorWorkItem
{
public:
ExecuteDispatchAll(
const OutgoingAsyncBasePtr& outAsync,
InputStream& stream,
const CollocatedRequestHandlerPtr& handler,
int32_t requestId,
int32_t dispatchCount)
: _outAsync(outAsync),
_stream(stream.instance(), currentProtocolEncoding),
_handler(handler),
_requestId(requestId),
_dispatchCount(dispatchCount)
{
_stream.swap(stream);
}

void run() final
{
if (_handler->sentAsync(_outAsync.get()))
{
_handler->dispatchAll(_stream, _requestId, _dispatchCount);
}
}

private:
OutgoingAsyncBasePtr _outAsync;
InputStream _stream;
CollocatedRequestHandlerPtr _handler;
int32_t _requestId;
int32_t _dispatchCount;
};

void fillInValue(OutputStream* os, int pos, int32_t value)
{
const byte* p = reinterpret_cast<const byte*>(&value);
Expand Down Expand Up @@ -184,34 +150,46 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba

int dispatchCount = batchRequestCount == 0 ? 1 : batchRequestCount;

//
// Make sure to hold a reference on this handler while the call is being
// dispatched. Otherwise, the handler could be deleted during the dispatch
// if a retry occurs.
//
auto self = shared_from_this();

if (!synchronous || !_response || _reference->getInvocationTimeout() > 0)
{
auto stream = make_shared<InputStream>();
is.swap(*stream);

// Don't invoke from the user thread if async or invocation timeout is set
_adapter->getThreadPool()->execute(make_shared<ExecuteDispatchAll>(
outAsync->shared_from_this(),
is,
shared_from_this(),
requestId,
dispatchCount));
_adapter->getThreadPool()->execute(
[self, outAsync = outAsync->shared_from_this(), stream, requestId, dispatchCount]()
{
if (self->sentAsync(outAsync.get()))
{
self->dispatchAll(*stream, requestId, dispatchCount);
}
},
nullptr);
}
else if (_hasExecutor)
{
_adapter->getThreadPool()->executeFromThisThread(make_shared<ExecuteDispatchAll>(
outAsync->shared_from_this(),
is,
shared_from_this(),
requestId,
dispatchCount));
auto stream = make_shared<InputStream>();
is.swap(*stream);

_adapter->getThreadPool()->executeFromThisThread(
[self, outAsync = outAsync->shared_from_this(), stream, requestId, dispatchCount]()
{
if (self->sentAsync(outAsync.get()))
{
self->dispatchAll(*stream, requestId, dispatchCount);
}
},
nullptr);
}
else // Optimization: directly call dispatchAll if there's no custom executor.
{
//
// Make sure to hold a reference on this handler while the call is being
// dispatched. Otherwise, the handler could be deleted during the dispatch
// if a retry occurs.
//

CollocatedRequestHandlerPtr self(shared_from_this());
if (sentAsync(outAsync))
{
dispatchAll(is, requestId, dispatchCount);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Ice/Communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Ice::Communicator::getServerDispatchQueue() const
void
Ice::Communicator::postToClientThreadPool(function<void()> call)
{
_instance->clientThreadPool()->execute(call);
_instance->clientThreadPool()->execute(call, nullptr);
}

::std::function<void()>
Expand Down
15 changes: 1 addition & 14 deletions cpp/src/Ice/ConnectionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,6 @@ namespace
IncomingConnectionFactoryPtr _factory;
InstancePtr _instance;
};

#if TARGET_OS_IPHONE != 0
class ExecuteFinish final : public ExecutorWorkItem
{
public:
ExecuteFinish(const IncomingConnectionFactoryPtr& factory) : _factory(factory) {}

void run() final { _factory->finish(); }

private:
const IncomingConnectionFactoryPtr _factory;
};
#endif
}

bool
Expand Down Expand Up @@ -1824,7 +1811,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
else
{
#if TARGET_OS_IPHONE != 0
_adapter->getThreadPool()->execute(make_shared<ExecuteFinish>(shared_from_this()));
_adapter->getThreadPool()->execute([self = shared_from_this()]() { self->finish(); }, nullptr);
#endif
state = StateFinished;
}
Expand Down
123 changes: 48 additions & 75 deletions cpp/src/Ice/ConnectionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,52 +72,6 @@ namespace
const weak_ptr<Ice::ConnectionI> _connection;
};

class ExecuteUpcall final : public ExecutorWorkItem
{
public:
ExecuteUpcall(
const ConnectionIPtr& connection,
function<void(ConnectionIPtr)> connectionStartCompleted,
const vector<ConnectionI::OutgoingMessage>& sentCBs,
const function<bool(InputStream&)> messageUpcall,
InputStream& messageStream)
: ExecutorWorkItem(connection),
_connection(connection),
_connectionStartCompleted(std::move(connectionStartCompleted)),
_sentCBs(sentCBs),
_messageUpcall(std::move(messageUpcall)),
_messageStream(messageStream.instance(), currentProtocolEncoding)
{
_messageStream.swap(messageStream);
}

void run() final { _connection->upcall(_connectionStartCompleted, _sentCBs, _messageUpcall, _messageStream); }

private:
const ConnectionIPtr _connection;
const function<void(Ice::ConnectionIPtr)> _connectionStartCompleted;
const vector<ConnectionI::OutgoingMessage> _sentCBs;
const function<bool(InputStream&)> _messageUpcall;
InputStream _messageStream;
};

class ExecuteFinish final : public ExecutorWorkItem
{
public:
ExecuteFinish(const Ice::ConnectionIPtr& connection, bool close)
: ExecutorWorkItem(connection),
_connection(connection),
_close(close)
{
}

void run() final { _connection->finish(_close); }

private:
const ConnectionIPtr _connection;
const bool _close;
};

//
// Class for handling Ice::Connection::begin_flushBatchRequests
//
Expand Down Expand Up @@ -838,7 +792,7 @@ Ice::ConnectionI::setCloseCallback(CloseCallback callback)
if (callback)
{
auto self = shared_from_this();
_threadPool->execute([self, callback = std::move(callback)]() { self->closeCallback(callback); });
_threadPool->execute([self, callback = std::move(callback)]() { self->closeCallback(callback); }, self);
}
}
else
Expand Down Expand Up @@ -1197,6 +1151,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
function<void(ConnectionIPtr)> connectionStartCompleted;
vector<OutgoingMessage> sentCBs;
function<bool(InputStream&)> messageUpcall;
InputStream messageStream(_instance.get(), currentProtocolEncoding);

ThreadPoolMessage<ConnectionI> msg(current, *this);
{
Expand Down Expand Up @@ -1403,7 +1358,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
// At this point, the protocol message is fully read and can therefore be decoded by parseMessage.
// parseMessage returns the operation to wait for readiness next.
newOp =
static_cast<SocketOperation>(newOp | parseMessage(upcallCount, messageUpcall, current.stream));
static_cast<SocketOperation>(newOp | parseMessage(upcallCount, messageUpcall, messageStream));
}

if (readyOp & SocketOperationWrite)
Expand Down Expand Up @@ -1476,25 +1431,42 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)

// executeFromThisThread dispatches to the correct DispatchQueue
#ifdef ICE_SWIFT
_threadPool->executeFromThisThread(make_shared<ExecuteUpcall>(
shared_from_this(),
std::move(connectionStartCompleted),
sentCBs,
std::move(messageUpcall),
current.stream));
auto stream = make_shared<InputStream>();
stream->swap(messageStream);

auto self = shared_from_this();
_threadPool->executeFromThisThread(
[self,
connectionStartCompleted = std::move(connectionStartCompleted),
sentCBs = std::move(sentCBs),
messageUpcall = std::move(messageUpcall),
stream]()
{ self->upcall(std::move(connectionStartCompleted), std::move(sentCBs), std::move(messageUpcall), *stream); },
self);
#else
if (!_hasExecutor) // Optimization, call dispatch() directly if there's no executor.
{
upcall(connectionStartCompleted, sentCBs, std::move(messageUpcall), current.stream);
upcall(std::move(connectionStartCompleted), std::move(sentCBs), std::move(messageUpcall), messageStream);
}
else
{
_threadPool->executeFromThisThread(make_shared<ExecuteUpcall>(
shared_from_this(),
std::move(connectionStartCompleted),
sentCBs,
std::move(messageUpcall),
current.stream));
auto stream = make_shared<InputStream>();
stream->swap(messageStream);

auto self = shared_from_this();
_threadPool->executeFromThisThread(
[self,
connectionStartCompleted = std::move(connectionStartCompleted),
sentCBs = std::move(sentCBs),
messageUpcall = std::move(messageUpcall),
stream]() {
self->upcall(
std::move(connectionStartCompleted),
std::move(sentCBs),
std::move(messageUpcall),
*stream);
},
self);
}
#endif
}
Expand Down Expand Up @@ -1605,17 +1577,19 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)

current.ioCompleted();

// executeFromThisThread dispatches to the correct DispatchQueue
// executeFromThisThread dispatches to the correct DispatchQueue
#ifdef ICE_SWIFT
_threadPool->executeFromThisThread(make_shared<ExecuteFinish>(shared_from_this(), close));
auto self = shared_from_this();
_threadPool->executeFromThisThread([self, close]() { self->finish(close); }, self);
#else
if (!_hasExecutor) // Optimization, call finish() directly if there's no executor.
{
finish(close);
}
else
{
_threadPool->executeFromThisThread(make_shared<ExecuteFinish>(shared_from_this(), close));
auto self = shared_from_this();
_threadPool->executeFromThisThread([self, close]() { self->finish(close); }, self);
}
#endif
}
Expand Down Expand Up @@ -3108,9 +3082,9 @@ Ice::ConnectionI::parseMessage(int32_t& upcallCount, function<bool(InputStream&)

stream.read(requestId);

upcall = [this, requestId, adapter, compress](InputStream& messageStream)
upcall = [self = shared_from_this(), requestId, adapter, compress](InputStream& messageStream)
{
dispatchAll(messageStream, requestCount, requestId, compress, adapter);
self->dispatchAll(messageStream, requestCount, requestId, compress, adapter);
return false; // the upcall will be completed once the dispatch is done.
};
++upcallCount;
Expand Down Expand Up @@ -3143,9 +3117,9 @@ Ice::ConnectionI::parseMessage(int32_t& upcallCount, function<bool(InputStream&)
throw UnmarshalOutOfBoundsException(__FILE__, __LINE__);
}

upcall = [this, requestCount, adapter, compress](InputStream& messageStream)
upcall = [self = shared_from_this(), requestCount, adapter, compress](InputStream& messageStream)
{
dispatchAll(messageStream, requestCount, requestId, compress, adapter);
self->dispatchAll(messageStream, requestCount, requestId, compress, adapter);
return false; // the upcall will be completed once the servant dispatch is done.
};
upcallCount += requestCount;
Expand Down Expand Up @@ -3226,22 +3200,21 @@ Ice::ConnectionI::parseMessage(int32_t& upcallCount, function<bool(InputStream&)
traceRecv(stream, _logger, _traceLevels);
if (_heartbeatCallback)
{
auto heartbeatCallback = _heartbeatCallback;
upcall = [this, heartbeatCallback](InputStream&)
upcall = [self = shared_from_this(), heartbeatCallback = _heartbeatCallback](InputStream&)
{
try
{
heartbeatCallback(shared_from_this());
heartbeatCallback(self);
}
catch (const std::exception& ex)
{
Error out(_instance->initializationData().logger);
out << "connection callback exception:\n" << ex << '\n' << _desc;
Error out(self->_instance->initializationData().logger);
out << "connection callback exception:\n" << ex << '\n' << self->_desc;
}
catch (...)
{
Error out(_instance->initializationData().logger);
out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc;
Error out(self->_instance->initializationData().logger);
out << "connection callback exception:\nunknown c++ exception" << '\n' << self->_desc;
}
return true; // upcall is done
};
Expand Down
Loading

0 comments on commit 409e40d

Please sign in to comment.