Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

various unrelated small fixes and tweaks #1752

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/agent/src/proxy_connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {

let mut proxy_client =
proto_grpc::runtime::connector_proxy_client::ConnectorProxyClient::with_interceptor(
gazette::dial_channel(reactor_address).await?,
gazette::dial_channel(reactor_address)?,
metadata.clone(),
);
let mut proxy_responses = proxy_client
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
};

Ok((
gazette::dial_channel(&address).await?,
gazette::dial_channel(&address)?,
metadata,
(cancel_tx, log_loop),
))
Expand Down
1 change: 1 addition & 0 deletions crates/gazette/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = { workspace = true }
futures-core = { workspace = true }
hyper-util = { workspace = true }
jsonwebtoken = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/gazette/src/journal/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Client {
router: &crate::Router,
req: &broker::ListRequest,
) -> crate::Result<tonic::Streaming<broker::ListResponse>> {
let mut client = self.into_sub(router.route(None, false, &self.default).await?);
let mut client = self.into_sub(router.route(None, false, &self.default)?);
Ok(client.list(req.clone()).await?.into_inner())
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/gazette/src/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Client {

/// Invoke the Gazette journal Apply API.
pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result<broker::ApplyResponse> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.apply(req)
Expand All @@ -66,7 +66,7 @@ impl Client {
&self,
req: broker::FragmentsRequest,
) -> crate::Result<broker::FragmentsResponse> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.list_fragments(req)
Expand All @@ -77,11 +77,13 @@ impl Client {
check_ok(resp.status(), resp)
}

fn into_sub(&self, channel: Channel) -> SubClient {
fn into_sub(&self, (channel, _local): (Channel, bool)) -> SubClient {
proto_grpc::broker::journal_client::JournalClient::with_interceptor(
channel,
self.metadata.clone(),
)
// TODO(johnny): Use `_local` to selectively enable LZ4 compression
// when traversing a non-local zone.
}
}

Expand Down
10 changes: 8 additions & 2 deletions crates/gazette/src/journal/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl Client {
// Surface error to the caller, which can either drop us
// or poll us again to retry.
() = co.yield_(Err(err)).await;
// Restart route discovery.
req.header = None;
}
}
}
Expand All @@ -49,7 +51,7 @@ impl Client {
write_head: &mut i64,
) -> crate::Result<()> {
let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref());
let mut client = self.into_sub(self.router.route(route, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(route, false, &self.default)?);

// Fetch metadata first before we start the actual read.
req.metadata_only = true;
Expand Down Expand Up @@ -89,7 +91,9 @@ impl Client {
}
match (resp.status(), &resp.fragment, resp.content.is_empty()) {
// Metadata response telling us of a new fragment being read.
(broker::Status::Ok, Some(_fragment), true) => {
(broker::Status::Ok, Some(fragment), true) => {
tracing::trace!(fragment=?ops::DebugJson(fragment), "read fragment metadata");

// Offset jumps happen if content is removed from the middle of a journal,
// or when reading from the journal head (offset -1).
if req.offset != resp.offset {
Expand Down Expand Up @@ -138,6 +142,8 @@ async fn read_fragment_url(
.and_then(reqwest::Response::error_for_status)
.map_err(Error::FetchFragment)?;

tracing::trace!(fragment=?ops::DebugJson(&fragment), "started direct fragment read");

let raw_reader = response
// Map into a Stream<Item = Result<Bytes, _>>.
.bytes_stream()
Expand Down
27 changes: 15 additions & 12 deletions crates/gazette/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,18 @@ impl Error {

pub type Result<T> = std::result::Result<T, Error>;

/// Dial a gRPC endpoint with opinionated defaults and
/// Lazily dial a gRPC endpoint with opinionated defaults and
/// support for TLS and Unix Domain Sockets.
pub async fn dial_channel(endpoint: &str) -> Result<tonic::transport::Channel> {
pub fn dial_channel(endpoint: &str) -> Result<tonic::transport::Channel> {
use std::time::Duration;

let ep = tonic::transport::Endpoint::from_shared(endpoint.to_string())
.map_err(|_err| Error::InvalidEndpoint(endpoint.to_string()))?
// Note this connect_timeout accounts only for TCP connection time and
// does not apply to time required for TLS or HTTP/2 transport start,
// which can block indefinitely if the server is bound but not listening.
// Callers MUST implement per-RPC timeouts if that's important.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the right way to implement per-RPC timeouts for server-streaming RPCs, where waiting for long periods of time with no messages is a perfectly legitimate usage pattern? That is, what signal do you time out on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a small update here, which also relates to an issue we saw with a reactor connection to flow-connector-init which was ESTABLISHED even though the network namespace had been completely torn down.

Now the advice would be you don't need to worry about it, because HTTP/2 keep-alive verifies the server is up and actively serving the HTTP/2 transport. If you choose to apply a timeout on individual stream receives, you can still do so for each stream receive future.

// This timeout is only a best-effort sanity check.
.connect_timeout(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(120))
.keep_alive_while_idle(true)
Expand All @@ -93,17 +98,15 @@ pub async fn dial_channel(endpoint: &str) -> Result<tonic::transport::Channel> {
.assume_http2(true),
)?;

let channel = match ep.uri().scheme_str() {
Some("unix") => {
ep.connect_with_connector(tower::util::service_fn(|uri: tonic::transport::Uri| {
connect_unix(uri)
}))
.await?
}
Some("https" | "http") => ep.connect().await?,
let channel =
match ep.uri().scheme_str() {
Some("unix") => ep.connect_with_connector_lazy(tower::util::service_fn(
|uri: tonic::transport::Uri| connect_unix(uri),
)),
Some("https" | "http") => ep.connect_lazy(),

_ => return Err(Error::InvalidEndpoint(endpoint.to_string())),
};
_ => return Err(Error::InvalidEndpoint(endpoint.to_string())),
};

Ok(channel)
}
Expand Down
109 changes: 27 additions & 82 deletions crates/gazette/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use tonic::transport::Channel;

// DialState represents a Channel which may be:
// - Ready (if Some)
// - Currently being dialed (if locked)
// - Neither (None and not locked).
// Ready channels also track their number of uses since the last sweep.
type DialState = Arc<futures::lock::Mutex<Option<(Channel, usize)>>>;

/// Router facilitates dispatching requests to designated members of
/// a dynamic serving topology, by maintaining ready Channels to
/// member endpoints which may be dynamically discovered over time.
Expand All @@ -20,7 +13,7 @@ pub struct Router {
inner: Arc<Inner>,
}
struct Inner {
states: std::sync::Mutex<HashMap<MemberId, DialState>>,
states: std::sync::Mutex<HashMap<MemberId, (Channel, bool)>>,
zone: String,
}

Expand All @@ -39,60 +32,39 @@ impl Router {
}

/// Map an optional broker::Route and indication of whether the "primary"
/// member is required into a ready Channel for use in the dispatch of an RPC.
/// member is required into a ready Channel for use in the dispatch of an RPC,
/// and a boolean which is set if and only if the Channel is in our local zone.
///
/// route() will prefer to send requests to a ready member Channel if possible,
/// or will dial new Channels if required by the `route` and `primary` requirement.
pub async fn route(
/// route() dial new Channels as required by the `route` and `primary` requirement.
/// Use sweep() to periodically clean up Channels which are no longer in use.
pub fn route(
&self,
route: Option<&broker::Route>,
primary: bool,
default: &MemberId,
) -> Result<Channel, Error> {
let (index, state) = self.pick(route, primary, &default);
) -> Result<(Channel, bool), Error> {
let index = pick(route, primary, &self.inner.zone);

// Acquire MemberId-specific, async-aware lock.
let mut state = state.lock().await;
let id = match index {
Some(index) => &route.unwrap().members[index],
None => default,
};
let mut states = self.inner.states.lock().unwrap();

// Fast path: client is dialed and ready.
if let Some((ref client, uses)) = &mut *state {
*uses += 1;
return Ok(client.clone());
// Is the channel already started?
if let Some((channel, mark)) = states.get_mut(id) {
*mark = true;
return Ok((channel.clone(), id.zone == self.inner.zone));
}

// Slow path: start dialing the endpoint.
// Start dialing the endpoint.
let channel = super::dial_channel(match index {
Some(index) => &route.unwrap().endpoints[index],
None => &default.suffix,
})
.await?;
})?;
states.insert(id.clone(), (channel.clone(), true));

*state = Some((channel.clone(), 1));

Ok(channel)
}

fn pick(
&self,
route: Option<&broker::Route>,
primary: bool,
default: &MemberId,
) -> (Option<usize>, DialState) {
// Acquire non-async lock which *cannot* be held across an await point.
let mut states = self.inner.states.lock().unwrap();
let index = pick(route, primary, &self.inner.zone, &states);

let id = match index {
Some(index) => &route.unwrap().members[index],
None => default,
};

let state = match states.get(id) {
Some(value) => value.clone(),
None => states.entry(id.clone()).or_default().clone(),
};

(index, state)
Ok((channel, id.zone == self.inner.zone))
}

// Identify Channels which have not been used since the preceding sweep, and close them.
Expand All @@ -101,32 +73,19 @@ impl Router {
pub fn sweep(&self) {
let mut states = self.inner.states.lock().unwrap();

states.retain(|id, state| {
// Retain entries which are currently connecting.
let Some(mut state) = state.try_lock() else {
return true;
};
// Drop entries which are not connected.
let Some((_client, uses)) = &mut *state else {
return false;
};
states.retain(|id, (_channel, mark)| {
// Drop entries which have not been used since the last sweep.
if *uses == 0 {
if !*mark {
tracing::debug!(?id, "dropping idle member connection");
return false;
}
*uses = 0; // Mark for next sweep.
*mark = false; // Mark for next sweep.
true
});
}
}

fn pick(
route: Option<&broker::Route>,
primary: bool,
zone: &str,
states: &HashMap<MemberId, DialState>,
) -> Option<usize> {
fn pick(route: Option<&broker::Route>, primary: bool, zone: &str) -> Option<usize> {
let default_route = broker::Route::default();
let route = route.unwrap_or(&default_route);

Expand All @@ -136,28 +95,14 @@ fn pick(
.zip(route.endpoints.iter())
.enumerate()
.max_by_key(|(index, (id, _endpoint))| {
let connected = if let Some(state) = states.get(id) {
if let Some(state) = state.try_lock() {
if let Some(_conn) = state.as_ref() {
true // Transport is ready.
} else {
false // Transport is not ready and no task is starting it.
}
} else {
true // Another task has started this transport.
}
} else {
false // Transport has not been started.
};

// Member selection criteria:
(
// If we want the primary, then prefer the primary.
primary && *index as i32 == route.primary,
// Prefer members in our same zone.
zone == id.zone,
// Prefer members which are already connected.
connected,
// Randomize over members to balance load.
rand::random::<u8>(),
)
})
.map(|(index, _)| index)
Expand Down
8 changes: 4 additions & 4 deletions crates/gazette/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Client {
&self,
req: consumer::ListRequest,
) -> Result<consumer::ListResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.list(req)
Expand All @@ -60,7 +60,7 @@ impl Client {
&self,
req: consumer::ApplyRequest,
) -> Result<consumer::ApplyResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.apply(req)
Expand All @@ -76,7 +76,7 @@ impl Client {
&self,
req: consumer::UnassignRequest,
) -> Result<consumer::UnassignResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.unassign(req)
Expand All @@ -87,7 +87,7 @@ impl Client {
check_ok(resp.status(), resp)
}

fn into_sub(&self, channel: Channel) -> SubClient {
fn into_sub(&self, (channel, _local): (Channel, bool)) -> SubClient {
proto_grpc::consumer::shard_client::ShardClient::with_interceptor(
channel,
self.metadata.clone(),
Expand Down
Loading