From 8f3cc4cb0a78f93d039ca2c21ccee6480562d12a Mon Sep 17 00:00:00 2001 From: Jonah Eisen Date: Wed, 29 Nov 2023 15:23:57 -0800 Subject: [PATCH] Support joining updating data Operators that produce updating data (insert/update/delete) previously could not be upstream from the JoinWithExpiration operator. This change modifies the JoinWithExpiration operator to correctly handle updating data. All join types, excluding inner joins on non-updating inputs, now produce updating data. The bulk of the changes are in `join_with_expiration.rs`, where we transforms/coerces all data into updating data so that we reuse the same 4 join processors regardless of input type. --- arroyo-datastream/src/lib.rs | 50 +- .../golden_outputs/updating_full_join.json | 7 + .../golden_outputs/updating_inner_join.json | 4 + .../golden_outputs/updating_left_join.json | 7 + .../golden_outputs/updating_right_join.json | 4 + arroyo-sql-testing/src/smoke_tests.rs | 120 +++ arroyo-sql/src/code_gen.rs | 5 +- arroyo-sql/src/pipeline.rs | 9 +- arroyo-sql/src/plan_graph.rs | 93 +- arroyo-types/src/lib.rs | 7 + .../src/operators/join_with_expiration.rs | 829 ++++++++++++------ arroyo-worker/src/operators/joiners.rs | 278 ++++++ arroyo-worker/src/operators/mod.rs | 1 + 13 files changed, 1075 insertions(+), 339 deletions(-) create mode 100644 arroyo-sql-testing/golden_outputs/updating_full_join.json create mode 100644 arroyo-sql-testing/golden_outputs/updating_inner_join.json create mode 100644 arroyo-sql-testing/golden_outputs/updating_left_join.json create mode 100644 arroyo-sql-testing/golden_outputs/updating_right_join.json create mode 100644 arroyo-worker/src/operators/joiners.rs diff --git a/arroyo-datastream/src/lib.rs b/arroyo-datastream/src/lib.rs index 57fde4516..bd8427d29 100644 --- a/arroyo-datastream/src/lib.rs +++ b/arroyo-datastream/src/lib.rs @@ -1789,27 +1789,43 @@ impl Program { let in_k = parse_type(&inputs[0].weight().key); let in_t1 = parse_type(&inputs[0].weight().value); + + let (t1_updating, in_t1_inner) = match extract_container_type("UpdatingData", &in_t1) { + Some(t) => (true, t), + None => (false, in_t1.clone()), + }; + let in_t2 = parse_type(&inputs[1].weight().value); + + let (t2_updating, in_t2_inner) = match extract_container_type("UpdatingData", &in_t2) { + Some(t) => (true, t), + None => (false, in_t2.clone()), + }; + let left_expiration = duration_to_syn_expr(*left_expiration); let right_expiration = duration_to_syn_expr(*right_expiration); - match join_type { - arroyo_types::JoinType::Inner => quote!{ - Box::new(arroyo_worker::operators::join_with_expiration:: - inner_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration)) - }, - arroyo_types::JoinType::Left => quote!{ - Box::new(arroyo_worker::operators::join_with_expiration:: - left_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration)) - }, - arroyo_types::JoinType::Right => quote!{ - Box::new(arroyo_worker::operators::join_with_expiration:: - right_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration)) - }, - arroyo_types::JoinType::Full => quote!{ - Box::new(arroyo_worker::operators::join_with_expiration:: - full_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration)) - }, + + let join_fn_head: String = match join_type { + JoinType::Inner => "inner_join", + JoinType::Left => "left_join", + JoinType::Right => "right_join", + JoinType::Full => "full_join", + }.to_string(); + + let join_fn_tail: String = match (t1_updating, t2_updating) { + (false, false) => "", + (true, false) => "_left_updating", + (false, true) => "_right_updating", + (true, true) => "_both_updating", + }.to_string(); + + let join_fn_name = format_ident!("{}{}", join_fn_head, join_fn_tail); + + quote!{ + Box::new(arroyo_worker::operators::joiners:: + #join_fn_name::<#in_k, #in_t1_inner, #in_t2_inner>(#left_expiration, #right_expiration)) } + }, Operator::UpdatingOperator { name, expression } => { let expr : syn::Expr = parse_str(expression).expect(expression); diff --git a/arroyo-sql-testing/golden_outputs/updating_full_join.json b/arroyo-sql-testing/golden_outputs/updating_full_join.json new file mode 100644 index 000000000..44a0155d3 --- /dev/null +++ b/arroyo-sql-testing/golden_outputs/updating_full_join.json @@ -0,0 +1,7 @@ +{"before":null,"after":{"left_counter":0,"counter_mod_2":null,"right_count":null},"op":"c"} +{"before":null,"after":{"left_counter":1,"counter_mod_2":null,"right_count":null},"op":"c"} +{"before":null,"after":{"left_counter":2,"counter_mod_2":null,"right_count":null},"op":"c"} +{"before":{"left_counter":1,"counter_mod_2":null,"right_count":null},"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"u"} +{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"} +{"before":{"left_counter":2,"counter_mod_2":null,"right_count":null},"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"u"} +{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"} diff --git a/arroyo-sql-testing/golden_outputs/updating_inner_join.json b/arroyo-sql-testing/golden_outputs/updating_inner_join.json new file mode 100644 index 000000000..bb9e76da7 --- /dev/null +++ b/arroyo-sql-testing/golden_outputs/updating_inner_join.json @@ -0,0 +1,4 @@ +{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"} +{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"} +{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"} +{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"} diff --git a/arroyo-sql-testing/golden_outputs/updating_left_join.json b/arroyo-sql-testing/golden_outputs/updating_left_join.json new file mode 100644 index 000000000..44a0155d3 --- /dev/null +++ b/arroyo-sql-testing/golden_outputs/updating_left_join.json @@ -0,0 +1,7 @@ +{"before":null,"after":{"left_counter":0,"counter_mod_2":null,"right_count":null},"op":"c"} +{"before":null,"after":{"left_counter":1,"counter_mod_2":null,"right_count":null},"op":"c"} +{"before":null,"after":{"left_counter":2,"counter_mod_2":null,"right_count":null},"op":"c"} +{"before":{"left_counter":1,"counter_mod_2":null,"right_count":null},"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"u"} +{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"} +{"before":{"left_counter":2,"counter_mod_2":null,"right_count":null},"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"u"} +{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"} diff --git a/arroyo-sql-testing/golden_outputs/updating_right_join.json b/arroyo-sql-testing/golden_outputs/updating_right_join.json new file mode 100644 index 000000000..bb9e76da7 --- /dev/null +++ b/arroyo-sql-testing/golden_outputs/updating_right_join.json @@ -0,0 +1,4 @@ +{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"} +{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"} +{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"} +{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"} diff --git a/arroyo-sql-testing/src/smoke_tests.rs b/arroyo-sql-testing/src/smoke_tests.rs index 1b5e30c22..3223736be 100644 --- a/arroyo-sql-testing/src/smoke_tests.rs +++ b/arroyo-sql-testing/src/smoke_tests.rs @@ -869,3 +869,123 @@ FROM (SELECT TUMBLE(interval '1 second'), counter, count(*) FROM impulse_source JOIN (SELECT TUMBLE(interval '1 second') as window, counter , count(*) FROM delayed_impulse_source GROUP BY 1,2) b ON a.counter = b.counter;"} + +correctness_run_codegen! {"updating_left_join", 10, +"CREATE TABLE impulse ( + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null +) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); + + +CREATE TABLE output ( + left_counter bigint, + counter_mod_2 bigint, + right_count bigint +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' +); + +INSERT INTO output +select counter as left_counter, counter_mod_2, right_count from impulse left join + (select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1) + on counter = right_count where counter < 3;"} + +correctness_run_codegen! {"updating_right_join", 10, +"CREATE TABLE impulse ( + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null +) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); + + +CREATE TABLE output ( + left_counter bigint, + counter_mod_2 bigint, + right_count bigint +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' +); + +INSERT INTO output +select counter as left_counter, counter_mod_2, right_count from impulse right join + (select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1) + on counter = right_count where counter < 3;"} + +correctness_run_codegen! {"updating_inner_join", 10, +"CREATE TABLE impulse ( + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null +) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); + + +CREATE TABLE output ( + left_counter bigint, + counter_mod_2 bigint, + right_count bigint +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' +); + +INSERT INTO output +select counter as left_counter, counter_mod_2, right_count from impulse inner join + (select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1) + on counter = right_count where counter < 3;"} + +correctness_run_codegen! {"updating_full_join", 10, +"CREATE TABLE impulse ( + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null +) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); + + +CREATE TABLE output ( + left_counter bigint, + counter_mod_2 bigint, + right_count bigint +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' +); + +INSERT INTO output +select counter as left_counter, counter_mod_2, right_count from impulse full outer join + (select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1) + on counter = right_count where counter < 3;"} diff --git a/arroyo-sql/src/code_gen.rs b/arroyo-sql/src/code_gen.rs index b3c37dc9c..97a15612d 100644 --- a/arroyo-sql/src/code_gen.rs +++ b/arroyo-sql/src/code_gen.rs @@ -227,9 +227,10 @@ impl JoinPairContext { let merge_expr = code_generator.generate(self); let left_ident = self.left_ident(); let right_ident = self.right_ident(); + parse_quote!({ - let #left_ident = &record.value.0; - let #right_ident = &record.value.1; + let (#left_ident, #right_ident) = &record.value.unwrap_append().clone(); + arroyo_types::Record { timestamp: record.timestamp.clone(), key: None, diff --git a/arroyo-sql/src/pipeline.rs b/arroyo-sql/src/pipeline.rs index e95e3c324..0059f6557 100644 --- a/arroyo-sql/src/pipeline.rs +++ b/arroyo-sql/src/pipeline.rs @@ -222,6 +222,12 @@ pub struct JoinOperator { pub join_type: JoinType, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InputsUpdating { + pub left: bool, + pub right: bool, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum JoinType { /// Inner Join @@ -805,9 +811,6 @@ impl<'a> SqlPipelineBuilder<'a> { fn insert_join(&mut self, join: &datafusion_expr::logical_plan::Join) -> Result { let left_input = self.insert_sql_plan(&join.left)?; let right_input = self.insert_sql_plan(&join.right)?; - if left_input.is_updating() || right_input.is_updating() { - bail!("don't support joins with updating inputs"); - } match join.join_constraint { JoinConstraint::On => {} JoinConstraint::Using => bail!("don't support 'using' in joins"), diff --git a/arroyo-sql/src/plan_graph.rs b/arroyo-sql/src/plan_graph.rs index 0234ffe98..a98a9248f 100644 --- a/arroyo-sql/src/plan_graph.rs +++ b/arroyo-sql/src/plan_graph.rs @@ -15,6 +15,7 @@ use quote::{quote, ToTokens}; use syn::{parse_quote, parse_str, Type}; use crate::expressions::AggregateComputation; +use crate::pipeline::InputsUpdating; use crate::{ code_gen::{ BinAggregatingContext, CodeGenerator, CombiningContext, JoinListsContext, JoinPairContext, @@ -66,7 +67,7 @@ pub enum PlanOperator { join_type: JoinType, }, JoinListMerge(JoinType, StructPair), - JoinPairMerge(JoinType, StructPair), + JoinPairMerge(JoinType, StructPair, InputsUpdating), Flatten, // TODO: figure out naming of various things called 'window' WindowFunction(WindowFunctionOperator), @@ -327,7 +328,7 @@ impl PlanNode { PlanOperator::InstantJoin => "instant_join".to_string(), PlanOperator::JoinWithExpiration { .. } => "join_with_expiration".to_string(), PlanOperator::JoinListMerge(_, _) => "join_list_merge".to_string(), - PlanOperator::JoinPairMerge(_, _) => "join_pair_merge".to_string(), + PlanOperator::JoinPairMerge(_, _, _) => "join_pair_merge".to_string(), PlanOperator::Flatten => "flatten".to_string(), PlanOperator::WindowFunction { .. } => "window_function".to_string(), PlanOperator::StreamOperator(name, _) => name.to_string(), @@ -455,23 +456,20 @@ impl PlanNode { let record_expression = context.compile_list_merge_record_expression(join_type); MethodCompiler::record_expression_operator("join_list_merge", record_expression) } - PlanOperator::JoinPairMerge(join_type, struct_pair) => { + PlanOperator::JoinPairMerge(join_type, struct_pair, inputs_updating) => { let context = JoinPairContext::new(struct_pair.left.clone(), struct_pair.right.clone()); - match join_type { - JoinType::Inner => { - let record_expression = - context.compile_pair_merge_record_expression(join_type); - MethodCompiler::record_expression_operator("join_merge", record_expression) - } - JoinType::Left | JoinType::Right | JoinType::Full => { - let value_expression = - context.compile_updating_pair_merge_value_expression(join_type); - MethodCompiler::value_updating_operator( - "updating_join_merge", - value_expression, - ) - } + + if !matches!(join_type, JoinType::Inner) + || inputs_updating.left + || inputs_updating.right + { + let value_expression = + context.compile_updating_pair_merge_value_expression(join_type); + MethodCompiler::value_updating_operator("updating_join_merge", value_expression) + } else { + let record_expression = context.compile_pair_merge_record_expression(join_type); + MethodCompiler::record_expression_operator("join_merge", record_expression) } } @@ -848,7 +846,7 @@ impl PlanNode { output_types.extend(self.output_type.get_all_types()); // TODO: populate types only created within operators. match &self.operator { - PlanOperator::JoinPairMerge(join_type, StructPair { left, right }) + PlanOperator::JoinPairMerge(join_type, StructPair { left, right }, ..) | PlanOperator::JoinListMerge(join_type, StructPair { left, right }) => { output_types.insert(join_type.join_struct_type(left, right)); } @@ -932,7 +930,9 @@ impl PlanType { let left_type = left_value.get_type(); let right_type = right_value.get_type(); match join_type { - JoinType::Inner => parse_quote!((#left_type,#right_type)), + JoinType::Inner => { + parse_quote!(arroyo_types::UpdatingData<(#left_type,#right_type)>) + } JoinType::Left => { parse_quote!(arroyo_types::UpdatingData<(#left_type,Option<#right_type>)>) } @@ -1135,6 +1135,20 @@ impl PlanType { pub(crate) fn is_updating(&self) -> bool { matches!(self, PlanType::Updating(_)) } + + fn for_defs(key_struct: StructDef, value_struct: StructDef, updating: bool) -> PlanType { + if updating { + PlanType::Updating(Box::new(PlanType::Keyed { + key: key_struct.clone(), + value: value_struct.clone(), + })) + } else { + PlanType::Keyed { + key: key_struct.clone(), + value: value_struct.clone(), + } + } + } } #[derive(Debug)] @@ -1225,7 +1239,7 @@ impl PlanGraph { PlanOperator::InstantJoin => {} PlanOperator::JoinWithExpiration { .. } => {} PlanOperator::JoinListMerge(_, _) => {} - PlanOperator::JoinPairMerge(_, _) => {} + PlanOperator::JoinPairMerge(_, _, _) => {} PlanOperator::Flatten => {} PlanOperator::WindowFunction(w) => { w.order_by @@ -1483,11 +1497,18 @@ impl PlanGraph { // right now left and right either both have or don't have windows. let has_window = left.has_window(); let join_type = join_operator.join_type; + let left_updating = left.is_updating(); + let right_updating = right.is_updating(); let left_index = self.add_sql_operator(*left); let right_index = self.add_sql_operator(*right); let key_struct = join_operator.left_key.output_struct(); + let inputs_updating = InputsUpdating { + left: left_updating, + right: right_updating, + }; + let left_key_operator = PlanOperator::RecordTransform(RecordTransform::KeyProjection(join_operator.left_key)); let right_key_operator = @@ -1495,17 +1516,12 @@ impl PlanGraph { let left_key_index = self.insert_operator( left_key_operator, - PlanType::Keyed { - key: key_struct.clone(), - value: left_type.clone(), - }, + PlanType::for_defs(key_struct.clone(), left_type.clone(), left_updating), ); + let right_key_index = self.insert_operator( right_key_operator, - PlanType::Keyed { - key: key_struct.clone(), - value: right_type.clone(), - }, + PlanType::for_defs(key_struct.clone(), right_type.clone(), right_updating), ); let left_key_edge = PlanEdge { @@ -1536,6 +1552,7 @@ impl PlanGraph { left_type, right_type, join_type, + inputs_updating, ) } } @@ -1596,6 +1613,7 @@ impl PlanGraph { flatten_index } + fn add_join_with_expiration( &mut self, left_index: NodeIndex, @@ -1604,6 +1622,7 @@ impl PlanGraph { left_struct: StructDef, right_struct: StructDef, join_type: JoinType, + inputs_updating: InputsUpdating, ) -> NodeIndex { let join_node = PlanOperator::JoinWithExpiration { left_expiration: Duration::from_secs(24 * 60 * 60), @@ -1636,16 +1655,18 @@ impl PlanGraph { left: left_struct, right: right_struct, }, + inputs_updating.clone(), ); - let merge_output_type = match join_type { - JoinType::Inner => PlanType::Unkeyed(merge_type), - JoinType::Left | JoinType::Right | JoinType::Full => { - PlanType::Updating(Box::new(PlanType::Keyed { - key: key_struct, - value: merge_type, - })) - } + + let updating = inputs_updating.left || inputs_updating.right; + let merge_output_type = match (join_type, updating) { + (JoinType::Inner, false) => PlanType::Unkeyed(merge_type), + _ => PlanType::Updating(Box::new(PlanType::Keyed { + key: key_struct, + value: merge_type, + })), }; + let merge_index = self.insert_operator(merge_operator, merge_output_type); let merge_edge = PlanEdge { diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index e7110b729..c5743b889 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -338,6 +338,13 @@ impl UpdatingData { UpdatingData::Append(t) => t.clone(), } } + + pub fn unwrap_append(&self) -> &T { + match self { + UpdatingData::Append(t) => t, + _ => panic!("UpdatingData is not an append"), + } + } } #[derive(Clone, Encode, Decode, Debug, Serialize, Deserialize, PartialEq)] diff --git a/arroyo-worker/src/operators/join_with_expiration.rs b/arroyo-worker/src/operators/join_with_expiration.rs index ad0e0c1b7..1c0c2cbe9 100644 --- a/arroyo-worker/src/operators/join_with_expiration.rs +++ b/arroyo-worker/src/operators/join_with_expiration.rs @@ -13,6 +13,10 @@ use crate::engine::Context; #[derive(StreamNode)] pub struct JoinWithExpiration< K: Key, + InT1: Data, + InT2: Data, + P1: IncomingDataProcessor, + P2: IncomingDataProcessor, T1: Data, T2: Data, Output: Data, @@ -21,299 +25,530 @@ pub struct JoinWithExpiration< left_expiration: Duration, right_expiration: Duration, processor: P, - _t: PhantomData<(K, T1, T2, Output)>, + _t: PhantomData<(K, P1, P2, InT1, InT2, T1, T2, Output)>, } pub trait JoinProcessor: Send + 'static { - fn left_join( + fn process_left( &self, - key: K, left_timestamp: SystemTime, - left_value: T1, + left_update: UpdatingData, right: Option<(SystemTime, &T2)>, - first_left: bool, + left_count: usize, ) -> Option<(SystemTime, Output)>; - fn right_join( + fn process_right( &self, - key: K, right_timestamp: SystemTime, - right_value: T2, + right_update: UpdatingData, left: Option<(SystemTime, &T1)>, - first_right: bool, + right_count: usize, ) -> Option<(SystemTime, Output)>; } +pub trait IncomingDataProcessor: Send + 'static { + fn ensure_updating(incoming: In) -> UpdatingData; +} + +pub struct NoOpProcessor { + _t: PhantomData, +} + +impl IncomingDataProcessor, T> for NoOpProcessor { + fn ensure_updating(incoming: UpdatingData) -> UpdatingData { + incoming + } +} + +pub struct Coercer { + _t: PhantomData, +} + +impl IncomingDataProcessor for Coercer { + fn ensure_updating(incoming: T) -> UpdatingData { + UpdatingData::Append(incoming) + } +} + pub struct LeftJoinProcessor { - _t: PhantomData<(K, T1, T2)>, + pub _t: PhantomData<(K, T1, T2)>, +} + +impl LeftJoinProcessor { + pub fn new() -> Self { + Self { _t: PhantomData } + } } impl JoinProcessor)>> for LeftJoinProcessor { - fn left_join( + fn process_left( &self, - _key: K, left_timestamp: SystemTime, - left_value: T1, + left_update: UpdatingData, right: Option<(SystemTime, &T2)>, - _first_left: bool, + _first_left: usize, ) -> Option<(SystemTime, UpdatingData<(T1, Option)>)> { - match right { - Some((right_timestamp, right_value)) => Some(( - left_timestamp.max(right_timestamp), - UpdatingData::Append((left_value, Some(right_value.clone()))), - )), - None => Some((left_timestamp, UpdatingData::Append((left_value, None)))), + match left_update { + UpdatingData::Append(left) => match right { + Some((right_timestamp, right)) => Some(( + left_timestamp.max(right_timestamp).clone(), + UpdatingData::Append((left.clone(), Some(right.clone()))), + )), + None => Some((left_timestamp, UpdatingData::Append((left.clone(), None)))), + }, + UpdatingData::Update { old, new } => match right { + Some((right_timestamp, right)) => Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (old.clone(), Some(right.clone())), + new: (new.clone(), Some(right.clone())), + }, + )), + None => Some(( + left_timestamp, + UpdatingData::Update { + old: (old.clone(), None), + new: (new.clone(), None), + }, + )), + }, + UpdatingData::Retract(left) => match right { + Some((right_timestamp, right)) => Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Retract((left.clone(), Some(right.clone()))), + )), + None => Some((left_timestamp, UpdatingData::Retract((left.clone(), None)))), + }, } } - fn right_join( + fn process_right( &self, - _key: K, right_timestamp: SystemTime, - right_value: T2, + right_update: UpdatingData, left: Option<(SystemTime, &T1)>, - first_right: bool, + right_count: usize, ) -> Option<(SystemTime, UpdatingData<(T1, Option)>)> { - left.map(|(left_timestamp, left_value)| { - if first_right { - ( - left_timestamp.max(right_timestamp), - UpdatingData::Update { - old: (left_value.clone(), None), - new: (left_value.clone(), Some(right_value.clone())), - }, - ) - } else { - ( - left_timestamp.max(right_timestamp), - UpdatingData::Append((left_value.clone(), Some(right_value))), - ) + let Some((left_timestamp, left)) = left else { + return None; + }; + + let timestamp = left_timestamp.max(right_timestamp); + + match right_update { + UpdatingData::Append(right) => { + if right_count == 0 { + Some(( + timestamp, + UpdatingData::Update { + old: (left.clone(), None), + new: (left.clone(), Some(right)), + }, + )) + } else { + Some((timestamp, UpdatingData::Append((left.clone(), Some(right))))) + } } - }) + UpdatingData::Update { old, new } => Some(( + timestamp, + UpdatingData::Update { + old: (left.clone(), Some(old)), + new: (left.clone(), Some(new)), + }, + )), + UpdatingData::Retract(right) => { + if right_count == 1 { + Some(( + timestamp, + UpdatingData::Update { + old: (left.clone(), Some(right.clone())), + new: (left.clone(), None), + }, + )) + } else { + Some(( + timestamp, + UpdatingData::Retract((left.clone(), Some(right.clone()))), + )) + } + } + } } } pub struct RightJoinProcessor { - _t: PhantomData<(K, T1, T2)>, + pub _t: PhantomData<(K, T1, T2)>, +} + +impl RightJoinProcessor { + pub fn new() -> Self { + Self { _t: PhantomData } + } } impl JoinProcessor, T2)>> for RightJoinProcessor { - fn left_join( + fn process_left( &self, - _key: K, left_timestamp: SystemTime, - left_value: T1, + left_update: UpdatingData, right: Option<(SystemTime, &T2)>, - first_left: bool, + left_count: usize, ) -> Option<(SystemTime, UpdatingData<(Option, T2)>)> { - right.map(|(right_timestamp, right_value)| { - if first_left { - ( - left_timestamp.max(right_timestamp), - UpdatingData::Update { - old: (None, right_value.clone()), - new: (Some(left_value.clone()), right_value.clone()), - }, - ) - } else { - ( - left_timestamp.max(right_timestamp), - UpdatingData::Append((Some(left_value), right_value.clone())), - ) + let Some((right_timestamp, right)) = right else { + return None; + }; + + let timestamp = left_timestamp.max(right_timestamp); + + match left_update { + UpdatingData::Append(left) => { + if left_count == 0 { + Some(( + timestamp, + UpdatingData::Update { + old: (None, right.clone()), + new: (Some(left), right.clone()), + }, + )) + } else { + Some(( + timestamp, + UpdatingData::Append((Some(left.clone()), right.clone())), + )) + } } - }) + UpdatingData::Update { old, new } => Some(( + timestamp, + UpdatingData::Update { + old: (Some(old), right.clone()), + new: (Some(new), right.clone()), + }, + )), + UpdatingData::Retract(left) => { + if left_count == 1 { + Some(( + timestamp, + UpdatingData::Update { + old: (Some(left.clone()), right.clone()), + new: (None, right.clone()), + }, + )) + } else { + Some(( + timestamp, + UpdatingData::Retract((Some(left.clone()), right.clone())), + )) + } + } + } } - fn right_join( + fn process_right( &self, - _key: K, right_timestamp: SystemTime, - right_value: T2, + right_update: UpdatingData, left: Option<(SystemTime, &T1)>, - _first_right: bool, + _first_left: usize, ) -> Option<(SystemTime, UpdatingData<(Option, T2)>)> { - match left { - Some((left_timestamp, left_value)) => Some(( - left_timestamp.max(right_timestamp), - UpdatingData::Append((Some(left_value.clone()), right_value)), - )), - None => Some((right_timestamp, UpdatingData::Append((None, right_value)))), + match right_update { + UpdatingData::Append(right) => match left { + Some((left_timestamp, left)) => Some(( + left_timestamp.max(right_timestamp).clone(), + UpdatingData::Append((Some(left.clone()), right.clone())), + )), + None => Some((right_timestamp, UpdatingData::Append((None, right.clone())))), + }, + UpdatingData::Update { old, new } => match left { + Some((left_timestamp, left)) => Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (Some(left.clone()), old.clone()), + new: (Some(left.clone()), new.clone()), + }, + )), + None => Some(( + right_timestamp, + UpdatingData::Update { + old: (None, old.clone()), + new: (None, new.clone()), + }, + )), + }, + UpdatingData::Retract(right) => match left { + Some((left_timestamp, left)) => Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Retract((Some(left.clone()), right.clone())), + )), + None => Some(( + right_timestamp, + UpdatingData::Retract((None, right.clone())), + )), + }, } } } pub struct FullJoinProcessor { - _t: PhantomData<(K, T1, T2)>, + pub(crate) _t: PhantomData<(K, T1, T2)>, +} + +impl FullJoinProcessor { + pub fn new() -> Self { + Self { _t: PhantomData } + } } impl JoinProcessor, Option)>> for FullJoinProcessor { - fn left_join( + fn process_left( &self, - _key: K, left_timestamp: SystemTime, - left_value: T1, + left_update: UpdatingData, right: Option<(SystemTime, &T2)>, - first_left: bool, + left_count: usize, ) -> Option<(SystemTime, UpdatingData<(Option, Option)>)> { - match right { - Some((right_timestamp, right_value)) => { - if first_left { - Some(( - left_timestamp.max(right_timestamp), - UpdatingData::Update { - old: (None, Some(right_value.clone())), - new: (Some(left_value), Some(right_value.clone())), - }, - )) - } else { - Some(( - left_timestamp.max(right_timestamp), - UpdatingData::Append((Some(left_value), Some(right_value.clone()))), - )) + match left_update { + UpdatingData::Append(left) => match right { + Some((right_timestamp, right)) => { + if left_count == 0 { + Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (None, Some(right.clone())), + new: (Some(left.clone()), Some(right.clone())), + }, + )) + } else { + Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Append((Some(left.clone()), Some(right.clone()))), + )) + } } - } - None => Some(( - left_timestamp, - UpdatingData::Append((Some(left_value), None)), - )), + None => Some(( + left_timestamp, + UpdatingData::Append((Some(left.clone()), None)), + )), + }, + UpdatingData::Update { old, new } => match right { + Some((right_timestamp, right)) => Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (Some(old.clone()), Some(right.clone())), + new: (Some(new.clone()), Some(right.clone())), + }, + )), + None => Some(( + left_timestamp, + UpdatingData::Update { + old: (Some(old.clone()), None), + new: (Some(new.clone()), None), + }, + )), + }, + UpdatingData::Retract(left) => match right { + Some((right_timestamp, right)) => { + if left_count == 1 { + Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (Some(left.clone()), Some(right.clone())), + new: (None, Some(right.clone())), + }, + )) + } else { + Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Retract((Some(left.clone()), Some(right.clone()))), + )) + } + } + None => { + if left_count == 1 { + Some(( + left_timestamp, + UpdatingData::Update { + old: (Some(left.clone()), None), + new: (None, None), + }, + )) + } else { + Some(( + left_timestamp, + UpdatingData::Retract((Some(left.clone()), None)), + )) + } + } + }, } } - fn right_join( + fn process_right( &self, - _key: K, right_timestamp: SystemTime, - right_value: T2, + right_update: UpdatingData, left: Option<(SystemTime, &T1)>, - first_right: bool, + right_count: usize, ) -> Option<(SystemTime, UpdatingData<(Option, Option)>)> { - match left { - Some((left_timestamp, left_value)) => { - if first_right { - Some(( - left_timestamp.max(right_timestamp), - UpdatingData::Update { - old: (Some(left_value.clone()), None), - new: (Some(left_value.clone()), Some(right_value)), - }, - )) - } else { - Some(( - left_timestamp.max(right_timestamp), - UpdatingData::Append((Some(left_value.clone()), Some(right_value))), - )) + match right_update { + UpdatingData::Append(right) => match left { + Some((left_timestamp, left)) => { + if right_count == 0 { + Some(( + right_timestamp.max(left_timestamp), + UpdatingData::Update { + old: (Some(left.clone()), None), + new: (Some(left.clone()), Some(right.clone())), + }, + )) + } else { + Some(( + right_timestamp.max(left_timestamp), + UpdatingData::Append((Some(left.clone()), Some(right.clone()))), + )) + } } - } - None => Some(( - right_timestamp, - UpdatingData::Append((None, Some(right_value))), - )), + None => Some(( + right_timestamp, + UpdatingData::Append((None, Some(right.clone()))), + )), + }, + UpdatingData::Update { old, new } => match left { + Some((left_timestamp, left)) => Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (Some(left.clone()), Some(old.clone())), + new: (Some(left.clone()), Some(new.clone())), + }, + )), + None => Some(( + right_timestamp, + UpdatingData::Update { + old: (None, Some(old.clone())), + new: (None, Some(new.clone())), + }, + )), + }, + UpdatingData::Retract(right) => match left { + Some((left_timestamp, left)) => { + if right_count == 1 { + Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (Some(left.clone()), Some(right.clone())), + new: (Some(left.clone()), None), + }, + )) + } else { + Some(( + left_timestamp.max(right_timestamp), + UpdatingData::Retract((Some(left.clone()), Some(right.clone()))), + )) + } + } + None => { + if right_count == 1 { + Some(( + right_timestamp, + UpdatingData::Update { + old: (None, Some(right.clone())), + new: (None, None), + }, + )) + } else { + Some(( + right_timestamp, + UpdatingData::Retract((None, Some(right.clone()))), + )) + } + } + }, } } } pub struct InnerJoinProcessor { - _t: PhantomData<(K, T1, T2)>, + pub(crate) _t: PhantomData<(K, T1, T2)>, } -impl JoinProcessor +impl InnerJoinProcessor { + pub fn new() -> Self { + Self { _t: PhantomData } + } +} + +impl JoinProcessor> for InnerJoinProcessor { - fn left_join( + fn process_left( &self, - _key: K, left_timestamp: SystemTime, - left_value: T1, + left_update: UpdatingData, right: Option<(SystemTime, &T2)>, - _evict_prior: bool, - ) -> Option<(SystemTime, (T1, T2))> { - right.map(|(right_timestamp, right_value)| { - ( + _left_count: usize, + ) -> Option<(SystemTime, UpdatingData<(T1, T2)>)> { + right.map(|(right_timestamp, right)| match left_update { + UpdatingData::Append(left) => ( left_timestamp.max(right_timestamp), - (left_value, right_value.clone()), - ) + UpdatingData::Append((left.clone(), right.clone())), + ), + UpdatingData::Update { old, new } => ( + left_timestamp.max(right_timestamp), + UpdatingData::Update { + old: (old.clone(), right.clone()), + new: (new.clone(), right.clone()), + }, + ), + UpdatingData::Retract(left) => ( + left_timestamp.max(right_timestamp), + UpdatingData::Retract((left.clone(), right.clone())), + ), }) } - fn right_join( + fn process_right( &self, - _key: K, right_timestamp: SystemTime, - right_value: T2, + right_update: UpdatingData, left: Option<(SystemTime, &T1)>, - _evict_prior: bool, - ) -> Option<(SystemTime, (T1, T2))> { - left.map(|(left_timestamp, left_value)| { - ( + _right_count: usize, + ) -> Option<(SystemTime, UpdatingData<(T1, T2)>)> { + left.map(|(left_timestamp, left)| match right_update { + UpdatingData::Append(right) => ( + left_timestamp.max(right_timestamp), + UpdatingData::Append((left.clone(), right.clone())), + ), + UpdatingData::Update { old, new } => ( left_timestamp.max(right_timestamp), - (left_value.clone(), right_value), - ) + UpdatingData::Update { + old: (left.clone(), old.clone()), + new: (left.clone(), new.clone()), + }, + ), + UpdatingData::Retract(right) => ( + left_timestamp.max(right_timestamp), + UpdatingData::Retract((left.clone(), right.clone())), + ), }) } } -// Return left JoinWithExpiration -pub fn left_join( - left_expiration: Duration, - right_expiration: Duration, -) -> JoinWithExpiration)>, LeftJoinProcessor> { - JoinWithExpiration::new( - left_expiration, - right_expiration, - LeftJoinProcessor { _t: PhantomData }, - ) -} - -// Return right JoinWithExpiration -pub fn right_join( - left_expiration: Duration, - right_expiration: Duration, -) -> JoinWithExpiration, T2)>, RightJoinProcessor> { - JoinWithExpiration::new( - left_expiration, - right_expiration, - RightJoinProcessor { _t: PhantomData }, - ) -} - -// Return full JoinWithExpiration -pub fn full_join( - left_expiration: Duration, - right_expiration: Duration, -) -> JoinWithExpiration< - K, - T1, - T2, - UpdatingData<(Option, Option)>, - FullJoinProcessor, -> { - JoinWithExpiration::new( - left_expiration, - right_expiration, - FullJoinProcessor { _t: PhantomData }, - ) -} - -// return inner JoinWithExpiration -pub fn inner_join( - left_expiration: Duration, - right_expiration: Duration, -) -> JoinWithExpiration> { - JoinWithExpiration::new( - left_expiration, - right_expiration, - InnerJoinProcessor { _t: PhantomData }, - ) -} - -#[co_process_fn(in_k1=K, in_t1=T1, in_k2=K, in_t2=T2, out_k=K, out_t=Output)] -impl> - JoinWithExpiration +#[co_process_fn(in_k1=K, in_t1=InT1, in_k2=K, in_t2=InT2, out_k=K, out_t=Output)] +impl< + K: Key, + InT1: Data, + InT2: Data, + P1: IncomingDataProcessor, + P2: IncomingDataProcessor, + T1: Data, + T2: Data, + Output: Data, + P: JoinProcessor, + > JoinWithExpiration { fn name(&self) -> String { "JoinWithExpiration".to_string() @@ -349,118 +584,150 @@ impl, ctx: &mut Context) { + async fn update_state( + &mut self, + ctx: &mut Context, + key: K, + timestamp: SystemTime, + updating_data: UpdatingData, + table: char, + ) { + let mut state = ctx.state.get_key_time_multi_map(table).await; + match updating_data { + UpdatingData::Append(value) => { + state.insert(timestamp, key, value).await; + } + UpdatingData::Update { old, new } => { + let k = key.clone(); + state.delete_value(timestamp, k, old).await; + state.insert(timestamp, key, new).await; + } + UpdatingData::Retract(value) => { + state.delete_value(timestamp, key, value).await; + } + } + } + + async fn process_left(&mut self, left_record: &Record, ctx: &mut Context) { if let Some(watermark) = ctx.last_present_watermark() { - if record.timestamp < watermark { + if left_record.timestamp < watermark { return; } }; - let mut key = record.key.clone().unwrap(); - let value = record.value.clone(); + let mut key = left_record.key.clone().unwrap(); + let left_update = P1::ensure_updating(left_record.value.clone()); let mut left_state: KeyTimeMultiMap = ctx.state.get_key_time_multi_map('l').await; - let first_left = left_state + let left_count = left_state .get_all_values_with_timestamps(&mut key) .await - .is_none(); + .map_or(0, |values| values.count()); + let mut right_state: KeyTimeMultiMap = ctx.state.get_key_time_multi_map('r').await; - let records = { - let mut records = vec![]; - if let Some(right_rows) = right_state.get_all_values_with_timestamps(&mut key).await { - for right in right_rows { - if let Some((timestamp, value)) = self.processor.left_join( - key.clone(), - record.timestamp, - value.clone(), - Some(right), - first_left, - ) { - records.push(Record { - timestamp, - key: Some(key.clone()), - value, - }); - } - } - } else if let Some((timestamp, value)) = self.processor.left_join( - key.clone(), - record.timestamp, - value.clone(), - None, - first_left, - ) { - records.push(Record { - timestamp, - key: Some(key.clone()), - value, + + let mut out_records = vec![]; + if let Some(right_rows) = right_state.get_all_values_with_timestamps(&mut key).await { + out_records.extend( + right_rows + .filter_map(|(timestamp, value)| { + self.processor.process_left( + left_record.timestamp.clone(), + left_update.clone(), + Some((timestamp.clone(), value)), + left_count, + ) + }) + .map(|(timestamp, value)| Record { + timestamp, + key: Some(key.clone()), + value, + }), + ); + } else { + self.processor + .process_left(left_record.timestamp, left_update.clone(), None, left_count) + .map(|(timestamp, value)| { + out_records.push(Record { + timestamp, + key: Some(key.clone()), + value, + }); }); - } - records - }; - for record in records { + } + + for record in out_records { ctx.collect(record).await; } - let mut left_state = ctx.state.get_key_time_multi_map('l').await; - left_state.insert(record.timestamp, key, value).await; + + self.update_state(ctx, key, left_record.timestamp, left_update, 'l') + .await; } - async fn process_right(&mut self, record: &Record, ctx: &mut Context) { + async fn process_right( + &mut self, + right_record: &Record, + ctx: &mut Context, + ) { if let Some(watermark) = ctx.last_present_watermark() { - if record.timestamp < watermark { + if right_record.timestamp < watermark { return; } }; - let mut key = record.key.clone().unwrap(); - let value = record.value.clone(); - let mut right_state = ctx.state.get_key_time_multi_map('r').await; - let first_right = right_state + + let mut key = right_record.key.clone().unwrap(); + let right_update = P2::ensure_updating(right_record.value.clone()); + + let mut right_state: KeyTimeMultiMap = + ctx.state.get_key_time_multi_map('r').await; + let right_count = right_state .get_all_values_with_timestamps(&mut key) .await - .is_none(); - let key_to_insert = key.clone(); - let value_to_insert = value.clone(); - right_state - .insert(record.timestamp, key_to_insert, value_to_insert) - .await; + .map_or(0, |values| values.count()); let mut left_state: KeyTimeMultiMap = ctx.state.get_key_time_multi_map('l').await; - let records = { - let mut records = vec![]; - if let Some(left_rows) = left_state.get_all_values_with_timestamps(&mut key).await { - for left in left_rows { - if let Some((timestamp, value)) = self.processor.right_join( - key.clone(), - record.timestamp, - value.clone(), - Some(left), - first_right, - ) { - records.push(Record { - timestamp, - key: Some(key.clone()), - value, - }); - } - } - } else if let Some((timestamp, value)) = self.processor.right_join( - key.clone(), - record.timestamp, - value.clone(), - None, - first_right, - ) { - records.push(Record { - timestamp, - key: Some(key.clone()), - value, + + let mut out_records = vec![]; + if let Some(left_rows) = left_state.get_all_values_with_timestamps(&mut key).await { + out_records.extend( + left_rows + .filter_map(|(timestamp, value)| { + self.processor.process_right( + right_record.timestamp.clone(), + right_update.clone(), + Some((timestamp.clone(), value)), + right_count, + ) + }) + .map(|(timestamp, value)| Record { + timestamp, + key: Some(key.clone()), + value, + }), + ); + } else { + self.processor + .process_right( + right_record.timestamp, + right_update.clone(), + None, + right_count, + ) + .map(|(timestamp, value)| { + out_records.push(Record { + timestamp, + key: Some(key.clone()), + value, + }); }); - } - records - }; - for record in records { + } + + for record in out_records { ctx.collect(record).await; } + + self.update_state(ctx, key, right_record.timestamp, right_update, 'r') + .await; } async fn handle_watermark(&mut self, watermark: Watermark, ctx: &mut Context) { diff --git a/arroyo-worker/src/operators/joiners.rs b/arroyo-worker/src/operators/joiners.rs new file mode 100644 index 000000000..f56a6a9aa --- /dev/null +++ b/arroyo-worker/src/operators/joiners.rs @@ -0,0 +1,278 @@ +use crate::operators::join_with_expiration::{ + Coercer, FullJoinProcessor, InnerJoinProcessor, JoinWithExpiration, LeftJoinProcessor, + NoOpProcessor, RightJoinProcessor, +}; +use arroyo_types::*; +use std::time::Duration; + +pub fn left_join( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + T2, + Coercer, + Coercer, + T1, + T2, + UpdatingData<(T1, Option)>, + LeftJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, LeftJoinProcessor::new()) +} + +pub fn left_join_left_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + T2, + NoOpProcessor, + Coercer, + T1, + T2, + UpdatingData<(T1, Option)>, + LeftJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, LeftJoinProcessor::new()) +} + +pub fn left_join_right_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + UpdatingData, + Coercer, + NoOpProcessor, + T1, + T2, + UpdatingData<(T1, Option)>, + LeftJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, LeftJoinProcessor::new()) +} + +pub fn left_join_both_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + UpdatingData, + NoOpProcessor, + NoOpProcessor, + T1, + T2, + UpdatingData<(T1, Option)>, + LeftJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, LeftJoinProcessor::new()) +} + +pub fn right_join( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + T2, + Coercer, + Coercer, + T1, + T2, + UpdatingData<(Option, T2)>, + RightJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, RightJoinProcessor::new()) +} + +pub fn right_join_left_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + T2, + NoOpProcessor, + Coercer, + T1, + T2, + UpdatingData<(Option, T2)>, + RightJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, RightJoinProcessor::new()) +} + +pub fn right_join_right_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + UpdatingData, + Coercer, + NoOpProcessor, + T1, + T2, + UpdatingData<(Option, T2)>, + RightJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, RightJoinProcessor::new()) +} + +pub fn right_join_both_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + UpdatingData, + NoOpProcessor, + NoOpProcessor, + T1, + T2, + UpdatingData<(Option, T2)>, + RightJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, RightJoinProcessor::new()) +} + +pub fn inner_join( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + T2, + Coercer, + Coercer, + T1, + T2, + UpdatingData<(T1, T2)>, + InnerJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, InnerJoinProcessor::new()) +} + +pub fn inner_join_left_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + T2, + NoOpProcessor, + Coercer, + T1, + T2, + UpdatingData<(T1, T2)>, + InnerJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, InnerJoinProcessor::new()) +} + +pub fn inner_join_right_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + UpdatingData, + Coercer, + NoOpProcessor, + T1, + T2, + UpdatingData<(T1, T2)>, + InnerJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, InnerJoinProcessor::new()) +} + +pub fn inner_join_both_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + UpdatingData, + NoOpProcessor, + NoOpProcessor, + T1, + T2, + UpdatingData<(T1, T2)>, + InnerJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, InnerJoinProcessor::new()) +} + +pub fn full_join( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + T2, + Coercer, + Coercer, + T1, + T2, + UpdatingData<(Option, Option)>, + FullJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, FullJoinProcessor::new()) +} + +pub fn full_join_left_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + T2, + NoOpProcessor, + Coercer, + T1, + T2, + UpdatingData<(Option, Option)>, + FullJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, FullJoinProcessor::new()) +} + +pub fn full_join_right_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + T1, + UpdatingData, + Coercer, + NoOpProcessor, + T1, + T2, + UpdatingData<(Option, Option)>, + FullJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, FullJoinProcessor::new()) +} + +pub fn full_join_both_updating( + left_expiration: Duration, + right_expiration: Duration, +) -> JoinWithExpiration< + K, + UpdatingData, + UpdatingData, + NoOpProcessor, + NoOpProcessor, + T1, + T2, + UpdatingData<(Option, Option)>, + FullJoinProcessor, +> { + JoinWithExpiration::new(left_expiration, right_expiration, FullJoinProcessor::new()) +} diff --git a/arroyo-worker/src/operators/mod.rs b/arroyo-worker/src/operators/mod.rs index 8389c33a0..bec96d6f3 100644 --- a/arroyo-worker/src/operators/mod.rs +++ b/arroyo-worker/src/operators/mod.rs @@ -22,6 +22,7 @@ use wasmtime::{ pub mod aggregating_window; pub mod functions; pub mod join_with_expiration; +pub mod joiners; pub mod joins; pub mod sinks; pub mod sliding_top_n_aggregating_window;