Skip to content

Commit

Permalink
udp_proxy: improve retries by checking connect attempts immediately a…
Browse files Browse the repository at this point in the history
…fter failure (#38243)

Commit Message:
Additional Description: 
- Improve retries by checking connect attempts immediately after the
upstream connect failure.
- Prevent adding the same session more than once to cluster's sessions
(it's not really a bug because absl::flat_hash_set is a hash set that
only allows unique elements and the second attempt has no effect).

Risk Level: low
Testing:
Docs Changes:
Release Notes:
Platform Specific Features:

---------

Signed-off-by: Issa Abu Kalbein <[email protected]>
Co-authored-by: Issa Abu Kalbein <[email protected]>
  • Loading branch information
IssaAbuKalbein and Issa Abu Kalbein authored Feb 11, 2025
1 parent 9df39d8 commit d68c527
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 16 deletions.
59 changes: 43 additions & 16 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,16 @@ bool UdpProxyFilter::ActiveSession::onNewSession() {
}
}

if (setClusterInfo() && createUpstream()) {
cluster_->sessions_.emplace(this);
return true;
if (setClusterInfo()) {
if (!shouldCreateUpstream()) {
return true;
}

if (createUpstream()) {
ASSERT(!cluster_->sessions_.contains(this));
cluster_->sessions_.emplace(this);
return true;
}
}

return false;
Expand Down Expand Up @@ -574,23 +581,35 @@ bool UdpProxyFilter::ActiveSession::onContinueFilterChain(ActiveReadFilter* filt
}
}

if (setClusterInfo() && createUpstream()) {
cluster_->sessions_.emplace(this);
return true;
if (setClusterInfo()) {
if (!shouldCreateUpstream()) {
return true;
}

if (createUpstream()) {
ASSERT(!cluster_->sessions_.contains(this));
cluster_->sessions_.emplace(this);
return true;
}
}

filter_.removeSession(this);
return false;
}

bool UdpProxyFilter::UdpActiveSession::createUpstream() {
ASSERT(cluster_);
bool UdpProxyFilter::UdpActiveSession::shouldCreateUpstream() {
if (udp_socket_) {
// A session filter may call on continueFilterChain(), after already creating the socket,
// so we first check that the socket was not created already.
return true;
return false;
}

return true;
}

bool UdpProxyFilter::UdpActiveSession::createUpstream() {
ASSERT(cluster_);

if (!host_) {
host_ = Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
cluster_->chooseHost(addresses_.peer_, &udp_session_info_));
Expand Down Expand Up @@ -973,13 +992,17 @@ UdpProxyFilter::TunnelingActiveSession::TunnelingActiveSession(
UdpProxyFilter& filter, Network::UdpRecvData::LocalPeerAddresses&& addresses)
: ActiveSession(filter, std::move(addresses), nullptr) {}

bool UdpProxyFilter::TunnelingActiveSession::createUpstream() {
bool UdpProxyFilter::TunnelingActiveSession::shouldCreateUpstream() {
if (conn_pool_factory_) {
// A session filter may call on continueFilterChain(), after already creating the upstream,
// so we first check that the factory was not created already.
return true;
return false;
}

return true;
}

bool UdpProxyFilter::TunnelingActiveSession::createUpstream() {
conn_pool_factory_ = std::make_unique<TunnelingConnectionPoolFactory>();
load_balancer_context_ = std::make_unique<UdpLoadBalancerContext>(
filter_.config_->hashPolicy(), addresses_.peer_, &udp_session_info_);
Expand Down Expand Up @@ -1012,11 +1035,7 @@ bool UdpProxyFilter::TunnelingActiveSession::createConnectionPool() {
return false;
}

if (connect_attempts_ >= filter_.config_->tunnelingConfig()->maxConnectAttempts()) {
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
cluster_->cluster_info_->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
return false;
} else if (connect_attempts_ >= 1) {
if (connect_attempts_ >= 1) {
cluster_->cluster_info_->trafficStats()->upstream_rq_retry_.inc();
}

Expand Down Expand Up @@ -1121,6 +1140,14 @@ void UdpProxyFilter::TunnelingActiveSession::onUpstreamEvent(Network::Connection
return;
}

if (connect_attempts_ >= filter_.config_->tunnelingConfig()->maxConnectAttempts()) {
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
cluster_->cluster_info_->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
cluster_->cluster_stats_.sess_tunnel_failure_.inc();
filter_.removeSession(this);
return;
}

resetRetryTimer();
}
}
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
void writeDownstream(Network::UdpRecvData& data);
void resetIdleTimer();

virtual bool shouldCreateUpstream() PURE;
virtual bool createUpstream() PURE;
virtual void writeUpstream(Network::UdpRecvData& data) PURE;
virtual void onIdleTimer() PURE;
Expand Down Expand Up @@ -683,6 +684,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
~UdpActiveSession() override = default;

// ActiveSession
bool shouldCreateUpstream() override;
bool createUpstream() override;
void writeUpstream(Network::UdpRecvData& data) override;
void onIdleTimer() override;
Expand Down Expand Up @@ -741,6 +743,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
~TunnelingActiveSession() override = default;

// ActiveSession
bool shouldCreateUpstream() override;
bool createUpstream() override;
void writeUpstream(Network::UdpRecvData& data) override;
void onIdleTimer() override;
Expand Down
52 changes: 52 additions & 0 deletions test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,58 @@ stat_prefix: foo
session->onStreamFailure(ConnectionPool::PoolFailureReason::Timeout, "", upstream_host_);
}

TEST_F(UdpProxyFilterTest, TunnelingSessionFailedWithRetry) {
Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_);
EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0);

setup(readConfig(R"EOF(
stat_prefix: foo
matcher:
on_no_match:
action:
name: route
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route
cluster: fake_cluster
tunneling_config:
proxy_host: host.com
target_host: host.com
default_target_port: 30
retry_options:
max_connect_attempts: 2
)EOF"),
true);

// Allow for two connect attempts.
factory_context_.server_factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_
->resetResourceManager(2, 0, 0, 0, 0);
auto session = filter_->createTunnelingSession();

EXPECT_CALL(
factory_context_.server_factory_context_.cluster_manager_.thread_local_cluster_.conn_pool_,
newStream(_, _, _))
.Times(2)
.WillRepeatedly(Return(nullptr));

session->onNewSession();

// First failure will enable the retry timer.
Event::MockTimer* retry_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_);
EXPECT_CALL(*retry_timer, enableTimer(_, _));
session->onStreamFailure(ConnectionPool::PoolFailureReason::RemoteConnectionFailure, "",
upstream_host_);

// Second failure will remove the session directly and will not enable the retry timer.
retry_timer->invokeCallback();
EXPECT_CALL(*retry_timer, disableTimer());
session->onStreamFailure(ConnectionPool::PoolFailureReason::RemoteConnectionFailure, "",
upstream_host_);

EXPECT_EQ(1U, factory_context_.server_factory_context_.cluster_manager_.thread_local_cluster_
.cluster_.info_->stats_store_.counter("upstream_cx_connect_attempts_exceeded")
.value());
}

using MockUdpTunnelingConfig = SessionFilters::MockUdpTunnelingConfig;
using MockUpstreamTunnelCallbacks = SessionFilters::MockUpstreamTunnelCallbacks;
using MockTunnelCreationCallbacks = SessionFilters::MockTunnelCreationCallbacks;
Expand Down

0 comments on commit d68c527

Please sign in to comment.