diff --git a/Cargo.toml b/Cargo.toml index 0aab2b03..d18622a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,8 @@ either = "1.6" fail = "0.4" futures = { version = "0.3" } lazy_static = "1" -log = "0.4" +log = { version = "0.4", features = ["kv_unstable"] } +minitrace = "0.6.2" pin-project = "1" prometheus = { version = "0.13", default-features = false } prost = "0.12" diff --git a/src/kv/mod.rs b/src/kv/mod.rs index 489110e6..9e9cc6b1 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -14,7 +14,7 @@ pub use key::Key; pub use kvpair::KvPair; pub use value::Value; -struct HexRepr<'a>(pub &'a [u8]); +pub struct HexRepr<'a>(pub &'a [u8]); impl<'a> fmt::Display for HexRepr<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 3df4d255..2176d7ff 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -103,6 +103,7 @@ impl Connection { Connection { security_mgr } } + #[minitrace::trace] pub async fn connect_cluster( &self, endpoints: &[String], @@ -122,6 +123,7 @@ impl Connection { } // Re-establish connection with PD leader in asynchronous fashion. + #[minitrace::trace] pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> { warn!("updating pd client"); let start = Instant::now(); diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 3c17a49e..ffe2a7df 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -8,6 +8,8 @@ use std::time::Duration; use std::time::Instant; use async_trait::async_trait; +use log::debug; +use minitrace::prelude::*; use tokio::sync::RwLock; use tokio::time::sleep; @@ -74,7 +76,9 @@ macro_rules! retry_core { ($self: ident, $tag: literal, $call: expr) => {{ let stats = pd_stats($tag); let mut last_err = Ok(()); - for _ in 0..LEADER_CHANGE_RETRY { + for retry in 0..LEADER_CHANGE_RETRY { + let _span = LocalSpan::enter_with_local_parent("RetryClient::retry"); + let res = $call; match stats.done(res) { @@ -82,6 +86,7 @@ macro_rules! retry_core { Err(e) => last_err = Err(e), } + debug!("retry {} on last_err: {:?}", retry, last_err); let mut reconnect_count = MAX_REQUEST_COUNT; while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await { reconnect_count -= 1; @@ -142,6 +147,7 @@ impl RetryClient { impl RetryClientTrait for RetryClient { // These get_* functions will try multiple times to make a request, reconnecting as necessary. // It does not know about encoding. Caller should take care of it. + #[minitrace::trace] async fn get_region(self: Arc, key: Vec) -> Result { retry_mut!(self, "get_region", |cluster| { let key = key.clone(); @@ -156,6 +162,7 @@ impl RetryClientTrait for RetryClient { }) } + #[minitrace::trace] async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { retry_mut!(self, "get_region_by_id", |cluster| async { cluster @@ -167,6 +174,7 @@ impl RetryClientTrait for RetryClient { }) } + #[minitrace::trace] async fn get_store(self: Arc, id: StoreId) -> Result { retry_mut!(self, "get_store", |cluster| async { cluster @@ -176,6 +184,7 @@ impl RetryClientTrait for RetryClient { }) } + #[minitrace::trace] async fn get_all_stores(self: Arc) -> Result> { retry_mut!(self, "get_all_stores", |cluster| async { cluster @@ -185,10 +194,12 @@ impl RetryClientTrait for RetryClient { }) } + #[minitrace::trace] async fn get_timestamp(self: Arc) -> Result { retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) } + #[minitrace::trace] async fn update_safepoint(self: Arc, safepoint: u64) -> Result { retry_mut!(self, "update_gc_safepoint", |cluster| async { cluster @@ -277,7 +288,7 @@ mod test { } async fn retry_ok(client: Arc) -> Result<()> { - retry!(client, "test", |_c| ready(Ok::<_, Error>(()))) + retry_mut!(client, "test", |_c| ready(Ok::<_, Error>(()))) } executor::block_on(async { @@ -342,7 +353,7 @@ mod test { client: Arc, max_retries: Arc, ) -> Result<()> { - retry!(client, "test", |c| { + retry_mut!(client, "test", |c| { c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1; diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index a1cc7fbd..bddb613a 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -22,6 +22,7 @@ use futures::task::Context; use futures::task::Poll; use log::debug; use log::info; +use minitrace::prelude::*; use pin_project::pin_project; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -63,6 +64,7 @@ impl TimestampOracle { Ok(TimestampOracle { request_tx }) } + #[minitrace::trace] pub(crate) async fn get_timestamp(self) -> Result { debug!("getting current timestamp"); let (request, response) = oneshot::channel(); @@ -74,6 +76,7 @@ impl TimestampOracle { } } +#[minitrace::trace] async fn run_tso( cluster_id: u64, mut pd_client: PdClient, @@ -98,6 +101,9 @@ async fn run_tso( let mut responses = pd_client.tso(request_stream).await?.into_inner(); while let Some(Ok(resp)) = responses.next().await { + let _span = LocalSpan::enter_with_local_parent("handle_response"); + debug!("got response: {:?}", resp); + { let mut pending_requests = pending_requests.lock().await; allocate_timestamps(&resp, &mut pending_requests)?; @@ -128,6 +134,7 @@ struct TsoRequestStream { impl Stream for TsoRequestStream { type Item = TsoRequest; + #[minitrace::trace] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); @@ -152,6 +159,12 @@ impl Stream for TsoRequestStream { } } + debug!( + "got requests: len {}, pending_requests {}", + requests.len(), + pending_requests.len() + ); + if !requests.is_empty() { let req = TsoRequest { header: Some(RequestHeader { @@ -168,6 +181,12 @@ impl Stream for TsoRequestStream { }; pending_requests.push_back(request_group); + debug!( + "sending request to PD: {:?}, pending_requests {}", + req, + pending_requests.len() + ); + Poll::Ready(Some(req)) } else { // Set the waker to the context, then the stream can be waked up after the pending queue diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 23bfce73..2e4bfc8e 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::any::Any; +use std::fmt::Formatter; use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -404,6 +405,14 @@ impl Request for RawCoprocessorRequest { } } +impl std::fmt::Debug for RawCoprocessorRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RawCoprocessorRequest") + .field("inner", &self.inner) + .finish() + } +} + impl KvRequest for RawCoprocessorRequest { type Response = kvrpcpb::RawCoprocessorResponse; } diff --git a/src/request/mod.rs b/src/request/mod.rs index aecaf26d..b975dc4b 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -129,7 +129,7 @@ mod test { impl HasLocks for MockRpcResponse {} - #[derive(Clone)] + #[derive(Debug, Clone)] struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/plan.rs b/src/request/plan.rs index ab72e8aa..1cbd6276 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -9,6 +9,8 @@ use futures::future::try_join_all; use futures::prelude::*; use log::debug; use log::info; +use minitrace::future::FutureExt; +use minitrace::prelude::*; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -57,6 +59,7 @@ pub struct Dispatch { impl Plan for Dispatch { type Result = Req::Response; + #[minitrace::trace] async fn execute(&self) -> Result { let stats = tikv_stats(self.request.label()); let result = self @@ -104,6 +107,7 @@ where { // A plan may involve multiple shards #[async_recursion] + #[minitrace::trace] async fn single_plan_handler( pd_client: Arc, current_plan: P, @@ -117,14 +121,17 @@ where let (shard, region_store) = shard?; let mut clone = current_plan.clone(); clone.apply_shard(shard, ®ion_store)?; - let handle = tokio::spawn(Self::single_shard_handler( - pd_client.clone(), - clone, - region_store, - backoff.clone(), - permits.clone(), - preserve_region_results, - )); + let handle = tokio::spawn( + Self::single_shard_handler( + pd_client.clone(), + clone, + region_store, + backoff.clone(), + permits.clone(), + preserve_region_results, + ) + .in_span(Span::enter_with_local_parent("single_shard_handler")), + ); handles.push(handle); } @@ -149,6 +156,7 @@ where } #[async_recursion] + #[minitrace::trace] async fn single_shard_handler( pd_client: Arc, plan: P, @@ -210,11 +218,17 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level + #[minitrace::trace] async fn handle_region_error( pd_client: Arc, e: errorpb::Error, region_store: RegionStore, ) -> Result { + debug!( + "handle_region_error, error:{:?}, region_store:{:?}", + e, region_store + ); + let ver_id = region_store.region_with_leader.ver_id(); if let Some(not_leader) = e.not_leader { if let Some(leader) = not_leader.leader { @@ -266,6 +280,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level + #[minitrace::trace] async fn on_region_epoch_not_match( pd_client: Arc, region_store: RegionStore, @@ -302,6 +317,7 @@ where Ok(false) } + #[minitrace::trace] async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -349,6 +365,7 @@ where { type Result = Vec>; + #[minitrace::trace] async fn execute(&self) -> Result { // Limit the maximum concurrency of multi-region request. If there are // too many concurrent requests, TiKV is more likely to return a "TiKV @@ -469,6 +486,7 @@ impl>>, M: Me { type Result = M::Out; + #[minitrace::trace] async fn execute(&self) -> Result { self.merge.merge(self.inner.execute().await?) } @@ -565,27 +583,43 @@ where { type Result = P::Result; + #[minitrace::trace] async fn execute(&self) -> Result { let mut result = self.inner.execute().await?; let mut clone = self.clone(); + let mut retry_cnt = 0; loop { + retry_cnt += 1; + let _span = LocalSpan::enter_with_local_parent("ResolveLock::execute::retry") + .with_property(|| ("retry_count", retry_cnt.to_string())); + let locks = result.take_locks(); if locks.is_empty() { + debug!("ResolveLock::execute ok"); return Ok(result); } if self.backoff.is_none() { + debug!("ResolveLock::execute lock error"); return Err(Error::ResolveLockError(locks)); } let pd_client = self.pd_client.clone(); let live_locks = resolve_locks(locks, pd_client.clone()).await?; if live_locks.is_empty() { + debug!("ResolveLock::execute lock error retry (resolved)",); result = self.inner.execute().await?; } else { match clone.backoff.next_delay_duration() { - None => return Err(Error::ResolveLockError(live_locks)), + None => { + debug!("ResolveLock::execute lock error"); + return Err(Error::ResolveLockError(live_locks)); + } Some(delay_duration) => { + debug!( + "ResolveLock::execute lock error retry (delay {:?})", + delay_duration + ); sleep(delay_duration).await; result = clone.inner.execute().await?; } @@ -595,7 +629,7 @@ where } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct CleanupLocksResult { pub region_error: Option, pub key_error: Option>, @@ -667,6 +701,7 @@ where { type Result = CleanupLocksResult; + #[minitrace::trace] async fn execute(&self) -> Result { let mut result = CleanupLocksResult::default(); let mut inner = self.inner.clone(); diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 8e2329e7..e66ce03f 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -159,6 +159,7 @@ where /// Preserve all results, even some of them are Err. /// To pass all responses to merge, and handle partial successful results correctly. + #[minitrace::trace] pub fn retry_multi_region_preserve_results( self, backoff: Backoff, diff --git a/src/store/mod.rs b/src/store/mod.rs index a244a1bc..fdfb232b 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -6,6 +6,7 @@ mod request; use std::cmp::max; use std::cmp::min; +use std::fmt; use std::sync::Arc; use async_trait::async_trait; @@ -33,6 +34,33 @@ pub struct RegionStore { pub client: Arc, } +impl fmt::Debug for RegionStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RegionStore") + .field("region_id", &self.region_with_leader.region.id) + .field( + "region_version", + &self + .region_with_leader + .region + .region_epoch + .as_ref() + .map(|e| e.version) + .unwrap_or_default(), + ) + .field( + "leader_store_id", + &self + .region_with_leader + .leader + .as_ref() + .map(|l| l.store_id) + .unwrap_or_default(), + ) + .finish() + } +} + #[derive(new, Clone)] pub struct Store { pub client: Arc, diff --git a/src/store/request.rs b/src/store/request.rs index e11fc8f1..f3b5e450 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -1,6 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use std::any::Any; +use std::fmt::Debug; use std::time::Duration; use async_trait::async_trait; @@ -13,7 +14,7 @@ use crate::Error; use crate::Result; #[async_trait] -pub trait Request: Any + Sync + Send + 'static { +pub trait Request: Any + Sync + Send + Debug + 'static { async fn dispatch( &self, client: &TikvClient, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4bcb16d9..55275a3c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use log::as_debug; use log::debug; use log::info; @@ -248,14 +249,17 @@ impl Client { /// /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). /// We skip the second step "delete ranges" which is an optimization for TiDB. - pub async fn gc(&self, safepoint: Timestamp) -> Result { - debug!("invoking transactional gc request"); + #[minitrace::trace] + pub async fn gc(&self, range: impl Into, safepoint: Timestamp) -> Result { + let range = range.into(); + debug!(range = as_debug!(range); "invoking transactional gc request"); let options = ResolveLocksOptions { batch_size: SCAN_LOCK_BATCH_SIZE, ..Default::default() }; - self.cleanup_locks(.., &safepoint, options).await?; + + self.cleanup_locks(range, &safepoint, options).await?; // update safepoint to PD let res: bool = self @@ -269,17 +273,19 @@ impl Client { Ok(res) } + #[minitrace::trace] pub async fn cleanup_locks( &self, range: impl Into, safepoint: &Timestamp, options: ResolveLocksOptions, ) -> Result { - debug!("invoking cleanup async commit locks"); + let range = range.into(); + debug!(range = as_debug!(range); "invoking transactional gc request"); // scan all locks with ts <= safepoint let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); - let req = new_scan_lock_request(range.into(), safepoint, options.batch_size); + let req = new_scan_lock_request(range, safepoint, options.batch_size); let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) .cleanup_locks(ctx.clone(), options, backoff) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index afb1d6c4..ab91ed6b 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -5,13 +5,17 @@ use std::collections::HashSet; use std::sync::Arc; use fail::fail_point; +use log::as_display; use log::debug; use log::error; +use log::info; +use minitrace::prelude::*; use tokio::sync::RwLock; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::OPTIMISTIC_BACKOFF; +use crate::kv::HexRepr; use crate::pd::PdClient; use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; @@ -41,6 +45,7 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; /// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get /// its status (committed or rolled back). Then, we use the status of its primary lock to determine /// the status of the other keys in the same transaction. +#[minitrace::trace] pub async fn resolve_locks( locks: Vec, pd_client: Arc, @@ -54,11 +59,23 @@ pub async fn resolve_locks( ts.physical - Timestamp::from_version(lock.lock_version).physical >= lock.lock_ttl as i64 }); + debug!( + "resolving locks: expired_locks {:?}, live_locks {:?}", + expired_locks, live_locks + ); // records the commit version of each primary lock (representing the status of the transaction) let mut commit_versions: HashMap = HashMap::new(); let mut clean_regions: HashMap> = HashMap::new(); for lock in expired_locks { + let _span = + LocalSpan::enter_with_local_parent("cleanup_expired_lock").with_properties(|| { + [ + ("lock_version", lock.lock_version.to_string()), + ("primary_lock", HexRepr(&lock.primary_lock).to_string()), + ] + }); + let region_ver_id = pd_client .region_for_key(&lock.primary_lock.clone().into()) .await? @@ -75,6 +92,7 @@ pub async fn resolve_locks( let commit_version = match commit_versions.get(&lock.lock_version) { Some(&commit_version) => commit_version, None => { + debug!("cleanup lock"); let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) @@ -84,6 +102,7 @@ pub async fn resolve_locks( .post_process_default() .plan(); let commit_version = plan.execute().await?; + debug!("cleanup lock done: commit_version {}", commit_version); commit_versions.insert(lock.lock_version, commit_version); commit_version } @@ -104,6 +123,7 @@ pub async fn resolve_locks( Ok(live_locks) } +#[minitrace::trace] async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, @@ -134,6 +154,7 @@ async fn resolve_lock_with_retry( // ResolveLockResponse can have at most 1 error match errors.pop() { e @ Some(Error::RegionError(_)) => { + pd_client.invalidate_region_cache(ver_id).await; error = e; continue; } @@ -209,6 +230,7 @@ impl LockResolver { /// _Cleanup_ the given locks. Returns whether all the given locks are resolved. /// /// Note: Will rollback RUNNING transactions. ONLY use in GC. + #[minitrace::trace] pub async fn cleanup_locks( &mut self, store: RegionStore, @@ -244,6 +266,12 @@ impl LockResolver { l.lock_type == kvrpcpb::Op::PessimisticLock as i32, ) .await?; + debug!( + "cleanup_locks: txn_id:{}, primary:{}, status:{:?}", + txn_id, + HexRepr(&l.primary_lock), + status + ); // If the transaction uses async commit, check_txn_status will reject rolling back the primary lock. // Then we need to check the secondary locks to determine the final status of the transaction. @@ -290,7 +318,7 @@ impl LockResolver { match &status.kind { TransactionStatusKind::Locked(_, lock_info) => { error!( - "cleanup_locks fail to clean locks, this result is not expected. txn_id:{}", + "cleanup_locks: fail to clean locks, this result is not expected. txn_id:{}", txn_id ); return Err(Error::ResolveLockError(vec![lock_info.clone()])); @@ -301,7 +329,7 @@ impl LockResolver { } debug!( - "batch resolve locks, region:{:?}, txn:{:?}", + "cleanup_locks: batch resolve locks, region:{:?}, txn:{:?}", store.region_with_leader.ver_id(), txn_infos ); @@ -317,6 +345,10 @@ impl LockResolver { let cleaned_region = self .batch_resolve_locks(pd_client.clone(), store.clone(), txn_info_vec) .await?; + debug!( + "cleanup_locks: batch resolve locks, cleaned_region:{:?}", + cleaned_region + ); for txn_id in txn_ids { self.ctx .save_cleaned_region(txn_id, cleaned_region.clone()) @@ -327,6 +359,7 @@ impl LockResolver { } #[allow(clippy::too_many_arguments)] + #[minitrace::trace] pub async fn check_txn_status( &mut self, pd_client: Arc, @@ -338,6 +371,8 @@ impl LockResolver { force_sync_commit: bool, resolving_pessimistic_lock: bool, ) -> Result> { + info!("primary" = as_display!(HexRepr(&primary)); "check_txn_status"); + if let Some(txn_status) = self.ctx.get_resolved(txn_id).await { return Ok(txn_status); } @@ -370,6 +405,7 @@ impl LockResolver { let current = pd_client.clone().get_timestamp().await?; res.check_ttl(current); + debug!("check_txn_status: status:{:?}", res); let res = Arc::new(res); if res.is_cacheable() { self.ctx.save_resolved(txn_id, res.clone()).await; @@ -377,6 +413,7 @@ impl LockResolver { Ok(res) } + #[minitrace::trace] async fn check_all_secondaries( &mut self, pd_client: Arc, @@ -393,6 +430,7 @@ impl LockResolver { plan.execute().await } + #[minitrace::trace] async fn batch_resolve_locks( &mut self, pd_client: Arc, diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6d3c0999..798a68ea 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -8,6 +8,7 @@ use either::Either; use futures::stream::BoxStream; use futures::stream::{self}; use futures::StreamExt; +use log::debug; use super::transaction::TXN_COMMIT_BATCH_SIZE; use crate::collect_first; @@ -175,7 +176,14 @@ shardable_range!(kvrpcpb::ScanRequest); impl Merge for Collect { type Out = Vec; + #[minitrace::trace] fn merge(&self, input: Vec>) -> Result { + let length: usize = input + .iter() + .map(|r| r.as_ref().map(|r| r.pairs.len()).unwrap_or_default()) + .sum(); + debug!("Collect::merge: result length {}", length); + input .into_iter() .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into)) @@ -816,6 +824,7 @@ impl Merge for Collect { } } +#[derive(Debug)] pub struct SecondaryLocksStatus { pub commit_ts: Option, pub min_commit_ts: u64, diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 0d1e4803..85573d99 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -50,6 +50,7 @@ impl> Snapshot { } /// Scan a range, return at most `limit` key-value pairs that lying in the range. + #[minitrace::trace] pub async fn scan( &mut self, range: impl Into, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e984f153..16e26383 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use std::fmt; use std::iter; use std::marker::PhantomData; use std::sync::atomic; @@ -10,9 +11,11 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; +use log::as_debug; use log::debug; +use log::info; use log::warn; -use tokio::time::Duration; +use std::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -88,6 +91,17 @@ pub struct Transaction = phantom: PhantomData, } +impl> fmt::Debug for Transaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Transaction") + .field("timestamp", &self.timestamp) + .field("options", &self.options) + .field("is_heartbeat_started", &self.is_heartbeat_started) + .field("start_instant", &self.start_instant) + .finish() + } +} + impl> Transaction { pub(crate) fn new( timestamp: Timestamp, @@ -354,6 +368,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` + #[minitrace::trace] pub async fn scan( &mut self, range: impl Into, @@ -390,6 +405,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` + #[minitrace::trace] pub async fn scan_keys( &mut self, range: impl Into, @@ -405,6 +421,7 @@ impl> Transaction { /// Create a 'scan_reverse' request. /// /// Similar to [`scan`](Transaction::scan), but scans in the reverse direction. + #[minitrace::trace] pub async fn scan_reverse( &mut self, range: impl Into, @@ -417,6 +434,7 @@ impl> Transaction { /// Create a 'scan_keys_reverse' request. /// /// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction. + #[minitrace::trace] pub async fn scan_keys_reverse( &mut self, range: impl Into, @@ -755,6 +773,7 @@ impl> Transaction { plan.execute().await } + #[minitrace::trace] async fn scan_inner( &mut self, range: impl Into, @@ -767,9 +786,12 @@ impl> Transaction { let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); + let range = range.into(); + info!(range = as_debug!(&range); "scanning range"); + self.buffer .scan_and_fetch( - range.into(), + range, limit, !key_only, reverse, @@ -1006,6 +1028,7 @@ impl> Transaction { } impl> Drop for Transaction { + #[minitrace::trace] fn drop(&mut self) { debug!("dropping transaction"); if std::thread::panicking() { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71d48283..a56569ec 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -554,7 +554,7 @@ async fn raw_req() -> Result<()> { async fn txn_update_safepoint() -> Result<()> { init().await?; let client = TransactionClient::new(pd_addrs()).await?; - let res = client.gc(client.current_timestamp().await?).await?; + let res = client.gc(.., client.current_timestamp().await?).await?; assert!(res); Ok(()) }