-
Notifications
You must be signed in to change notification settings - Fork 666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds citus_pause_node udf #7089
Changes from 59 commits
9d46cd8
304a00e
7ccbb84
b178a75
20ae610
46ab6f1
8928c0f
6da0baa
afa7bf6
515627e
d42f557
4ed78f1
24380f8
28cda81
ed40dfe
63311e5
1c05eeb
339a47a
4c3341e
b69c36a
c41f93e
1a1b633
3220bd9
6f2ddf4
cc403bf
a05d5fc
29c5b0c
9e79cd6
997a5d7
bb62b84
3edefdc
b471bb0
88695c6
dd72cf0
18c55a4
86e0831
be2e653
d10eb05
de83b01
3fbe5e4
1a5cf9d
d9cecba
48a5450
458edd8
eda5539
7ac5f21
dd90ec7
4030ec9
af29e2d
6d171e9
22303ad
850df01
ecc675b
c4d694e
1e4699c
fcc6903
796a740
43abc8b
820ef06
4af29ed
8fb4d2f
463845e
c42413b
365b762
5ded545
2ac620d
d355193
3784889
43d961f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,6 @@ | |
#include "funcapi.h" | ||
#include "utils/plancache.h" | ||
|
||
|
||
#include "access/genam.h" | ||
#include "access/heapam.h" | ||
#include "access/htup.h" | ||
|
@@ -102,8 +101,8 @@ | |
static int32 GetNextGroupId(void); | ||
static int GetNextNodeId(void); | ||
static void InsertPlaceholderCoordinatorRecord(void); | ||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata | ||
*nodeMetadata); | ||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, | ||
NodeMetadata *nodeMetadata); | ||
static void DeleteNodeRow(char *nodename, int32 nodeport); | ||
static void BlockDistributedQueriesOnMetadataNodes(void); | ||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); | ||
|
@@ -134,6 +133,13 @@ | |
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); | ||
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); | ||
static void EnsureTransactionalMetadataSyncMode(void); | ||
static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE | ||
lockMode); | ||
static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown); | ||
static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( | ||
WorkerNode *workerNode, bool force, int32 lock_cooldown); | ||
|
||
/* Function definitions go here */ | ||
|
||
/* declarations for dynamic loading */ | ||
PG_FUNCTION_INFO_V1(citus_set_coordinator_host); | ||
|
@@ -152,6 +158,7 @@ | |
PG_FUNCTION_INFO_V1(citus_activate_node); | ||
PG_FUNCTION_INFO_V1(master_activate_node); | ||
PG_FUNCTION_INFO_V1(citus_update_node); | ||
PG_FUNCTION_INFO_V1(citus_pause_node_within_txn); | ||
PG_FUNCTION_INFO_V1(master_update_node); | ||
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); | ||
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); | ||
|
@@ -160,7 +167,6 @@ | |
PG_FUNCTION_INFO_V1(citus_is_coordinator); | ||
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); | ||
|
||
|
||
/* | ||
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to | ||
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK. | ||
|
@@ -544,7 +550,8 @@ | |
"metadata is not allowed"), | ||
errhint("You can force disabling node, SELECT " | ||
"citus_disable_node('%s', %d, " | ||
"synchronous:=true);", workerNode->workerName, | ||
"synchronous:=true);", | ||
workerNode->workerName, | ||
nodePort), | ||
errdetail("Citus uses the first worker node in the " | ||
"metadata for certain internal operations when " | ||
|
@@ -693,8 +700,7 @@ | |
else | ||
{ | ||
ereport(ERROR, (errmsg( | ||
"only the 'shouldhaveshards' property can be set using this function" | ||
))); | ||
"only the 'shouldhaveshards' property can be set using this function"))); | ||
} | ||
|
||
TransactionModifiedNodeMetadata = true; | ||
|
@@ -1160,6 +1166,100 @@ | |
} | ||
|
||
|
||
/* | ||
* Acquires shard metadata locks on all shards residing in the given worker node | ||
* | ||
* TODO: This function is not compatible with query from any node feature. | ||
* To ensure proper behavior, it is essential to acquire locks on placements across all nodes | ||
* rather than limiting it to just the coordinator (or the specific node from which this function is called) | ||
*/ | ||
void | ||
LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) | ||
{ | ||
List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); | ||
LockShardsInPlacementListMetadata(placementList, lockMode); | ||
} | ||
|
||
|
||
/* | ||
* This function is used to start a background worker to kill backends holding conflicting | ||
* locks with this backend. It returns NULL if the background worker could not be started. | ||
*/ | ||
BackgroundWorkerHandle * | ||
CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function needs a comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment |
||
{ | ||
BackgroundWorkerHandle *handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, | ||
lock_cooldown); | ||
if (!handle) | ||
{ | ||
/* | ||
* We failed to start a background worker, which probably means that we exceeded | ||
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want | ||
* to repeatedly throw an error because if citus_update_node is called to complete a | ||
* failover then finishing is the only way to bring the cluster back up. Therefore we | ||
* give up on killing other backends and simply wait for the lock. We do set | ||
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. | ||
*/ | ||
SetLockTimeoutLocally(lock_cooldown); | ||
ereport(WARNING, (errmsg( | ||
"could not start background worker to kill backends with conflicting" | ||
" locks to force the update. Degrading to acquiring locks " | ||
"with a lock time out."), | ||
errhint( | ||
"Increasing max_worker_processes might help."))); | ||
} | ||
return handle; | ||
} | ||
|
||
|
||
/* | ||
* This function is used to lock shards in a primary node. | ||
* If force is true, we start a background worker to kill backends holding | ||
* conflicting locks with this backend. | ||
* | ||
* If the node is a primary node we block reads and writes. | ||
* | ||
* This lock has two purposes: | ||
* | ||
* - Ensure buggy code in Citus doesn't cause failures when the | ||
* nodename/nodeport of a node changes mid-query | ||
* | ||
* - Provide fencing during failover, after this function returns all | ||
* connections will use the new node location. | ||
* | ||
* Drawback: | ||
* | ||
* - This function blocks until all previous queries have finished. This | ||
* means that long-running queries will prevent failover. | ||
* | ||
* In case of node failure said long-running queries will fail in the end | ||
* anyway as they will be unable to commit successfully on the failed | ||
* machine. To cause quick failure of these queries use force => true | ||
* during the invocation of citus_update_node to terminate conflicting | ||
* backends proactively. | ||
* | ||
* It might be worth blocking reads to a secondary for the same reasons, | ||
* though we currently only query secondaries on follower clusters | ||
* where these locks will have no effect. | ||
*/ | ||
BackgroundWorkerHandle * | ||
LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32 | ||
lock_cooldown) | ||
{ | ||
BackgroundWorkerHandle *handle = NULL; | ||
|
||
if (NodeIsPrimary(workerNode)) | ||
{ | ||
if (force) | ||
{ | ||
handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown); | ||
} | ||
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); | ||
} | ||
return handle; | ||
} | ||
|
||
|
||
/* | ||
* citus_update_node moves the requested node to a different nodename and nodeport. It | ||
* locks to ensure no queries are running concurrently; and is intended for customers who | ||
|
@@ -1188,8 +1288,6 @@ | |
int32 lock_cooldown = PG_GETARG_INT32(4); | ||
|
||
char *newNodeNameString = text_to_cstring(newNodeName); | ||
List *placementList = NIL; | ||
BackgroundWorkerHandle *handle = NULL; | ||
|
||
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, | ||
newNodePort); | ||
|
@@ -1226,64 +1324,9 @@ | |
EnsureTransactionalMetadataSyncMode(); | ||
} | ||
|
||
/* | ||
* If the node is a primary node we block reads and writes. | ||
* | ||
* This lock has two purposes: | ||
* | ||
* - Ensure buggy code in Citus doesn't cause failures when the | ||
* nodename/nodeport of a node changes mid-query | ||
* | ||
* - Provide fencing during failover, after this function returns all | ||
* connections will use the new node location. | ||
* | ||
* Drawback: | ||
* | ||
* - This function blocks until all previous queries have finished. This | ||
* means that long-running queries will prevent failover. | ||
* | ||
* In case of node failure said long-running queries will fail in the end | ||
* anyway as they will be unable to commit successfully on the failed | ||
* machine. To cause quick failure of these queries use force => true | ||
* during the invocation of citus_update_node to terminate conflicting | ||
* backends proactively. | ||
* | ||
* It might be worth blocking reads to a secondary for the same reasons, | ||
* though we currently only query secondaries on follower clusters | ||
* where these locks will have no effect. | ||
*/ | ||
if (NodeIsPrimary(workerNode)) | ||
{ | ||
/* | ||
* before acquiring the locks check if we want a background worker to help us to | ||
* aggressively obtain the locks. | ||
*/ | ||
if (force) | ||
{ | ||
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); | ||
if (!handle) | ||
{ | ||
/* | ||
* We failed to start a background worker, which probably means that we exceeded | ||
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want | ||
* to repeatedly throw an error because if citus_update_node is called to complete a | ||
* failover then finishing is the only way to bring the cluster back up. Therefore we | ||
* give up on killing other backends and simply wait for the lock. We do set | ||
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. | ||
*/ | ||
SetLockTimeoutLocally(lock_cooldown); | ||
ereport(WARNING, (errmsg( | ||
"could not start background worker to kill backends with conflicting" | ||
" locks to force the update. Degrading to acquiring locks " | ||
"with a lock time out."), | ||
errhint( | ||
"Increasing max_worker_processes might help."))); | ||
} | ||
} | ||
|
||
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); | ||
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); | ||
} | ||
BackgroundWorkerHandle *handle = LockPlacementsWithBackgroundWorkersInPrimaryNode( | ||
workerNode, force, | ||
lock_cooldown); | ||
|
||
/* | ||
* if we have planned statements such as prepared statements, we should clear the cache so that | ||
|
@@ -1330,6 +1373,34 @@ | |
} | ||
|
||
|
||
/* | ||
* This function is designed to obtain locks for all the shards in a worker placement list. | ||
* Once the transaction is committed, the acquired locks will be automatically released. | ||
* Therefore, it is essential to invoke this function within a transaction. | ||
* This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction. | ||
*/ | ||
Datum | ||
citus_pause_node_within_txn(PG_FUNCTION_ARGS) | ||
{ | ||
CheckCitusVersion(ERROR); | ||
|
||
int32 nodeId = PG_GETARG_INT32(0); | ||
aykut-bozkurt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bool force = PG_GETARG_BOOL(1); | ||
int32 lock_cooldown = PG_GETARG_INT32(2); | ||
|
||
WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); | ||
if (workerNode == NULL) | ||
{ | ||
ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tests to cover this line and I can see that tests are being executed from the output. But still seeing below warning. What could be the reason? Tests file result: https://github.com/citusdata/citus/blob/ed40dfe1a7482679aa4bc2068db19cba3655f337/src/test/regress/expected/isolation_citus_pause_node.out#L80C1-L111C18 |
||
errmsg("node %u not found", nodeId))); | ||
} | ||
|
||
LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown); | ||
|
||
PG_RETURN_VOID(); | ||
} | ||
|
||
|
||
/* | ||
* master_update_node is a wrapper function for old UDF name. | ||
*/ | ||
|
@@ -1947,7 +2018,8 @@ | |
ereport(ERROR, (errmsg("cannot remove or disable the node " | ||
"%s:%d because because it contains " | ||
"the only shard placement for " | ||
"shard " UINT64_FORMAT, workerNode->workerName, | ||
"shard " UINT64_FORMAT, | ||
workerNode->workerName, | ||
workerNode->workerPort, placement->shardId), | ||
errdetail("One of the table(s) that prevents the operation " | ||
"complete successfully is %s", | ||
|
@@ -2499,7 +2571,8 @@ | |
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) | ||
{ | ||
ereport(ERROR, (errmsg("cannot change \"%s\" field of the " | ||
"coordinator node", field))); | ||
"coordinator node", | ||
field))); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
-- citus--12.1-1--12.0-1 | ||
|
||
DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int); | ||
-- we have modified the relevant upgrade script to include any_value changes | ||
-- we don't need to upgrade this downgrade path for any_value changes | ||
-- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op. | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, | ||
force bool DEFAULT false, | ||
lock_cooldown int DEFAULT 10000) | ||
RETURNS void | ||
LANGUAGE C STRICT | ||
AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; | ||
|
||
COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, | ||
force bool , | ||
lock_cooldown int ) | ||
IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; | ||
|
||
REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More code from citus_update_node should be included here. Basically everything starting at the big comment at line 1246. Because citus_pause_node_within_txn should support the
force
argument as well.Also the comment about why we don't run this on secondaries should be included.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the code you requested