Skip to content

Commit

Permalink
feat: log more information on error
Browse files Browse the repository at this point in the history
  • Loading branch information
PureWhiteWu committed Sep 14, 2023
1 parent f05a888 commit ee94d93
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 231 deletions.
419 changes: 208 additions & 211 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-thrift"
version = "0.7.2"
version = "0.7.3"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
Ok(()) => Ok(()),
Err(mut e) => {
let msg = format!(
", rpcinfo: {:?}, encode real size: {}, malloc size: {}",
", cx: {:?}, encode real size: {}, malloc size: {}",
cx.rpc_info(),
real_size,
malloc_size
Expand Down
2 changes: 2 additions & 0 deletions volo-thrift/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ pub struct ServerCxInner {
pub common_stats: CommonStats,
}

#[derive(Debug)]
pub struct ClientContext(pub(crate) RpcCx<ClientCxInner, Config>);

newtype_impl_context!(ClientContext, Config, 0);
Expand Down Expand Up @@ -243,6 +244,7 @@ thread_local! {
pub(crate) static SERVER_CONTEXT_CACHE: std::cell::RefCell<Vec<ServerContext>> = std::cell::RefCell::new(Vec::with_capacity(128));
}

#[derive(Debug)]
pub struct ServerContext(pub(crate) RpcCx<ServerCxInner, Config>);

impl Default for ServerContext {
Expand Down
4 changes: 2 additions & 2 deletions volo-thrift/src/transport/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ where
return Err(Error::Transport(pilota::thrift::TransportError::new(
TransportErrorKind::EndOfFile,
format!(
"an unexpected end of file from server, rpcinfo: {:?}",
cx.rpc_info
"an unexpected end of file from server, cx: {:?}",
cx
),
)));
}
Expand Down
8 changes: 4 additions & 4 deletions volo-thrift/src/transport/multiplex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn serve<Svc, Req, Resp, E, D>(
Some((mi, mut cx, msg)) => {
if let Err(e) = metainfo::METAINFO.scope(RefCell::new(mi), encoder.encode::<Resp, ServerContext>(&mut cx, msg)).await {
// log it
error!("[VOLO] server send response error: {:?}, rpcinfo: {:?}, peer_addr: {:?}", e, cx.rpc_info, peer_addr);
error!("[VOLO] server send response error: {:?}, cx: {:?}, peer_addr: {:?}", e, cx, peer_addr);
stat_tracer.iter().for_each(|f| f(&cx));
return;
}
Expand All @@ -73,7 +73,7 @@ pub async fn serve<Svc, Req, Resp, E, D>(
Some((mut cx, msg)) => {
if let Err(e) = encoder.encode::<DummyMessage, ServerContext>(&mut cx, msg).await {
// log it
error!("[VOLO] server send error error: {:?}, rpcinfo: {:?}, peer_addr: {:?}", e, cx.rpc_info, peer_addr);
error!("[VOLO] server send error error: {:?}, cx: {:?}, peer_addr: {:?}", e, cx, peer_addr);
}
stat_tracer.iter().for_each(|f| f(&cx));
return;
Expand Down Expand Up @@ -111,9 +111,9 @@ pub async fn serve<Svc, Req, Resp, E, D>(
// receives a message
msg = decoder.decode(&mut cx) => {
tracing::debug!(
"[VOLO] received message: {:?}, rpcinfo: {:?}, peer_addr: {:?}",
"[VOLO] received message: {:?}, cx: {:?}, peer_addr: {:?}",
msg.as_ref().map(|msg| msg.as_ref().map(|msg| &msg.meta)),
cx.rpc_info,
cx,
peer_addr
);
match msg {
Expand Down
10 changes: 5 additions & 5 deletions volo-thrift/src/transport/pingpong/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ pub async fn serve<Svc, Req, Resp, E, D, SP>(
out = decoder.decode(&mut cx) => out
};
debug!(
"[VOLO] received message: {:?}, rpcinfo: {:?}, peer_addr: {:?}",
"[VOLO] received message: {:?}, cx: {:?}, peer_addr: {:?}",
msg.as_ref().map(|msg| msg.as_ref().map(|msg| &msg.meta)),
cx.rpc_info,
cx,
peer_addr
);

Expand Down Expand Up @@ -100,7 +100,7 @@ pub async fn serve<Svc, Req, Resp, E, D, SP>(
result
}.instrument(span_provider.on_encode(tracing_cx)).await {
// log it
error!("[VOLO] server send response error: {:?}, rpcinfo: {:?}, peer_addr: {:?}", e, cx.rpc_info, peer_addr);
error!("[VOLO] server send response error: {:?}, cx: {:?}, peer_addr: {:?}", e, cx, peer_addr);
stat_tracer.iter().for_each(|f| f(&cx));
return Err(());
}
Expand All @@ -114,13 +114,13 @@ pub async fn serve<Svc, Req, Resp, E, D, SP>(
return Err(());
}
Err(e) => {
error!("[VOLO] pingpong server decode error: {:?}, peer_addr: {:?}", e, peer_addr);
error!("[VOLO] pingpong server decode error: {:?}, cx: {:?}, peer_addr: {:?}", e, cx, peer_addr);
cx.msg_type = Some(TMessageType::Exception);
if !matches!(e, Error::Transport(_)) {
let msg = ThriftMessage::mk_server_resp(&cx, Err::<DummyMessage, _>(e))
.unwrap();
if let Err(e) = encoder.encode(&mut cx, msg).await {
error!("[VOLO] server send error error: {:?}, rpcinfo: {:?}, peer_addr: {:?}", e, cx.rpc_info, peer_addr);
error!("[VOLO] server send error error: {:?}, cx: {:?}, peer_addr: {:?}", e, cx, peer_addr);
}
}
stat_tracer.iter().for_each(|f| f(&cx));
Expand Down
9 changes: 4 additions & 5 deletions volo-thrift/src/transport/pingpong/thrift_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::atomic::AtomicUsize;

use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use volo::context::Context;

use crate::{
codec::{Decoder, Encoder, MakeCodec},
Expand Down Expand Up @@ -92,23 +91,23 @@ where
) -> Result<Option<ThriftMessage<T>>, Error> {
let thrift_msg = self.decoder.decode(cx).await.map_err(|e| {
let mut e = e;
e.append_msg(&format!(", rpcinfo: {:?}", cx.rpc_info()));
e.append_msg(&format!(", cx: {:?}", cx));
tracing::error!("[VOLO] transport[{}] decode error: {}", self.id, e);
e
})?;

if let Some(ThriftMessage { meta, .. }) = &thrift_msg {
if meta.seq_id != cx.seq_id {
tracing::error!(
"[VOLO] transport[{}] seq_id not match: {} != {}, rpcinfo: {:?}",
"[VOLO] transport[{}] seq_id not match: {} != {}, cx: {:?}",
self.id,
meta.seq_id,
cx.seq_id,
cx.rpc_info(),
cx,
);
return Err(Error::Application(ApplicationError::new(
ApplicationErrorKind::BAD_SEQUENCE_ID,
format!("seq_id not match, rpcinfo: {:?}", cx.rpc_info()),
format!("seq_id not match, cx: {:?}", cx),
)));
}
};
Expand Down
2 changes: 1 addition & 1 deletion volo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo"
version = "0.5.4"
version = "0.5.5"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion volo/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ macro_rules! newtype_impl_context {

const DEFAULT_MAP_CAPACITY: usize = 10;

#[derive(Debug)]
pub struct RpcCx<I, Config> {
pub rpc_info: RpcInfo<Config>,
pub inner: I,
pub extensions: Extensions,
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct Extensions(TypeMap);

impl std::ops::Deref for Extensions {
Expand Down

0 comments on commit ee94d93

Please sign in to comment.