Skip to content

Commit

Permalink
[BUG] Fix swordfish inner joins (#2678)
Browse files Browse the repository at this point in the history
Q2 wasn't working due to `DaftError::ValueError Attempting to make a
Schema with duplicate field names: P_PARTKEY`

This is because the JoinMapper was removed in a previous PR, so this PR
implements a similar logic that prunes common join keys from the right
side of an inner join.

Q2 works now:
<img width="1034" alt="Screenshot 2024-08-16 at 4 27 22 PM"
src="https://github.com/user-attachments/assets/d973177e-92b8-49b7-bcc2-a0c710a37ad6">

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Aug 17, 2024
1 parent 39cd0ad commit ae13e22
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion src/daft-local-execution/src/sinks/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl HashJoinState {

pub(crate) struct HashJoinOperator {
right_on: Vec<ExprRef>,
pruned_right_side_columns: Vec<String>,
_join_type: JoinType,
join_state: HashJoinState,
}
Expand Down Expand Up @@ -130,9 +131,27 @@ impl HashJoinOperator {
.zip(key_schema.fields.values())
.map(|(e, f)| e.cast(&f.dtype))
.collect::<Vec<_>>();
let common_join_keys = left_on
.iter()
.zip(right_on.iter())
.filter_map(|(l, r)| {
if l.name() == r.name() {
Some(l.name())
} else {
None
}
})
.collect::<std::collections::HashSet<_>>();
let pruned_right_side_columns = right_schema
.fields
.keys()
.filter(|k| !common_join_keys.contains(k.as_str()))
.cloned()
.collect::<Vec<_>>();
assert_eq!(join_type, JoinType::Inner);
Ok(Self {
right_on,
pruned_right_side_columns,
_join_type: join_type,
join_state: HashJoinState::new(&key_schema, left_on)?,
})
Expand All @@ -152,6 +171,7 @@ impl HashJoinOperator {
probe_table: probe_table.clone(),
tables: tables.clone(),
right_on: self.right_on.clone(),
pruned_right_side_columns: self.pruned_right_side_columns.clone(),
})
} else {
panic!("can't call as_intermediate_op when not in probing state")
Expand All @@ -163,6 +183,7 @@ struct HashJoinProber {
probe_table: Arc<ProbeTable>,
tables: Arc<Vec<Table>>,
right_on: Vec<ExprRef>,
pruned_right_side_columns: Vec<String>,
}

impl IntermediateOperator for HashJoinProber {
Expand Down Expand Up @@ -201,7 +222,9 @@ impl IntermediateOperator for HashJoinProber {
let left_table = left_growable.build()?;
let right_table = right_growable.build()?;

let final_table = left_table.union(&right_table)?;
let pruned_right_table = right_table.get_columns(&self.pruned_right_side_columns)?;

let final_table = left_table.union(&pruned_right_table)?;
Ok(Arc::new(MicroPartition::new_loaded(
final_table.schema.clone(),
Arc::new(vec![final_table]),
Expand Down

0 comments on commit ae13e22

Please sign in to comment.