Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Dec 16, 2023
1 parent 99e366e commit 8534111
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 71 deletions.
4 changes: 4 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ pub enum Error {
inner: Box<Error>,
success_keys: Vec<Vec<u8>>,
},
#[error("Keyspace is not enabled on the server, please enable it by setting `storage.api-version = 2`")]
ServerKeyspaceNotEnabled,
#[error("Keyspace is not enabled on the client, please enable it by `Config::with_default_keyspace()` or `Config::with_keyspace()`")]
ClientKeyspaceNotEnabled,
}

impl From<crate::proto::errorpb::Error> for Error {
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ impl Config {

/// Set to use default keyspace.
///
/// Server should enable `api-version = 2` to use this feature.
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_default_keyspace(self) -> Self {
self.with_keyspace("DEFAULT")
}

/// Set the use keyspace for the client.
///
/// Server should enable `api-version = 2` to use this feature.
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
self.keyspace = Some(keyspace.to_owned());
Expand Down
5 changes: 5 additions & 0 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl<PdC: PdClient> Client<PdC> {
let request = new_raw_get_request(key, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -268,6 +269,7 @@ impl<PdC: PdClient> Client<PdC> {
let request = new_raw_batch_get_request(keys, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(Collect)
.plan();
plan.execute().await.map(|r| {
Expand Down Expand Up @@ -581,6 +583,7 @@ impl<PdC: PdClient> Client<PdC> {
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -660,6 +663,7 @@ impl<PdC: PdClient> Client<PdC> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.single_region_with_store(region_store.clone())
.await?
.extract_error()
.plan()
.execute()
.await?;
Expand Down Expand Up @@ -719,6 +723,7 @@ impl<PdC: PdClient> Client<PdC> {
let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(Collect)
.plan();
plan.execute().await.map(|r| {
Expand Down
8 changes: 2 additions & 6 deletions src/request/api_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_
fn prepend_bytes<const N: usize>(vec: &mut Vec<u8>, prefix: &[u8; N]) {
unsafe {
vec.reserve_exact(N);
std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().offset(N as isize), vec.len());
std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().add(N), vec.len());
std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N);
vec.set_len(vec.len() + N);
}
Expand All @@ -172,11 +172,7 @@ fn prepend_bytes<const N: usize>(vec: &mut Vec<u8>, prefix: &[u8; N]) {
fn pretruncate_bytes<const N: usize>(vec: &mut Vec<u8>) {
assert!(vec.len() >= N);
unsafe {
std::ptr::copy(
vec.as_ptr().offset(N as isize),
vec.as_mut_ptr(),
vec.len() - N,
);
std::ptr::copy(vec.as_ptr().add(N), vec.as_mut_ptr(), vec.len() - N);
vec.set_len(vec.len() - N);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ mod test {

#[tokio::test]
async fn test_region_retry() {
#[derive(Clone)]
#[derive(Debug, Clone)]
struct MockRpcResponse;

impl HasKeyErrors for MockRpcResponse {
Expand Down
62 changes: 49 additions & 13 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ where
}
}

#[derive(Default)]
#[derive(Debug, Default)]
pub struct CleanupLocksResult {
pub region_error: Option<errorpb::Error>,
pub key_error: Option<Vec<Error>>,
Expand Down Expand Up @@ -784,18 +784,54 @@ where
type Result = P::Result;

async fn execute(&self) -> Result<Self::Result> {
let mut result = self.inner.execute().await?;
if let Some(errors) = result.key_errors() {
Err(Error::ExtractedErrors(errors))
} else if let Some(errors) = result.region_errors() {
Err(Error::ExtractedErrors(
errors
.into_iter()
.map(|e| Error::RegionError(Box::new(e)))
.collect(),
))
} else {
Ok(result)
let result = self.inner.execute().await;
let mut errors = Vec::new();
match result {
Ok(mut resp) => {
if let Some(e) = resp.key_errors() {
errors.extend(e);
}
if let Some(e) = resp.region_errors() {
errors.extend(e.into_iter().map(|e| Error::RegionError(Box::new(e))));
}

if errors.is_empty() {
return Ok(resp);
}
}
Err(Error::MultipleKeyErrors(e)) => errors.extend(e),
Err(e) => errors.push(e),
};
for error in &mut errors {
match error {
Error::KvError { message }
if message
.contains("Api version in request does not match with TiKV storage")
&& message.contains("storage: V1, request: V2") =>
{
*error = Error::ServerKeyspaceNotEnabled
}
Error::KvError { message }
if message
.contains("Api version in request does not match with TiKV storage")
&& message.contains("storage: V1ttl, request: V2") =>
{
*error = Error::ServerKeyspaceNotEnabled
}
Error::KvError { message }
if message
.contains("Api version in request does not match with TiKV storage")
&& message.contains("storage: V2, request: V1") =>
{
*error = Error::ClientKeyspaceNotEnabled
}
_ => (),
}
}
match errors.len() {
0 => unreachable!(),
1 => Err(errors.pop().unwrap()),
_ => Err(Error::ExtractedErrors(errors)),
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::Display;

use crate::proto::kvrpcpb;
use crate::Error;

Expand Down Expand Up @@ -162,10 +160,11 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
}
}

impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
match self {
Ok(x) => x.key_errors(),
Err(Error::MultipleKeyErrors(e)) => Some(std::mem::take(e)),
Err(e) => Some(vec![Error::StringError(e.to_string())]),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ impl Client {
let req = new_unsafe_destroy_range_request(range);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req)
.all_stores(DEFAULT_STORE_BACKOFF)
.extract_error()
.merge(crate::request::Collect)
.plan();
plan.execute().await
Expand Down
36 changes: 18 additions & 18 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc, api_version, request)
.resolve_lock(retry_options.lock_backoff, api_version)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -281,6 +282,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc, api_version, request)
.resolve_lock(retry_options.lock_backoff, api_version)
.retry_multi_region(retry_options.region_backoff)
.extract_error()
.merge(Collect)
.plan();
plan.execute()
Expand Down Expand Up @@ -333,7 +335,11 @@ impl<PdC: PdClient> Transaction<PdC> {
let keys = keys
.into_iter()
.map(move |k| k.into().encode_version(api_version, KeyMode::Txn));
self.pessimistic_lock(keys, true).await
let pairs = self
.pessimistic_lock(keys, true)
.await?
.truncate_version(api_version);
Ok(pairs)
}
}

Expand Down Expand Up @@ -541,23 +547,14 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::{Key, Config, TransactionClient, proto::kvrpcpb};
/// # use tikv_client::{Key, Config, TransactionClient, transaction::Mutation};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_optimistic().await.unwrap();
/// let mutations = vec![
/// kvrpcpb::Mutation {
/// op: kvrpcpb::Op::Del.into(),
/// key: b"k0".to_vec(),
/// ..Default::default()
/// },
/// kvrpcpb::Mutation {
/// op: kvrpcpb::Op::Put.into(),
/// key: b"k1".to_vec(),
/// value: b"v1".to_vec(),
/// ..Default::default()
/// },
/// Mutation::Delete("k0".to_owned().into()),
/// Mutation::Put("k1".to_owned().into(), b"v1".to_vec()),
/// ];
/// txn.batch_mutate(mutations).await.unwrap();
/// txn.commit().await.unwrap();
Expand All @@ -574,7 +571,7 @@ impl<PdC: PdClient> Transaction<PdC> {
.map(|mutation| mutation.encode_version(self.api_version, KeyMode::Txn))
.collect();
if self.is_pessimistic() {
self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key().clone())), false)
self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false)
.await?;
for m in mutations {
self.buffer.mutate(m);
Expand Down Expand Up @@ -623,12 +620,11 @@ impl<PdC: PdClient> Transaction<PdC> {
match self.options.kind {
TransactionKind::Optimistic => {
for key in keys {
self.buffer.lock(key.into());
self.buffer.lock(key);
}
}
TransactionKind::Pessimistic(_) => {
self.pessimistic_lock(keys, false)
.await?;
self.pessimistic_lock(keys, false).await?;
}
}
Ok(())
Expand Down Expand Up @@ -771,6 +767,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.api_version,
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -803,6 +800,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc, api_version, request)
.resolve_lock(retry_options.lock_backoff, api_version)
.retry_multi_region(retry_options.region_backoff)
.extract_error()
.merge(Collect)
.plan();
plan.execute()
Expand Down Expand Up @@ -863,6 +861,7 @@ impl<PdC: PdClient> Transaction<PdC> {
)
.preserve_shard()
.retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
.extract_error()
.merge(CollectWithShard)
.plan();
let pairs = plan.execute().await;
Expand Down Expand Up @@ -920,6 +919,7 @@ impl<PdC: PdClient> Transaction<PdC> {
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.extract_error()
.plan();
plan.execute().await?;

Expand Down Expand Up @@ -989,6 +989,7 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc.clone(), api_version, request)
.retry_multi_region(region_backoff.clone())
.merge(CollectSingle)
.extract_error()
.plan();
plan.execute().await?;
}
Expand Down Expand Up @@ -1042,7 +1043,6 @@ impl<PdC: PdClient> Drop for Transaction<PdC> {
if self.get_status() == TransactionStatus::Active {
match self.options.check_level {
CheckLevel::Panic => {
dbg!(&self.timestamp);
panic!("Dropping an active transaction. Consider commit or rollback it.")
}
CheckLevel::Warn => {
Expand Down
2 changes: 1 addition & 1 deletion tests/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async fn must_rollbacked(client: &TransactionClient, keys: HashSet<Vec<u8>>) {

async fn count_locks(client: &TransactionClient) -> Result<usize> {
let ts = client.current_timestamp().await.unwrap();
let locks = client.scan_locks(&ts, vec![].., 1024).await?;
let locks = client.scan_locks(&ts, .., 1024).await?;
// De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes.
let locks_set: HashSet<Vec<u8>> = HashSet::from_iter(locks.into_iter().map(|l| l.key));
Ok(locks_set.len())
Expand Down
Loading

0 comments on commit 8534111

Please sign in to comment.