From 7b1a0c05347d0a0f874978b4462f580ddc814b93 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 28 Oct 2023 16:54:57 +0200 Subject: [PATCH] feat(cubestore): Queue - expose queueId for LIST/ACTIVE/PENDING/TO_CANCEL (#7351) --- .../cubestore-sql-tests/src/tests.rs | 106 +++++++++++------- .../cubestore/src/cachestore/queue_item.rs | 24 +++- .../cubestore/cubestore/src/sql/cachestore.rs | 20 ++-- 3 files changed, 99 insertions(+), 51 deletions(-) diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 745d30223efac..7a4226ab7a890 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -8251,7 +8251,7 @@ async fn queue_latest_result_v1(service: Box) { async fn queue_full_workflow_v1(service: Box) { let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_1" "payload1";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8265,7 +8265,7 @@ async fn queue_full_workflow_v1(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:2" "payload2";"#) + .exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:queue_key_2" "payload2";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8279,7 +8279,7 @@ async fn queue_full_workflow_v1(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:3" "payload3";"#) + .exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:queue_key_3" "payload3";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8293,7 +8293,7 @@ async fn queue_full_workflow_v1(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload4";"#) + .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:queue_key_4" "payload4";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8307,7 +8307,7 @@ async fn queue_full_workflow_v1(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:5" "payload5";"#) + .exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:queue_key_5" "payload5";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8323,7 +8323,7 @@ async fn queue_full_workflow_v1(service: Box) { // deduplication check { let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_1" "payload1";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8346,34 +8346,40 @@ async fn queue_full_workflow_v1(service: Box) { pending_response.get_columns(), &vec![ Column::new("id".to_string(), ColumnType::String, 0), - Column::new("status".to_string(), ColumnType::String, 1), - Column::new("extra".to_string(), ColumnType::String, 2), + Column::new("queue_id".to_string(), ColumnType::String, 1), + Column::new("status".to_string(), ColumnType::String, 2), + Column::new("extra".to_string(), ColumnType::String, 3), ] ); assert_eq!( pending_response.get_rows(), &vec![ Row::new(vec![ + TableValue::String("queue_key_3".to_string()), TableValue::String("3".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_4".to_string()), TableValue::String("4".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_2".to_string()), TableValue::String("2".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_1".to_string()), TableValue::String("1".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_5".to_string()), TableValue::String("5".to_string()), TableValue::String("pending".to_string()), TableValue::Null @@ -8392,7 +8398,7 @@ async fn queue_full_workflow_v1(service: Box) { { let retrieve_response = service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:3""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:queue_key_3""#) .await .unwrap(); assert_queue_retrieve_columns(&retrieve_response); @@ -8402,7 +8408,8 @@ async fn queue_full_workflow_v1(service: Box) { TableValue::String("payload3".to_string()), TableValue::Null, TableValue::Int(4), - TableValue::String("3".to_string()), + // list of active keys + TableValue::String("queue_key_3".to_string()), TableValue::String("3".to_string()), ]),] ); @@ -8411,7 +8418,7 @@ async fn queue_full_workflow_v1(service: Box) { { // concurrency limit let retrieve_response = service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:4""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:queue_key_4""#) .await .unwrap(); assert_queue_retrieve_columns(&retrieve_response); @@ -8426,6 +8433,7 @@ async fn queue_full_workflow_v1(service: Box) { assert_eq!( active_response.get_rows(), &vec![Row::new(vec![ + TableValue::String("queue_key_3".to_string()), TableValue::String("3".to_string()), TableValue::String("active".to_string()), TableValue::Null @@ -8439,7 +8447,7 @@ async fn queue_full_workflow_v1(service: Box) { let service_to_move = service.clone(); let blocking = async move { service_to_move - .exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:3""#) + .exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:queue_key_3""#) .await .unwrap() }; @@ -8449,7 +8457,7 @@ async fn queue_full_workflow_v1(service: Box) { tokio::time::sleep(Duration::from_millis(1000)).await; let ack_result = service_to_move - .exec_query(r#"QUEUE ACK "STANDALONE#queue:3" "result:3""#) + .exec_query(r#"QUEUE ACK "STANDALONE#queue:queue_key_3" "result:3""#) .await .unwrap(); assert_eq!( @@ -8480,7 +8488,7 @@ async fn queue_full_workflow_v1(service: Box) { // get { let get_response = service - .exec_query(r#"QUEUE GET "STANDALONE#queue:2""#) + .exec_query(r#"QUEUE GET "STANDALONE#queue:queue_key_2""#) .await .unwrap(); assert_eq!( @@ -8495,7 +8503,7 @@ async fn queue_full_workflow_v1(service: Box) { // cancel job { let cancel_response = service - .exec_query(r#"QUEUE CANCEL "STANDALONE#queue:2""#) + .exec_query(r#"QUEUE CANCEL "STANDALONE#queue:queue_key_2""#) .await .unwrap(); assert_eq!( @@ -8508,7 +8516,7 @@ async fn queue_full_workflow_v1(service: Box) { // assertion that job was removed let get_response = service - .exec_query(r#"QUEUE GET "STANDALONE#queue:2""#) + .exec_query(r#"QUEUE GET "STANDALONE#queue:queue_key_2""#) .await .unwrap(); assert_eq!(get_response.get_rows().len(), 0); @@ -8517,7 +8525,7 @@ async fn queue_full_workflow_v1(service: Box) { async fn queue_full_workflow_v2(service: Box) { let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_1" "payload1";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8531,7 +8539,7 @@ async fn queue_full_workflow_v2(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:2" "payload2";"#) + .exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:queue_key_2" "payload2";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8545,7 +8553,7 @@ async fn queue_full_workflow_v2(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:3" "payload3";"#) + .exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:queue_key_3" "payload3";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8559,7 +8567,7 @@ async fn queue_full_workflow_v2(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload4";"#) + .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:queue_key_4" "payload4";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8573,7 +8581,7 @@ async fn queue_full_workflow_v2(service: Box) { ); let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:5" "payload5";"#) + .exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:queue_key_5" "payload5";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8589,7 +8597,7 @@ async fn queue_full_workflow_v2(service: Box) { // deduplication check { let add_response = service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_1" "payload1";"#) .await .unwrap(); assert_queue_add_columns(&add_response); @@ -8612,34 +8620,40 @@ async fn queue_full_workflow_v2(service: Box) { pending_response.get_columns(), &vec![ Column::new("id".to_string(), ColumnType::String, 0), - Column::new("status".to_string(), ColumnType::String, 1), - Column::new("extra".to_string(), ColumnType::String, 2), + Column::new("queue_id".to_string(), ColumnType::String, 1), + Column::new("status".to_string(), ColumnType::String, 2), + Column::new("extra".to_string(), ColumnType::String, 3), ] ); assert_eq!( pending_response.get_rows(), &vec![ Row::new(vec![ + TableValue::String("queue_key_3".to_string()), TableValue::String("3".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_4".to_string()), TableValue::String("4".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_2".to_string()), TableValue::String("2".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_1".to_string()), TableValue::String("1".to_string()), TableValue::String("pending".to_string()), TableValue::Null ]), Row::new(vec![ + TableValue::String("queue_key_5".to_string()), TableValue::String("5".to_string()), TableValue::String("pending".to_string()), TableValue::Null @@ -8658,7 +8672,7 @@ async fn queue_full_workflow_v2(service: Box) { { let retrieve_response = service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:3""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:queue_key_3""#) .await .unwrap(); assert_queue_retrieve_columns(&retrieve_response); @@ -8668,7 +8682,8 @@ async fn queue_full_workflow_v2(service: Box) { TableValue::String("payload3".to_string()), TableValue::Null, TableValue::Int(4), - TableValue::String("3".to_string()), + // array of active keys + TableValue::String("queue_key_3".to_string()), TableValue::String("3".to_string()), ]),] ); @@ -8677,7 +8692,7 @@ async fn queue_full_workflow_v2(service: Box) { { // concurrency limit let retrieve_response = service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:4""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:queue_key_4""#) .await .unwrap(); assert_queue_retrieve_columns(&retrieve_response); @@ -8692,6 +8707,7 @@ async fn queue_full_workflow_v2(service: Box) { assert_eq!( active_response.get_rows(), &vec![Row::new(vec![ + TableValue::String("queue_key_3".to_string()), TableValue::String("3".to_string()), TableValue::String("active".to_string()), TableValue::Null @@ -9033,12 +9049,12 @@ async fn queue_orphaned_timeout(service: Box) { .unwrap(); service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_1" "payload1";"#) .await .unwrap(); service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:2" "payload2";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_2" "payload2";"#) .await .unwrap(); @@ -9052,12 +9068,12 @@ async fn queue_orphaned_timeout(service: Box) { // RETRIEVE updates heartbeat { service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:1""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:queue_key_1""#) .await .unwrap(); service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:2""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:queue_key_2""#) .await .unwrap(); } @@ -9065,7 +9081,7 @@ async fn queue_orphaned_timeout(service: Box) { tokio::time::sleep(Duration::from_millis(1000)).await; service - .exec_query(r#"QUEUE HEARTBEAT "STANDALONE#queue:2";"#) + .exec_query(r#"QUEUE HEARTBEAT "STANDALONE#queue:queue_key_2";"#) .await .unwrap(); @@ -9075,11 +9091,17 @@ async fn queue_orphaned_timeout(service: Box) { .unwrap(); assert_eq!( res.get_columns(), - &vec![Column::new("id".to_string(), ColumnType::String, 0),] + &vec![ + Column::new("id".to_string(), ColumnType::String, 0), + Column::new("queue_id".to_string(), ColumnType::String, 1), + ] ); assert_eq!( res.get_rows(), - &vec![Row::new(vec![TableValue::String("1".to_string()),]),] + &vec![Row::new(vec![ + TableValue::String("queue_key_1".to_string()), + TableValue::String("1".to_string()), + ]),] ); // awaiting for expiring heart beat for queue:2 @@ -9327,12 +9349,14 @@ async fn queue_multiple_result_blocking(service: Box) { async fn queue_custom_orphaned(service: Box) { service - .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:queue_key_1" "payload1";"#) .await .unwrap(); service - .exec_query(r#"QUEUE ADD PRIORITY 1 ORPHANED 60 "STANDALONE#queue:2" "payload1";"#) + .exec_query( + r#"QUEUE ADD PRIORITY 1 ORPHANED 60 "STANDALONE#queue:queue_key_2" "payload1";"#, + ) .await .unwrap(); @@ -9344,12 +9368,18 @@ async fn queue_custom_orphaned(service: Box) { .unwrap(); assert_eq!( res.get_columns(), - &vec![Column::new("id".to_string(), ColumnType::String, 0),] + &vec![ + Column::new("id".to_string(), ColumnType::String, 0), + Column::new("queue_id".to_string(), ColumnType::String, 1), + ] ); assert_eq!( res.get_rows(), - &vec![Row::new(vec![TableValue::String("1".to_string()),]),] + &vec![Row::new(vec![ + TableValue::String("queue_key_1".to_string()), + TableValue::String("1".to_string()), + ]),] ); } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index 87cc77e998cc2..74679ff2731bb 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -180,11 +180,25 @@ impl QueueItem { Row::new(res) } - pub fn into_queue_list_row(self, with_payload: bool) -> Row { + pub fn queue_to_cancel_row(item: IdRow) -> Row { + let row_id = item.get_id(); + let row = item.into_row(); + + Row::new(vec![ + TableValue::String(row.key), + TableValue::String(row_id.to_string()), + ]) + } + + pub fn queue_list_row(item: IdRow, with_payload: bool) -> Row { + let row_id = item.get_id(); + let row = item.into_row(); + let mut res = vec![ - TableValue::String(self.key), - TableValue::String(self.status.to_string()), - if let Some(extra) = self.extra { + TableValue::String(row.key), + TableValue::String(row_id.to_string()), + TableValue::String(row.status.to_string()), + if let Some(extra) = row.extra { TableValue::String(extra) } else { TableValue::Null @@ -192,7 +206,7 @@ impl QueueItem { ]; if with_payload { - res.push(TableValue::String(self.value)); + res.push(TableValue::String(row.value)); } Row::new(res) diff --git a/rust/cubestore/cubestore/src/sql/cachestore.rs b/rust/cubestore/cubestore/src/sql/cachestore.rs index 6c6a0728897a9..5284c14ffae11 100644 --- a/rust/cubestore/cubestore/src/sql/cachestore.rs +++ b/rust/cubestore/cubestore/src/sql/cachestore.rs @@ -375,15 +375,17 @@ impl CacheStoreSqlService { .queue_to_cancel(prefix.value, orphaned_timeout, heartbeat_timeout) .await?; - let columns = vec![Column::new("id".to_string(), ColumnType::String, 0)]; + let columns = vec![ + // id is a path, we cannot change it, because it's breaking change + Column::new("id".to_string(), ColumnType::String, 0), + Column::new("queue_id".to_string(), ColumnType::String, 1), + ]; ( Arc::new(DataFrame::new( columns, rows.into_iter() - .map(|item| { - Row::new(vec![TableValue::String(item.get_row().get_key().clone())]) - }) + .map(|item| QueueItem::queue_to_cancel_row(item)) .collect(), )), true, @@ -401,20 +403,22 @@ impl CacheStoreSqlService { .await?; let mut columns = vec![ + // id is a path, we cannot change it, because it's breaking change Column::new("id".to_string(), ColumnType::String, 0), - Column::new("status".to_string(), ColumnType::String, 1), - Column::new("extra".to_string(), ColumnType::String, 2), + Column::new("queue_id".to_string(), ColumnType::String, 1), + Column::new("status".to_string(), ColumnType::String, 2), + Column::new("extra".to_string(), ColumnType::String, 3), ]; if with_payload { - columns.push(Column::new("payload".to_string(), ColumnType::String, 3)); + columns.push(Column::new("payload".to_string(), ColumnType::String, 4)); } ( Arc::new(DataFrame::new( columns, rows.into_iter() - .map(|item| item.into_row().into_queue_list_row(with_payload)) + .map(|item| QueueItem::queue_list_row(item, with_payload)) .collect(), )), true,