Skip to content

Commit

Permalink
Adding a reverse scan API for raw client
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Rane <[email protected]>
  • Loading branch information
rahulrane50 committed Feb 9, 2024
1 parent 6f9c133 commit dac55a7
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
47 changes: 43 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,40 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
debug!("invoking raw scan request");
self.scan_inner(range.into(), limit, false).await
self.scan_inner(range.into(), limit, false, false).await
}

// Create a new 'scan' request but scans in "reverse" direction.
///
/// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
///
/// If the number of eligible key-value pairs are greater than `limit`,
/// only the first `limit` pairs are returned, ordered by the key.
///
///
/// Reverse Scan queries continuous kv pairs in range (endKey, startKey],
/// from startKey(upperBound) to endKey(lowerBound), up to limit pairs.
/// The returned keys are in reversed lexicographical order.
/// If you want to include the endKey or exclude the startKey, push a '\0' to the key.
/// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiDB"..="TiKV";
/// let req = client.scan_reverse(inclusive_range.into_owned(), 2);
/// let result: Vec<KvPair> = req.await.unwrap();
/// # });
/// ```
pub async fn scan_reverse(
&self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<Vec<KvPair>> {
debug!("invoking raw reverse scan request");
self.scan_inner(range.into(), limit, false, true).await
}

/// Create a new 'scan' request that only returns the keys.
Expand All @@ -525,7 +558,7 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.scan_inner(range, limit, true, false)
.await?
.into_iter()
.map(KvPair::into_key)
Expand Down Expand Up @@ -682,6 +715,7 @@ impl<PdC: PdClient> Client<PdC> {
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
reverse: bool,
) -> Result<Vec<KvPair>> {
if limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -703,8 +737,13 @@ impl<PdC: PdClient> Client<PdC> {
let mut cur_limit = limit;

while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
key_only,
reverse,
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
Expand Down
2 changes: 2 additions & 0 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn new_raw_scan_request(
range: BoundRange,
limit: u32,
key_only: bool,
reverse: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let (start_key, end_key) = range.into_keys();
Expand All @@ -92,6 +93,7 @@ pub fn new_raw_scan_request(
end_key.unwrap_or_default().into(),
limit,
key_only,
reverse,
cf,
)
}
Expand Down
2 changes: 2 additions & 0 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,15 @@ pub fn new_raw_scan_request(
end_key: Vec<u8>,
limit: u32,
key_only: bool,
reverse: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let mut req = kvrpcpb::RawScanRequest::default();
req.start_key = start_key;
req.end_key = end_key;
req.limit = limit;
req.key_only = key_only;
req.reverse = reverse;
req.maybe_set_cf(cf);

req
Expand Down
52 changes: 52 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,58 @@ async fn raw_req() -> Result<()> {
assert_eq!(res[5].1, "v4".as_bytes());
assert_eq!(res[6].1, "v5".as_bytes());

// reverse scan
// By default end key is exclusive, so k5 is not included and start key in included

let res = client
.scan_reverse("k5".to_owned().."k2".to_owned(), 5)
.await?;
assert_eq!(res.len(), 3);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
assert_eq!(res[2].1, "v2".as_bytes());

// by default end key in exclusive and start key is inclusive but now exclude start key
let res = client
.scan_reverse("k5".to_owned()..="k2".to_owned(), 5)
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());

// reverse scan
// by default end key is exclusive and start key is inclusive but now include end key
let res = client
.scan_reverse("k5\0".to_owned().."k2".to_owned(), 5)
.await?;
assert_eq!(res.len(), 4);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v2".as_bytes());

// by default end key is exclusive and start key is inclusive but now include end key and exclude start key
let res = client
.scan_reverse("k5\0".to_owned()..="k2".to_owned(), 5)
.await?;
assert_eq!(res.len(), 3);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());

// limit results to first 2
let res = client
.scan_reverse("k5".to_owned().."k2".to_owned(), 2)
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());

//let range = "k5"..; // Upperbound (k5, +inf). This is NOT SUPPORTED by TiKV.
let range = BoundRange::range_from(Key::from("k5".to_owned()));
let res = client.scan_reverse(range, 20).await?;
assert_eq!(res.len(), 0);

Ok(())
}

Expand Down

0 comments on commit dac55a7

Please sign in to comment.