Skip to content

Commit

Permalink
fix(network): handle skipped envelopes right way (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
laplab authored Jul 26, 2023
1 parent 845826b commit 419d0c5
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 48 deletions.
6 changes: 6 additions & 0 deletions elfo-core/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,12 @@ cfg_network!({
nodes: Option<Nodes>,
}

impl<'a> RegisterRemoteGroupGuard<'a> {
pub fn handle_addr(&self) -> Addr {
self.handle_addr
}
}

impl Drop for RegisterRemoteGroupGuard<'_> {
fn drop(&mut self) {
// Disable direct messaging.
Expand Down
139 changes: 107 additions & 32 deletions elfo-network/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,25 @@ pub(crate) struct DecodeStats {
pub(crate) total_messages_decoding_skipped: u64,
}

#[derive(Debug)]
pub(crate) struct EnvelopeDetails {
pub(crate) kind: u8,
pub(crate) sender: Addr,
pub(crate) recipient: Addr,
pub(crate) request_id: Option<RequestId>,
pub(crate) trace_id: TraceId,
}

pub(crate) enum DecodeState {
/// Buffer needs to contain at least `total_length_estimate` bytes in total
/// in order for the decoder to make progress.
NeedMoreData { total_length_estimate: usize },
/// There was a non-fatal error while decoding a message residing in
/// `bytes_consumed` bytes, so it was skipped.
Skipped { bytes_consumed: usize },
Skipped {
bytes_consumed: usize,
details: Option<EnvelopeDetails>,
},
/// Decoder decoded a value, which occupied `bytes_consumed` bytes in the
/// buffer.
Done {
Expand Down Expand Up @@ -67,35 +79,73 @@ pub(crate) fn decode(input: &[u8], stats: &mut DecodeStats) -> eyre::Result<Deco

stats.total_messages_decoding_skipped += 1;

let error = decode_result.unwrap_err();
// TODO: cooldown/metrics.
error!(
message = "cannot decode message, skipping",
error = format!("{:#}", error.error),
protocol = %error.protocol.as_deref().unwrap_or("<unknown>"),
name = %error.name.as_deref().unwrap_or("<unknown>"),
);
let DecodeError { message, details } = decode_result.unwrap_err();
if let Some(details) = &details {
error!(
message = "cannot decode message, skipping",
error = format!("{:#}", message.error),
protocol = message.protocol.as_deref().unwrap_or("<unknown>"),
name = message.name.as_deref().unwrap_or("<unknown>"),
kind = ?details.kind,
sender = %details.sender,
recipient = %details.recipient,
request_id = ?details.request_id,
trace_id = %details.trace_id,
);
} else {
error!(
message = "cannot decode message, skipping",
error = format!("{:#}", message.error),
protocol = message.protocol.as_deref().unwrap_or("<unknown>"),
name = message.name.as_deref().unwrap_or("<unknown>"),
);
}

Ok(DecodeState::Skipped {
bytes_consumed: size,
details,
})
}

#[derive(Debug)]
struct DecodeError {
message: MessageDecodeError,
details: Option<EnvelopeDetails>,
}

impl<T> From<T> for DecodeError
where
T: Into<eyre::Report>,
{
fn from(value: T) -> Self {
DecodeError {
message: MessageDecodeError {
protocol: None,
name: None,
error: value.into(),
},
details: None,
}
}
}

#[derive(Debug)]
struct MessageDecodeError {
protocol: Option<String>,
name: Option<String>,
error: eyre::Report,
}

impl<T> From<T> for DecodeError
impl<T> From<T> for MessageDecodeError
where
T: Into<eyre::Report>,
{
fn from(error: T) -> Self {
Self {
fn from(value: T) -> Self {
MessageDecodeError {
protocol: None,
name: None,
error: error.into(),
error: value.into(),
}
}
}
Expand All @@ -104,11 +154,11 @@ fn get_request_id(frame: &mut Cursor<&[u8]>) -> eyre::Result<RequestId> {
Ok(RequestId::from_ffi(frame.read_u64::<LittleEndian>()?))
}

fn get_message(frame: &mut Cursor<&[u8]>) -> Result<AnyMessage, DecodeError> {
fn get_message(frame: &mut Cursor<&[u8]>) -> Result<AnyMessage, MessageDecodeError> {
let protocol = get_str(frame).wrap_err("invalid message protocol")?;
let name = get_str(frame)
.wrap_err("invalid message name")
.map_err(|error| DecodeError {
.map_err(|error| MessageDecodeError {
protocol: Some(protocol.to_string()),
name: None,
error,
Expand All @@ -118,15 +168,16 @@ fn get_message(frame: &mut Cursor<&[u8]>) -> Result<AnyMessage, DecodeError> {
let position = frame.position() as usize;
let remaining_slice = &frame.get_ref()[position..];

let result =
AnyMessage::read_msgpack(remaining_slice, protocol, name).map_err(|error| DecodeError {
let result = AnyMessage::read_msgpack(remaining_slice, protocol, name).map_err(|error| {
MessageDecodeError {
protocol: Some(protocol.to_string()),
name: Some(name.to_string()),
error: error.into(),
})?;
}
})?;
frame.set_position(frame.get_ref().len() as u64);

result.ok_or_else(|| DecodeError {
result.ok_or_else(|| MessageDecodeError {
protocol: Some(protocol.to_string()),
name: Some(name.to_string()),
error: eyre!("unknown message"),
Expand Down Expand Up @@ -155,24 +206,48 @@ fn do_decode(frame: &mut Cursor<&[u8]>) -> Result<NetworkEnvelope, DecodeError>
let recipient = Addr::from_bits(frame.read_u64::<LittleEndian>()?).into_local();
let trace_id = TraceId::try_from(frame.read_u64::<LittleEndian>()?)?;

let map_decode_error = |result: Result<AnyMessage, MessageDecodeError>,
request_id: Option<RequestId>|
-> Result<AnyMessage, DecodeError> {
result.map_err(|message| DecodeError {
message,
details: Some(EnvelopeDetails {
kind,
sender,
recipient,
request_id,
trace_id,
}),
})
};

use NetworkEnvelopePayload::*;
let payload = match kind {
KIND_REGULAR => Regular {
message: get_message(frame)?,
},
KIND_REQUEST_ANY => RequestAny {
request_id: get_request_id(frame)?,
message: get_message(frame)?,
},
KIND_REQUEST_ALL => RequestAll {
request_id: get_request_id(frame)?,
message: get_message(frame)?,
},
KIND_RESPONSE_OK => Response {
request_id: get_request_id(frame)?,
message: Ok(get_message(frame)?),
is_last: flags & FLAG_IS_LAST_RESPONSE != 0,
message: map_decode_error(get_message(frame), None)?,
},
KIND_REQUEST_ANY => {
let request_id = get_request_id(frame)?;
RequestAny {
request_id,
message: map_decode_error(get_message(frame), Some(request_id))?,
}
}
KIND_REQUEST_ALL => {
let request_id = get_request_id(frame)?;
RequestAll {
request_id,
message: map_decode_error(get_message(frame), Some(request_id))?,
}
}
KIND_RESPONSE_OK => {
let request_id = get_request_id(frame)?;
Response {
request_id: get_request_id(frame)?,
message: Ok(map_decode_error(get_message(frame), Some(request_id))?),
is_last: flags & FLAG_IS_LAST_RESPONSE != 0,
}
}
KIND_RESPONSE_FAILED => Response {
request_id: get_request_id(frame)?,
message: Err(RequestError::Failed),
Expand Down
2 changes: 1 addition & 1 deletion elfo-network/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ mod tests {
}

let state = decode(&bytes[message_size..], &mut Default::default()).unwrap();
if let DecodeState::Skipped { bytes_consumed } = state {
if let DecodeState::Skipped { bytes_consumed, .. } = state {
assert_eq!(bytes_consumed, message_size);
} else {
panic!("expected the second message to be skipped");
Expand Down
72 changes: 67 additions & 5 deletions elfo-network/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ use self::{
requests::OutgoingRequests,
};
use crate::{
codec::format::{NetworkEnvelope, NetworkEnvelopePayload},
codec::{
decode::EnvelopeDetails,
format::{NetworkEnvelope, NetworkEnvelopePayload, KIND_REQUEST_ALL, KIND_REQUEST_ANY},
},
frame::write::FrameState,
protocol::{internode, HandleConnection},
rtt::Rtt,
socket::{ReadHalf, WriteHalf},
socket::{ReadError, ReadHalf, WriteHalf},
NetworkContext,
};

Expand Down Expand Up @@ -94,7 +97,7 @@ impl Connection {
tx: local_tx.clone(),
tx_flows: tx_flows.clone(),
};
let _guard = self.topology.register_remote(
let remote_group_guard = self.topology.register_remote(
self.local.0,
(self.remote.0, self.remote.1),
&self.remote.2,
Expand All @@ -118,6 +121,7 @@ impl Connection {
.map(|g| g.addr)
.find(|a| a.group_no() == self.local.0)
.expect("invalid local group"),
handle_addr: remote_group_guard.handle_addr(),
time_origin,
// TODO: the number of samples should be calculated based on telemetry scrape
// interval, but it's not povideded for now by the elfo core.
Expand Down Expand Up @@ -324,6 +328,7 @@ fn make_network_envelope(item: KanalItem) -> (NetworkEnvelope, Option<ResponseTo
struct SocketReader {
ctx: Context,
group_addr: Addr,
handle_addr: Addr,
time_origin: Instant,
rtt: Rtt,
rx: ReadHalf,
Expand All @@ -335,8 +340,21 @@ struct SocketReader {

impl SocketReader {
async fn exec(mut self) -> ConnectionClosed {
// TODO: error handling.
while let Some(network_envelope) = self.rx.recv().await.unwrap() {
loop {
let network_envelope = match self.rx.recv().await {
Ok(Some(envelope)) => envelope,
Ok(None) => break,
Err(ReadError::EnvelopeSkipped(details)) => {
scope::set_trace_id(details.trace_id);
self.handle_skipped_message(details);
continue;
}
Err(ReadError::Fatal(e)) => {
// TODO: error handling.
panic!("fatal error while reading from socket: {:#}", e);
}
};

scope::set_trace_id(network_envelope.trace_id);

let (sender, recipient) = (network_envelope.sender, network_envelope.recipient);
Expand All @@ -361,6 +379,48 @@ impl SocketReader {
ConnectionClosed
}

/// Ensures that messages that were skipped due to errors during decoding
/// are properly accounted for in flow control. Also notifies the remote
/// actor if the message was a request in order to avoid indefinite
/// waiting from the remote actor's side.
fn handle_skipped_message(&self, details: EnvelopeDetails) {
let update_flow = {
let mut rx_flows = self.rx_flows.lock();
if details.recipient == Addr::NULL {
rx_flows.acquire_routed(true);
rx_flows.release_routed()
} else {
let mut rx_flow = rx_flows.get_or_create_flow(details.recipient);
rx_flow.acquire_direct(true);
rx_flow.release_direct()
}
};

if let Some(envelope) = update_flow.map(make_system_envelope) {
let item = KanalItem::simple(Addr::NULL, envelope);
self.tx.try_send(item).unwrap();
}

if details.kind == KIND_REQUEST_ALL || details.kind == KIND_REQUEST_ANY {
let sender = self
.ctx
.book()
.get(self.handle_addr)
.expect("bug: remote group is missing in the address book");
let token = ResponseToken::new(
details.sender,
details.request_id.expect("bug: request_id is missing"),
details.trace_id,
self.ctx.book().clone(),
);
// This can be the first time we have received a message from this sender,
// so we need to introduce the flow which will be used in `sender.respond()`
// below.
self.tx_flows.add_flow_if_needed(details.sender);
sender.respond(token, Err(RequestError::Failed));
}
}

fn make_envelope(&self, network_envelope: NetworkEnvelope) -> Option<Envelope> {
let (message, message_kind) = match network_envelope.payload {
NetworkEnvelopePayload::Regular { message } => (
Expand Down Expand Up @@ -434,6 +494,8 @@ impl SocketReader {
)
});

// Since this is a response to a request which originated from this node,
// all the neccessary flows have been already added.
object.respond(token, envelope);
return None;
}
Expand Down
28 changes: 28 additions & 0 deletions elfo-network/src/connection/flow_control.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
//! Implements data flow control for connections to remote actor groups.
//!
//! The basic idea is that we want to adjust the amount of traffic local actor
//! sends to a remote actor according to it's processing rate. This is important
//! for two reasons:
//! 1. Avoid overwhelming slow remote actor with lots of messages from a fast
//! local actor
//! 2. Avoid network congestion
//!
//! This is implemented through maintaining two "windows": one for the sender
//! and another one for the receiver. Window is an estimate of how many messages
//! the receiver can handle during some period of time, implemented as a
//! semaphore. Before sending a message, sender acquires budget from the window.
//! When the receiver acknowledges that a message was processed, budget is
//! returned to the window.
//!
//! Finally, there are two ways the sender can send a message: bounded and
//! unbounded. Bounded send waits until there is enough budget in the sender's
//! window before sending the message. Unbounded send always gets the budget
//! immediately, even if there is zero available. This can lead to window size
//! being a negative number.
//!
//! See `TxFlowControl` and `RxFlowControl` for implementations of sender's and
//! receiver's windows respectively.
//!
//! If none of this makes sense, see https://en.wikipedia.org/wiki/Flow_control_(data) and
//! https://en.wikipedia.org/wiki/Sliding_window_protocol for a more elaborate explanation.

use std::sync::atomic::{AtomicI32, Ordering};

// We don't want to send window updates for tiny changes, but instead
Expand Down
Loading

0 comments on commit 419d0c5

Please sign in to comment.