Skip to content

Commit

Permalink
Add citus_schema_move() function (#7180)
Browse files Browse the repository at this point in the history
Add citus_schema_move() that can be used to move tenant tables within a distributed
schema to another node. The function has two variations as simple wrappers around
citus_move_shard_placement() and citus_move_shard_placement_with_nodeid() respectively.
They pick a shard that belongs to the given tenant schema and resolve the source node
that contain the shards under given tenant schema. Hence their signatures are quite
similar to underlying functions:

```sql
-- citus_schema_move(), using target node name and node port
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
	schema_id regnamespace,
	target_node_name text,
	target_node_port integer,
	shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move$$;

-- citus_schema_move(), using target node id
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
	schema_id regnamespace,
	target_node_id integer,
	shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
```
  • Loading branch information
onurctirtir authored Sep 8, 2023
1 parent 8894c76 commit d628a4c
Show file tree
Hide file tree
Showing 14 changed files with 631 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/backend/distributed/commands/alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "distributed/replication_origin_session_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_utils.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "executor/spi.h"
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/commands/drop_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
Expand Down
150 changes: 150 additions & 0 deletions src/backend/distributed/commands/schema_based_sharding.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/shard_transfer.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_shard_visibility.h"
#include "utils/builtins.h"
Expand All @@ -29,17 +30,31 @@
#include "utils/syscache.h"


/* return value of CreateCitusMoveSchemaParams() */
typedef struct
{
uint64 anchorShardId;
uint32 sourceNodeId;
char *sourceNodeName;
uint32 sourceNodePort;
} CitusMoveSchemaParams;


static void UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName);
static List * SchemaGetNonShardTableIdList(Oid schemaId);
static void EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList);
static void EnsureTenantSchemaNameAllowed(Oid schemaId);
static void EnsureTableKindSupportedForTenantSchema(Oid relationId);
static void EnsureFKeysForTenantTable(Oid relationId);
static void EnsureSchemaExist(Oid schemaId);
static CitusMoveSchemaParams * CreateCitusMoveSchemaParams(Oid schemaId);
static uint64 TenantSchemaPickAnchorShardId(Oid schemaId);


/* controlled via citus.enable_schema_based_sharding GUC */
bool EnableSchemaBasedSharding = false;


const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
"undistribute_table",
"alter_distributed_table",
Expand All @@ -52,6 +67,8 @@ const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
PG_FUNCTION_INFO_V1(citus_internal_unregister_tenant_schema_globally);
PG_FUNCTION_INFO_V1(citus_schema_distribute);
PG_FUNCTION_INFO_V1(citus_schema_undistribute);
PG_FUNCTION_INFO_V1(citus_schema_move);
PG_FUNCTION_INFO_V1(citus_schema_move_with_nodeid);

/*
* ShouldUseSchemaBasedSharding returns true if schema given name should be
Expand Down Expand Up @@ -757,6 +774,139 @@ citus_schema_undistribute(PG_FUNCTION_ARGS)
}


/*
* citus_schema_move moves the shards that belong to given distributed tenant
* schema from one node to the other node by using citus_move_shard_placement().
*/
Datum
citus_schema_move(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();

Oid schemaId = PG_GETARG_OID(0);
CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);

DirectFunctionCall6(citus_move_shard_placement,
UInt64GetDatum(params->anchorShardId),
CStringGetTextDatum(params->sourceNodeName),
UInt32GetDatum(params->sourceNodePort),
PG_GETARG_DATUM(1),
PG_GETARG_DATUM(2),
PG_GETARG_DATUM(3));
PG_RETURN_VOID();
}


/*
* citus_schema_move_with_nodeid does the same as citus_schema_move(), but
* accepts node id as parameter instead of hostname and port, hence uses
* citus_move_shard_placement_with_nodeid().
*/
Datum
citus_schema_move_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();

Oid schemaId = PG_GETARG_OID(0);
CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);

DirectFunctionCall4(citus_move_shard_placement_with_nodeid,
UInt64GetDatum(params->anchorShardId),
UInt32GetDatum(params->sourceNodeId),
PG_GETARG_DATUM(1),
PG_GETARG_DATUM(2));
PG_RETURN_VOID();
}


/*
* CreateCitusMoveSchemaParams is a helper function for
* citus_schema_move() and citus_schema_move_with_nodeid()
* that validates input schema and returns the parameters to be used in underlying
* shard transfer functions.
*/
static CitusMoveSchemaParams *
CreateCitusMoveSchemaParams(Oid schemaId)
{
EnsureSchemaExist(schemaId);
EnsureSchemaOwner(schemaId);

if (!IsTenantSchema(schemaId))
{
ereport(ERROR, (errmsg("schema %s is not a distributed schema",
get_namespace_name(schemaId))));
}

uint64 anchorShardId = TenantSchemaPickAnchorShardId(schemaId);
if (anchorShardId == INVALID_SHARD_ID)
{
ereport(ERROR, (errmsg("cannot move distributed schema %s because it is empty",
get_namespace_name(schemaId))));
}

uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
uint32 sourceNodeId = SingleShardTableColocationNodeId(colocationId);

bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);

CitusMoveSchemaParams *params = palloc0(sizeof(CitusMoveSchemaParams));
params->anchorShardId = anchorShardId;
params->sourceNodeId = sourceNodeId;
params->sourceNodeName = sourceNode->workerName;
params->sourceNodePort = sourceNode->workerPort;
return params;
}


/*
* TenantSchemaPickAnchorShardId returns the id of one of the shards
* created in given tenant schema.
*
* Returns INVALID_SHARD_ID if the schema was initially empty or if it's not
* a tenant schema.
*
* Throws an error if all the tables in the schema are concurrently dropped.
*/
static uint64
TenantSchemaPickAnchorShardId(Oid schemaId)
{
uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
List *tablesInSchema = ColocationGroupTableList(colocationId, 0);
if (list_length(tablesInSchema) == 0)
{
return INVALID_SHARD_ID;
}

Oid relationId = InvalidOid;
foreach_oid(relationId, tablesInSchema)
{
/*
* Make sure the relation isn't dropped for the remainder of
* the transaction.
*/
LockRelationOid(relationId, AccessShareLock);

/*
* The relation might have been dropped just before we locked it.
* Let's look it up.
*/
Relation relation = RelationIdGetRelation(relationId);
if (RelationIsValid(relation))
{
/* relation still exists, we can use it */
RelationClose(relation);
return GetFirstShardId(relationId);
}
}

ereport(ERROR, (errmsg("tables in schema %s are concurrently dropped",
get_namespace_name(schemaId))));
}


/*
* ErrorIfTenantTable errors out with the given operation name,
* if the given relation is a tenant table.
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/sql/citus--12.0-1--12.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@

#include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql"
#include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql"

#include "udfs/citus_schema_move/12.1-1.sql"
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
placement_id bigint
);

DROP FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace, target_node_name text, target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode
);

DROP FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace, target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode
);
29 changes: 29 additions & 0 deletions src/backend/distributed/sql/udfs/citus_schema_move/12.1-1.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions src/backend/distributed/sql/udfs/citus_schema_move/latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- citus_schema_move, using target node name and node port
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move$$;
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a distributed schema to given node';

-- citus_schema_move, using target node id
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a distributed schema to given node';
1 change: 0 additions & 1 deletion src/include/distributed/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,6 @@ extern void UpdateAutoConvertedForConnectedRelations(List *relationId, bool
/* schema_based_sharding.c */
extern bool ShouldUseSchemaBasedSharding(char *schemaName);
extern bool ShouldCreateTenantSchemaTable(Oid relationId);
extern bool IsTenantSchema(Oid schemaId);
extern void EnsureTenantTable(Oid relationId, char *operationName);
extern void ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId,
Oid partitionRelationId);
Expand Down
3 changes: 3 additions & 0 deletions src/include/distributed/shard_transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "distributed/shard_rebalancer.h"
#include "nodes/pg_list.h"

extern Datum citus_move_shard_placement(PG_FUNCTION_ARGS);
extern Datum citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS);

typedef enum
{
SHARD_TRANSFER_INVALID_FIRST = 0,
Expand Down
Loading

0 comments on commit d628a4c

Please sign in to comment.