-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove "-t" enforcement in C++, simplify ConnectTimeout / CloseTimeout #2059
Changes from 1 commit
e63a472
b965be1
df88faa
cbd4781
8745933
5f67d4d
8e5bb36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,15 +39,38 @@ using namespace IceInternal; | |
|
||
namespace | ||
{ | ||
class TimeoutCallback final : public IceUtil::TimerTask | ||
class ConnectTimerTask : public IceUtil::TimerTask | ||
{ | ||
public: | ||
TimeoutCallback(Ice::ConnectionI* connection) : _connection(connection) {} | ||
ConnectTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {} | ||
|
||
void runTimerTask() final { _connection->timedOut(); } | ||
void runTimerTask() override | ||
{ | ||
if (auto connection = _connection.lock()) | ||
{ | ||
connection->connectTimedOut(); | ||
} | ||
} | ||
|
||
private: | ||
Ice::ConnectionI* _connection; | ||
const weak_ptr<Ice::ConnectionI> _connection; | ||
}; | ||
|
||
class CloseTimerTask : public IceUtil::TimerTask | ||
{ | ||
public: | ||
CloseTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {} | ||
|
||
void runTimerTask() override | ||
{ | ||
if (auto connection = _connection.lock()) | ||
{ | ||
connection->closeTimedOut(); | ||
} | ||
} | ||
|
||
private: | ||
const weak_ptr<Ice::ConnectionI> _connection; | ||
}; | ||
|
||
class DispatchCall final : public ExecutorWorkItem | ||
|
@@ -1401,8 +1424,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) | |
|
||
try | ||
{ | ||
unscheduleTimeout(current.operation); | ||
|
||
SocketOperation writeOp = SocketOperationNone; | ||
SocketOperation readOp = SocketOperationNone; | ||
|
||
|
@@ -1548,7 +1569,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) | |
// for data to read or write. | ||
if (newOp) | ||
{ | ||
scheduleTimeout(newOp); | ||
_threadPool->update(shared_from_this(), current.operation, newOp); | ||
return; | ||
} | ||
|
@@ -1614,11 +1634,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) | |
} | ||
} | ||
|
||
// If the connection is not closed yet, we can schedule the read or write timeout and update the thread | ||
// pool selector to wait for readiness of read, write or both operations. | ||
// If the connection is not closed yet, we update the thread pool selector to wait for readiness of | ||
// read, write or both operations. | ||
if (_state < StateClosed) | ||
{ | ||
scheduleTimeout(newOp); | ||
_threadPool->update(shared_from_this(), current.operation, newOp); | ||
} | ||
} | ||
|
@@ -1854,12 +1873,6 @@ ConnectionI::dispatch( | |
void | ||
Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) | ||
{ | ||
{ | ||
std::lock_guard lock(_mutex); | ||
assert(_state == StateClosed); | ||
unscheduleTimeout(static_cast<SocketOperation>(SocketOperationRead | SocketOperationWrite)); | ||
} | ||
|
||
// | ||
// If there are no callbacks to call, we don't call ioCompleted() since we're not going | ||
// to call code that will potentially block (this avoids promoting a new leader and | ||
|
@@ -2066,24 +2079,6 @@ Ice::ConnectionI::getNativeInfo() | |
return _transceiver->getNativeInfo(); | ||
} | ||
|
||
void | ||
Ice::ConnectionI::timedOut() | ||
{ | ||
std::lock_guard lock(_mutex); | ||
if (_state <= StateNotValidated) | ||
{ | ||
setState(StateClosed, make_exception_ptr(ConnectTimeoutException(__FILE__, __LINE__))); | ||
} | ||
else if (_state < StateClosing) | ||
{ | ||
setState(StateClosed, make_exception_ptr(TimeoutException(__FILE__, __LINE__))); | ||
} | ||
else if (_state < StateClosed) | ||
{ | ||
setState(StateClosed, make_exception_ptr(CloseTimeoutException(__FILE__, __LINE__))); | ||
} | ||
} | ||
|
||
string | ||
Ice::ConnectionI::type() const noexcept | ||
{ | ||
|
@@ -2148,10 +2143,6 @@ Ice::ConnectionI::ConnectionI( | |
_logger(_instance->initializationData().logger), // Cached for better performance. | ||
_traceLevels(_instance->traceLevels()), // Cached for better performance. | ||
_timer(_instance->timer()), // Cached for better performance. | ||
_writeTimeout(new TimeoutCallback(this)), | ||
_writeTimeoutScheduled(false), | ||
_readTimeout(new TimeoutCallback(this)), | ||
_readTimeoutScheduled(false), | ||
_connectTimeout(options.connectTimeout), | ||
_closeTimeout(options.closeTimeout), | ||
_inactivityTimeout(options.inactivityTimeout), | ||
|
@@ -2559,6 +2550,11 @@ Ice::ConnectionI::initiateShutdown() | |
os.write(static_cast<uint8_t>(1)); // compression status: compression supported but not used. | ||
os.write(headerSize); // Message size. | ||
|
||
if (_closeTimeout > chrono::seconds::zero()) | ||
{ | ||
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout); | ||
} | ||
|
||
OutgoingMessage message(&os, false); | ||
if (sendMessage(message) & AsyncStatusSent) | ||
{ | ||
|
@@ -2570,7 +2566,6 @@ Ice::ConnectionI::initiateShutdown() | |
SocketOperation op = _transceiver->closing(true, _exception); | ||
if (op) | ||
{ | ||
scheduleTimeout(op); | ||
_threadPool->_register(shared_from_this(), op); | ||
} | ||
} | ||
|
@@ -2617,6 +2612,28 @@ Ice::ConnectionI::idleCheck( | |
// else, nothing to do | ||
} | ||
|
||
void | ||
Ice::ConnectionI::connectTimedOut() noexcept | ||
{ | ||
std::lock_guard lock(_mutex); | ||
if (_state < StateActive) | ||
{ | ||
setState(StateClosed, make_exception_ptr(ConnectTimeoutException(__FILE__, __LINE__))); | ||
} | ||
// else ignore since we're no longer in the "connect" phase. | ||
} | ||
|
||
void | ||
Ice::ConnectionI::closeTimedOut() noexcept | ||
{ | ||
std::lock_guard lock(_mutex); | ||
if (_state < StateClosed) | ||
{ | ||
setState(StateClosed, make_exception_ptr(CloseTimeoutException(__FILE__, __LINE__))); | ||
} | ||
// else ignore since we're no longer in the "close" phase. | ||
bernardnormier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
void | ||
Ice::ConnectionI::sendHeartbeat() noexcept | ||
{ | ||
|
@@ -2685,10 +2702,14 @@ Ice::ConnectionI::sendResponse(OutgoingResponse response, uint8_t compress) | |
bool | ||
Ice::ConnectionI::initialize(SocketOperation operation) | ||
{ | ||
if (_connectTimeout > chrono::seconds::zero()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it possible for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's possible but so rare that we don't need to optimize for the case where it's called multiple times. If it's called multiple times, we schedule multiple connect timer tasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But isn't it an error to schedule a timeout, which is already scheduled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear if we return false from
This method would be called again, and the timer schedule would throw
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not the case because we schedule a different timer task each time. It's created by make_shared: _timer->schedule(make_shared<ConnectTimerTask>(shared_from_this()), _connectTimeout); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is the right place for scheduling the connect timer task. It should be moved to the implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to startAsync as suggested. |
||
{ | ||
_timer->schedule(make_shared<ConnectTimerTask>(shared_from_this()), _connectTimeout); | ||
} | ||
|
||
SocketOperation s = _transceiver->initialize(_readStream, _writeStream); | ||
if (s != SocketOperationNone) | ||
{ | ||
scheduleTimeout(s); | ||
_threadPool->update(shared_from_this(), operation, s); | ||
return false; | ||
} | ||
|
@@ -2735,7 +2756,6 @@ Ice::ConnectionI::validate(SocketOperation operation) | |
SocketOperation op = write(_writeStream); | ||
if (op) | ||
{ | ||
scheduleTimeout(op); | ||
_threadPool->update(shared_from_this(), operation, op); | ||
return false; | ||
} | ||
|
@@ -2764,7 +2784,6 @@ Ice::ConnectionI::validate(SocketOperation operation) | |
SocketOperation op = read(_readStream); | ||
if (op) | ||
{ | ||
scheduleTimeout(op); | ||
_threadPool->update(shared_from_this(), operation, op); | ||
return false; | ||
} | ||
|
@@ -2993,6 +3012,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) | |
AsyncStatus | ||
Ice::ConnectionI::sendMessage(OutgoingMessage& message) | ||
{ | ||
assert(_state >= StateActive); | ||
assert(_state < StateClosed); | ||
|
||
message.stream->i = 0; // Reset the message stream iterator before starting sending the message. | ||
|
@@ -3119,7 +3139,6 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) | |
#endif | ||
|
||
_writeStream.swap(*_sendStreams.back().stream); | ||
scheduleTimeout(op); | ||
_threadPool->_register(shared_from_this(), op); | ||
return AsyncStatusQueued; | ||
} | ||
|
@@ -3351,6 +3370,10 @@ Ice::ConnectionI::parseMessage( | |
SocketOperation op = _transceiver->closing(false, _exception); | ||
if (op) | ||
{ | ||
if (_closeTimeout > chrono::seconds::zero()) | ||
{ | ||
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout); | ||
} | ||
return op; | ||
} | ||
setState(StateClosed); | ||
|
@@ -3572,75 +3595,6 @@ Ice::ConnectionI::invokeAll( | |
} | ||
} | ||
|
||
void | ||
Ice::ConnectionI::scheduleTimeout(SocketOperation status) | ||
{ | ||
int timeout; | ||
if (_state < StateActive) | ||
{ | ||
timeout = static_cast<int>(chrono::milliseconds(_connectTimeout).count()); | ||
} | ||
else if (_state < StateClosingPending) | ||
{ | ||
if (_readHeader) // No timeout for reading the header. | ||
{ | ||
status = static_cast<SocketOperation>(status & ~SocketOperationRead); | ||
} | ||
timeout = _endpoint->timeout(); | ||
} | ||
else | ||
{ | ||
timeout = static_cast<int>(chrono::milliseconds(_closeTimeout).count()); | ||
} | ||
|
||
if (timeout < 0) | ||
{ | ||
return; | ||
} | ||
|
||
try | ||
{ | ||
if (status & IceInternal::SocketOperationRead) | ||
{ | ||
if (_readTimeoutScheduled) | ||
{ | ||
_timer->cancel(_readTimeout); | ||
} | ||
_timer->schedule(_readTimeout, chrono::milliseconds(timeout)); | ||
_readTimeoutScheduled = true; | ||
} | ||
|
||
if (status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) | ||
{ | ||
if (_writeTimeoutScheduled) | ||
{ | ||
_timer->cancel(_writeTimeout); | ||
} | ||
_timer->schedule(_writeTimeout, chrono::milliseconds(timeout)); | ||
_writeTimeoutScheduled = true; | ||
} | ||
} | ||
catch (const IceUtil::Exception&) | ||
{ | ||
assert(false); | ||
} | ||
} | ||
|
||
void | ||
Ice::ConnectionI::unscheduleTimeout(SocketOperation status) | ||
{ | ||
if ((status & IceInternal::SocketOperationRead) && _readTimeoutScheduled) | ||
{ | ||
_timer->cancel(_readTimeout); | ||
_readTimeoutScheduled = false; | ||
} | ||
if ((status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) && _writeTimeoutScheduled) | ||
{ | ||
_timer->cancel(_writeTimeout); | ||
_writeTimeoutScheduled = false; | ||
} | ||
} | ||
|
||
Ice::ConnectionInfoPtr | ||
Ice::ConnectionI::initConnectionInfo() const | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to document the change of behavior, for both connect and close timeouts.
Previously, the close timeout didn't apply to the whole period of the connection closure. It applied to the period between two transport operations. So as long as there was transport activity while in the closing state, the connection wouldn't timeout. Ditto for connection establishment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Keep in mind the properties are new (compared to Ice 3.7) and the semantics match IceRPC's semantics for connect and shutdown timeout.