Skip to content

Commit

Permalink
Remove "-t" enforcement in C++, simplify ConnectTimeout / CloseTimeout (
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Apr 24, 2024
1 parent f89166e commit 60fbf3a
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 648 deletions.
178 changes: 66 additions & 112 deletions cpp/src/Ice/ConnectionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,38 @@ using namespace IceInternal;

namespace
{
class TimeoutCallback final : public IceUtil::TimerTask
class ConnectTimerTask : public IceUtil::TimerTask
{
public:
TimeoutCallback(Ice::ConnectionI* connection) : _connection(connection) {}
ConnectTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}

void runTimerTask() final { _connection->timedOut(); }
void runTimerTask() override
{
if (auto connection = _connection.lock())
{
connection->connectTimedOut();
}
}

private:
Ice::ConnectionI* _connection;
const weak_ptr<Ice::ConnectionI> _connection;
};

class CloseTimerTask : public IceUtil::TimerTask
{
public:
CloseTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}

void runTimerTask() override
{
if (auto connection = _connection.lock())
{
connection->closeTimedOut();
}
}

private:
const weak_ptr<Ice::ConnectionI> _connection;
};

class ExecuteUpCall final : public ExecutorWorkItem
Expand Down Expand Up @@ -391,6 +414,11 @@ Ice::ConnectionI::startAsync(

if (!initialize() || !validate())
{
if (_connectTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<ConnectTimerTask>(shared_from_this()), _connectTimeout);
}

if (connectionStartCompleted && connectionStartFailed)
{
_connectionStartCompleted = std::move(connectionStartCompleted);
Expand Down Expand Up @@ -1326,8 +1354,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)

try
{
unscheduleTimeout(current.operation);

SocketOperation writeOp = SocketOperationNone;
SocketOperation readOp = SocketOperationNone;

Expand Down Expand Up @@ -1473,7 +1499,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
// for data to read or write.
if (newOp)
{
scheduleTimeout(newOp);
_threadPool->update(shared_from_this(), current.operation, newOp);
return;
}
Expand Down Expand Up @@ -1539,11 +1564,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
}

// If the connection is not closed yet, we can schedule the read or write timeout and update the thread
// pool selector to wait for readiness of read, write or both operations.
// If the connection is not closed yet, we update the thread pool selector to wait for readiness of
// read, write or both operations.
if (_state < StateClosed)
{
scheduleTimeout(newOp);
_threadPool->update(shared_from_this(), current.operation, newOp);
}
}
Expand Down Expand Up @@ -1769,12 +1793,6 @@ ConnectionI::upcall(
void
Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
{
{
std::lock_guard lock(_mutex);
assert(_state == StateClosed);
unscheduleTimeout(static_cast<SocketOperation>(SocketOperationRead | SocketOperationWrite));
}

// If there are no callbacks to call, we don't call ioCompleted() since we're not going to call code that will
// potentially block (this avoids promoting a new leader and unecessary thread creation, especially if this is
// called on shutdown).
Expand Down Expand Up @@ -1979,24 +1997,6 @@ Ice::ConnectionI::getNativeInfo()
return _transceiver->getNativeInfo();
}

void
Ice::ConnectionI::timedOut()
{
std::lock_guard lock(_mutex);
if (_state <= StateNotValidated)
{
setState(StateClosed, make_exception_ptr(ConnectTimeoutException(__FILE__, __LINE__)));
}
else if (_state < StateClosing)
{
setState(StateClosed, make_exception_ptr(TimeoutException(__FILE__, __LINE__)));
}
else if (_state < StateClosed)
{
setState(StateClosed, make_exception_ptr(CloseTimeoutException(__FILE__, __LINE__)));
}
}

string
Ice::ConnectionI::type() const noexcept
{
Expand Down Expand Up @@ -2061,10 +2061,6 @@ Ice::ConnectionI::ConnectionI(
_logger(_instance->initializationData().logger), // Cached for better performance.
_traceLevels(_instance->traceLevels()), // Cached for better performance.
_timer(_instance->timer()), // Cached for better performance.
_writeTimeout(new TimeoutCallback(this)),
_writeTimeoutScheduled(false),
_readTimeout(new TimeoutCallback(this)),
_readTimeoutScheduled(false),
_connectTimeout(options.connectTimeout),
_closeTimeout(options.closeTimeout),
_inactivityTimeout(options.inactivityTimeout),
Expand Down Expand Up @@ -2472,6 +2468,11 @@ Ice::ConnectionI::initiateShutdown()
os.write(static_cast<uint8_t>(1)); // compression status: compression supported but not used.
os.write(headerSize); // Message size.

if (_closeTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}

OutgoingMessage message(&os, false);
if (sendMessage(message) & AsyncStatusSent)
{
Expand All @@ -2483,7 +2484,6 @@ Ice::ConnectionI::initiateShutdown()
SocketOperation op = _transceiver->closing(true, _exception);
if (op)
{
scheduleTimeout(op);
_threadPool->_register(shared_from_this(), op);
}
}
Expand Down Expand Up @@ -2530,6 +2530,28 @@ Ice::ConnectionI::idleCheck(
// else, nothing to do
}

void
Ice::ConnectionI::connectTimedOut() noexcept
{
std::lock_guard lock(_mutex);
if (_state < StateActive)
{
setState(StateClosed, make_exception_ptr(ConnectTimeoutException(__FILE__, __LINE__)));
}
// else ignore since we're already connected.
}

void
Ice::ConnectionI::closeTimedOut() noexcept
{
std::lock_guard lock(_mutex);
if (_state < StateClosed)
{
setState(StateClosed, make_exception_ptr(CloseTimeoutException(__FILE__, __LINE__)));
}
// else ignore since we're already closed.
}

void
Ice::ConnectionI::sendHeartbeat() noexcept
{
Expand Down Expand Up @@ -2627,7 +2649,6 @@ Ice::ConnectionI::initialize(SocketOperation operation)
SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
if (s != SocketOperationNone)
{
scheduleTimeout(s);
_threadPool->update(shared_from_this(), operation, s);
return false;
}
Expand Down Expand Up @@ -2674,7 +2695,6 @@ Ice::ConnectionI::validate(SocketOperation operation)
SocketOperation op = write(_writeStream);
if (op)
{
scheduleTimeout(op);
_threadPool->update(shared_from_this(), operation, op);
return false;
}
Expand Down Expand Up @@ -2703,7 +2723,6 @@ Ice::ConnectionI::validate(SocketOperation operation)
SocketOperation op = read(_readStream);
if (op)
{
scheduleTimeout(op);
_threadPool->update(shared_from_this(), operation, op);
return false;
}
Expand Down Expand Up @@ -2932,6 +2951,7 @@ Ice::ConnectionI::sendNextMessages(vector<OutgoingMessage>& callbacks)
AsyncStatus
Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
assert(_state >= StateActive);
assert(_state < StateClosed);

message.stream->i = 0; // Reset the message stream iterator before starting sending the message.
Expand Down Expand Up @@ -3058,7 +3078,6 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
#endif

_writeStream.swap(*_sendStreams.back().stream);
scheduleTimeout(op);
_threadPool->_register(shared_from_this(), op);
return AsyncStatusQueued;
}
Expand Down Expand Up @@ -3290,6 +3309,10 @@ Ice::ConnectionI::parseMessage(
SocketOperation op = _transceiver->closing(false, _exception);
if (op)
{
if (_closeTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}
return op;
}
setState(StateClosed);
Expand Down Expand Up @@ -3509,75 +3532,6 @@ Ice::ConnectionI::dispatchAll(
}
}

void
Ice::ConnectionI::scheduleTimeout(SocketOperation status)
{
int timeout;
if (_state < StateActive)
{
timeout = static_cast<int>(chrono::milliseconds(_connectTimeout).count());
}
else if (_state < StateClosingPending)
{
if (_readHeader) // No timeout for reading the header.
{
status = static_cast<SocketOperation>(status & ~SocketOperationRead);
}
timeout = _endpoint->timeout();
}
else
{
timeout = static_cast<int>(chrono::milliseconds(_closeTimeout).count());
}

if (timeout < 0)
{
return;
}

try
{
if (status & IceInternal::SocketOperationRead)
{
if (_readTimeoutScheduled)
{
_timer->cancel(_readTimeout);
}
_timer->schedule(_readTimeout, chrono::milliseconds(timeout));
_readTimeoutScheduled = true;
}

if (status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect))
{
if (_writeTimeoutScheduled)
{
_timer->cancel(_writeTimeout);
}
_timer->schedule(_writeTimeout, chrono::milliseconds(timeout));
_writeTimeoutScheduled = true;
}
}
catch (const IceUtil::Exception&)
{
assert(false);
}
}

void
Ice::ConnectionI::unscheduleTimeout(SocketOperation status)
{
if ((status & IceInternal::SocketOperationRead) && _readTimeoutScheduled)
{
_timer->cancel(_readTimeout);
_readTimeoutScheduled = false;
}
if ((status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) && _writeTimeoutScheduled)
{
_timer->cancel(_writeTimeout);
_writeTimeoutScheduled = false;
}
}

Ice::ConnectionInfoPtr
Ice::ConnectionI::initConnectionInfo() const
{
Expand Down
15 changes: 6 additions & 9 deletions cpp/src/Ice/ConnectionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ namespace Ice
std::string toString() const noexcept final; // From Connection and EventHandler.
IceInternal::NativeInfoPtr getNativeInfo() final;

void timedOut();

std::string type() const noexcept final; // From Connection.
std::int32_t timeout() const noexcept final; // From Connection.
ConnectionInfoPtr getInfo() const final; // From Connection
Expand Down Expand Up @@ -230,6 +228,12 @@ namespace Ice
void
idleCheck(const IceUtil::TimerTaskPtr& idleCheckTimerTask, const std::chrono::seconds& idleTimeout) noexcept;

/// Aborts the connection if its state is < StateActive.
void connectTimedOut() noexcept;

/// Aborts the connection if its state is < StateClosed.
void closeTimedOut() noexcept;

// TODO: there are too many functions with similar names. This is the function called by the HeartbeatTimerTask.
void sendHeartbeat() noexcept;

Expand Down Expand Up @@ -302,9 +306,6 @@ namespace Ice

void dispatchAll(Ice::InputStream&, std::int32_t, std::int32_t, std::uint8_t, const ObjectAdapterIPtr&);

void scheduleTimeout(IceInternal::SocketOperation status);
void unscheduleTimeout(IceInternal::SocketOperation status);

Ice::ConnectionInfoPtr initConnectionInfo() const;
Ice::Instrumentation::ConnectionState toConnectionState(State) const;

Expand Down Expand Up @@ -332,10 +333,6 @@ namespace Ice
const IceInternal::ThreadPoolPtr _threadPool;

const IceUtil::TimerPtr _timer;
const IceUtil::TimerTaskPtr _writeTimeout;
bool _writeTimeoutScheduled;
const IceUtil::TimerTaskPtr _readTimeout;
bool _readTimeoutScheduled;

const std::chrono::seconds _connectTimeout;
const std::chrono::seconds _closeTimeout;
Expand Down
9 changes: 1 addition & 8 deletions cpp/src/Ice/OutgoingAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,7 @@ ProxyOutgoingAsyncBase::responseImpl(bool ok, bool invoke)
void
ProxyOutgoingAsyncBase::runTimerTask()
{
if (_proxy._getReference()->getInvocationTimeout() == -2)
{
cancel(make_exception_ptr(ConnectionTimeoutException(__FILE__, __LINE__)));
}
else
{
cancel(make_exception_ptr(InvocationTimeoutException(__FILE__, __LINE__)));
}
cancel(make_exception_ptr(InvocationTimeoutException(__FILE__, __LINE__)));
}

OutgoingAsync::OutgoingAsync(ObjectPrx proxy, bool synchronous)
Expand Down
Loading

0 comments on commit 60fbf3a

Please sign in to comment.