Skip to content

Commit

Permalink
Push down orderby scankeys to Hypercore TAM
Browse files Browse the repository at this point in the history
Quals on orderby columns can be pushed down to Hypercore TAM and be
transformed to the corresponding min/max scankeys on the compressed
relation. Previously, only quals on non-compressed segmentby columns
were pushed down as scankeys.

Pushing down orderby scan keys seem to give a good performance boost
for columnar scans when no index exists.

The scankey push down can be disabled with a new GUC:

`timescaledb.enable_hypercore_scankey_pushdown=false`
  • Loading branch information
erimatnor committed Feb 4, 2025
1 parent db5e246 commit 7700da7
Show file tree
Hide file tree
Showing 8 changed files with 762 additions and 148 deletions.
15 changes: 15 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ TSDLLEXPORT bool ts_guc_enable_merge_on_cagg_refresh = false;
TSDLLEXPORT char *ts_guc_hypercore_indexam_whitelist;
TSDLLEXPORT HypercoreCopyToBehavior ts_guc_hypercore_copy_to_behavior =
HYPERCORE_COPY_NO_COMPRESSED_DATA;
TSDLLEXPORT bool ts_guc_enable_hypercore_scankey_pushdown = true;

/* default value of ts_guc_max_open_chunks_per_insert and
* ts_guc_max_cached_chunks_per_hypertable will be set as their respective boot-value when the
Expand Down Expand Up @@ -1069,6 +1070,20 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable(/* name= */ MAKE_EXTOPTION("enable_hypercore_scankey_pushdown"),
/* short_desc= */
"Push down qualifiers as scankeys when using Hypercore TAM",
/* long_desc= */
"Enabling this setting might lead to faster scans when "
"query qualifiers match Hypercore segmentby and orderby columns.",
/* valueAddr= */ &ts_guc_enable_hypercore_scankey_pushdown,
/* bootValue= */ true,
/* context= */ PGC_USERSET,
/* flags= */ 0,
/* check_hook= */ NULL,
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomIntVariable(/* name= */ MAKE_EXTOPTION("debug_bgw_scheduler_exit_status"),
/* short_desc= */ "exit status to use when shutting down the scheduler",
/* long_desc= */ "this is for debugging purposes",
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ typedef enum HypercoreCopyToBehavior
} HypercoreCopyToBehavior;

extern TSDLLEXPORT HypercoreCopyToBehavior ts_guc_hypercore_copy_to_behavior;
extern TSDLLEXPORT bool ts_guc_enable_hypercore_scankey_pushdown;

void _guc_init(void);

Expand Down
139 changes: 130 additions & 9 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
#include <access/heapam.h>
#include <access/hio.h>
#include <access/htup_details.h>
#include <access/relscan.h>
#include <access/rewriteheap.h>
#include <access/sdir.h>
#include <access/skey.h>
#include <access/stratnum.h>
#include <access/tableam.h>
#include <access/transam.h>
#include <access/xact.h>
Expand Down Expand Up @@ -307,6 +309,19 @@ lazy_build_hypercore_info_cache(Relation rel, bool create_chunk_constraints,
colsettings->cattnum = get_attnum(hsinfo->compressed_relid, attname);
else
colsettings->cattnum = InvalidAttrNumber;

if (colsettings->is_orderby)
{
const char *min_attname = column_segment_min_name(orderby_pos);
const char *max_attname = column_segment_max_name(orderby_pos);
colsettings->cattnum_min = get_attnum(hsinfo->compressed_relid, min_attname);
colsettings->cattnum_max = get_attnum(hsinfo->compressed_relid, max_attname);
}
else
{
colsettings->cattnum_min = InvalidAttrNumber;
colsettings->cattnum_max = InvalidAttrNumber;
}
}

Ensure(hsinfo->relation_id > 0, "invalid chunk ID");
Expand Down Expand Up @@ -476,15 +491,17 @@ initscan(HypercoreScanDesc scan, ScanKey keys, int nkeys)
*
* It is only possible to use scankeys in the following two cases:
*
* 1. The scankey is for a segment_by column
* 2. The scankey is for a column that has min/max metadata (i.e., order_by column).
* 1. The scankey is for a segmentby column
* 2. The scankey is for a column that has min/max metadata (e.g., orderby column).
*
* TODO: Implement support for (2) above, which involves transforming a
* scankey to the corresponding min/max scankeys.
* For case (2), it is necessary to translate the scankey on the
* non-compressed relation to two min and max scankeys on the compressed
* relation.
*/
if (NULL != keys && nkeys > 0)
{
const HypercoreInfo *hsinfo = RelationGetHypercoreInfo(scan->rs_base.rs_rd);
CompressionSettings *settings = NULL;

for (int i = 0; i < nkeys; i++)
{
Expand All @@ -503,6 +520,106 @@ initscan(HypercoreScanDesc scan, ScanKey keys, int nkeys)
nvalidkeys++;
break;
}

/* Transform equality to min/max on metadata columns */
else if (key->sk_attno == column->attnum && hypercore_column_has_minmax(column))
{
if (settings == NULL)
settings = ts_compression_settings_get(hsinfo->compressed_relid);

TypeCacheEntry *tce =
lookup_type_cache(column->typid, TYPECACHE_BTREE_OPFAMILY);

switch (key->sk_strategy)
{
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
{
/*
* The operators '<' and '<=' translate to the
* same operators on the min metadata column
*/
Oid opno = get_opfamily_member(tce->btree_opf,
tce->type_id,
key->sk_subtype,
key->sk_strategy);
ScanKeyEntryInitialize(&scan->rs_base.rs_key[nvalidkeys++],
0,
column->cattnum_min,
key->sk_strategy,
key->sk_subtype,
key->sk_collation,
get_opcode(opno),
key->sk_argument);
break;
}
case BTEqualStrategyNumber:
{
/*
* Equality ('=') translates to:
*
* x <= min_col AND x >= max_col
*/
StrategyNumber strategy_le = BTLessEqualStrategyNumber;
Oid opno_le = get_opfamily_member(tce->btree_opf,
tce->type_id,
key->sk_subtype,
strategy_le);

ScanKeyEntryInitialize(&scan->rs_base.rs_key[nvalidkeys++],
0,
column->cattnum_min,
strategy_le,
key->sk_subtype,
key->sk_collation,
get_opcode(opno_le),
key->sk_argument);

StrategyNumber strategy_ge = BTGreaterEqualStrategyNumber;
Oid opno_ge = get_opfamily_member(tce->btree_opf,
tce->type_id,
key->sk_subtype,
strategy_ge);

ScanKeyEntryInitialize(&scan->rs_base.rs_key[nvalidkeys++],
0,
column->cattnum_max,
strategy_ge,
key->sk_subtype,
key->sk_collation,
get_opcode(opno_ge),
key->sk_argument);
break;
}
case BTGreaterEqualStrategyNumber:
case BTGreaterStrategyNumber:
{
/*
* The operators '>' and '>=' translate to the
* same operators on the max metadata column
*/
Oid opno = get_opfamily_member(tce->btree_opf,
tce->type_id,
key->sk_subtype,
key->sk_strategy);
ScanKeyEntryInitialize(&scan->rs_base.rs_key[nvalidkeys++],
0,
column->cattnum_max,
key->sk_strategy,
key->sk_subtype,
key->sk_collation,
get_opcode(opno),
key->sk_argument);
break;
}
default:
pg_unreachable();

Check warning on line 616 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L615-L616

Added lines #L615 - L616 were not covered by tests
Assert(false);
break;
}

break;
}
}
}
}
Expand Down Expand Up @@ -566,16 +683,19 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key

RelationIncrementReferenceCount(relation);

TS_DEBUG_LOG("starting %s scan of relation %s parallel_scan=%p",
TS_DEBUG_LOG("starting %s scan of relation %s parallel_scan=%p nkeys=%d",
get_scan_type(flags),
RelationGetRelationName(relation),
parallel_scan);
parallel_scan,
nkeys);

scan = palloc0(sizeof(HypercoreScanDescData));
scan->rs_base.rs_rd = relation;
scan->rs_base.rs_snapshot = snapshot;
scan->rs_base.rs_nkeys = nkeys;
scan->rs_base.rs_key = nkeys > 0 ? palloc0(sizeof(ScanKeyData) * nkeys) : NULL;
/* Allocate double the scan keys to account for some being transformed to
* two min/max keys */
scan->rs_base.rs_key = nkeys > 0 ? palloc0(sizeof(ScanKeyData) * nkeys * 2) : NULL;
scan->rs_base.rs_flags = flags;
scan->rs_base.rs_parallel = parallel_scan;
scan->returned_noncompressed_count = 0;
Expand All @@ -590,8 +710,8 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
return &scan->rs_base;
}

HypercoreInfo *hsinfo = RelationGetHypercoreInfo(relation);
scan->compressed_rel = table_open(hsinfo->compressed_relid, AccessShareLock);
HypercoreInfo *hcinfo = RelationGetHypercoreInfo(relation);
scan->compressed_rel = table_open(hcinfo->compressed_relid, AccessShareLock);

if (should_skip_compressed_data(&scan->rs_base))
{
Expand Down Expand Up @@ -668,6 +788,7 @@ hypercore_endscan(TableScanDesc sscan)
HypercoreScanDesc scan = (HypercoreScanDesc) sscan;

RelationDecrementReferenceCount(sscan->rs_rd);

if (scan->cscan_desc)
table_endscan(scan->cscan_desc);
if (scan->compressed_rel)
Expand Down
18 changes: 17 additions & 1 deletion tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>
#include <access/attnum.h>
#include <access/tableam.h>
#include <access/xact.h>
#include <fmgr.h>
Expand Down Expand Up @@ -36,9 +37,17 @@ extern int hypercore_decompress_update_segment(Relation relation, const ItemPoin

typedef struct ColumnCompressionSettings
{
/* Attribute name in the non-compressed relation */
NameData attname;
/* Attribute number in non-compressed relation */
AttrNumber attnum;
AttrNumber cattnum; /* Attribute number in the compressed relation */
/* Attribute number in the compressed relation */
AttrNumber cattnum;
/* For orderby columns, these are the attribute numbers of the the min/max
* metadata columns. */
AttrNumber cattnum_min;
AttrNumber cattnum_max;
/* Attribute type */
Oid typid;
bool is_orderby;
bool is_segmentby;
Expand Down Expand Up @@ -67,3 +76,10 @@ typedef struct HypercoreInfo
#define REL_IS_HYPERCORE(rel) ((rel)->rd_tableam == hypercore_routine())

extern HypercoreInfo *RelationGetHypercoreInfo(Relation rel);

static inline bool
hypercore_column_has_minmax(const ColumnCompressionSettings *column)
{
return AttributeNumberIsValid(column->cattnum_min) &&
AttributeNumberIsValid(column->cattnum_max);
}
Loading

0 comments on commit 7700da7

Please sign in to comment.