Skip to content

Commit

Permalink
implement the complete keyspace feature (#439)
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Lok <[email protected]>
  • Loading branch information
andylokandy authored Dec 27, 2023
1 parent bbaf317 commit 1178d79
Show file tree
Hide file tree
Showing 35 changed files with 953 additions and 562 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ prometheus = { version = "0.13", default-features = false }
prost = "0.12"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
semver = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1"
take_mut = "0.2.2"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.10", features = ["tls"] }
Expand All @@ -51,9 +54,7 @@ env_logger = "0.10"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
proptest-derive = "0.3"
reqwest = { version = "0.11", default-features = false, features = [
"native-tls-vendored",
] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
simple_logger = "1"
Expand Down
4 changes: 4 additions & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ max-open-files = 10000

[raftdb]
max-open-files = 10000

[storage]
api-version = 2
enable-ttl = true
4 changes: 3 additions & 1 deletion examples/pessimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ async fn main() {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
// This example uses the default keyspace, so api-v2 must be enabled on the server.
.with_default_keyspace();

// init
let client = Client::new_with_config(args.pd, config)
Expand Down
8 changes: 6 additions & 2 deletions examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async fn main() -> Result<()> {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
// This example uses the default keyspace, so api-v2 must be enabled on the server.
.with_default_keyspace();

// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
Expand Down Expand Up @@ -136,6 +138,8 @@ async fn main() -> Result<()> {
);
println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}");

// Cleanly exit.
// Delete all keys in the whole range.
client.delete_range("".to_owned().."".to_owned()).await?;

Ok(())
}
4 changes: 3 additions & 1 deletion examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ async fn main() {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
// This example uses the default keyspace, so api-v2 must be enabled on the server.
.with_default_keyspace();

let txn = Client::new_with_config(args.pd, config)
.await
Expand Down
5 changes: 5 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ pub enum Error {
/// Wraps a `grpcio::Error`.
#[error("gRPC error: {0}")]
Grpc(#[from] tonic::transport::Error),
/// Wraps a `reqwest::Error`.
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
/// Wraps a `grpcio::Error`.
#[error("gRPC api error: {0}")]
GrpcAPI(#[from] tonic::Status),
#[error("Http request failed: unknown respond {0}")]
UnknownHttpRespond(String),
/// Wraps a `grpcio::Error`.
#[error("url error: {0}")]
Url(#[from] tonic::codegen::http::uri::InvalidUri),
Expand Down
19 changes: 19 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub keyspace: Option<String>,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
Expand All @@ -30,6 +31,7 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
keyspace: None,
}
}
}
Expand Down Expand Up @@ -83,4 +85,21 @@ impl Config {
self.timeout = timeout;
self
}

/// Set to use default keyspace.
///
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_default_keyspace(self) -> Self {
self.with_keyspace("DEFAULT")
}

/// Set the use keyspace for the client.
///
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
self.keyspace = Some(keyspace.to_owned());
self
}
}
10 changes: 2 additions & 8 deletions src/kv/bound_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,11 @@ impl BoundRange {
pub fn into_keys(self) -> (Key, Option<Key>) {
let start = match self.from {
Bound::Included(v) => v,
Bound::Excluded(mut v) => {
v.push_zero();
v
}
Bound::Excluded(v) => v.next_key(),
Bound::Unbounded => Key::EMPTY,
};
let end = match self.to {
Bound::Included(mut v) => {
v.push_zero();
Some(v)
}
Bound::Included(v) => Some(v.next_key()),
Bound::Excluded(v) => Some(v),
Bound::Unbounded => None,
};
Expand Down
9 changes: 5 additions & 4 deletions src/kv/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct Key(
test,
proptest(strategy = "any_with::<Vec<u8>>((size_range(_PROPTEST_KEY_MAX), ()))")
)]
pub(super) Vec<u8>,
pub(crate) Vec<u8>,
);

impl AsRef<Key> for kvrpcpb::Mutation {
Expand All @@ -98,10 +98,11 @@ impl Key {

/// Push a zero to the end of the key.
///
/// Extending a zero makes the new key the smallest key that is greater than than the original one, i.e. the succeeder.
/// Extending a zero makes the new key the smallest key that is greater than than the original one.
#[inline]
pub(super) fn push_zero(&mut self) {
self.0.push(0)
pub(crate) fn next_key(mut self) -> Self {
self.0.push(0);
self
}

/// Convert the key to a lower bound. The key is treated as inclusive.
Expand Down
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@

pub mod backoff;
#[doc(hidden)]
pub mod proto; // export `proto` to enable user customized codec
#[doc(hidden)]
pub mod raw;
pub mod request;
#[doc(hidden)]
Expand All @@ -106,6 +104,7 @@ mod compat;
mod config;
mod kv;
mod pd;
mod proto;
mod region;
mod region_cache;
mod stats;
Expand Down Expand Up @@ -146,8 +145,6 @@ pub use crate::raw::Client as RawClient;
#[doc(inline)]
pub use crate::raw::ColumnFamily;
#[doc(inline)]
pub use crate::request::codec;
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::Timestamp;
Expand Down
21 changes: 4 additions & 17 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::request::codec::ApiV1TxnCodec;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
Expand All @@ -31,7 +30,7 @@ use crate::Timestamp;

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
Expand All @@ -44,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCl
))
},
false,
Some(ApiV1TxnCodec::default()),
)
.await
.unwrap()
Expand Down Expand Up @@ -73,18 +71,9 @@ pub struct MockKvConnect;

pub struct MockCluster;

#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
codec: ApiV1TxnCodec,
}

impl MockPdClient {
pub fn new(client: MockKvClient) -> MockPdClient {
MockPdClient {
client,
codec: ApiV1TxnCodec::default(),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -113,7 +102,6 @@ impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
codec: ApiV1TxnCodec::default(),
}
}

Expand Down Expand Up @@ -177,7 +165,6 @@ impl MockPdClient {

#[async_trait]
impl PdClient for MockPdClient {
type Codec = ApiV1TxnCodec;
type KvClient = MockKvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Expand Down Expand Up @@ -228,7 +215,7 @@ impl PdClient for MockPdClient {

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

fn get_codec(&self) -> &Self::Codec {
&self.codec
async fn get_keyspace_id(&self, _keyspace: &str) -> Result<u32> {
unimplemented!()
}
}
Loading

0 comments on commit 1178d79

Please sign in to comment.