Skip to content

Commit

Permalink
Support joining updating data
Browse files Browse the repository at this point in the history
Operators that produce updating data (insert/update/delete) previously
could not be upstream from the JoinWithExpiration operator. This change
modifies the JoinWithExpiration operator to correctly handle updating
data.

All join types, excluding inner joins on non-updating inputs, now
produce updating data.

The bulk of the changes are in `join_with_expiration.rs`, where we
transforms/coerces all data into updating data so that we reuse the same
4 join processors regardless of input type.
  • Loading branch information
jbeisen committed Nov 30, 2023
1 parent 4d3ff50 commit 8f3cc4c
Show file tree
Hide file tree
Showing 13 changed files with 1,075 additions and 339 deletions.
50 changes: 33 additions & 17 deletions arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1789,27 +1789,43 @@ impl Program {

let in_k = parse_type(&inputs[0].weight().key);
let in_t1 = parse_type(&inputs[0].weight().value);

let (t1_updating, in_t1_inner) = match extract_container_type("UpdatingData", &in_t1) {
Some(t) => (true, t),
None => (false, in_t1.clone()),
};

let in_t2 = parse_type(&inputs[1].weight().value);

let (t2_updating, in_t2_inner) = match extract_container_type("UpdatingData", &in_t2) {
Some(t) => (true, t),
None => (false, in_t2.clone()),
};

let left_expiration = duration_to_syn_expr(*left_expiration);
let right_expiration = duration_to_syn_expr(*right_expiration);
match join_type {
arroyo_types::JoinType::Inner => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
inner_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Left => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
left_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Right => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
right_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Full => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
full_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},

let join_fn_head: String = match join_type {
JoinType::Inner => "inner_join",
JoinType::Left => "left_join",
JoinType::Right => "right_join",
JoinType::Full => "full_join",
}.to_string();

let join_fn_tail: String = match (t1_updating, t2_updating) {
(false, false) => "",
(true, false) => "_left_updating",
(false, true) => "_right_updating",
(true, true) => "_both_updating",
}.to_string();

let join_fn_name = format_ident!("{}{}", join_fn_head, join_fn_tail);

quote!{
Box::new(arroyo_worker::operators::joiners::
#join_fn_name::<#in_k, #in_t1_inner, #in_t2_inner>(#left_expiration, #right_expiration))
}

},
Operator::UpdatingOperator { name, expression } => {
let expr : syn::Expr = parse_str(expression).expect(expression);
Expand Down
7 changes: 7 additions & 0 deletions arroyo-sql-testing/golden_outputs/updating_full_join.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"before":null,"after":{"left_counter":0,"counter_mod_2":null,"right_count":null},"op":"c"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":null,"right_count":null},"op":"c"}
{"before":null,"after":{"left_counter":2,"counter_mod_2":null,"right_count":null},"op":"c"}
{"before":{"left_counter":1,"counter_mod_2":null,"right_count":null},"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"u"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"}
{"before":{"left_counter":2,"counter_mod_2":null,"right_count":null},"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"u"}
{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"}
4 changes: 4 additions & 0 deletions arroyo-sql-testing/golden_outputs/updating_inner_join.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"}
{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"}
{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"}
7 changes: 7 additions & 0 deletions arroyo-sql-testing/golden_outputs/updating_left_join.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"before":null,"after":{"left_counter":0,"counter_mod_2":null,"right_count":null},"op":"c"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":null,"right_count":null},"op":"c"}
{"before":null,"after":{"left_counter":2,"counter_mod_2":null,"right_count":null},"op":"c"}
{"before":{"left_counter":1,"counter_mod_2":null,"right_count":null},"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"u"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"}
{"before":{"left_counter":2,"counter_mod_2":null,"right_count":null},"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"u"}
{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"}
4 changes: 4 additions & 0 deletions arroyo-sql-testing/golden_outputs/updating_right_join.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"}
{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"}
{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"}
120 changes: 120 additions & 0 deletions arroyo-sql-testing/src/smoke_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,3 +869,123 @@ FROM (SELECT TUMBLE(interval '1 second'), counter, count(*) FROM impulse_source
JOIN (SELECT TUMBLE(interval '1 second') as window, counter , count(*) FROM delayed_impulse_source GROUP BY 1,2) b
ON a.counter = b.counter;"}

correctness_run_codegen! {"updating_left_join", 10,
"CREATE TABLE impulse (
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);
CREATE TABLE output (
left_counter bigint,
counter_mod_2 bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);
INSERT INTO output
select counter as left_counter, counter_mod_2, right_count from impulse left join
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
on counter = right_count where counter < 3;"}

correctness_run_codegen! {"updating_right_join", 10,
"CREATE TABLE impulse (
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);
CREATE TABLE output (
left_counter bigint,
counter_mod_2 bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);
INSERT INTO output
select counter as left_counter, counter_mod_2, right_count from impulse right join
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
on counter = right_count where counter < 3;"}

correctness_run_codegen! {"updating_inner_join", 10,
"CREATE TABLE impulse (
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);
CREATE TABLE output (
left_counter bigint,
counter_mod_2 bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);
INSERT INTO output
select counter as left_counter, counter_mod_2, right_count from impulse inner join
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
on counter = right_count where counter < 3;"}

correctness_run_codegen! {"updating_full_join", 10,
"CREATE TABLE impulse (
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);
CREATE TABLE output (
left_counter bigint,
counter_mod_2 bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);
INSERT INTO output
select counter as left_counter, counter_mod_2, right_count from impulse full outer join
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
on counter = right_count where counter < 3;"}
5 changes: 3 additions & 2 deletions arroyo-sql/src/code_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ impl JoinPairContext {
let merge_expr = code_generator.generate(self);
let left_ident = self.left_ident();
let right_ident = self.right_ident();

parse_quote!({
let #left_ident = &record.value.0;
let #right_ident = &record.value.1;
let (#left_ident, #right_ident) = &record.value.unwrap_append().clone();

arroyo_types::Record {
timestamp: record.timestamp.clone(),
key: None,
Expand Down
9 changes: 6 additions & 3 deletions arroyo-sql/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ pub struct JoinOperator {
pub join_type: JoinType,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputsUpdating {
pub left: bool,
pub right: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JoinType {
/// Inner Join
Expand Down Expand Up @@ -805,9 +811,6 @@ impl<'a> SqlPipelineBuilder<'a> {
fn insert_join(&mut self, join: &datafusion_expr::logical_plan::Join) -> Result<SqlOperator> {
let left_input = self.insert_sql_plan(&join.left)?;
let right_input = self.insert_sql_plan(&join.right)?;
if left_input.is_updating() || right_input.is_updating() {
bail!("don't support joins with updating inputs");
}
match join.join_constraint {
JoinConstraint::On => {}
JoinConstraint::Using => bail!("don't support 'using' in joins"),
Expand Down
Loading

0 comments on commit 8f3cc4c

Please sign in to comment.