diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 60a5ab92bed..a73f2e9d2cb 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -9,7 +9,6 @@ #include "funcapi.h" #include "utils/plancache.h" - #include "access/genam.h" #include "access/heapam.h" #include "access/htup.h" @@ -102,8 +101,8 @@ static HeapTuple GetNodeByNodeId(int32 nodeId); static int32 GetNextGroupId(void); static int GetNextNodeId(void); static void InsertPlaceholderCoordinatorRecord(void); -static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata - *nodeMetadata); +static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, + NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -134,6 +133,13 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); +static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE + lockMode); +static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown); +static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( + WorkerNode *workerNode, bool force, int32 lock_cooldown); + +/* Function definitions go here */ /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -152,6 +158,7 @@ PG_FUNCTION_INFO_V1(master_disable_node); PG_FUNCTION_INFO_V1(citus_activate_node); PG_FUNCTION_INFO_V1(master_activate_node); PG_FUNCTION_INFO_V1(citus_update_node); +PG_FUNCTION_INFO_V1(citus_pause_node_within_txn); PG_FUNCTION_INFO_V1(master_update_node); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); @@ -160,7 +167,6 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); - /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK. @@ -544,7 +550,8 @@ citus_disable_node(PG_FUNCTION_ARGS) "metadata is not allowed"), errhint("You can force disabling node, SELECT " "citus_disable_node('%s', %d, " - "synchronous:=true);", workerNode->workerName, + "synchronous:=true);", + workerNode->workerName, nodePort), errdetail("Citus uses the first worker node in the " "metadata for certain internal operations when " @@ -693,8 +700,7 @@ citus_set_node_property(PG_FUNCTION_ARGS) else { ereport(ERROR, (errmsg( - "only the 'shouldhaveshards' property can be set using this function" - ))); + "only the 'shouldhaveshards' property can be set using this function"))); } TransactionModifiedNodeMetadata = true; @@ -1160,6 +1166,100 @@ ActivateNodeList(MetadataSyncContext *context) } +/* + * Acquires shard metadata locks on all shards residing in the given worker node + * + * TODO: This function is not compatible with query from any node feature. + * To ensure proper behavior, it is essential to acquire locks on placements across all nodes + * rather than limiting it to just the coordinator (or the specific node from which this function is called) + */ +void +LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) +{ + List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); + LockShardsInPlacementListMetadata(placementList, lockMode); +} + + +/* + * This function is used to start a background worker to kill backends holding conflicting + * locks with this backend. It returns NULL if the background worker could not be started. + */ +BackgroundWorkerHandle * +CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown) +{ + BackgroundWorkerHandle *handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, + lock_cooldown); + if (!handle) + { + /* + * We failed to start a background worker, which probably means that we exceeded + * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want + * to repeatedly throw an error because if citus_update_node is called to complete a + * failover then finishing is the only way to bring the cluster back up. Therefore we + * give up on killing other backends and simply wait for the lock. We do set + * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. + */ + SetLockTimeoutLocally(lock_cooldown); + ereport(WARNING, (errmsg( + "could not start background worker to kill backends with conflicting" + " locks to force the update. Degrading to acquiring locks " + "with a lock time out."), + errhint( + "Increasing max_worker_processes might help."))); + } + return handle; +} + + +/* + * This function is used to lock shards in a primary node. + * If force is true, we start a background worker to kill backends holding + * conflicting locks with this backend. + * + * If the node is a primary node we block reads and writes. + * + * This lock has two purposes: + * + * - Ensure buggy code in Citus doesn't cause failures when the + * nodename/nodeport of a node changes mid-query + * + * - Provide fencing during failover, after this function returns all + * connections will use the new node location. + * + * Drawback: + * + * - This function blocks until all previous queries have finished. This + * means that long-running queries will prevent failover. + * + * In case of node failure said long-running queries will fail in the end + * anyway as they will be unable to commit successfully on the failed + * machine. To cause quick failure of these queries use force => true + * during the invocation of citus_update_node to terminate conflicting + * backends proactively. + * + * It might be worth blocking reads to a secondary for the same reasons, + * though we currently only query secondaries on follower clusters + * where these locks will have no effect. + */ +BackgroundWorkerHandle * +LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32 + lock_cooldown) +{ + BackgroundWorkerHandle *handle = NULL; + + if (NodeIsPrimary(workerNode)) + { + if (force) + { + handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown); + } + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); + } + return handle; +} + + /* * citus_update_node moves the requested node to a different nodename and nodeport. It * locks to ensure no queries are running concurrently; and is intended for customers who @@ -1188,8 +1288,6 @@ citus_update_node(PG_FUNCTION_ARGS) int32 lock_cooldown = PG_GETARG_INT32(4); char *newNodeNameString = text_to_cstring(newNodeName); - List *placementList = NIL; - BackgroundWorkerHandle *handle = NULL; WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); @@ -1226,64 +1324,9 @@ citus_update_node(PG_FUNCTION_ARGS) EnsureTransactionalMetadataSyncMode(); } - /* - * If the node is a primary node we block reads and writes. - * - * This lock has two purposes: - * - * - Ensure buggy code in Citus doesn't cause failures when the - * nodename/nodeport of a node changes mid-query - * - * - Provide fencing during failover, after this function returns all - * connections will use the new node location. - * - * Drawback: - * - * - This function blocks until all previous queries have finished. This - * means that long-running queries will prevent failover. - * - * In case of node failure said long-running queries will fail in the end - * anyway as they will be unable to commit successfully on the failed - * machine. To cause quick failure of these queries use force => true - * during the invocation of citus_update_node to terminate conflicting - * backends proactively. - * - * It might be worth blocking reads to a secondary for the same reasons, - * though we currently only query secondaries on follower clusters - * where these locks will have no effect. - */ - if (NodeIsPrimary(workerNode)) - { - /* - * before acquiring the locks check if we want a background worker to help us to - * aggressively obtain the locks. - */ - if (force) - { - handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); - if (!handle) - { - /* - * We failed to start a background worker, which probably means that we exceeded - * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want - * to repeatedly throw an error because if citus_update_node is called to complete a - * failover then finishing is the only way to bring the cluster back up. Therefore we - * give up on killing other backends and simply wait for the lock. We do set - * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. - */ - SetLockTimeoutLocally(lock_cooldown); - ereport(WARNING, (errmsg( - "could not start background worker to kill backends with conflicting" - " locks to force the update. Degrading to acquiring locks " - "with a lock time out."), - errhint( - "Increasing max_worker_processes might help."))); - } - } - - placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); - LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); - } + BackgroundWorkerHandle *handle = LockPlacementsWithBackgroundWorkersInPrimaryNode( + workerNode, force, + lock_cooldown); /* * if we have planned statements such as prepared statements, we should clear the cache so that @@ -1330,6 +1373,34 @@ citus_update_node(PG_FUNCTION_ARGS) } +/* + * This function is designed to obtain locks for all the shards in a worker placement list. + * Once the transaction is committed, the acquired locks will be automatically released. + * Therefore, it is essential to invoke this function within a transaction. + * This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction. + */ +Datum +citus_pause_node_within_txn(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int32 nodeId = PG_GETARG_INT32(0); + bool force = PG_GETARG_BOOL(1); + int32 lock_cooldown = PG_GETARG_INT32(2); + + WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND), + errmsg("node %u not found", nodeId))); + } + + LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown); + + PG_RETURN_VOID(); +} + + /* * master_update_node is a wrapper function for old UDF name. */ @@ -1947,7 +2018,8 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode) ereport(ERROR, (errmsg("cannot remove or disable the node " "%s:%d because because it contains " "the only shard placement for " - "shard " UINT64_FORMAT, workerNode->workerName, + "shard " UINT64_FORMAT, + workerNode->workerName, workerNode->workerPort, placement->shardId), errdetail("One of the table(s) that prevents the operation " "complete successfully is %s", @@ -2499,7 +2571,8 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) { ereport(ERROR, (errmsg("cannot change \"%s\" field of the " - "coordinator node", field))); + "coordinator node", + field))); } } diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index b97d5083df7..b904b6a838a 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -2,6 +2,7 @@ -- bump version to 12.1-1 +#include "udfs/citus_pause_node_within_txn/12.1-1.sql" #include "udfs/citus_prepare_pg_upgrade/12.1-1.sql" #include "udfs/citus_finish_pg_upgrade/12.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index dc242a1be6d..82ff28a6b2e 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -1,5 +1,5 @@ -- citus--12.1-1--12.0-1 - +DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int); -- we have modified the relevant upgrade script to include any_value changes -- we don't need to upgrade this downgrade path for any_value changes -- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op. @@ -12,3 +12,4 @@ DROP FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata( DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata( placement_id bigint ); + diff --git a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql new file mode 100644 index 00000000000..9f81d6840c1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool DEFAULT false, + lock_cooldown int DEFAULT 10000) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; + +COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool , + lock_cooldown int ) + IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql new file mode 100644 index 00000000000..9f81d6840c1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool DEFAULT false, + lock_cooldown int DEFAULT 10000) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; + +COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool , + lock_cooldown int ) + IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC; diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out new file mode 100644 index 00000000000..dd796f76818 --- /dev/null +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -0,0 +1,317 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-pause-node s2-insert-distributed s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-insert-distributed: + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + +step s1-end: + COMMIT; + +step s2-insert-distributed: <... completed> +step s2-end: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-pause-node s2-delete-distributed s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-delete-distributed: + -- Execute the DELETE statement + delete from employee where id = 9; + +step s1-end: + COMMIT; + +step s2-delete-distributed: <... completed> +step s2-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s2-begin s2-select-distributed s1-end s2-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-begin: + BEGIN; + +step s2-select-distributed: + select * from employee where id = 10; + +id|name|company_id +--------------------------------------------------------------------- +10|e10 | 3 +(1 row) + +step s1-end: + COMMIT; + +step s2-end: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-pause-node s2-insert-reference s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-insert-reference: + -- Execute the INSERT statement + insert into city values(3,'city3'); + +step s1-end: + COMMIT; + +step s2-insert-reference: <... completed> +step s2-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s1-pause-node s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s1-node-not-found s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: Node not found. +step s1-node-not-found: + DO $$ + DECLARE + v_node_id int:= -1; + v_node_exists boolean := true; + v_exception_message text; + v_expected_exception_message text := ''; + BEGIN + select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; + select citus_pause_node_within_txn(v_node_id) ; + EXCEPTION + WHEN SQLSTATE 'P0002' THEN + GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; + v_expected_exception_message := 'node ' || v_node_id || ' not found'; + if v_exception_message = v_expected_exception_message then + RAISE NOTICE 'Node not found.'; + end if; + END; + $$ + LANGUAGE plpgsql; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-insert-distributed s1-pause-node-force s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-insert-distributed: + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + +step s1-pause-node-force: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + v_force boolean := true; + v_lock_cooldown int := 100; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node with force true + perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force,v_lock_cooldown) ; + END; + $$ + LANGUAGE plpgsql; + +s1: NOTICE: +step s1-pause-node-force: <... completed> +step s1-end: + COMMIT; + +step s2-end: + COMMIT; + +FATAL: terminating connection due to administrator command +SSL connection has been closed unexpectedly + diff --git a/src/test/regress/expected/isolation_citus_pause_node_1.out b/src/test/regress/expected/isolation_citus_pause_node_1.out new file mode 100644 index 00000000000..7b84ecd7226 --- /dev/null +++ b/src/test/regress/expected/isolation_citus_pause_node_1.out @@ -0,0 +1,318 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-pause-node s2-insert-distributed s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-insert-distributed: + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + +step s1-end: + COMMIT; + +step s2-insert-distributed: <... completed> +step s2-end: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-pause-node s2-delete-distributed s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-delete-distributed: + -- Execute the DELETE statement + delete from employee where id = 9; + +step s1-end: + COMMIT; + +step s2-delete-distributed: <... completed> +step s2-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s2-begin s2-select-distributed s1-end s2-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-begin: + BEGIN; + +step s2-select-distributed: + select * from employee where id = 10; + +id|name|company_id +--------------------------------------------------------------------- +10|e10 | 3 +(1 row) + +step s1-end: + COMMIT; + +step s2-end: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-pause-node s2-insert-reference s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s2-insert-reference: + -- Execute the INSERT statement + insert into city values(3,'city3'); + +step s1-end: + COMMIT; + +step s2-insert-reference: <... completed> +step s2-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s1-pause-node s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +s1: NOTICE: +step s1-pause-node: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s1-node-not-found s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: Node not found. +step s1-node-not-found: + DO $$ + DECLARE + v_node_id int:= -1; + v_node_exists boolean := true; + v_exception_message text; + v_expected_exception_message text := ''; + BEGIN + select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; + select citus_pause_node_within_txn(v_node_id) ; + EXCEPTION + WHEN SQLSTATE 'P0002' THEN + GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; + v_expected_exception_message := 'node ' || v_node_id || ' not found'; + if v_exception_message = v_expected_exception_message then + RAISE NOTICE 'Node not found.'; + end if; + END; + $$ + LANGUAGE plpgsql; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-insert-distributed s1-pause-node-force s1-end s2-end +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-insert-distributed: + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + +step s1-pause-node-force: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + v_force boolean := true; + v_lock_cooldown int := 100; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node with force true + perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force,v_lock_cooldown) ; + END; + $$ + LANGUAGE plpgsql; + +s1: NOTICE: +step s1-pause-node-force: <... completed> +step s1-end: + COMMIT; + +step s2-end: + COMMIT; + +FATAL: terminating connection due to administrator command +FATAL: terminating connection due to administrator command +SSL connection has been closed unexpectedly + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index b1d9d3a6f12..86ca9573a34 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1403,7 +1403,8 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- | function citus_internal_delete_placement_metadata(bigint) void | function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void -(2 rows) + | function citus_pause_node_within_txn(integer,boolean,integer) void +(3 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index d379f22a4cb..1a4b5d04d9a 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -105,6 +105,7 @@ ORDER BY 1; function citus_nodeid_for_gpid(bigint) function citus_nodename_for_nodeid(integer) function citus_nodeport_for_nodeid(integer) + function citus_pause_node_within_txn(integer,boolean,integer) function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() @@ -340,5 +341,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(330 rows) +(331 rows) diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 1484c712fea..d8cc77c73f3 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -77,6 +77,7 @@ test: isolation_global_pid test: isolation_citus_locks test: isolation_reference_table test: isolation_schema_based_sharding +test: isolation_citus_pause_node test: isolation_citus_schema_distribute_undistribute # Rebalancer diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec new file mode 100644 index 00000000000..8449e2b3cde --- /dev/null +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -0,0 +1,185 @@ +setup +{ + SET citus.shard_replication_factor to 1; + + create table city (id int , name text ); + SELECT create_reference_table('city'); + + CREATE TABLE company(id int primary key, name text, city_id int); + select create_distributed_table('company', 'id'); + + create table employee(id int , name text, company_id int ); + alter table employee add constraint employee_pkey primary key (id,company_id); + + select create_distributed_table('employee', 'company_id'); + + insert into city values(1,'city1'); + insert into city values(2,'city2'); + + + insert into company values(1,'c1', 1); + insert into company values(2,'c2',2); + insert into company values(3,'c3',1); + + insert into employee values(1,'e1',1); + insert into employee values(2,'e2',1); + insert into employee values(3,'e3',1); + + insert into employee values(4,'e4',2); + insert into employee values(5,'e5',2); + insert into employee values(6,'e6',2); + + insert into employee values(7,'e7',3); + insert into employee values(8,'e8',3); + insert into employee values(9,'e9',3); + insert into employee values(10,'e10',3); + + +} + +teardown +{ + DROP TABLE employee,company,city; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-node-not-found" +{ + DO $$ + DECLARE + v_node_id int:= -1; + v_node_exists boolean := true; + v_exception_message text; + v_expected_exception_message text := ''; + BEGIN + select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; + select citus_pause_node_within_txn(v_node_id) ; + EXCEPTION + WHEN SQLSTATE 'P0002' THEN + GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; + v_expected_exception_message := 'node ' || v_node_id || ' not found'; + if v_exception_message = v_expected_exception_message then + RAISE NOTICE 'Node not found.'; + end if; + END; + $$ + LANGUAGE plpgsql; +} + +step "s1-pause-node" +{ + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + + + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; +} + +step "s1-pause-node-force" +{ + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + v_force boolean := true; + v_lock_cooldown int := 100; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + + + -- Pause the node with force true + perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force,v_lock_cooldown) ; + END; + $$ + LANGUAGE plpgsql; +} + +step "s1-end" +{ + COMMIT; +} + +session "s2" + + +step "s2-begin" +{ + BEGIN; +} + +step "s2-insert-distributed" +{ + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + +} + +step "s2-insert-reference"{ + -- Execute the INSERT statement + insert into city values(3,'city3'); +} + +step "s2-select-distributed"{ + + select * from employee where id = 10; +} + + +step "s2-delete-distributed"{ + -- Execute the DELETE statement + delete from employee where id = 9; +} + +step "s2-end" +{ + COMMIT; +} + +permutation "s1-begin" "s2-begin" "s1-pause-node" "s2-insert-distributed" "s1-end" "s2-end" +permutation "s1-begin" "s2-begin" "s1-pause-node" "s2-delete-distributed" "s1-end" "s2-end" +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-select-distributed" "s1-end" "s2-end" +permutation "s1-begin" "s2-begin" "s1-pause-node" "s2-insert-reference" "s1-end" "s2-end" +permutation "s1-begin" "s1-pause-node" "s1-pause-node" "s1-end" +permutation "s1-begin" "s1-node-not-found" "s1-end" +permutation "s1-begin" "s2-begin" "s2-insert-distributed" "s1-pause-node-force"(*) "s1-end" "s2-end"