Skip to content

Commit

Permalink
Not undistribute Citus local table when converting it to a reference …
Browse files Browse the repository at this point in the history
…table / single-shard table
  • Loading branch information
onurctirtir committed Aug 23, 2023
1 parent 53501c7 commit 59d1480
Show file tree
Hide file tree
Showing 30 changed files with 947 additions and 110 deletions.
346 changes: 294 additions & 52 deletions src/backend/distributed/commands/create_distributed_table.c

Large diffs are not rendered by default.

116 changes: 116 additions & 0 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ static char * RemoteSchemaIdExpressionById(Oid schemaId);
static char * RemoteSchemaIdExpressionByName(char *schemaName);
static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
static char * RemoteTableIdExpression(Oid relationId);


PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
Expand All @@ -167,6 +168,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy);
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
Expand All @@ -176,6 +178,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);


static bool got_SIGTERM = false;
Expand Down Expand Up @@ -3449,6 +3452,27 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
}


/*
* citus_internal_add_placement_metadata is an internal UDF to
* delete a row from pg_dist_placement.
*/
Datum
citus_internal_delete_placement_metadata(PG_FUNCTION_ARGS)
{
int64 placementId = PG_GETARG_INT64(0);

if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
}

DeleteShardPlacementRow(placementId);

PG_RETURN_VOID();
}


/*
* citus_internal_add_placement_metadata_legacy is the old function that will be dropped.
*/
Expand Down Expand Up @@ -3836,6 +3860,33 @@ citus_internal_delete_tenant_schema(PG_FUNCTION_ARGS)
}


/*
* citus_internal_update_none_dist_table_metadata is an internal UDF to
* update a row in pg_dist_partition that belongs to given none-distributed
* table.
*/
Datum
citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

Oid relationId = PG_GETARG_OID(0);
char replicationModel = PG_GETARG_CHAR(1);
uint32 colocationId = PG_GETARG_INT32(2);
bool autoConverted = PG_GETARG_BOOL(3);

if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}

UpdateNoneDistTableMetadata(relationId, replicationModel,
colocationId, autoConverted);

PG_RETURN_VOID();
}


/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
Expand Down Expand Up @@ -4017,6 +4068,55 @@ TenantSchemaDeleteCommand(char *schemaName)
}


/*
* UpdateNoneDistTableMetadataCommand returns a command to call
* citus_internal_update_none_dist_table_metadata().
*/
char *
UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(%s, '%c', %u, %s)",
RemoteTableIdExpression(relationId), replicationModel, colocationId,
autoConverted ? "true" : "false");

return command->data;
}


/*
* AddPlacementMetadataCommand returns a command to call
* citus_internal_add_placement_metadata().
*/
char *
AddPlacementMetadataCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld)",
shardId, shardLength, groupId, placementId);
return command->data;
}


/*
* DeletePlacementMetadataCommand returns a command to call
* citus_internal_delete_placement_metadata().
*/
char *
DeletePlacementMetadataCommand(uint64 placementId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_delete_placement_metadata(%ld)",
placementId);
return command->data;
}


/*
* RemoteSchemaIdExpressionById returns an expression in text form that
* can be used to obtain the OID of the schema with given schema id on a
Expand Down Expand Up @@ -4051,6 +4151,22 @@ RemoteSchemaIdExpressionByName(char *schemaName)
}


/*
* RemoteTableIdExpression returns an expression in text form that
* can be used to obtain the OID of given table on a different node
* when included in a query string.
*/
static char *
RemoteTableIdExpression(Oid relationId)
{
StringInfo regclassExpr = makeStringInfo();
appendStringInfo(regclassExpr, "%s::regclass",
quote_literal_cstr(generate_qualified_relation_name(relationId)));

return regclassExpr->data;
}


/*
* SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata
* synced among given node list into metadataSyncContext.
Expand Down
131 changes: 131 additions & 0 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,17 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement)
}


/*
* IsRemoteShardPlacement returns true if the shard placement is on a remote
* node.
*/
bool
IsRemoteShardPlacement(ShardPlacement *shardPlacement)
{
return shardPlacement->groupId != GetLocalGroupId();
}


/*
* IsPlacementOnWorkerNode checks if the shard placement is for to the given
* workenode.
Expand Down Expand Up @@ -1783,6 +1794,24 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
}


/*
* InsertShardPlacementRowGlobally inserts shard placement that has given
* parameters into pg_dist_placement globally.
*/
ShardPlacement *
InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
{
InsertShardPlacementRow(shardId, placementId, shardLength, groupId);

char *insertPlacementCommand =
AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId);
SendCommandToWorkersWithMetadata(insertPlacementCommand);

return LoadShardPlacement(shardId, placementId);
}


/*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog. If placementId is
Expand Down Expand Up @@ -1999,6 +2028,21 @@ DeleteShardRow(uint64 shardId)
}


/*
* DeleteShardPlacementRowGlobally deletes shard placement that has given
* parameters from pg_dist_placement globally.
*/
void
DeleteShardPlacementRowGlobally(uint64 placementId)
{
DeleteShardPlacementRow(placementId);

char *deletePlacementCommand =
DeletePlacementMetadataCommand(placementId);
SendCommandToWorkersWithMetadata(deletePlacementCommand);
}


/*
* DeleteShardPlacementRow opens the shard placement system catalog, finds the placement
* with the given placementId, and deletes it.
Expand Down Expand Up @@ -2243,6 +2287,93 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
}


/*
* UpdateNoneDistTableMetadataGlobally globally updates pg_dist_partition for
* given none-distributed table.
*/
void
UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted)
{
UpdateNoneDistTableMetadata(relationId, replicationModel,
colocationId, autoConverted);

if (ShouldSyncTableMetadata(relationId))
{
char *metadataCommand =
UpdateNoneDistTableMetadataCommand(relationId,
replicationModel,
colocationId,
autoConverted);
SendCommandToWorkersWithMetadata(metadataCommand);
}
}


/*
* UpdateNoneDistTableMetadata locally updates pg_dist_partition for given
* none-distributed table.
*/
void
UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 colocationId,
bool autoConverted)
{
if (HasDistributionKey(relationId))
{
ereport(ERROR, (errmsg("cannot update metadata for a distributed "
"table that has a distribution column")));
}

ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];

Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));

SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);

HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for Citus table with oid: %u",
relationId)));
}

memset(replace, 0, sizeof(replace));

values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
replace[Anum_pg_dist_partition_colocationid - 1] = true;

values[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
isnull[Anum_pg_dist_partition_repmodel - 1] = false;
replace[Anum_pg_dist_partition_repmodel - 1] = true;

values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;

heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);

CatalogTupleUpdate(pgDistPartition, &heapTuple->t_self, heapTuple);

CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();

systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
}


/*
* Check that the current user has `mode` permissions on relationId, error out
* if not. Superusers always have such permissions.
Expand Down
36 changes: 26 additions & 10 deletions src/backend/distributed/operations/create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,8 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);

int32 workerNodeCount = list_length(workerNodeList);
if (workerNodeCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("couldn't find any worker nodes"),
errhint("Add more worker nodes")));
}
int roundRobinNodeIdx =
EmptySingleShardTableColocationDecideNodeId(colocationId);

char shardStorageType = ShardStorageType(relationId);
text *minHashTokenText = NULL;
Expand All @@ -412,9 +407,6 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
InsertShardRow(relationId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);

/* determine the node index based on colocation id */
int roundRobinNodeIdx = colocationId % workerNodeCount;

int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
Expand All @@ -433,6 +425,30 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
}


/*
* EmptySingleShardTableColocationDecideNodeId returns index of the node
* that first shard to be created in given "single-shard table colocation
* group" should be placed on.
*
* This is determined by modulo of the colocation id by the length of the
* list returned by DistributedTablePlacementNodeList().
*/
int
EmptySingleShardTableColocationDecideNodeId(uint32 colocationId)
{
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
int32 workerNodeCount = list_length(workerNodeList);
if (workerNodeCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("couldn't find any worker nodes"),
errhint("Add more worker nodes")));
}

return colocationId % workerNodeCount;
}


/*
* CheckHashPartitionedTable looks up the partition information for the given
* tableId and checks if the table is hash partitioned. If not, the function
Expand Down
Loading

0 comments on commit 59d1480

Please sign in to comment.