Skip to content

Commit

Permalink
address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: limbooverlambda <[email protected]>
  • Loading branch information
limbooverlambda committed Jul 8, 2024
1 parent b5640d1 commit 4740269
Showing 1 changed file with 2 additions and 68 deletions.
70 changes: 2 additions & 68 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ impl<PdC: PdClient> Client<PdC> {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
range: range.clone(),
limit,
limit: current_limit,
key_only,
reverse,
backoff: backoff.clone(),
Expand All @@ -789,12 +789,7 @@ impl<PdC: PdClient> Client<PdC> {
current_limit -= kvs.len() as u32;
result.append(&mut kvs);
}
if end_key
.as_ref()
.map(|ek| ek <= next_key.as_ref())
.unwrap_or(false)
|| next_key.is_empty()
{
if end_key.clone().is_some_and(|ek| ek <= next_key) {
break;
} else {
current_key = next_key;
Expand Down Expand Up @@ -928,13 +923,11 @@ struct ScanInnerArgs {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use super::*;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb;
use crate::Result;

Expand Down Expand Up @@ -1016,63 +1009,4 @@ mod tests {
);
Ok(())
}

#[tokio::test]
async fn test_raw_scan_retryer() -> Result<()> {
let epoch_not_match_error = EpochNotMatch {
current_regions: vec![],
};
let region_error = errorpb::Error {
epoch_not_match: Some(epoch_not_match_error),
..Default::default()
};
let flag = Arc::new(AtomicBool::new(true));
let tikv_flag = flag.clone();
let error_handler_flag = flag.clone();
let mock_tikv_client = MockKvClient::with_dispatch_hook(move |req: &dyn Any| {
if req.downcast_ref::<RawScanRequest>().is_some() {
let v = tikv_flag.clone().load(Ordering::Relaxed);
let resp = if v {
RawScanResponse {
region_error: Some(region_error.clone()),
..Default::default()
}
} else {
RawScanResponse {
..Default::default()
}
};
Ok(Box::new(resp) as Box<dyn Any>)
} else {
unreachable!()
}
});
let mock_pd_client = MockPdClient::new(mock_tikv_client);
let client = Client {
rpc: Arc::new(mock_pd_client),
cf: Some(ColumnFamily::Default),
backoff: DEFAULT_REGION_BACKOFF,
atomic: false,
keyspace: Keyspace::Enable { keyspace_id: 0 },
};

let scan_args = ScanInnerArgs {
start_key: "k1".to_string().into(),
range: BoundRange::from("k1".to_owned().."k2".to_owned()),
limit: 10,
key_only: false,
reverse: false,
backoff: Backoff::no_backoff(),
};
let error_handler = |_, _, _| {
error_handler_flag.clone().store(false, Ordering::Relaxed);
Box::pin(async move {
let res: Result<bool> = Ok(true);
res
})
} as _;
client.retryable_scan(scan_args, error_handler).await?;
assert!(!error_handler_flag.load(Ordering::Relaxed));
Ok(())
}
}

0 comments on commit 4740269

Please sign in to comment.