Skip to content

Commit

Permalink
Prepare VectorAgg exec path to handle arrow slots
Browse files Browse the repository at this point in the history
The VectorAgg exec loop reads tuples directly from a compressed
relation, thus bypassing the DecompressChunk child node. This won't
work with arrow slots, which are read via a table access method.

To make the VectorAgg exec code similar to the standard pattern of
reading slots from child nodes, code specific to decompressing batches
is moved out of the main VectorAgg exec loop so that the loop only
deals with the final compressed batch slot instead of the raw
compressed slot. The code is instead put in a "get_next_slot"
function, which is called from the loop.

Also move the code to initialize vectorized filters to its own
"init_vector_qual" function, since it is specific to compressed
batches.

With these two function interfaces, it is possible to provide
implementations of the functions for handling arrow slots.
  • Loading branch information
erimatnor committed Jan 30, 2025
1 parent 64e5ffc commit d528b62
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 79 deletions.
224 changes: 146 additions & 78 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@
#include "nodes/vector_agg/exec.h"

#include "compression/arrow_c_data_interface.h"
#include "guc.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"

static int
get_input_offset(DecompressChunkState *decompress_state, Var *var)
get_input_offset(const CustomScanState *state, const Var *var)
{
DecompressContext *dcontext = &decompress_state->decompress_context;
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;

/*
* All variable references in the vectorized aggregation node were
* translated to uncompressed chunk variables when it was created.
*/
CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan);
const CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan);
Ensure((Index) var->varno == (Index) cscan->scan.scanrelid,
"got vector varno %d expected %d",
var->varno,
cscan->scan.scanrelid);

CompressionColumnDescription *value_column_description = NULL;
const CompressionColumnDescription *value_column_description = NULL;
for (int i = 0; i < dcontext->num_data_columns; i++)
{
CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
const CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
if (current_column->uncompressed_chunk_attno == var->varattno)
{
value_column_description = current_column;
Expand All @@ -57,6 +57,15 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var)
return index;
}

static int
get_value_bytes(const CustomScanState *state, int input_offset)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;
const CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[input_offset];
return desc->value_bytes;
}

static void
vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
{
Expand All @@ -66,9 +75,7 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)

VectorAggState *vector_agg_state = (VectorAggState *) node;
vector_agg_state->input_ended = false;

DecompressChunkState *decompress_state =
(DecompressChunkState *) linitial(vector_agg_state->custom.custom_ps);
CustomScanState *childstate = (CustomScanState *) linitial(vector_agg_state->custom.custom_ps);

/*
* Set up the helper structures used to evaluate stable expressions in
Expand Down Expand Up @@ -157,7 +164,7 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
Assert(aggref->aggsplit == AGGSPLIT_INITIAL_SERIAL);

Var *var = castNode(Var, castNode(TargetEntry, linitial(aggref->args))->expr);
def->input_offset = get_input_offset(decompress_state, var);
def->input_offset = get_input_offset(childstate, var);
}
else
{
Expand All @@ -179,11 +186,8 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
col->output_offset = i;

Var *var = castNode(Var, tlentry->expr);
col->input_offset = get_input_offset(decompress_state, var);
DecompressContext *dcontext = &decompress_state->decompress_context;
CompressionColumnDescription *desc =
&dcontext->compressed_chunk_columns[col->input_offset];
col->value_bytes = desc->value_bytes;
col->input_offset = get_input_offset(childstate, var);
col->value_bytes = get_value_bytes(childstate, col->input_offset);
}
}

Expand Down Expand Up @@ -237,6 +241,104 @@ vector_agg_rescan(CustomScanState *node)
state->grouping->gp_reset(state->grouping);
}

/*
* Get the next slot to aggregate for a compressed batch.
*
* Implements "get next slot" on top of DecompressChunk. Note that compressed
* tuples are read directly from the DecompressChunk child node, which means
* that the processing normally done in DecompressChunk is actually done here
* (batch processing and filtering).
*
* Returns an TupleTableSlot that implements a compressed batch.
*/
static TupleTableSlot *
compressed_batch_get_next_slot(VectorAggState *vector_agg_state)
{
DecompressChunkState *decompress_state =
(DecompressChunkState *) linitial(vector_agg_state->custom.custom_ps);
DecompressContext *dcontext = &decompress_state->decompress_context;
BatchQueue *batch_queue = decompress_state->batch_queue;
DecompressBatchState *batch_state = batch_array_get_at(&batch_queue->batch_array, 0);

do
{
/*
* We discard the previous compressed batch here and not earlier,
* because the grouping column values returned by the batch grouping
* policy are owned by the compressed batch memory context. This is done
* to avoid generic value copying in the grouping policy to simplify its
* code.
*/
compressed_batch_discard_tuples(batch_state);

TupleTableSlot *compressed_slot =
ExecProcNode(linitial(decompress_state->csstate.custom_ps));

if (TupIsNull(compressed_slot))
{
vector_agg_state->input_ended = true;
return NULL;
}

compressed_batch_set_compressed_tuple(dcontext, batch_state, compressed_slot);

/* If the entire batch is filtered out, then immediately read the next
* one */
} while (batch_state->next_batch_row >= batch_state->total_batch_rows);

/*
* Count rows filtered out by vectorized filters for EXPLAIN. Normally
* this is done in tuple-by-tuple interface of DecompressChunk, so that
* it doesn't say it filtered out more rows that were returned (e.g.
* with LIMIT). Here we always work in full batches. The batches that
* were fully filtered out, and their rows, were already counted in
* compressed_batch_set_compressed_tuple().
*/
const int not_filtered_rows =
arrow_num_valid(batch_state->vector_qual_result, batch_state->total_batch_rows);
InstrCountFiltered1(dcontext->ps, batch_state->total_batch_rows - not_filtered_rows);
if (dcontext->ps->instrument)
{
/*
* These values are normally updated by InstrStopNode(), and are
* required so that the calculations in InstrEndLoop() run properly.
*/
dcontext->ps->instrument->running = true;
dcontext->ps->instrument->tuplecount += not_filtered_rows;
}

return &batch_state->decompressed_scan_slot_data.base;
}

/*
* Initialize vector quals for a compressed batch.
*
* Used to implement vectorized aggregate function filter clause.
*/
static VectorQualState *
compressed_batch_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_def,
TupleTableSlot *slot)
{
DecompressChunkState *decompress_state =
(DecompressChunkState *) linitial(agg_state->custom.custom_ps);
DecompressContext *dcontext = &decompress_state->decompress_context;
DecompressBatchState *batch_state = (DecompressBatchState *) slot;

agg_state->vqual_state = (CompressedBatchVectorQualState) {
.vqstate = {
.vectorized_quals_constified = agg_def->filter_clauses,
.num_results = batch_state->total_batch_rows,
.per_vector_mcxt = batch_state->per_batch_context,
.slot = decompress_state->csstate.ss.ss_ScanTupleSlot,
.get_arrow_array = compressed_batch_get_arrow_array,
},
.batch_state = batch_state,
.dcontext = dcontext,
};

return &agg_state->vqual_state.vqstate;
}

static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
Expand Down Expand Up @@ -275,67 +377,28 @@ vector_agg_exec(CustomScanState *node)
*/
grouping->gp_reset(grouping);

DecompressChunkState *decompress_state =
(DecompressChunkState *) linitial(vector_agg_state->custom.custom_ps);

DecompressContext *dcontext = &decompress_state->decompress_context;

BatchQueue *batch_queue = decompress_state->batch_queue;
DecompressBatchState *batch_state = batch_array_get_at(&batch_queue->batch_array, 0);

/*
* Now we loop through the input compressed tuples, until they end or until
* the grouping policy asks us to emit partials.
*/
while (!grouping->gp_should_emit(grouping))
{
/*
* We discard the previous compressed batch here and not earlier,
* because the grouping column values returned by the batch grouping
* policy are owned by the compressed batch memory context. This is done
* to avoid generic value copying in the grouping policy to simplify its
* code.
* Get the next slot to aggregate. It will be either a compressed
* batch or an arrow tuple table slot. Both hold arrow arrays of data
* that can be vectorized.
*/
compressed_batch_discard_tuples(batch_state);

TupleTableSlot *compressed_slot =
ExecProcNode(linitial(decompress_state->csstate.custom_ps));

if (TupIsNull(compressed_slot))
{
/* The input has ended. */
vector_agg_state->input_ended = true;
break;
}

compressed_batch_set_compressed_tuple(dcontext, batch_state, compressed_slot);

if (batch_state->next_batch_row >= batch_state->total_batch_rows)
{
/* This batch was fully filtered out. */
continue;
}
TupleTableSlot *slot = vector_agg_state->get_next_slot(vector_agg_state);

/*
* Count rows filtered out by vectorized filters for EXPLAIN. Normally
* this is done in tuple-by-tuple interface of DecompressChunk, so that
* it doesn't say it filtered out more rows that were returned (e.g.
* with LIMIT). Here we always work in full batches. The batches that
* were fully filtered out, and their rows, were already counted in
* compressed_batch_set_compressed_tuple().
* Exit if there is no more data. Note that it is not possible to do
* the standard TupIsNull() check here because the compressed batch's
* implementation of TupleTableSlot never clears the empty flag bit
* (TTS_EMPTY), so it will always look empty. Therefore, look at the
* "input_ended" flag instead.
*/
const int not_filtered_rows =
arrow_num_valid(batch_state->vector_qual_result, batch_state->total_batch_rows);
InstrCountFiltered1(dcontext->ps, batch_state->total_batch_rows - not_filtered_rows);
if (dcontext->ps->instrument)
{
/*
* These values are normally updated by InstrStopNode(), and are
* required so that the calculations in InstrEndLoop() run properly.
*/
dcontext->ps->instrument->running = true;
dcontext->ps->instrument->tuplecount += not_filtered_rows;
}
if (vector_agg_state->input_ended)
break;

/*
* Compute the vectorized filters for the aggregate function FILTER
Expand All @@ -349,26 +412,17 @@ vector_agg_exec(CustomScanState *node)
{
continue;
}
CompressedBatchVectorQualState cbvqstate = {
.vqstate = {
.vectorized_quals_constified = agg_def->filter_clauses,
.num_results = batch_state->total_batch_rows,
.per_vector_mcxt = batch_state->per_batch_context,
.slot = compressed_slot,
.get_arrow_array = compressed_batch_get_arrow_array,
},
.batch_state = batch_state,
.dcontext = dcontext,
};
VectorQualState *vqstate = &cbvqstate.vqstate;

VectorQualState *vqstate =
vector_agg_state->init_vector_quals(vector_agg_state, agg_def, slot);
vector_qual_compute(vqstate);
agg_def->filter_result = vqstate->vector_qual_result;
}

/*
* Finally, pass the compressed batch to the grouping policy.
*/
grouping->gp_add_batch(grouping, &batch_state->decompressed_scan_slot_data.base);
grouping->gp_add_batch(grouping, slot);
}

/*
Expand Down Expand Up @@ -425,6 +479,20 @@ Node *
vector_agg_state_create(CustomScan *cscan)
{
VectorAggState *state = (VectorAggState *) newNode(sizeof(VectorAggState), T_CustomScanState);

state->custom.methods = &exec_methods;

/*
* Initialize VectorAggState to process vector slots from different
* subnodes. Currently, only compressed batches are supported, but arrow
* slots will be supported as well.
*
* The vector qual init functions are needed to implement vectorized
* aggregate function FILTER clauses for arrow tuple table slots and
* compressed batches, respectively.
*/
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;

return (Node *) state;
}
20 changes: 19 additions & 1 deletion tsl/src/nodes/vector_agg/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <postgres.h>

#include "nodes/decompress_chunk/compressed_batch.h"
#include <nodes/execnodes.h>

#include "function/functions.h"
Expand All @@ -29,7 +30,7 @@ typedef struct GroupingColumn
int value_bytes;
} GroupingColumn;

typedef struct
typedef struct VectorAggState
{
CustomScanState custom;

Expand All @@ -47,6 +48,23 @@ typedef struct
bool input_ended;

GroupingPolicy *grouping;

/*
* State to compute vector quals for FILTER clauses.
*/
CompressedBatchVectorQualState vqual_state;

/*
* Initialization function for vectorized quals depending on slot type.
*/
VectorQualState *(*init_vector_quals)(struct VectorAggState *agg_state, VectorAggDef *agg_def,
TupleTableSlot *slot);

/*
* Function for getting the next slot from the child node depending on
* child node type.
*/
TupleTableSlot *(*get_next_slot)(struct VectorAggState *vector_agg_state);
} VectorAggState;

extern Node *vector_agg_state_create(CustomScan *cscan);

0 comments on commit d528b62

Please sign in to comment.