Skip to content

Commit

Permalink
Adding a reverse scan API for raw client (#441)
Browse files Browse the repository at this point in the history
* Adding a reverse scan API for raw client

Signed-off-by: Rahul Rane <[email protected]>

* Addressed comments

Signed-off-by: Rahul Rane <[email protected]>

* Addressed comments

Signed-off-by: Rahul Rane <[email protected]>

---------

Signed-off-by: Rahul Rane <[email protected]>
  • Loading branch information
rahulrane50 authored Feb 21, 2024
1 parent 6f9c133 commit c6110dd
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 7 deletions.
80 changes: 76 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,40 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
debug!("invoking raw scan request");
self.scan_inner(range.into(), limit, false).await
self.scan_inner(range.into(), limit, false, false).await
}

/// Create a new 'scan' request but scans in "reverse" direction.
///
/// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
///
/// If the number of eligible key-value pairs are greater than `limit`,
/// only the first `limit` pairs are returned, ordered by the key.
///
///
/// Reverse Scan queries continuous kv pairs in range [startKey, endKey),
/// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs.
/// The returned keys are in reversed lexicographical order.
/// If you want to include the endKey or exclude the startKey, push a '\0' to the key.
/// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan_reverse(inclusive_range.into_owned(), 2);
/// let result: Vec<KvPair> = req.await.unwrap();
/// # });
/// ```
pub async fn scan_reverse(
&self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<Vec<KvPair>> {
debug!("invoking raw reverse scan request");
self.scan_inner(range.into(), limit, false, true).await
}

/// Create a new 'scan' request that only returns the keys.
Expand All @@ -525,7 +558,40 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.scan_inner(range, limit, true, false)
.await?
.into_iter()
.map(KvPair::into_key)
.collect())
}

/// Create a new 'scan' request that only returns the keys in reverse order.
///
/// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
///
/// If the number of eligible keys are greater than `limit`,
/// only the first `limit` pairs are returned, ordered by the key.
///
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan_keys(inclusive_range.into_owned(), 2);
/// let result: Vec<Key> = req.await.unwrap();
/// # });
/// ```
pub async fn scan_keys_reverse(
&self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<Vec<Key>> {
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true, true)
.await?
.into_iter()
.map(KvPair::into_key)
Expand Down Expand Up @@ -682,6 +748,7 @@ impl<PdC: PdClient> Client<PdC> {
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
reverse: bool,
) -> Result<Vec<KvPair>> {
if limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -703,8 +770,13 @@ impl<PdC: PdClient> Client<PdC> {
let mut cur_limit = limit;

while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
key_only,
reverse,
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
Expand Down
2 changes: 2 additions & 0 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn new_raw_scan_request(
range: BoundRange,
limit: u32,
key_only: bool,
reverse: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let (start_key, end_key) = range.into_keys();
Expand All @@ -92,6 +93,7 @@ pub fn new_raw_scan_request(
end_key.unwrap_or_default().into(),
limit,
key_only,
reverse,
cf,
)
}
Expand Down
13 changes: 10 additions & 3 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,20 @@ pub fn new_raw_scan_request(
end_key: Vec<u8>,
limit: u32,
key_only: bool,
reverse: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let mut req = kvrpcpb::RawScanRequest::default();
req.start_key = start_key;
req.end_key = end_key;
if !reverse {
req.start_key = start_key;
req.end_key = end_key;
} else {
req.start_key = end_key;
req.end_key = start_key;
}
req.limit = limit;
req.key_only = key_only;
req.reverse = reverse;
req.maybe_set_cf(cf);

req
Expand All @@ -294,7 +301,7 @@ impl KvRequest for kvrpcpb::RawScanRequest {
type Response = kvrpcpb::RawScanResponse;
}

range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan.
range_request!(kvrpcpb::RawScanRequest);
shardable_range!(kvrpcpb::RawScanRequest);

impl Merge<kvrpcpb::RawScanResponse> for Collect {
Expand Down
155 changes: 155 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,62 @@ async fn raw_req() -> Result<()> {
assert_eq!(res[5].1, "v4".as_bytes());
assert_eq!(res[6].1, "v5".as_bytes());

// reverse scan

// By default end key is exclusive, so k5 is not included and start key in included
let res = client
.scan_reverse("k2".to_owned().."k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 3);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
assert_eq!(res[2].1, "v2".as_bytes());

// by default end key in exclusive and start key is inclusive but now exclude start key
let res = client
.scan_reverse("k2\0".to_owned().."k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());

// reverse scan
// by default end key is exclusive and start key is inclusive but now include end key
let res = client
.scan_reverse("k2".to_owned()..="k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 4);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v2".as_bytes());

// by default end key is exclusive and start key is inclusive but now include end key and exclude start key
let res = client
.scan_reverse("k2\0".to_owned()..="k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 3);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());

// limit results to first 2
let res = client
.scan_reverse("k2".to_owned().."k5".to_owned(), 2)
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());

// if endKey is not provided then it scan everything including end key
let range = BoundRange::range_from(Key::from("k2".to_owned()));
let res = client.scan_reverse(range, 20).await?;
assert_eq!(res.len(), 4);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v2".as_bytes());

Ok(())
}

Expand Down Expand Up @@ -704,6 +760,105 @@ async fn raw_write_million() -> Result<()> {
r = client.scan(.., limit).await?;
assert_eq!(r.len(), limit as usize);

// test scan_reverse
// test scan, key range from [0,0,0,0] to [255.0.0.0]
let mut limit = 2000;
let mut r = client.scan_reverse(.., limit).await?;
assert_eq!(r.len(), 256);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8);
}
r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?;
assert_eq!(r.len(), 156);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 100);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 195);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 196);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 251);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);

limit = 3;
let mut r = client.scan_reverse(.., limit).await?;
let mut expected_start: u8 = 255 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?;
expected_start = 255 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
expected_start = 200 - limit as u8;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
expected_start = 200 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
expected_start = 255 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);

limit = 0;
r = client.scan_reverse(.., limit).await?;
assert_eq!(r.len(), limit as usize);

// test batch_scan
for batch_num in 1..4 {
let _ = client
Expand Down

0 comments on commit c6110dd

Please sign in to comment.