Skip to content

Commit

Permalink
VTX-7527: take into account errors during idle and processing time calc
Browse files Browse the repository at this point in the history
  • Loading branch information
fsdvh committed Oct 31, 2024
1 parent af68f31 commit beb9799
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,16 +440,6 @@ pub(crate) fn create_group_accumulator(
}
}

/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
macro_rules! extract_ok {
($RES: expr) => {{
match $RES {
Ok(v) => v,
Err(e) => return Poll::Ready(Some(Err(e))),
}
}};
}

impl Stream for GroupedHashAggregateStream {
type Item = Result<RecordBatch>;

Expand All @@ -468,10 +458,16 @@ impl Stream for GroupedHashAggregateStream {
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
// Make sure we have enough capacity for `batch`, otherwise spill
extract_ok!(self.spill_previous_if_necessary(&batch));
if let Err(e) = self.spill_previous_if_necessary(&batch) {
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}

// Do the grouping
extract_ok!(self.group_aggregate_batch(batch));
if let Err(e) = self.group_aggregate_batch(batch) {
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}

// If we can begin emitting rows, do so,
// otherwise keep consuming input
Expand All @@ -481,30 +477,48 @@ impl Stream for GroupedHashAggregateStream {
// emit all groups and switch to producing output
if self.hit_soft_group_limit() {
timer.done();
extract_ok!(self.set_input_done_and_produce_output());
if let Err(e) = self.set_input_done_and_produce_output() {
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}

if let Some(to_emit) = self.group_ordering.emit_to() {
let batch = extract_ok!(self.emit(to_emit, false));
self.exec_state = ExecutionState::ProducingOutput(batch);
timer.done();
// make sure the exec_state just set is not overwritten below
break 'reading_input;
match self.emit(to_emit, false) {
Ok(b) => {
self.exec_state =
ExecutionState::ProducingOutput(b);
timer.done();
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}
Err(e) => {
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}
}
}

extract_ok!(self.emit_early_if_necessary());
if let Err(e) = self.emit_early_if_necessary() {
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}

timer.done();
}
Some(Err(e)) => {
// inner had error, return to caller
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}
None => {
// inner is done, emit all rows and switch to producing output
extract_ok!(self.set_input_done_and_produce_output());
if let Err(e) = self.set_input_done_and_produce_output() {
self.aggregate_stream_metrics.record(now);
return Poll::Ready(Some(Err(e)));
}
}
}
}
Expand Down

0 comments on commit beb9799

Please sign in to comment.