Skip to content

Commit

Permalink
Add unit tests on router
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 30, 2025
1 parent 2a9da60 commit bf9480e
Showing 1 changed file with 89 additions and 1 deletion.
90 changes: 89 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@ mod tests {
}

#[tokio::test]
async fn test_router_returns_rate_limited_failure() {
async fn test_router_returns_all_shards_rate_limited_failure() {
let self_node_id = "test-router".into();
let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new());
let ingester_pool = IngesterPool::default();
Expand All @@ -2109,6 +2109,10 @@ mod tests {
let mut state_guard = router.state.lock().await;
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);

// We setup one shard that is rate limited. After 1 retry, it will be
// marked as rate limited, so all shards will be known to be rate
// limited.

state_guard.routing_table.replace_shards(
index_uid.clone(),
"test-source",
Expand Down Expand Up @@ -2157,6 +2161,90 @@ mod tests {
let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0);
ingester_pool.insert("test-ingester-0".into(), ingester_0.clone());

let ingest_request = IngestRequestV2 {
subrequests: vec![IngestSubrequest {
subrequest_id: 0,
index_id: "test-index-0".to_string(),
source_id: "test-source".to_string(),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
}],
commit_type: CommitTypeV2::Auto as i32,
};
let ingest_response = router.ingest(ingest_request).await.unwrap();
assert_eq!(ingest_response.successes.len(), 0);
assert_eq!(ingest_response.failures.len(), 1);
assert_eq!(
ingest_response.failures[0].reason(),
IngestFailureReason::AllShardsRateLimited
);
}

#[tokio::test]
async fn test_router_returns_attempted_shards_rate_limited_failure() {
let self_node_id = "test-router".into();
let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new());
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let router = IngestRouter::new(
self_node_id,
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let mut state_guard = router.state.lock().await;
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);

// We setup many rate limited shards. After the maximum number
// of retries the router will give up.

state_guard.routing_table.replace_shards(
index_uid.clone(),
"test-source",
(1..=100)
.map(|shard_id| Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(shard_id)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester-0".to_string(),
..Default::default()
})
.collect(),
);

drop(state_guard);

let mut mock_ingester_0 = MockIngesterService::new();
mock_ingester_0.expect_persist().returning(move |request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.commit_type(), CommitTypeV2::Auto);
assert_eq!(request.subrequests.len(), 1);
let subrequest = &request.subrequests[0];
assert_eq!(subrequest.subrequest_id, 0);
let index_uid = subrequest.index_uid().clone();
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(
subrequest.doc_batch,
Some(DocBatchV2::for_test(["test-doc-foo"]))
);

let response = PersistResponse {
leader_id: request.leader_id,
successes: Vec::new(),
failures: vec![PersistFailure {
subrequest_id: 0,
index_uid: Some(index_uid),
source_id: "test-source".to_string(),
shard_id: Some(subrequest.shard_id().clone()),
reason: PersistFailureReason::ShardRateLimited as i32,
}],
};
Ok(response)
});
let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0);
ingester_pool.insert("test-ingester-0".into(), ingester_0.clone());

let ingest_request = IngestRequestV2 {
subrequests: vec![IngestSubrequest {
subrequest_id: 0,
Expand Down

0 comments on commit bf9480e

Please sign in to comment.