diff --git a/src/raw/client.rs b/src/raw/client.rs index 109008ab..71d40b2a 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -500,7 +500,40 @@ impl Client { /// ``` pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { debug!("invoking raw scan request"); - self.scan_inner(range.into(), limit, false).await + self.scan_inner(range.into(), limit, false, false).await + } + + /// Create a new 'scan' request but scans in "reverse" direction. + /// + /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range. + /// + /// If the number of eligible key-value pairs are greater than `limit`, + /// only the first `limit` pairs are returned, ordered by the key. + /// + /// + /// Reverse Scan queries continuous kv pairs in range [startKey, endKey), + /// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs. + /// The returned keys are in reversed lexicographical order. + /// If you want to include the endKey or exclude the startKey, push a '\0' to the key. + /// It doesn't support Scanning from "", because locating the last Region is not yet implemented. + /// # Examples + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange}; + /// # use futures::prelude::*; + /// # futures::executor::block_on(async { + /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = client.scan_reverse(inclusive_range.into_owned(), 2); + /// let result: Vec = req.await.unwrap(); + /// # }); + /// ``` + pub async fn scan_reverse( + &self, + range: impl Into, + limit: u32, + ) -> Result> { + debug!("invoking raw reverse scan request"); + self.scan_inner(range.into(), limit, false, true).await } /// Create a new 'scan' request that only returns the keys. @@ -525,7 +558,40 @@ impl Client { pub async fn scan_keys(&self, range: impl Into, limit: u32) -> Result> { debug!("invoking raw scan_keys request"); Ok(self - .scan_inner(range, limit, true) + .scan_inner(range, limit, true, false) + .await? + .into_iter() + .map(KvPair::into_key) + .collect()) + } + + /// Create a new 'scan' request that only returns the keys in reverse order. + /// + /// Once resolved this request will result in a `Vec` of keys that lies in the specified range. + /// + /// If the number of eligible keys are greater than `limit`, + /// only the first `limit` pairs are returned, ordered by the key. + /// + /// + /// # Examples + /// ```rust,no_run + /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange}; + /// # use futures::prelude::*; + /// # futures::executor::block_on(async { + /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = client.scan_keys(inclusive_range.into_owned(), 2); + /// let result: Vec = req.await.unwrap(); + /// # }); + /// ``` + pub async fn scan_keys_reverse( + &self, + range: impl Into, + limit: u32, + ) -> Result> { + debug!("invoking raw scan_keys request"); + Ok(self + .scan_inner(range, limit, true, true) .await? .into_iter() .map(KvPair::into_key) @@ -682,6 +748,7 @@ impl Client { range: impl Into, limit: u32, key_only: bool, + reverse: bool, ) -> Result> { if limit > MAX_RAW_KV_SCAN_LIMIT { return Err(Error::MaxScanLimitExceeded { @@ -703,8 +770,13 @@ impl Client { let mut cur_limit = limit; while cur_limit > 0 { - let request = - new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); + let request = new_raw_scan_request( + cur_range.clone(), + cur_limit, + key_only, + reverse, + self.cf.clone(), + ); let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .single_region_with_store(region_store.clone()) .await? diff --git a/src/raw/lowering.rs b/src/raw/lowering.rs index 4fd35477..3065401f 100644 --- a/src/raw/lowering.rs +++ b/src/raw/lowering.rs @@ -84,6 +84,7 @@ pub fn new_raw_scan_request( range: BoundRange, limit: u32, key_only: bool, + reverse: bool, cf: Option, ) -> kvrpcpb::RawScanRequest { let (start_key, end_key) = range.into_keys(); @@ -92,6 +93,7 @@ pub fn new_raw_scan_request( end_key.unwrap_or_default().into(), limit, key_only, + reverse, cf, ) } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 65165927..8c49da9e 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -278,13 +278,20 @@ pub fn new_raw_scan_request( end_key: Vec, limit: u32, key_only: bool, + reverse: bool, cf: Option, ) -> kvrpcpb::RawScanRequest { let mut req = kvrpcpb::RawScanRequest::default(); - req.start_key = start_key; - req.end_key = end_key; + if !reverse { + req.start_key = start_key; + req.end_key = end_key; + } else { + req.start_key = end_key; + req.end_key = start_key; + } req.limit = limit; req.key_only = key_only; + req.reverse = reverse; req.maybe_set_cf(cf); req @@ -294,7 +301,7 @@ impl KvRequest for kvrpcpb::RawScanRequest { type Response = kvrpcpb::RawScanResponse; } -range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan. +range_request!(kvrpcpb::RawScanRequest); shardable_range!(kvrpcpb::RawScanRequest); impl Merge for Collect { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 7244ed6f..514c4aa8 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -561,6 +561,62 @@ async fn raw_req() -> Result<()> { assert_eq!(res[5].1, "v4".as_bytes()); assert_eq!(res[6].1, "v5".as_bytes()); + // reverse scan + + // By default end key is exclusive, so k5 is not included and start key in included + let res = client + .scan_reverse("k2".to_owned().."k5".to_owned(), 5) + .await?; + assert_eq!(res.len(), 3); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[2].1, "v2".as_bytes()); + + // by default end key in exclusive and start key is inclusive but now exclude start key + let res = client + .scan_reverse("k2\0".to_owned().."k5".to_owned(), 5) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // reverse scan + // by default end key is exclusive and start key is inclusive but now include end key + let res = client + .scan_reverse("k2".to_owned()..="k5".to_owned(), 5) + .await?; + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); + + // by default end key is exclusive and start key is inclusive but now include end key and exclude start key + let res = client + .scan_reverse("k2\0".to_owned()..="k5".to_owned(), 5) + .await?; + assert_eq!(res.len(), 3); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + + // limit results to first 2 + let res = client + .scan_reverse("k2".to_owned().."k5".to_owned(), 2) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // if endKey is not provided then it scan everything including end key + let range = BoundRange::range_from(Key::from("k2".to_owned())); + let res = client.scan_reverse(range, 20).await?; + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); + Ok(()) } @@ -704,6 +760,105 @@ async fn raw_write_million() -> Result<()> { r = client.scan(.., limit).await?; assert_eq!(r.len(), limit as usize); + // test scan_reverse + // test scan, key range from [0,0,0,0] to [255.0.0.0] + let mut limit = 2000; + let mut r = client.scan_reverse(.., limit).await?; + assert_eq!(r.len(), 256); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8); + } + r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?; + assert_eq!(r.len(), 156); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 100); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit) + .await?; + assert_eq!(r.len(), 195); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 5); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit) + .await?; + assert_eq!(r.len(), 196); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 5); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + assert_eq!(r.len(), 251); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 5); + } + r = client + .scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + assert_eq!(r.len(), 0); + r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?; + assert_eq!(r.len(), 0); + + limit = 3; + let mut r = client.scan_reverse(.., limit).await?; + let mut expected_start: u8 = 255 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?; + expected_start = 255 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit) + .await?; + expected_start = 200 - limit as u8; + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit) + .await?; + expected_start = 200 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + expected_start = 255 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + assert_eq!(r.len(), 0); + r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?; + assert_eq!(r.len(), 0); + + limit = 0; + r = client.scan_reverse(.., limit).await?; + assert_eq!(r.len(), limit as usize); + // test batch_scan for batch_num in 1..4 { let _ = client