Skip to content

Commit

Permalink
chore: fix after cherry pick
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 14, 2024
1 parent e42bea2 commit 8488889
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ pub(crate) mod error;
pub(crate) mod node_context;

pub(crate) use node_context::FlownodeContext;

mod worker;

pub const PER_REQ_MAX_ROW_CNT: usize = 8192;

// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
pub type TableName = [String; 3];
4 changes: 1 addition & 3 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,11 @@ mod test {
use tokio::sync::oneshot;

use super::*;
use crate::adapter::FlowTickManager;
use crate::expr::Id;
use crate::plan::Plan;
use crate::repr::{RelationType, Row};
#[tokio::test]
pub async fn test_simple_get_with_worker_and_handle() {
let flow_tick = FlowTickManager::new();
let (tx, rx) = oneshot::channel();
let worker_thread_handle = std::thread::spawn(move || {
let (handle, mut worker) = create_worker();
Expand Down Expand Up @@ -502,7 +500,7 @@ mod test {
.await
.unwrap();
tx.send((Row::empty(), 0, 0)).unwrap();
handle.run_available(flow_tick.tick()).await;
handle.run_available(0).await;
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
handle.shutdown().await;
worker_thread_handle.join().unwrap();
Expand Down

0 comments on commit 8488889

Please sign in to comment.