Skip to content

Commit

Permalink
drain: give 5s (configurable) time for existing connections to (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn authored Jun 18, 2024
1 parent e2bcfbe commit 46b971f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 24 deletions.
52 changes: 40 additions & 12 deletions src/proxy/inbound_passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::time::Instant;

use drain::Watch;
use tokio::net::TcpStream;
use tokio::sync::watch;
use tokio::time::timeout;

use tracing::{error, info, trace, Instrument};
use tracing::{debug, error, info, trace, warn, Instrument};

use crate::config::ProxyMode;

Expand Down Expand Up @@ -64,21 +66,36 @@ impl InboundPassthrough {
}

pub(super) async fn run(self) {
let (sub_drain_signal, sub_drain) = drain::channel();
let deadline = self.pi.cfg.self_termination_deadline;
let (trigger_force_shutdown, force_shutdown) = watch::channel(());
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
let socket = self.listener.accept().await;
let start = Instant::now();
let mut force_shutdown = force_shutdown.clone();
let drain = sub_drain.clone();
let pi = self.pi.clone();
match socket {
Ok((stream, remote)) => {
let serve_client = async move {
Self::proxy_inbound_plaintext(
pi, // pi cloned above; OK to move
socket::to_canonical(remote),
stream,
self.enable_orig_src,
)
.await
debug!(dur=?start.elapsed(), "inbound passthrough connection started");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
_ = force_shutdown.changed() => {
debug!("inbound passthrough connection forcefully terminated signaled");
}
_ = Self::proxy_inbound_plaintext(
pi, // pi cloned above; OK to move
socket::to_canonical(remote),
stream,
self.enable_orig_src,
) => {}
}
// Mark we are done with the connection, so drain can complete
drop(drain);
debug!(dur=?start.elapsed(), "inbound passthrough connection completed");
}
.in_current_span();

Expand All @@ -95,13 +112,24 @@ impl InboundPassthrough {
}
}
.in_current_span();

// Stop accepting once we drain.
// Note: we are *not* waiting for all connections to be closed. In the future, we may consider
// this, but will need some timeout period, as we have no back-pressure mechanism on connections.
// We will then allow connections up to `deadline` to terminate on their own.
// After that, they will be forcefully terminated.
tokio::select! {
res = accept => { res }
_ = self.drain.signaled() => {
info!("inbound passthrough drained");
res = self.drain.signaled() => {
debug!("inbound passthrough drained, waiting {:?} for any outbound connections to close", deadline);
if let Err(e) = timeout(deadline, sub_drain_signal.drain()).await {
// Not all connections completed within time, we will force shut them down
warn!("drain duration expired with pending connections, forcefully shutting down: {e:?}");
}
// Trigger force shutdown. In theory, this is only needed in the timeout case. However,
// it doesn't hurt to always trigger it.
let _ = trigger_force_shutdown.send(());

info!("outbound drain complete");
drop(res);
}
}
}
Expand Down
39 changes: 27 additions & 12 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use drain::Watch;
use hyper::header::FORWARDED;

use tokio::net::TcpStream;
use tokio::sync::watch;
use tokio::time::timeout;

use tracing::{debug, error, info, info_span, trace_span, Instrument};
use tracing::{debug, error, info, info_span, trace_span, warn, Instrument};

use crate::config::ProxyMode;
use crate::identity::Identity;
Expand Down Expand Up @@ -86,12 +88,15 @@ impl Outbound {
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone(),
);
let deadline = self.pi.cfg.self_termination_deadline;
let (trigger_force_shutdown, force_shutdown) = watch::channel(());
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
let socket = self.listener.accept().await;
let start_outbound_instant = Instant::now();
let outbound_drain = sub_drain.clone();
let drain = sub_drain.clone();
let mut force_shutdown = force_shutdown.clone();
match socket {
Ok((stream, _remote)) => {
let mut oc = OutboundConnection {
Expand All @@ -104,15 +109,17 @@ impl Outbound {
stream.set_nodelay(true).unwrap();
let span = info_span!("outbound", id=%oc.id);
let serve_outbound_connection = (async move {
debug!(dur=?start_outbound_instant.elapsed(), "outbound spawn START");
debug!(dur=?start_outbound_instant.elapsed(), "outbound connection started");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
_ = outbound_drain.signaled() => {
debug!("outbound drain signaled");
_ = force_shutdown.changed() => {
debug!("outbound connection forcefully terminated signaled");
}
_ = oc.proxy(stream) => {}
}
debug!(dur=?start_outbound_instant.elapsed(), "outbound spawn DONE");
// Mark we are done with the connection, so drain can complete
drop(drain);
debug!(dur=?start_outbound_instant.elapsed(), "outbound connection completed");
})
.instrument(span);

Expand All @@ -131,14 +138,22 @@ impl Outbound {
.in_current_span();

// Stop accepting once we drain.
// Note: we are *not* waiting for all connections to be closed. In the future, we may consider
// this, but will need some timeout period, as we have no back-pressure mechanism on connections.
// We will then allow connections up to `deadline` to terminate on their own.
// After that, they will be forcefully terminated.
tokio::select! {
res = accept => { res }
_ = self.drain.signaled() => {
debug!("outbound drained, dropping any outbound connections");
sub_drain_signal.drain().await;
info!("outbound drained");
res = self.drain.signaled() => {
debug!("outbound drained, waiting {:?} for any outbound connections to close", deadline);
if let Err(e) = timeout(deadline, sub_drain_signal.drain()).await {
// Not all connections completed within time, we will force shut them down
warn!("drain duration expired with pending connections, forcefully shutting down: {e:?}");
}
// Trigger force shutdown. In theory, this is only needed in the timeout case. However,
// it doesn't hurt to always trigger it.
let _ = trigger_force_shutdown.send(());

info!("outbound drain complete");
drop(res);
}
}
}
Expand Down

0 comments on commit 46b971f

Please sign in to comment.