Skip to content

Commit

Permalink
Use CompressedColumnValues instead of wrapper functions
Browse files Browse the repository at this point in the history
  • Loading branch information
erimatnor committed Jan 30, 2025
1 parent 601b4d5 commit c2754c0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 51 deletions.
29 changes: 21 additions & 8 deletions tsl/src/nodes/vector_agg/grouping_policy_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,23 @@ compute_single_aggregate(GroupingPolicyBatch *policy, TupleTableSlot *vector_slo
if (agg_def->input_offset >= 0)
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset);
const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);

arg_arrow = vector_slot_get_arrow_array(vector_slot, attnum);
Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_Iterator);

if (arg_arrow == NULL)
arg_datum = vector_slot_get_datum(vector_slot, attnum, &arg_isnull);
if (values->arrow != NULL)
{
arg_arrow = values->arrow;
arg_validity_bitmap = values->buffers[0];
}
else
arg_validity_bitmap = arg_arrow->buffers[0];
{
Assert(values->decompression_type == DT_Scalar);
arg_datum = *values->output_value;
arg_isnull = *values->output_isnull;
}
}

/*
Expand Down Expand Up @@ -218,19 +228,22 @@ gp_batch_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot)
for (int i = 0; i < ngrp; i++)
{
GroupingColumn *col = &policy->grouping_columns[i];
const AttrNumber attnum = AttrOffsetGetAttrNumber(col->input_offset);
Assert(col->input_offset >= 0);
Assert(col->output_offset >= 0);

const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);
Assert(values->decompression_type == DT_Scalar);

/*
* By sheer luck, we can avoid generically copying the Datum here,
* because if we have any output grouping columns in this policy, it
* means we're grouping by segmentby, and these values will be valid
* until the next call to the vector agg node.
*/
policy->output_grouping_values[i] =
vector_slot_get_datum(vector_slot,
AttrOffsetGetAttrNumber(col->input_offset),
&policy->output_grouping_isnull[i]);
policy->output_grouping_values[i] = *values->output_value;
policy->output_grouping_isnull[i] = *values->output_isnull;
}

policy->have_results = true;
Expand Down
23 changes: 17 additions & 6 deletions tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,23 @@ compute_single_aggregate(GroupingPolicyHash *policy, TupleTableSlot *vector_slot
if (agg_def->input_offset >= 0)
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset);
arg_arrow = vector_slot_get_arrow_array(vector_slot, attnum);
const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);

if (arg_arrow == NULL)
arg_datum = vector_slot_get_datum(vector_slot, attnum, &arg_isnull);
Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_Iterator);

if (values->arrow != NULL)
{
arg_arrow = values->arrow;
arg_validity_bitmap = values->buffers[0];
}
else
arg_validity_bitmap = arg_arrow->buffers[0];
{
Assert(values->decompression_type == DT_Scalar);
arg_datum = *values->output_value;
arg_isnull = *values->output_isnull;
}
}

/*
Expand Down Expand Up @@ -311,8 +322,8 @@ gp_hash_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot)
const GroupingColumn *def = &policy->grouping_columns[i];

policy->current_batch_grouping_column_values[i] =
vector_slot_get_compressed_column_values(vector_slot,
AttrOffsetGetAttrNumber(def->input_offset));
*vector_slot_get_compressed_column_values(vector_slot,
AttrOffsetGetAttrNumber(def->input_offset));
}
/*
* Call the per-batch initialization function of the hashing strategy.
Expand Down
39 changes: 2 additions & 37 deletions tsl/src/nodes/vector_agg/vector_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,53 +25,18 @@ vector_slot_get_qual_result(const TupleTableSlot *slot, uint16 *num_rows)
return batch_state->vector_qual_result;
}

/*
* Get the arrow array for the given attribute.
*
* Returns the arrow array or NULL if the attribute does not correspond to a
* compressed column (i.e., it is a segmentby column).
*/
static inline const ArrowArray *
vector_slot_get_arrow_array(TupleTableSlot *slot, const AttrNumber attnum)
{
DecompressBatchState *batch_state = (DecompressBatchState *) slot;
CompressedColumnValues *values =
&batch_state->compressed_columns[AttrNumberGetAttrOffset(attnum)];
Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_Iterator);
return values->arrow;
}

/*
* Get the scalar value datum corresponding to the attribute number.
*
* It is assumed that the attnum corresponds to a segmentby column.
*/
static inline Datum
vector_slot_get_datum(TupleTableSlot *slot, const AttrNumber attnum, bool *isnull)
{
DecompressBatchState *batch_state = (DecompressBatchState *) slot;
CompressedColumnValues *values =
&batch_state->compressed_columns[AttrNumberGetAttrOffset(attnum)];
Assert(values->decompression_type == DT_Scalar);

*isnull = *values->output_isnull;
return *values->output_value;
}

/*
* Return the arrow array or the datum (in case of single scalar value) for a
* given attribute.
*
* This is essentially doing the same thing as the separate functions above,
* but with a common return type.
*/
static inline CompressedColumnValues
static inline const CompressedColumnValues *
vector_slot_get_compressed_column_values(TupleTableSlot *slot, const AttrNumber attnum)
{
const uint16 offset = AttrNumberGetAttrOffset(attnum);
const DecompressBatchState *batch_state = (const DecompressBatchState *) slot;
const CompressedColumnValues *values = &batch_state->compressed_columns[offset];
Assert(values->decompression_type == DT_Scalar || values->decompression_type > 0);
return *values;
return values;
}

0 comments on commit c2754c0

Please sign in to comment.