Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds citus_pause_node udf #7089

Merged
merged 69 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
9d46cd8
Adds initial code
gurkanindibay Jul 25, 2023
304a00e
Fixed unit test problems
gurkanindibay Jul 28, 2023
7ccbb84
Adds unit tests
gurkanindibay Jul 28, 2023
b178a75
Fixes static code analysis issues
gurkanindibay Jul 28, 2023
20ae610
Removes unnecessary changes
gurkanindibay Jul 28, 2023
46ab6f1
Removes unnecessary changes
gurkanindibay Jul 28, 2023
8928c0f
Adds multi_extension.out output
gurkanindibay Jul 28, 2023
6da0baa
Adds citus_pause_node into test files
gurkanindibay Jul 28, 2023
afa7bf6
Fixes upgrade_list_citus_objects diff
gurkanindibay Jul 28, 2023
515627e
Adds code to debug
gurkanindibay Jul 28, 2023
d42f557
Adds node id detection with shard_id
gurkanindibay Jul 28, 2023
4ed78f1
Fixes errors in code
gurkanindibay Jul 28, 2023
24380f8
Fixes unit tests
gurkanindibay Jul 28, 2023
28cda81
Fixes coverage issue
gurkanindibay Jul 28, 2023
ed40dfe
Give details for exception message
gurkanindibay Jul 28, 2023
63311e5
Fixes some review notes
gurkanindibay Jul 29, 2023
1c05eeb
Updates udf name
gurkanindibay Jul 29, 2023
339a47a
Fixes review comments
gurkanindibay Jul 29, 2023
4c3341e
Fixes indentation
gurkanindibay Jul 29, 2023
b69c36a
Fixes static code analysis issues
gurkanindibay Jul 29, 2023
c41f93e
Fixes indentation
gurkanindibay Jul 29, 2023
1a1b633
Fixes multi extension tests
gurkanindibay Jul 29, 2023
3220bd9
Fixes test errors after rebase
gurkanindibay Jul 29, 2023
6f2ddf4
Removes empty line
gurkanindibay Jul 29, 2023
cc403bf
Fixes upgrade tests
gurkanindibay Jul 29, 2023
a05d5fc
Fixes upgrade tests
gurkanindibay Jul 29, 2023
29c5b0c
Adds comments to methods
gurkanindibay Jul 29, 2023
9e79cd6
Fixes indent issues
gurkanindibay Jul 29, 2023
997a5d7
Merge branch 'main' into citus_pause_node
gurkanindibay Jul 31, 2023
bb62b84
Parameterizes node id in no node test
gurkanindibay Jul 28, 2023
3edefdc
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 1, 2023
b471bb0
Merge branch 'citus_pause_node' of https://github.com/citusdata/citus…
gurkanindibay Jul 29, 2023
88695c6
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 2, 2023
dd72cf0
Merge branch 'citus_pause_node' of https://github.com/citusdata/citus…
gurkanindibay Jul 29, 2023
18c55a4
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 4, 2023
86e0831
Merge branch 'citus_pause_node' of https://github.com/citusdata/citus…
gurkanindibay Jul 29, 2023
be2e653
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 4, 2023
d10eb05
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 6, 2023
de83b01
Update src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql
gurkanindibay Aug 7, 2023
3fbe5e4
Update src/backend/distributed/sql/citus--12.0-1--12.1-1.sql
gurkanindibay Aug 7, 2023
1a5cf9d
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 8, 2023
d9cecba
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 14, 2023
48a5450
Fixes review comments
gurkanindibay Aug 17, 2023
458edd8
Fixes code changes
gurkanindibay Aug 17, 2023
eda5539
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 17, 2023
7ac5f21
Adds a new empty space
gurkanindibay Aug 17, 2023
dd90ec7
Fixes review issues
gurkanindibay Aug 17, 2023
4030ec9
Fixes formatting
gurkanindibay Aug 17, 2023
af29e2d
Fixes static code analysis issues
gurkanindibay Aug 17, 2023
6d171e9
Fixes issues
gurkanindibay Aug 17, 2023
22303ad
Fixes test problems
gurkanindibay Aug 17, 2023
850df01
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 18, 2023
ecc675b
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 24, 2023
c4d694e
Removes timeouts
gurkanindibay Aug 24, 2023
1e4699c
Add comment for CheckBackgroundWorkerToObtainLocks
gurkanindibay Aug 24, 2023
fcc6903
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 24, 2023
796a740
Fixes indentation
gurkanindibay Aug 24, 2023
43abc8b
Adds additional message for force
gurkanindibay Aug 24, 2023
820ef06
Fixes varyinn inputs
gurkanindibay Aug 24, 2023
4af29ed
Removes unnecessary sql blocks in tests
gurkanindibay Aug 29, 2023
8fb4d2f
Fixes review comments
gurkanindibay Aug 29, 2023
463845e
Fixes review comments
gurkanindibay Aug 29, 2023
c42413b
Removes unnecessary file
gurkanindibay Aug 29, 2023
365b762
Increases statement timeout to pass flaky tests
gurkanindibay Aug 29, 2023
5ded545
Fixes test error
gurkanindibay Aug 29, 2023
2ac620d
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 30, 2023
d355193
Fixes test errors
gurkanindibay Aug 29, 2023
3784889
Fixes timeout problem using lock cooldown
gurkanindibay Sep 1, 2023
43d961f
Merge branch 'main' into citus_pause_node
gurkanindibay Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 142 additions & 69 deletions src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "funcapi.h"
#include "utils/plancache.h"


#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
Expand Down Expand Up @@ -102,8 +101,8 @@
static int32 GetNextGroupId(void);
static int GetNextNodeId(void);
static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport,
NodeMetadata *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
Expand Down Expand Up @@ -134,6 +133,13 @@
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void);
static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE
lockMode);
static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown);
static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode(
WorkerNode *workerNode, bool force, int32 lock_cooldown);

/* Function definitions go here */

/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
Expand All @@ -152,6 +158,7 @@
PG_FUNCTION_INFO_V1(citus_activate_node);
PG_FUNCTION_INFO_V1(master_activate_node);
PG_FUNCTION_INFO_V1(citus_update_node);
PG_FUNCTION_INFO_V1(citus_pause_node_within_txn);
PG_FUNCTION_INFO_V1(master_update_node);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
Expand All @@ -160,7 +167,6 @@
PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);


/*
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
Expand Down Expand Up @@ -544,7 +550,8 @@
"metadata is not allowed"),
errhint("You can force disabling node, SELECT "
"citus_disable_node('%s', %d, "
"synchronous:=true);", workerNode->workerName,
"synchronous:=true);",
workerNode->workerName,
nodePort),
errdetail("Citus uses the first worker node in the "
"metadata for certain internal operations when "
Expand Down Expand Up @@ -693,8 +700,7 @@
else
{
ereport(ERROR, (errmsg(
"only the 'shouldhaveshards' property can be set using this function"
)));
"only the 'shouldhaveshards' property can be set using this function")));
}

TransactionModifiedNodeMetadata = true;
Expand Down Expand Up @@ -1160,6 +1166,100 @@
}


/*
* Acquires shard metadata locks on all shards residing in the given worker node
*
* TODO: This function is not compatible with query from any node feature.
* To ensure proper behavior, it is essential to acquire locks on placements across all nodes
* rather than limiting it to just the coordinator (or the specific node from which this function is called)
*/
void
LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode)
{
List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
Copy link
Contributor

@JelteF JelteF Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More code from citus_update_node should be included here. Basically everything starting at the big comment at line 1246. Because citus_pause_node_within_txn should support the force argument as well.

Also the comment about why we don't run this on secondaries should be included.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the code you requested

LockShardsInPlacementListMetadata(placementList, lockMode);
}


/*
* This function is used to start a background worker to kill backends holding conflicting
* locks with this backend. It returns NULL if the background worker could not be started.
*/
BackgroundWorkerHandle *
CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function needs a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

{
BackgroundWorkerHandle *handle = StartLockAcquireHelperBackgroundWorker(MyProcPid,
lock_cooldown);
if (!handle)
{
/*
* We failed to start a background worker, which probably means that we exceeded
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
* to repeatedly throw an error because if citus_update_node is called to complete a
* failover then finishing is the only way to bring the cluster back up. Therefore we
* give up on killing other backends and simply wait for the lock. We do set
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
*/
SetLockTimeoutLocally(lock_cooldown);
ereport(WARNING, (errmsg(

Check warning on line 1204 in src/backend/distributed/metadata/node_metadata.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/node_metadata.c#L1203-L1204

Added lines #L1203 - L1204 were not covered by tests
"could not start background worker to kill backends with conflicting"
" locks to force the update. Degrading to acquiring locks "
"with a lock time out."),
errhint(
"Increasing max_worker_processes might help.")));
}
return handle;
}


/*
* This function is used to lock shards in a primary node.
* If force is true, we start a background worker to kill backends holding
* conflicting locks with this backend.
*
* If the node is a primary node we block reads and writes.
*
* This lock has two purposes:
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* In case of node failure said long-running queries will fail in the end
* anyway as they will be unable to commit successfully on the failed
* machine. To cause quick failure of these queries use force => true
* during the invocation of citus_update_node to terminate conflicting
* backends proactively.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
BackgroundWorkerHandle *
LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32
lock_cooldown)
{
BackgroundWorkerHandle *handle = NULL;

if (NodeIsPrimary(workerNode))
{
if (force)
{
handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown);
}
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
}
return handle;
}


/*
* citus_update_node moves the requested node to a different nodename and nodeport. It
* locks to ensure no queries are running concurrently; and is intended for customers who
Expand Down Expand Up @@ -1188,8 +1288,6 @@
int32 lock_cooldown = PG_GETARG_INT32(4);

char *newNodeNameString = text_to_cstring(newNodeName);
List *placementList = NIL;
BackgroundWorkerHandle *handle = NULL;

WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
newNodePort);
Expand Down Expand Up @@ -1226,64 +1324,9 @@
EnsureTransactionalMetadataSyncMode();
}

/*
* If the node is a primary node we block reads and writes.
*
* This lock has two purposes:
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* In case of node failure said long-running queries will fail in the end
* anyway as they will be unable to commit successfully on the failed
* machine. To cause quick failure of these queries use force => true
* during the invocation of citus_update_node to terminate conflicting
* backends proactively.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
if (NodeIsPrimary(workerNode))
{
/*
* before acquiring the locks check if we want a background worker to help us to
* aggressively obtain the locks.
*/
if (force)
{
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
if (!handle)
{
/*
* We failed to start a background worker, which probably means that we exceeded
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
* to repeatedly throw an error because if citus_update_node is called to complete a
* failover then finishing is the only way to bring the cluster back up. Therefore we
* give up on killing other backends and simply wait for the lock. We do set
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
*/
SetLockTimeoutLocally(lock_cooldown);
ereport(WARNING, (errmsg(
"could not start background worker to kill backends with conflicting"
" locks to force the update. Degrading to acquiring locks "
"with a lock time out."),
errhint(
"Increasing max_worker_processes might help.")));
}
}

placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
}
BackgroundWorkerHandle *handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(
workerNode, force,
lock_cooldown);

/*
* if we have planned statements such as prepared statements, we should clear the cache so that
Expand Down Expand Up @@ -1330,6 +1373,34 @@
}


/*
* This function is designed to obtain locks for all the shards in a worker placement list.
* Once the transaction is committed, the acquired locks will be automatically released.
* Therefore, it is essential to invoke this function within a transaction.
* This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction.
*/
Datum
citus_pause_node_within_txn(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

int32 nodeId = PG_GETARG_INT32(0);
aykut-bozkurt marked this conversation as resolved.
Show resolved Hide resolved
bool force = PG_GETARG_BOOL(1);
int32 lock_cooldown = PG_GETARG_INT32(2);

WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),

Check warning on line 1394 in src/backend/distributed/metadata/node_metadata.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/node_metadata.c#L1394

Added line #L1394 was not covered by tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tests to cover this line and I can see that tests are being executed from the output. But still seeing below warning. What could be the reason?

Tests file result: https://github.com/citusdata/citus/blob/ed40dfe1a7482679aa4bc2068db19cba3655f337/src/test/regress/expected/isolation_citus_pause_node.out#L80C1-L111C18

errmsg("node %u not found", nodeId)));
}

LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown);

PG_RETURN_VOID();
}


/*
* master_update_node is a wrapper function for old UDF name.
*/
Expand Down Expand Up @@ -1947,7 +2018,8 @@
ereport(ERROR, (errmsg("cannot remove or disable the node "
"%s:%d because because it contains "
"the only shard placement for "
"shard " UINT64_FORMAT, workerNode->workerName,
"shard " UINT64_FORMAT,
workerNode->workerName,
workerNode->workerPort, placement->shardId),
errdetail("One of the table(s) that prevents the operation "
"complete successfully is %s",
Expand Down Expand Up @@ -2499,7 +2571,8 @@
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
"coordinator node", field)));
"coordinator node",
field)));
}
}

Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--12.0-1--12.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-- bump version to 12.1-1

#include "udfs/citus_pause_node_within_txn/12.1-1.sql"
#include "udfs/citus_prepare_pg_upgrade/12.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/12.1-1.sql"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- citus--12.1-1--12.0-1

DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int);
-- we have modified the relevant upgrade script to include any_value changes
-- we don't need to upgrade this downgrade path for any_value changes
-- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op.
Expand All @@ -12,3 +12,4 @@ DROP FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
placement_id bigint
);

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
force bool DEFAULT false,
lock_cooldown int DEFAULT 10000)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$;

COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
force bool ,
lock_cooldown int )
IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node';

REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC;
Loading