From 8488889e764f35ffe43f0f86d07b4de5b86340d9 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 May 2024 14:38:39 +0800 Subject: [PATCH] chore: fix after cherry pick --- src/flow/src/adapter.rs | 9 +++++++++ src/flow/src/adapter/worker.rs | 4 +--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 9eb68a02c5f4..aef545438918 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -19,3 +19,12 @@ pub(crate) mod error; pub(crate) mod node_context; pub(crate) use node_context::FlownodeContext; + +mod worker; + +pub const PER_REQ_MAX_ROW_CNT: usize = 8192; + +// TODO: refactor common types for flow to a separate module +/// FlowId is a unique identifier for a flow task +pub type FlowId = u64; +pub type TableName = [String; 3]; diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index a9d779054e4a..ca5f06a98dba 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -461,13 +461,11 @@ mod test { use tokio::sync::oneshot; use super::*; - use crate::adapter::FlowTickManager; use crate::expr::Id; use crate::plan::Plan; use crate::repr::{RelationType, Row}; #[tokio::test] pub async fn test_simple_get_with_worker_and_handle() { - let flow_tick = FlowTickManager::new(); let (tx, rx) = oneshot::channel(); let worker_thread_handle = std::thread::spawn(move || { let (handle, mut worker) = create_worker(); @@ -502,7 +500,7 @@ mod test { .await .unwrap(); tx.send((Row::empty(), 0, 0)).unwrap(); - handle.run_available(flow_tick.tick()).await; + handle.run_available(0).await; assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); handle.shutdown().await; worker_thread_handle.join().unwrap();