Skip to content

Commit

Permalink
Fix message proxy client not responding to flush during connect
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk committed Jan 29, 2025
1 parent bf08d1d commit d0f1dfd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 54 deletions.
45 changes: 23 additions & 22 deletions crates/store/re_grpc_client/src/message_proxy/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,38 @@ pub fn stream(
}

/// Represents a URL to a gRPC server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MessageProxyUrl(String);

impl MessageProxyUrl {
/// Parses as a regular URL, the protocol must be `temp://`, `http://`, or `https://`.
pub fn parse(url: &str) -> Result<Self, InvalidMessageProxyUrl> {
if let Some(url) = url.strip_prefix("http") {
if url.starts_with("http") {
let _ = Url::parse(url).map_err(|err| InvalidMessageProxyUrl {
url: url.to_owned(),
msg: err.to_string(),
})?;

return Ok(Self(url.to_owned()));
Ok(Self(url.to_owned()))
}
// TODO(#8761): URL prefix
else if let Some(url) = url.strip_prefix("temp") {
let url = format!("http{url}");

let Some(url) = url.strip_prefix("temp") else {
let scheme = url.split_once("://").map(|(a, _)| a).ok_or("unknown");
return Err(InvalidMessageProxyUrl {
url: url.to_owned(),
msg: format!(
"Invalid scheme {scheme:?}, expected {:?}",
// TODO(#8761): URL prefix
"temp"
),
});
};
let url = format!("http{url}");
let _ = Url::parse(&url).map_err(|err| InvalidMessageProxyUrl {
url: url.clone(),
msg: err.to_string(),
})?;

let _ = Url::parse(&url).map_err(|err| InvalidMessageProxyUrl {
url: url.clone(),
msg: err.to_string(),
})?;
Ok(Self(url))
} else {
let scheme = url.split_once("://").map(|(a, _)| a).ok_or("unknown");

Ok(Self(url))
Err(InvalidMessageProxyUrl {
url: url.to_owned(),
msg: format!("Invalid scheme {scheme:?}, expected {:?}", "temp"),
})
}
}

pub fn to_http(&self) -> String {
Expand All @@ -88,7 +87,7 @@ impl std::str::FromStr for MessageProxyUrl {
}
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
#[error("invalid message proxy url {url:?}: {msg}")]
pub struct InvalidMessageProxyUrl {
pub url: String,
Expand Down Expand Up @@ -172,12 +171,14 @@ mod tests {
fn $name() {
assert_eq!(
MessageProxyUrl::parse($url).map(|v| v.to_http()),
Ok($expected_http)
Ok($expected_http.to_owned())
);
}
};
}

test_parse_url!(basic, "temp://127.0.0.1:1852", expected: "http://127.0.0.1:1852");
test_parse_url!(basic_temp, "temp://127.0.0.1:1852", expected: "http://127.0.0.1:1852");
// TODO(#8761): URL prefix
test_parse_url!(basic_http, "http://127.0.0.1:1852", expected: "http://127.0.0.1:1852");
test_parse_url!(invalid, "definitely not valid", error);
}
70 changes: 61 additions & 9 deletions crates/store/re_grpc_client/src/message_proxy/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;

use re_log_encoding::Compression;
use re_log_types::LogMsg;
Expand Down Expand Up @@ -96,10 +97,18 @@ impl Drop for Client {
re_log::debug!("Shutting down message proxy client");
// Wait for flush
self.flush();
// Quit immediately after that - no messages are left in the channel
self.shutdown_tx.try_send(()).ok();

// Quit immediately - no more messages left in the queue
if let Err(err) = self.shutdown_tx.try_send(()) {
re_log::error!("failed to gracefully shut down message proxy client: {err}");
return;
};

// Wait for the shutdown
self.thread.take().map(|t| t.join().ok());
if let Some(thread) = self.thread.take() {
thread.join().ok();
};

re_log::debug!("Message proxy client has shut down");
}
}
Expand All @@ -117,16 +126,59 @@ async fn message_proxy_client(
return;
}
};
let channel = match endpoint.connect().await {
Ok(channel) => channel,
Err(err) => {
re_log::error!("Failed to connect to message proxy server: {err}");
return;

// Temporarily buffer messages while we're connecting:
let mut buffered_messages = vec![];
let channel = loop {
match endpoint.connect().await {
Ok(channel) => break channel,
Err(err) => {
re_log::debug!("failed to connect to message proxy server: {err}");
tokio::select! {
cmd = cmd_rx.recv() => {
match cmd {
Some(Cmd::LogMsg(msg)) => {
buffered_messages.push(msg);
}
Some(Cmd::Flush(tx)) => {
re_log::debug!("Flush requested");
if tx.send(()).is_err() {
re_log::debug!("Failed to respond to flush: channel is closed");
return;
};
}
None => {
re_log::debug!("Channel closed");
return;
}
}
}
_ = shutdown_rx.recv() => {
re_log::debug!("shutting down client without flush");
return;
}
_ = tokio::time::sleep(Duration::from_millis(200)) => {
continue;
}
}
}
}
};
let mut client = MessageProxyClient::new(channel);

let stream = async_stream::stream! {
for msg in buffered_messages {
let msg = match re_log_encoding::protobuf_conversions::log_msg_to_proto(msg, compression) {
Ok(msg) => msg,
Err(err) => {
re_log::error!("Failed to encode message: {err}");
break;
}
};

yield msg;
}

loop {
tokio::select! {
cmd = cmd_rx.recv() => {
Expand Down Expand Up @@ -161,7 +213,7 @@ async fn message_proxy_client(
}

_ = shutdown_rx.recv() => {
re_log::debug!("Shutting down without flush");
re_log::debug!("Shutting down client without flush");
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using namespace rerun::demo;

int main() {
// Create a new `RecordingStream` which sends data over TCP to the viewer process.
// Create a new `RecordingStream` which sends data over gRPC to the viewer process.
const auto rec = rerun::RecordingStream("rerun_example_quick_start_spawn");
rec.spawn().exit_on_failure();

Expand Down
39 changes: 17 additions & 22 deletions rerun_cpp/tests/recording_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,13 @@ SCENARIO("RecordingStream can be used for logging archetypes and components", TE
THEN("an archetype can be logged") {
stream.log(
"log_archetype-splat",
rerun::Points2D({rerun::Vec2D{1.0, 2.0}, rerun::Vec2D{4.0, 5.0}}
).with_colors(rerun::Color(0xFF0000FF))
rerun::Points2D({rerun::Vec2D{1.0, 2.0}, rerun::Vec2D{4.0, 5.0}})
.with_colors(rerun::Color(0xFF0000FF))
);
stream.log_static(
"log_archetype-splat",
rerun::Points2D({rerun::Vec2D{1.0, 2.0}, rerun::Vec2D{4.0, 5.0}}
).with_colors(rerun::Color(0xFF0000FF))
rerun::Points2D({rerun::Vec2D{1.0, 2.0}, rerun::Vec2D{4.0, 5.0}})
.with_colors(rerun::Color(0xFF0000FF))
);
}

Expand Down Expand Up @@ -367,27 +367,26 @@ SCENARIO("RecordingStream can log to file", TEST_TAG) {
}
}

void test_logging_to_connection(const char* address, const rerun::RecordingStream& stream) {
void test_logging_to_connection(const char* url, const rerun::RecordingStream& stream) {
// We changed to taking std::string_view instead of const char* and constructing such from nullptr crashes
// at least on some C++ implementations.
// If we'd want to support this in earnest we'd have to create out own string_view type.
//
// AND_GIVEN("a nullptr for the socket address") {
// AND_GIVEN("a nullptr for the socket url") {
// THEN("then the connect call returns a null argument error") {
// CHECK(stream.connect(nullptr, 0.0f).code == rerun::ErrorCode::UnexpectedNullArgument);
// }
// }
AND_GIVEN("an invalid address for the socket address") {
AND_GIVEN("an invalid url") {
THEN("then the save call fails") {
CHECK(
stream.connect_grpc("definitely not valid!").code ==
rerun::ErrorCode::InvalidServerUrl
stream.connect("definitely not valid!").code == rerun::ErrorCode::InvalidServerUrl
);
}
}
AND_GIVEN("a valid socket address " << address) {
AND_GIVEN("a valid url " << url) {
THEN("save call with zero timeout returns no error") {
REQUIRE(stream.connect_grpc(address).is_ok());
CHECK(stream.connect(url).code == rerun::ErrorCode::Ok);

WHEN("logging a component and then flushing") {
check_logged_error([&] {
Expand Down Expand Up @@ -427,18 +426,18 @@ void test_logging_to_connection(const char* address, const rerun::RecordingStrea
}

SCENARIO("RecordingStream can connect", TEST_TAG) {
const char* address = "127.0.0.1:9876";
const char* url = "http://127.0.0.1:1852";
GIVEN("a new RecordingStream") {
rerun::RecordingStream stream("test-local");
test_logging_to_connection(address, stream);
test_logging_to_connection(url, stream);
}
WHEN("setting a global RecordingStream and then discarding it") {
{
rerun::RecordingStream stream("test-global");
stream.set_global();
}
GIVEN("the current recording stream") {
test_logging_to_connection(address, rerun::RecordingStream::current());
test_logging_to_connection(url, rerun::RecordingStream::current());
}
}
}
Expand Down Expand Up @@ -491,17 +490,13 @@ SCENARIO("Recording stream handles serialization failure during logging graceful

THEN("calling log with an array logs the serialization error") {
check_logged_error(
[&] {
stream.log(path, std::array{component, component});
},
[&] { stream.log(path, std::array{component, component}); },
expected_error.code
);
}
THEN("calling log with a vector logs the serialization error") {
check_logged_error(
[&] {
stream.log(path, std::vector{component, component});
},
[&] { stream.log(path, std::vector{component, component}); },
expected_error.code
);
}
Expand Down Expand Up @@ -643,8 +638,8 @@ SCENARIO("Deprecated log_static still works", TEST_TAG) {
THEN("an archetype can be logged") {
stream.log_static(
"log_archetype-splat",
rerun::Points2D({rerun::Vec2D{1.0, 2.0}, rerun::Vec2D{4.0, 5.0}}
).with_colors(rerun::Color(0xFF0000FF))
rerun::Points2D({rerun::Vec2D{1.0, 2.0}, rerun::Vec2D{4.0, 5.0}})
.with_colors(rerun::Color(0xFF0000FF))
);
}
}
Expand Down

0 comments on commit d0f1dfd

Please sign in to comment.