Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add keepalive absence detection in the client #9722

Merged
merged 12 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
TESTBED_SERVER: http://localhost:6777
services:
parsec-testbed-server:
image: ghcr.io/scille/parsec-cloud/parsec-testbed-server:3.2.5-a.0.dev.20125.11f965e
image: ghcr.io/scille/parsec-cloud/parsec-testbed-server:3.2.5-a.0.dev.20135.bb30d4e
ports:
- 6777:6777
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-web.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
# https://github.com/Scille/parsec-cloud/pkgs/container/parsec-cloud%2Fparsec-testbed-server
services:
parsec-testbed-server:
image: ghcr.io/scille/parsec-cloud/parsec-testbed-server:3.2.5-a.0.dev.20125.11f965e
image: ghcr.io/scille/parsec-cloud/parsec-testbed-server:3.2.5-a.0.dev.20135.bb30d4e
ports:
- 6777:6777
steps:
Expand Down
72 changes: 64 additions & 8 deletions libparsec/crates/client/src/monitors/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
/// So we choose (for the moment at least !) the pragmatic approach of considering
/// SSE errors are the only important ones, so that only the connection monitor have
/// to deal with events.
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

use libparsec_client_connection::{
AuthenticatedCmds, ConnectionError, RateLimiter, SSEResponseOrMissedEvents,
AuthenticatedCmds, ConnectionError, RateLimiter, SSEEvent, SSEResponseOrMissedEvents, SSEStream,
};
use libparsec_platform_async::{
channel, pretend_future_is_send_on_web, select2_biased, stream::StreamExt,
channel, pretend_future_is_send_on_web, select2_biased, sleep, stream::StreamExt,
};
use libparsec_platform_storage::certificates::PerTopicLastTimestamps;
use libparsec_protocol::authenticated_cmds::latest::events_listen::{APIEvent, Rep, Req};
Expand All @@ -66,9 +66,10 @@ pub(crate) async fn start_connection_monitor(
fn dispatch_api_event(event: APIEvent, event_bus: &EventBus) {
match event {
APIEvent::Pinged { .. } => (),
APIEvent::ServerConfig {
APIEvent::OrganizationConfig {
active_users_limit,
user_profile_outsider_allowed,
sse_keepalive_seconds: _,
} => {
let event = EventServerConfigNotified {
active_users_limit,
Expand Down Expand Up @@ -267,8 +268,63 @@ enum ConnectionState {
Online,
}

#[derive(Debug, Default)]
struct KeepaliveTracking {
sse_keepalive: Option<Duration>,
}

impl KeepaliveTracking {
// Add 20% to the keepalive duration to avoid a false positive on
// the timeout if the server is a bit late to send the next event.
const TIMEOUT_FACTOR: f32 = 1.2;

async fn next_event(
&mut self,
stream: &mut SSEStream<Req>,
) -> Option<Result<SSEEvent<Req>, ConnectionError>> {
let result = match self.sse_keepalive {
// No keepalive duration, just wait for the next event
None => stream.next().await,
// We have a keepalive duration, use it as a timeout
Some(keepalive) => {
let timeout = keepalive.mul_f32(Self::TIMEOUT_FACTOR);
select2_biased!(
res = stream.next() => res,
// In case of a timeout, return a `ConnectionError::NoResponse`
// to get the same behavior as if the server was unreachable.
_ = sleep(timeout) => {
log::info!("Keepalive timeout after {:?}", timeout);
Some(Err(ConnectionError::NoResponse(None)))
}
)
}
};
// Update the keepalive duration if we received an `OrganizationConfig` event
if let Some(Ok(SSEEvent {
message:
SSEResponseOrMissedEvents::Response(Rep::Ok(APIEvent::OrganizationConfig {
sse_keepalive_seconds,
..
})),
..
})) = result
{
self.sse_keepalive = sse_keepalive_seconds.map(|x| Duration::from_secs(x.get()));
match self.sse_keepalive {
Some(keepalive) => log::info!(
"Set expected keepalive duration: {} seconds",
keepalive.as_secs()
),
None => log::info!("Unset expected keepalive duration"),
}
}
result
}
}

async fn task_future_factory(cmds: Arc<AuthenticatedCmds>, event_bus: EventBus) {
let mut state = ConnectionState::Offline;
let mut keepalive_tracking = KeepaliveTracking::default();
let mut last_event_id = None;
// Backoff is used to wait longer and longer after each failed connection
// the server.
Expand All @@ -288,7 +344,7 @@ async fn task_future_factory(cmds: Arc<AuthenticatedCmds>, event_bus: EventBus)
// As last monitor to start, we send this event to wake up all the other monitors
event_bus.send(&EventMissedServerEvents);

loop {
'connection_loop: loop {
// Note we listen on `retry_now_rx`
let should_reset_backoff = select2_biased!(
_ = backoff.wait() => false,
Expand All @@ -308,14 +364,14 @@ async fn task_future_factory(cmds: Arc<AuthenticatedCmds>, event_bus: EventBus)
Ok(stream) => stream,
Err(err) => match handle_sse_error(&mut state, &event_bus, err) {
HandleSseErrorOutcome::WaitForOnline
| HandleSseErrorOutcome::WaitForTosAccepted => continue,
| HandleSseErrorOutcome::WaitForTosAccepted => continue 'connection_loop,
HandleSseErrorOutcome::StopMonitor => return,
},
};

backoff.reset();

while let Some(res) = stream.next().await {
while let Some(res) = keepalive_tracking.next_event(&mut stream).await {
match res {
Ok(event) => {
if let Some(retry) = event.retry {
Expand Down Expand Up @@ -352,7 +408,7 @@ async fn task_future_factory(cmds: Arc<AuthenticatedCmds>, event_bus: EventBus)
}
Err(err) => match handle_sse_error(&mut state, &event_bus, err) {
HandleSseErrorOutcome::WaitForOnline
| HandleSseErrorOutcome::WaitForTosAccepted => continue,
| HandleSseErrorOutcome::WaitForTosAccepted => continue 'connection_loop,
HandleSseErrorOutcome::StopMonitor => return,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{collections::HashMap, future::Future, pin::pin, sync::Arc};

use libparsec_platform_async::{
channel, event, pretend_future_is_send_on_web, select3_biased, spawn,
channel, event, pretend_future_is_send_on_web, select2_biased, select3_biased, spawn,
};
use libparsec_types::prelude::*;

Expand Down Expand Up @@ -58,6 +58,7 @@ fn task_future_factory(

let request_stop = event::Event::new();
let stop_requested = request_stop.listen();
let syncer_stop_requested = request_stop.listen();

let stop_cb = Box::new(move || {
request_stop.notify(usize::MAX);
Expand Down Expand Up @@ -89,11 +90,16 @@ fn task_future_factory(
let tx = tx.clone();
let device = device.clone();
async move {
let mut syncer_stop_requested = pin!(syncer_stop_requested);
macro_rules! handle_workspace_sync_error {
($err:expr, $entry_id:expr) => {
match $err {
WorkspaceSyncError::Offline(_) => {
event_bus.wait_server_reconnect().await;
// Make sure we do not block the stopping of the monitor here
select2_biased!(
_ = event_bus.wait_server_reconnect() => {},
_ = &mut syncer_stop_requested => {},
)
}
// We have lost read access to the workspace, the certificates
// ops should soon be notified and work accordingly (typically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ where
T::Response: Debug + PartialEq,
{
let message = match event.event.as_ref() {
"keepalive" => SSEResponseOrMissedEvents::Empty,
"missed_events" => SSEResponseOrMissedEvents::MissedEvents,
"message" if event.data.is_empty() => SSEResponseOrMissedEvents::Empty,
"message" => {
Expand All @@ -106,7 +107,10 @@ where
}

// Unknown event should still be returned given it can modify `retry` param
_ => SSEResponseOrMissedEvents::Empty,
_ => {
log::warn!("Unknown event type: {}", event.event);
SSEResponseOrMissedEvents::Empty
}
};

std::task::Poll::Ready(Ok(SSEEvent {
Expand Down Expand Up @@ -164,7 +168,15 @@ impl RateLimiter {
pub async fn wait(&mut self) {
let duration_to_wait = self.get_duration_to_wait();
self.attempt += 1;
if duration_to_wait.is_zero() {
return;
}
log::info!(
"Retrying in SSE connection in {} seconds",
duration_to_wait.as_secs()
);
libparsec_platform_async::sleep(duration_to_wait).await;
log::info!("Retrying SSE connection");
}

fn get_duration_to_wait(&self) -> Duration {
Expand Down
27 changes: 16 additions & 11 deletions libparsec/crates/client_connection/tests/unit/authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn sse_ok_mocked(env: &TestbedEnv) {
headers,
"\
:keepalive\n\n\
data:hKZzdGF0dXOib2ulZXZlbnStU0VSVkVSX0NPTkZJR7JhY3RpdmVfdXNlcnNfbGltaXTAvXVzZXJfcHJvZmlsZV9vdXRzaWRlcl9hbGxvd2Vkww==\nid:832ea0c75e0d4ca8aedf123a89b3fcc7\n\n\
data:haZzdGF0dXOib2ulZXZlbnSzT1JHQU5JWkFUSU9OX0NPTkZJR7JhY3RpdmVfdXNlcnNfbGltaXTAvXVzZXJfcHJvZmlsZV9vdXRzaWRlcl9hbGxvd2Vkw7Vzc2Vfa2VlcGFsaXZlX3NlY29uZHMe\nid:832ea0c75e0d4ca8aedf123a89b3fcc7\n\n\
event:missed_events\ndata:\n\n\
data:g6ZzdGF0dXOib2ulZXZlbnSmUElOR0VEpHBpbmemZ29vZCAx\nid:4fe5b6ddf29f4c159e6002da2132d80f\n\n\
:keepalive\n\n\
Expand All @@ -227,9 +227,10 @@ async fn sse_ok_mocked(env: &TestbedEnv) {
p_assert_eq!(
sse.next().await.unwrap().unwrap().message,
SSEResponseOrMissedEvents::Response(authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::NoLimit,
user_profile_outsider_allowed: true
user_profile_outsider_allowed: true,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
}
))
);
Expand Down Expand Up @@ -292,9 +293,10 @@ async fn sse_ok_with_server(env: &TestbedEnv) {
p_assert_eq!(
sse.next().await.unwrap().unwrap().message,
SSEResponseOrMissedEvents::Response(authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::NoLimit,
user_profile_outsider_allowed: true
user_profile_outsider_allowed: true,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
}
))
);
Expand Down Expand Up @@ -650,9 +652,10 @@ async fn sse_last_event_id_with_server(env: &TestbedEnv) {
p_assert_eq!(
first_event.message,
SSEResponseOrMissedEvents::Response(authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::NoLimit,
user_profile_outsider_allowed: true
user_profile_outsider_allowed: true,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
}
))
);
Expand Down Expand Up @@ -703,9 +706,10 @@ async fn sse_last_event_id_with_server(env: &TestbedEnv) {
p_assert_eq!(
sse_alice.next().await.unwrap().unwrap().message,
SSEResponseOrMissedEvents::Response(authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::NoLimit,
user_profile_outsider_allowed: true
user_profile_outsider_allowed: true,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
}
))
);
Expand Down Expand Up @@ -744,9 +748,10 @@ async fn sse_last_event_id_with_server(env: &TestbedEnv) {
p_assert_eq!(
sse_alice.next().await.unwrap().unwrap().message,
SSEResponseOrMissedEvents::Response(authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::NoLimit,
user_profile_outsider_allowed: true
user_profile_outsider_allowed: true,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
}
))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
},
{
// This event is always fired first upon SSE connection
"name": "ServerConfig",
"discriminant_value": "SERVER_CONFIG",
"name": "OrganizationConfig",
"discriminant_value": "ORGANIZATION_CONFIG",
"fields": [
{
"name": "user_profile_outsider_allowed",
Expand All @@ -46,6 +46,10 @@
{
"name": "active_users_limit",
"type": "ActiveUsersLimit"
},
{
"name": "sse_keepalive_seconds",
"type": "RequiredOption<NonZeroInteger>"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,39 +55,46 @@ pub fn rep_ok() {
),
),
(
// Generated from Rust implementation (Parsec v3.0.0+dev)
// Generated from Parsec 3.2.5-a.0+dev
// Content:
// event: "SERVER_CONFIG"
// status: 'ok'
// event: 'ORGANIZATION_CONFIG'
// active_users_limit: 8
// user_profile_outsider_allowed: true
// status: "ok"
// sse_keepalive_seconds: 30
// user_profile_outsider_allowed: True
&hex!(
"84a6737461747573a26f6ba56576656e74ad5345525645525f434f4e464947b2616374"
"6976655f75736572735f6c696d697408bd757365725f70726f66696c655f6f75747369"
"6465725f616c6c6f776564c3"
"85a6737461747573a26f6ba56576656e74b34f5247414e495a4154494f4e5f434f4e46"
"4947b26163746976655f75736572735f6c696d697408b57373655f6b656570616c6976"
"655f7365636f6e64731ebd757365725f70726f66696c655f6f757473696465725f616c"
"6c6f776564c3"
)[..],
authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::LimitedTo(8),
user_profile_outsider_allowed: true,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
},
),
),
(
// Generated from Rust implementation (Parsec v3.0.0+dev)
// Generated from Parsec 3.2.5-a.0+dev
// Content:
// event: "SERVER_CONFIG"
// user_profile_outsider_allowed: false
// status: "ok"
// status: 'ok'
// event: 'ORGANIZATION_CONFIG'
// active_users_limit: None
// sse_keepalive_seconds: None
// user_profile_outsider_allowed: False
&hex!(
"84a6737461747573a26f6ba56576656e74ad5345525645525f434f4e464947b2616374"
"6976655f75736572735f6c696d6974c0bd757365725f70726f66696c655f6f75747369"
"6465725f616c6c6f776564c2"
"85a6737461747573a26f6ba56576656e74b34f5247414e495a4154494f4e5f434f4e46"
"4947b26163746976655f75736572735f6c696d6974c0b57373655f6b656570616c6976"
"655f7365636f6e64731ebd757365725f70726f66696c655f6f757473696465725f616c"
"6c6f776564c2"
)[..],
authenticated_cmds::events_listen::Rep::Ok(
authenticated_cmds::events_listen::APIEvent::ServerConfig {
authenticated_cmds::events_listen::APIEvent::OrganizationConfig {
active_users_limit: ActiveUsersLimit::NoLimit,
user_profile_outsider_allowed: false,
sse_keepalive_seconds: Some(30.try_into().unwrap()),
},
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,14 +1076,18 @@ fn internal_quote_field_as_fn_new_conversion(field_name: &Ident, ty: &FieldType)
FieldType::NonRequiredOption(nested) => {
let nested_name = format_ident!("x");
let nested_conversion = internal_quote_field_as_fn_new_conversion(&nested_name, nested);
quote! { #field_name.map(|x| #nested_conversion) }
quote! { match #field_name {
None => None,
Some(x) => Some(#nested_conversion),
}}
}
FieldType::RequiredOption(nested) => {
let nested_name = format_ident!("x");
let nested_conversion = internal_quote_field_as_fn_new_conversion(&nested_name, nested);
quote! {
#field_name.map(|x| #nested_conversion)
}
quote! { match #field_name {
None => None,
Some(x) => Some(#nested_conversion),
}}
}
FieldType::Tuple(items) => {
let (xs, xs_conversions): (Vec<_>, Vec<_>) = items
Expand Down
Loading
Loading