diff --git a/src/backend/columnar/columnar_customscan.c b/src/backend/columnar/columnar_customscan.c index 698734bc94e..c78485c6683 100644 --- a/src/backend/columnar/columnar_customscan.c +++ b/src/backend/columnar/columnar_customscan.c @@ -1924,11 +1924,6 @@ ColumnarScan_EndCustomScan(CustomScanState *node) */ TableScanDesc scanDesc = node->ss.ss_currentScanDesc; - /* - * Free the exprcontext - */ - ExecFreeExprContext(&node->ss.ps); - /* * clean out the tuple table */ diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 6d31c27791d..0e6c423c2d6 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -1424,8 +1424,13 @@ ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode, int timeout, static bool -columnar_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, +columnar_scan_analyze_next_block(TableScanDesc scan, +#if PG_VERSION_NUM >= PG_VERSION_17 + ReadStream *stream) +#else + BlockNumber blockno, BufferAccessStrategy bstrategy) +#endif { /* * Our access method is not pages based, i.e. tuples are not confined diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c index cf9f4963b72..1e71a82a1c1 100644 --- a/src/backend/distributed/cdc/cdc_decoder.c +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -22,6 +22,8 @@ #include "utils/rel.h" #include "utils/typcache.h" +#include "pg_version_constants.h" + PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); @@ -435,6 +437,74 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation return; } +#if PG_VERSION_NUM >= PG_VERSION_17 + + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + /* For insert action, only new tuple should always be translated*/ + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple = targetRelationNewTuple; + break; + } + + /* + * For update changes both old and new tuples need to be translated for target relation + * if the REPLICA IDENTITY is set to FULL. Otherwise, only the new tuple needs to be + * translated for target relation. + */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + /* For update action, new tuple should always be translated*/ + /* Get the new tuple from the ReorderBufferChange, and translate it to target relation. */ + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple = targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + /* For delete action, only old tuple should be translated*/ + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + break; + } + + default: + { + /* Do nothing for other action types. */ + break; + } + } +#else + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ switch (change->action) { @@ -499,4 +569,5 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation break; } } +#endif } diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 5ce3d1436cc..1a8c211f942 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -77,7 +77,7 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati * ICU-related field. Only the libc-related fields or the ICU-related field * is set, never both. */ - char *colliculocale; + char *colllocale; bool isnull; Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate, @@ -101,17 +101,17 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati collctype = NULL; } - datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colliculocale, &isnull); + datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colllocale, &isnull); if (!isnull) { - colliculocale = TextDatumGetCString(datum); + colllocale = TextDatumGetCString(datum); } else { - colliculocale = NULL; + colllocale = NULL; } - Assert((collcollate && collctype) || colliculocale); + Assert((collcollate && collctype) || colllocale); #else /* @@ -147,12 +147,12 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati *quotedCollationName, providerString); #if PG_VERSION_NUM >= PG_VERSION_15 - if (colliculocale) + if (colllocale) { appendStringInfo(&collationNameDef, ", locale = %s", - quote_literal_cstr(colliculocale)); - pfree(colliculocale); + quote_literal_cstr(colllocale)); + pfree(colllocale); } else { diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 05959ca3e51..dd45c98d833 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -71,7 +71,9 @@ static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); static char * GetDatabaseNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); +#if PG_VERSION_NUM < PG_VERSION_17 static Node * makeStringConst(char *str, int location); +#endif static Node * makeIntConst(int val, int location); static Node * makeFloatConst(char *str, int location); static const char * WrapQueryInAlterRoleIfExistsCall(const char *query, RoleSpec *role); @@ -949,6 +951,8 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString, } +#if PG_VERSION_NUM < PG_VERSION_17 + /* * makeStringConst creates a Const Node that stores a given string * @@ -972,6 +976,9 @@ makeStringConst(char *str, int location) } +#endif + + /* * makeIntConst creates a Const Node that stores a given integer * diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 45d79afe489..b43f6335ef4 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -651,14 +651,15 @@ GetAlterIndexStatisticsCommands(Oid indexOid) } Form_pg_attribute targetAttr = (Form_pg_attribute) GETSTRUCT(attTuple); - if (targetAttr->attstattarget != DEFAULT_STATISTICS_TARGET) + int32 targetAttstattarget = getAttstattarget_compat(attTuple); + if (targetAttstattarget != DEFAULT_STATISTICS_TARGET) { char *indexNameWithSchema = generate_qualified_relation_name(indexOid); char *command = GenerateAlterIndexColumnSetStatsCommand(indexNameWithSchema, targetAttr->attnum, - targetAttr->attstattarget); + targetAttstattarget); alterIndexStatisticsCommandList = lappend(alterIndexStatisticsCommandList, @@ -773,9 +774,10 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid) } Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup); + int16 currentStxstattarget = getStxstattarget_compat(tup); ReleaseSysCache(tup); - if (statisticsForm->stxstattarget == -1) + if (currentStxstattarget == -1) { return NULL; } @@ -785,7 +787,8 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid) char *schemaName = get_namespace_name(statisticsForm->stxnamespace); char *statName = NameStr(statisticsForm->stxname); - alterStatsStmt->stxstattarget = statisticsForm->stxstattarget; + alterStatsStmt->stxstattarget = getAlterStatsStxstattarget_compat( + currentStxstattarget); alterStatsStmt->defnames = list_make2(makeString(schemaName), makeString(statName)); return DeparseAlterStatisticsStmt((Node *) alterStatsStmt); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index a8d8bad8a77..4787d8f2fd3 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -866,7 +866,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) *waitCount = 0; } - WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + WaitEventSet *waitEventSet = CreateWaitEventSet(WaitEventSetTracker_compat, + eventSetSize); EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet), waitEventSet); diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 7a9e0601dcc..c9860c06185 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1130,7 +1130,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, /* allocate pending connections + 2 for the signal latch and postmaster death */ /* (CreateWaitEventSet makes room for pgwin32_signal_event automatically) */ - WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, + WaitEventSet *waitEventSet = CreateWaitEventSet(WaitEventSetTracker_compat, pendingConnectionCount + 2); for (int connectionIndex = 0; connectionIndex < pendingConnectionCount; diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 3b387799b05..530f6e72087 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -395,7 +395,8 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults if (attributeForm->attidentity && includeIdentityDefaults) { bool missing_ok = false; - Oid seqOid = getIdentitySequence(RelationGetRelid(relation), + Oid seqOid = getIdentitySequence(identitySequenceRelation_compat( + relation), attributeForm->attnum, missing_ok); if (includeIdentityDefaults == INCLUDE_IDENTITY) @@ -738,7 +739,18 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) * If the user changed the column's statistics target, create * alter statement and add statement to a list for later processing. */ - if (attributeForm->attstattarget >= 0) + HeapTuple atttuple = SearchSysCache2(ATTNUM, + ObjectIdGetDatum(tableRelationId), + Int16GetDatum(attributeForm->attnum)); + if (!HeapTupleIsValid(atttuple)) + { + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attributeForm->attnum, tableRelationId); + } + + int32 targetAttstattarget = getAttstattarget_compat(atttuple); + ReleaseSysCache(atttuple); + if (targetAttstattarget >= 0) { StringInfoData statement = { NULL, 0, 0, 0 }; initStringInfo(&statement); @@ -746,7 +758,7 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) appendStringInfo(&statement, "ALTER COLUMN %s ", quote_identifier(attributeName)); appendStringInfo(&statement, "SET STATISTICS %d", - attributeForm->attstattarget); + targetAttstattarget); columnOptionList = lappend(columnOptionList, statement.data); } diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index 4d72119397e..79be835b93b 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -177,8 +177,9 @@ AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt) static void AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt) { - appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", NameListToQuotedString( - stmt->defnames), stmt->stxstattarget); + appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", + NameListToQuotedString(stmt->defnames), + getIntStxstattarget_compat(stmt->stxstattarget)); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f7e7c23373c..9606cd724ea 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -4740,7 +4740,7 @@ BuildWaitEventSet(List *sessionList) int eventSetSize = GetEventSetSize(sessionList); WaitEventSet *waitEventSet = - CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + CreateWaitEventSet(WaitEventSetTracker_compat, eventSetSize); WorkerSession *session = NULL; foreach_declared_ptr(session, sessionList) diff --git a/src/backend/distributed/executor/query_stats.c b/src/backend/distributed/executor/query_stats.c index f37a99bbf72..ce6179b9617 100644 --- a/src/backend/distributed/executor/query_stats.c +++ b/src/backend/distributed/executor/query_stats.c @@ -759,9 +759,6 @@ citus_query_stats(PG_FUNCTION_ARGS) LWLockRelease(queryStats->lock); - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - return (Datum) 0; } diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 66eaf71da7b..8048002e005 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -1475,7 +1475,7 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query) foreach_declared_ptr(action, query->mergeActionList) { /* Skip MATCHED clause as INSERTS are not allowed in it */ - if (action->matched) + if (matched_compat(action)) { continue; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 86a9f2a64d0..3f0120dae3d 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -375,6 +375,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) { bufusage_start = pgBufferUsage; @@ -432,8 +447,20 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + + ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL)); +#else ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, (es->buffers ? &bufusage : NULL)); +#endif ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("Subplan", NULL, true, es); @@ -1253,6 +1280,21 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) { bufusage_start = pgBufferUsage; @@ -1286,9 +1328,23 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, queryEnv, + &planduration, (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL)); +#else + /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL)); +#endif } @@ -1701,6 +1757,21 @@ ExplainOneQuery(Query *query, int cursorOptions, BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) bufusage_start = pgBufferUsage; INSTR_TIME_SET_CURRENT(planstart); @@ -1718,9 +1789,21 @@ ExplainOneQuery(Query *query, int cursorOptions, BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, queryEnv, + &planduration, (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL)); +#else /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL)); +#endif } } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 2fb5b26e3ca..dee3464cf14 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -547,7 +547,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList) List *sortClauseList = NIL; Node *limitCount = NULL; Node *limitOffset = NULL; - LimitOption limitOption = LIMIT_OPTION_DEFAULT; + LimitOption limitOption = LIMIT_OPTION_COUNT; Node *havingQual = NULL; bool hasDistinctOn = false; List *distinctClause = NIL; diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 841fa89cd47..bcd25ce2e2a 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -14,6 +14,8 @@ #include "utils/lsyscache.h" #include "utils/typcache.h" +#include "pg_version_constants.h" + #include "distributed/listutils.h" #include "distributed/metadata/distobject.h" #include "distributed/shardinterval_utils.h" @@ -134,6 +136,43 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } Oid targetRelationOid = InvalidOid; + +#if PG_VERSION_NUM >= PG_VERSION_17 + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newTuple = change->data.tp.newtuple; + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + /* updating non-partition column value */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple newTuple = change->data.tp.newtuple; + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldTuple = change->data.tp.oldtuple; + targetRelationOid = FindTargetRelationOid(relation, oldTuple, + replicationSlotName); + + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } +#else switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -168,6 +207,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } +#endif /* Current replication slot is not responsible for handling the change */ if (targetRelationOid == InvalidOid) @@ -185,6 +225,62 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc targetRelationDesc = RelationGetDescr(targetRelation); if (sourceRelationDesc->natts > targetRelationDesc->natts) { +#if PG_VERSION_NUM >= PG_VERSION_17 + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple = targetRelationNewTuple; + break; + } + + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple = targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } +#else switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -239,6 +335,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } +#endif } pgOutputPluginChangeCB(ctx, txn, targetRelation, change); diff --git a/src/backend/distributed/test/fake_am.c b/src/backend/distributed/test/fake_am.c index cff124961ac..92805194242 100644 --- a/src/backend/distributed/test/fake_am.c +++ b/src/backend/distributed/test/fake_am.c @@ -372,8 +372,13 @@ fake_vacuum(Relation onerel, VacuumParams *params, static bool -fake_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, +fake_scan_analyze_next_block(TableScanDesc scan, +#if PG_VERSION_NUM >= PG_VERSION_17 + ReadStream *stream) +#else + BlockNumber blockno, BufferAccessStrategy bstrategy) +#endif { /* we don't support analyze, so return false */ return false; diff --git a/src/backend/distributed/test/fake_fdw.c b/src/backend/distributed/test/fake_fdw.c index 585e61d4108..90b205b1ea1 100644 --- a/src/backend/distributed/test/fake_fdw.c +++ b/src/backend/distributed/test/fake_fdw.c @@ -29,7 +29,7 @@ #include "optimizer/restrictinfo.h" #include "utils/palloc.h" -#include "pg_version_constants.h" +#include "pg_version_compat.h" /* local function forward declarations */ static void FakeGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, @@ -91,9 +91,11 @@ FakeGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) Cost startup_cost = 0; Cost total_cost = startup_cost + baserel->rows; - add_path(baserel, (Path *) create_foreignscan_path(root, baserel, NULL, baserel->rows, - startup_cost, total_cost, NIL, - NULL, NULL, NIL)); + add_path(baserel, (Path *) create_foreignscan_path_compat(root, baserel, NULL, + baserel->rows, + startup_cost, total_cost, + NIL, + NULL, NULL, NIL, NIL)); } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index a0584bde82f..f23d58bd027 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -33,7 +33,7 @@ #include "storage/spin.h" #include "utils/timestamp.h" -#include "pg_version_constants.h" +#include "pg_version_compat.h" #include "distributed/backend_data.h" #include "distributed/connection_management.h" @@ -700,7 +700,7 @@ InitializeBackendData(const char *applicationName) uint64 gpid = ExtractGlobalPID(applicationName); - MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno]; + MyBackendData = &backendManagementShmemData->backends[getProcNo_compat(MyProc)]; Assert(MyBackendData); @@ -1174,11 +1174,11 @@ CurrentDistributedTransactionNumber(void) void GetBackendDataForProc(PGPROC *proc, BackendData *result) { - int pgprocno = proc->pgprocno; + int pgprocno = getProcNo_compat(proc); if (proc->lockGroupLeader != NULL) { - pgprocno = proc->lockGroupLeader->pgprocno; + pgprocno = getProcNo_compat(proc->lockGroupLeader); } BackendData *backendData = &backendManagementShmemData->backends[pgprocno]; @@ -1198,7 +1198,8 @@ GetBackendDataForProc(PGPROC *proc, BackendData *result) void CancelTransactionDueToDeadlock(PGPROC *proc) { - BackendData *backendData = &backendManagementShmemData->backends[proc->pgprocno]; + BackendData *backendData = &backendManagementShmemData->backends[getProcNo_compat( + proc)]; /* backend might not have used citus yet and thus not initialized backend data */ if (!backendData) @@ -1330,7 +1331,7 @@ ActiveDistributedTransactionNumbers(void) LocalTransactionId GetMyProcLocalTransactionId(void) { - return MyProc->lxid; + return getLxid_compat(MyProc); } diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index a224c29e11e..dadcfe0a4bc 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -23,6 +23,8 @@ #include "utils/hsearch.h" #include "utils/timestamp.h" +#include "pg_version_compat.h" + #include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" @@ -993,7 +995,7 @@ AllocWaitEdge(WaitGraph *waitGraph) static void AddProcToVisit(PROCStack *remaining, PGPROC *proc) { - if (remaining->procAdded[proc->pgprocno]) + if (remaining->procAdded[getProcNo_compat(proc)]) { return; } @@ -1001,7 +1003,7 @@ AddProcToVisit(PROCStack *remaining, PGPROC *proc) Assert(remaining->procCount < TotalProcCount()); remaining->procs[remaining->procCount++] = proc; - remaining->procAdded[proc->pgprocno] = true; + remaining->procAdded[getProcNo_compat(proc)] = true; } diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 0b03926f862..076e8ce6a3f 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -53,9 +53,6 @@ static const char *CitusNodeTagNamesD[] = { const char **CitusNodeTagNames = CitusNodeTagNamesD; -/* support for CitusNewNode() macro */ -CitusNode *newCitusNodeMacroHolder; - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(citus_extradata_container); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index d2b60aa5018..0370001eec2 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -170,7 +170,8 @@ worker_adjust_identity_column_seq_ranges(PG_FUNCTION_ARGS) if (attributeForm->attidentity) { - Oid sequenceOid = getIdentitySequence(tableRelationId, + Oid sequenceOid = getIdentitySequence(identitySequenceRelation_compat( + tableRelation), attributeForm->attnum, missingSequenceOk); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 888133a8978..16df367aa75 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -92,38 +92,21 @@ CitusNodeTagI(Node *node) return ((CitusNode*)(node))->citus_tag; } -/* - * Postgres's nodes/nodes.h has more information on why we do this. - */ -#ifdef __GNUC__ /* Citus variant of newNode(), don't use directly. */ -#define CitusNewNode(size, tag) \ -({ CitusNode *_result; \ - AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \ - _result = (CitusNode *) palloc0fast(size); \ - _result->extensible.type = T_ExtensibleNode; \ - _result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \ - _result->citus_tag =(int) (tag); \ - _result; \ -}) - -#else - -extern CitusNode *newCitusNodeMacroHolder; - -#define CitusNewNode(size, tag) \ -( \ - AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \ - newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \ - newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \ - newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \ - newCitusNodeMacroHolder->citus_tag =(int) (tag), \ - newCitusNodeMacroHolder \ -) - -#endif +static inline CitusNode * +CitusNewNode(size_t size, CitusNodeTag tag) +{ + CitusNode *result; + Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */ + result = (CitusNode *) palloc0(size); + result->extensible.type = T_ExtensibleNode; + result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; + result->citus_tag = (int) (tag); + + return result; +} /* * IsA equivalent that compares node tags, including Citus-specific nodes. diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 4e874e2ee63..80c4e9d3d7f 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -13,6 +13,139 @@ #include "pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_17 + +#include "catalog/pg_am.h" +#include "catalog/pg_auth_members.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_class.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_database.h" +#include "catalog/pg_extension.h" +#include "catalog/pg_foreign_server.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_parameter_acl.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_publication.h" +#include "catalog/pg_tablespace.h" +#include "catalog/pg_transform.h" +#include "catalog/pg_ts_config.h" +#include "catalog/pg_ts_dict.h" +#include "catalog/pg_ts_template.h" +#include "catalog/pg_type.h" + +typedef int ObjectClass; +#define getObjectClass(a) a->classId +#define LAST_OCLASS TransformRelationId +#define OCLASS_ROLE AuthIdRelationId +#define OCLASS_DATABASE DatabaseRelationId +#define OCLASS_TBLSPACE TableSpaceRelationId +#define OCLASS_PARAMETER_ACL ParameterAclRelationId +#define OCLASS_ROLE_MEMBERSHIP AuthMemRelationId +#define OCLASS_CLASS RelationRelationId +#define OCLASS_COLLATION CollationRelationId +#define OCLASS_CONSTRAINT ConstraintRelationId +#define OCLASS_PROC ProcedureRelationId +#define OCLASS_PUBLICATION PublicationRelationId +#define OCLASS_SCHEMA NamespaceRelationId +#define OCLASS_TSCONFIG TSConfigRelationId +#define OCLASS_TSDICT TSDictionaryRelationId +#define OCLASS_TYPE TypeRelationId +#define OCLASS_EXTENSION ExtensionRelationId +#define OCLASS_FOREIGN_SERVER ForeignServerRelationId +#define OCLASS_AM AccessMethodRelationId +#define OCLASS_TSTEMPLATE TSTemplateRelationId + +#include "commands/tablecmds.h" + +static inline void +RangeVarCallbackOwnsTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + return RangeVarCallbackMaintainsTable(relation, relId, oldRelId, arg); +} + + +#include "catalog/pg_attribute.h" +#include "utils/syscache.h" + +static inline int +getAttstattarget_compat(HeapTuple attTuple) +{ + bool isnull; + Datum dat = SysCacheGetAttr(ATTNUM, attTuple, + Anum_pg_attribute_attstattarget, &isnull); + return (isnull ? -1 : DatumGetInt16(dat)); +} + + +#include "catalog/pg_statistic_ext.h" + +static inline int +getStxstattarget_compat(HeapTuple tup) +{ + bool isnull; + Datum dat = SysCacheGetAttr(STATEXTOID, tup, + Anum_pg_statistic_ext_stxstattarget, &isnull); + return (isnull ? -1 : DatumGetInt16(dat)); +} + + +#define getAlterStatsStxstattarget_compat(a) ((Node *) makeInteger(a)) +#define getIntStxstattarget_compat(a) (intVal(a)) + +#define WaitEventSetTracker_compat CurrentResourceOwner + +#define identitySequenceRelation_compat(a) (a) + +#define matched_compat(a) (a->matchKind == MERGE_WHEN_MATCHED) + +#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, \ + k) create_foreignscan_path(a, b, c, d, e, f, g, h, \ + i, j, k) + +#define getProcNo_compat(a) (a->vxid.procNumber) +#define getLxid_compat(a) (a->vxid.lxid) + +#else + +#define Anum_pg_collation_colllocale Anum_pg_collation_colliculocale + +#include "access/htup_details.h" +static inline int +getAttstattarget_compat(HeapTuple attTuple) +{ + return ((Form_pg_attribute) GETSTRUCT(attTuple))->attstattarget; +} + + +#include "catalog/pg_statistic_ext.h" +static inline int +getStxstattarget_compat(HeapTuple tup) +{ + return ((Form_pg_statistic_ext) GETSTRUCT(tup))->stxstattarget; +} + + +#define getAlterStatsStxstattarget_compat(a) (a) +#define getIntStxstattarget_compat(a) (a) + +#define WaitEventSetTracker_compat CurrentMemoryContext + +#define identitySequenceRelation_compat(a) (RelationGetRelid(a)) + +#define matched_compat(a) (a->matched) + +#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, \ + k) create_foreignscan_path(a, b, c, d, e, f, g, h, \ + i, k) + +#define getProcNo_compat(a) (a->pgprocno) +#define getLxid_compat(a) (a->lxid) + +#endif + #if PG_VERSION_NUM >= PG_VERSION_16 #include "utils/guc_tables.h"