Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove "-t" enforcement in C++, simplify ConnectTimeout / CloseTimeout #2059

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 66 additions & 112 deletions cpp/src/Ice/ConnectionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,38 @@ using namespace IceInternal;

namespace
{
class TimeoutCallback final : public IceUtil::TimerTask
class ConnectTimerTask : public IceUtil::TimerTask
{
public:
TimeoutCallback(Ice::ConnectionI* connection) : _connection(connection) {}
ConnectTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}

void runTimerTask() final { _connection->timedOut(); }
void runTimerTask() override
{
if (auto connection = _connection.lock())
{
connection->connectTimedOut();
}
}

private:
Ice::ConnectionI* _connection;
const weak_ptr<Ice::ConnectionI> _connection;
};

class CloseTimerTask : public IceUtil::TimerTask
{
public:
CloseTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}

void runTimerTask() override
{
if (auto connection = _connection.lock())
{
connection->closeTimedOut();
}
}

private:
const weak_ptr<Ice::ConnectionI> _connection;
};

class ExecuteUpCall final : public ExecutorWorkItem
Expand Down Expand Up @@ -391,6 +414,11 @@ Ice::ConnectionI::startAsync(

if (!initialize() || !validate())
{
if (_connectTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<ConnectTimerTask>(shared_from_this()), _connectTimeout);
}

if (connectionStartCompleted && connectionStartFailed)
{
_connectionStartCompleted = std::move(connectionStartCompleted);
Expand Down Expand Up @@ -1326,8 +1354,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)

try
{
unscheduleTimeout(current.operation);

SocketOperation writeOp = SocketOperationNone;
SocketOperation readOp = SocketOperationNone;

Expand Down Expand Up @@ -1473,7 +1499,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
// for data to read or write.
if (newOp)
{
scheduleTimeout(newOp);
_threadPool->update(shared_from_this(), current.operation, newOp);
return;
}
Expand Down Expand Up @@ -1539,11 +1564,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
}

// If the connection is not closed yet, we can schedule the read or write timeout and update the thread
// pool selector to wait for readiness of read, write or both operations.
// If the connection is not closed yet, we update the thread pool selector to wait for readiness of
// read, write or both operations.
if (_state < StateClosed)
{
scheduleTimeout(newOp);
_threadPool->update(shared_from_this(), current.operation, newOp);
}
}
Expand Down Expand Up @@ -1769,12 +1793,6 @@ ConnectionI::upcall(
void
Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
{
{
std::lock_guard lock(_mutex);
assert(_state == StateClosed);
unscheduleTimeout(static_cast<SocketOperation>(SocketOperationRead | SocketOperationWrite));
}

// If there are no callbacks to call, we don't call ioCompleted() since we're not going to call code that will
// potentially block (this avoids promoting a new leader and unecessary thread creation, especially if this is
// called on shutdown).
Expand Down Expand Up @@ -1979,24 +1997,6 @@ Ice::ConnectionI::getNativeInfo()
return _transceiver->getNativeInfo();
}

void
Ice::ConnectionI::timedOut()
{
std::lock_guard lock(_mutex);
if (_state <= StateNotValidated)
{
setState(StateClosed, make_exception_ptr(ConnectTimeoutException(__FILE__, __LINE__)));
}
else if (_state < StateClosing)
{
setState(StateClosed, make_exception_ptr(TimeoutException(__FILE__, __LINE__)));
}
else if (_state < StateClosed)
{
setState(StateClosed, make_exception_ptr(CloseTimeoutException(__FILE__, __LINE__)));
}
}

string
Ice::ConnectionI::type() const noexcept
{
Expand Down Expand Up @@ -2061,10 +2061,6 @@ Ice::ConnectionI::ConnectionI(
_logger(_instance->initializationData().logger), // Cached for better performance.
_traceLevels(_instance->traceLevels()), // Cached for better performance.
_timer(_instance->timer()), // Cached for better performance.
_writeTimeout(new TimeoutCallback(this)),
_writeTimeoutScheduled(false),
_readTimeout(new TimeoutCallback(this)),
_readTimeoutScheduled(false),
_connectTimeout(options.connectTimeout),
_closeTimeout(options.closeTimeout),
_inactivityTimeout(options.inactivityTimeout),
Expand Down Expand Up @@ -2472,6 +2468,11 @@ Ice::ConnectionI::initiateShutdown()
os.write(static_cast<uint8_t>(1)); // compression status: compression supported but not used.
os.write(headerSize); // Message size.

if (_closeTimeout > chrono::seconds::zero())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to document the change of behavior, for both connect and close timeouts.

Previously, the close timeout didn't apply to the whole period of the connection closure. It applied to the period between two transport operations. So as long as there was transport activity while in the closing state, the connection wouldn't timeout. Ditto for connection establishment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Keep in mind the properties are new (compared to Ice 3.7) and the semantics match IceRPC's semantics for connect and shutdown timeout.

{
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}

OutgoingMessage message(&os, false);
if (sendMessage(message) & AsyncStatusSent)
{
Expand All @@ -2483,7 +2484,6 @@ Ice::ConnectionI::initiateShutdown()
SocketOperation op = _transceiver->closing(true, _exception);
if (op)
{
scheduleTimeout(op);
_threadPool->_register(shared_from_this(), op);
}
}
Expand Down Expand Up @@ -2530,6 +2530,28 @@ Ice::ConnectionI::idleCheck(
// else, nothing to do
}

void
Ice::ConnectionI::connectTimedOut() noexcept
{
std::lock_guard lock(_mutex);
if (_state < StateActive)
{
setState(StateClosed, make_exception_ptr(ConnectTimeoutException(__FILE__, __LINE__)));
}
// else ignore since we're already connected.
}

void
Ice::ConnectionI::closeTimedOut() noexcept
{
std::lock_guard lock(_mutex);
if (_state < StateClosed)
{
setState(StateClosed, make_exception_ptr(CloseTimeoutException(__FILE__, __LINE__)));
}
// else ignore since we're already closed.
}

void
Ice::ConnectionI::sendHeartbeat() noexcept
{
Expand Down Expand Up @@ -2627,7 +2649,6 @@ Ice::ConnectionI::initialize(SocketOperation operation)
SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
if (s != SocketOperationNone)
{
scheduleTimeout(s);
_threadPool->update(shared_from_this(), operation, s);
return false;
}
Expand Down Expand Up @@ -2674,7 +2695,6 @@ Ice::ConnectionI::validate(SocketOperation operation)
SocketOperation op = write(_writeStream);
if (op)
{
scheduleTimeout(op);
_threadPool->update(shared_from_this(), operation, op);
return false;
}
Expand Down Expand Up @@ -2703,7 +2723,6 @@ Ice::ConnectionI::validate(SocketOperation operation)
SocketOperation op = read(_readStream);
if (op)
{
scheduleTimeout(op);
_threadPool->update(shared_from_this(), operation, op);
return false;
}
Expand Down Expand Up @@ -2932,6 +2951,7 @@ Ice::ConnectionI::sendNextMessages(vector<OutgoingMessage>& callbacks)
AsyncStatus
Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
assert(_state >= StateActive);
assert(_state < StateClosed);

message.stream->i = 0; // Reset the message stream iterator before starting sending the message.
Expand Down Expand Up @@ -3058,7 +3078,6 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
#endif

_writeStream.swap(*_sendStreams.back().stream);
scheduleTimeout(op);
_threadPool->_register(shared_from_this(), op);
return AsyncStatusQueued;
}
Expand Down Expand Up @@ -3290,6 +3309,10 @@ Ice::ConnectionI::parseMessage(
SocketOperation op = _transceiver->closing(false, _exception);
if (op)
{
if (_closeTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}
return op;
}
setState(StateClosed);
Expand Down Expand Up @@ -3509,75 +3532,6 @@ Ice::ConnectionI::dispatchAll(
}
}

void
Ice::ConnectionI::scheduleTimeout(SocketOperation status)
{
int timeout;
if (_state < StateActive)
{
timeout = static_cast<int>(chrono::milliseconds(_connectTimeout).count());
}
else if (_state < StateClosingPending)
{
if (_readHeader) // No timeout for reading the header.
{
status = static_cast<SocketOperation>(status & ~SocketOperationRead);
}
timeout = _endpoint->timeout();
}
else
{
timeout = static_cast<int>(chrono::milliseconds(_closeTimeout).count());
}

if (timeout < 0)
{
return;
}

try
{
if (status & IceInternal::SocketOperationRead)
{
if (_readTimeoutScheduled)
{
_timer->cancel(_readTimeout);
}
_timer->schedule(_readTimeout, chrono::milliseconds(timeout));
_readTimeoutScheduled = true;
}

if (status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect))
{
if (_writeTimeoutScheduled)
{
_timer->cancel(_writeTimeout);
}
_timer->schedule(_writeTimeout, chrono::milliseconds(timeout));
_writeTimeoutScheduled = true;
}
}
catch (const IceUtil::Exception&)
{
assert(false);
}
}

void
Ice::ConnectionI::unscheduleTimeout(SocketOperation status)
{
if ((status & IceInternal::SocketOperationRead) && _readTimeoutScheduled)
{
_timer->cancel(_readTimeout);
_readTimeoutScheduled = false;
}
if ((status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) && _writeTimeoutScheduled)
{
_timer->cancel(_writeTimeout);
_writeTimeoutScheduled = false;
}
}

Ice::ConnectionInfoPtr
Ice::ConnectionI::initConnectionInfo() const
{
Expand Down
15 changes: 6 additions & 9 deletions cpp/src/Ice/ConnectionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ namespace Ice
std::string toString() const noexcept final; // From Connection and EventHandler.
IceInternal::NativeInfoPtr getNativeInfo() final;

void timedOut();

std::string type() const noexcept final; // From Connection.
std::int32_t timeout() const noexcept final; // From Connection.
ConnectionInfoPtr getInfo() const final; // From Connection
Expand Down Expand Up @@ -230,6 +228,12 @@ namespace Ice
void
idleCheck(const IceUtil::TimerTaskPtr& idleCheckTimerTask, const std::chrono::seconds& idleTimeout) noexcept;

/// Aborts the connection if its state is < StateActive.
void connectTimedOut() noexcept;

/// Aborts the connection if its state is < StateClosed.
void closeTimedOut() noexcept;

// TODO: there are too many functions with similar names. This is the function called by the HeartbeatTimerTask.
void sendHeartbeat() noexcept;

Expand Down Expand Up @@ -302,9 +306,6 @@ namespace Ice

void dispatchAll(Ice::InputStream&, std::int32_t, std::int32_t, std::uint8_t, const ObjectAdapterIPtr&);

void scheduleTimeout(IceInternal::SocketOperation status);
void unscheduleTimeout(IceInternal::SocketOperation status);

Ice::ConnectionInfoPtr initConnectionInfo() const;
Ice::Instrumentation::ConnectionState toConnectionState(State) const;

Expand Down Expand Up @@ -332,10 +333,6 @@ namespace Ice
const IceInternal::ThreadPoolPtr _threadPool;

const IceUtil::TimerPtr _timer;
const IceUtil::TimerTaskPtr _writeTimeout;
bool _writeTimeoutScheduled;
const IceUtil::TimerTaskPtr _readTimeout;
bool _readTimeoutScheduled;

const std::chrono::seconds _connectTimeout;
const std::chrono::seconds _closeTimeout;
Expand Down
9 changes: 1 addition & 8 deletions cpp/src/Ice/OutgoingAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,7 @@ ProxyOutgoingAsyncBase::responseImpl(bool ok, bool invoke)
void
ProxyOutgoingAsyncBase::runTimerTask()
{
if (_proxy._getReference()->getInvocationTimeout() == -2)
{
cancel(make_exception_ptr(ConnectionTimeoutException(__FILE__, __LINE__)));
}
else
{
cancel(make_exception_ptr(InvocationTimeoutException(__FILE__, __LINE__)));
}
cancel(make_exception_ptr(InvocationTimeoutException(__FILE__, __LINE__)));
}

OutgoingAsync::OutgoingAsync(ObjectPrx proxy, bool synchronous)
Expand Down
Loading