Skip to content

Commit

Permalink
refactor and test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Aug 25, 2023
1 parent ef1af85 commit b9b2174
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 71 deletions.
6 changes: 6 additions & 0 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,12 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));

/*
* Track the propagation of the distributed table and its sequences in the current
* transaction.
*/
TrackPropagatedTable(relationId);
}


Expand Down
91 changes: 42 additions & 49 deletions src/backend/distributed/commands/dependencies.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,13 @@ static bool ShouldPropagateObject(const ObjectAddress *address);
static char * DropTableIfExistsCommand(Oid relationId);


/*
* Memory context and hash map for the distributed objects created in the current
* transaction.
*/

/*
* Memory context and data structure for storing the objects propagated in the
* current transaction. We push a new stack for tracking each subtransaction's objects.
* In each stack, we store the objects propagated in the subtransaction.
*/
static MemoryContext ObjectsPropagatedInTxContext = NULL;
static List *TxObjectsPropagated = NIL;
static MemoryContext PropagatedObjectsInTxContext = NULL;
static List *PropagatedObjectsInTx = NIL;


/*
Expand All @@ -62,7 +57,7 @@ static List *TxObjectsPropagated = NIL;
void
InitObjectsPropagatedContext(void)
{
ObjectsPropagatedInTxContext = AllocSetContextCreateInternal(TopMemoryContext,
PropagatedObjectsInTxContext = AllocSetContextCreateInternal(TopMemoryContext,
"Tx Propagated Object Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
Expand All @@ -71,37 +66,53 @@ InitObjectsPropagatedContext(void)


/*
* PushObjectsPropagatedHash pushes a new hashmap to stack to track objects propagated
* PushPropagatedObjectsHash pushes a new hashmap to stack to track objects propagated
* in the current (sub)/transaction.
*/
void
PushObjectsPropagatedHash(void)
PushPropagatedObjectsHash(void)
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ObjectAddress);
info.entrysize = sizeof(ObjectAddress);
info.hash = tag_hash;
info.hcxt = ObjectsPropagatedInTxContext;
info.hcxt = PropagatedObjectsInTxContext;
uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);

HTAB *propagatedObjectsHash = hash_create("citus tx propagated objects hash",
8, &info, hashFlags);

MemoryContext oldContext = MemoryContextSwitchTo(ObjectsPropagatedInTxContext);
TxObjectsPropagated = lappend(TxObjectsPropagated, propagatedObjectsHash);
MemoryContext oldContext = MemoryContextSwitchTo(PropagatedObjectsInTxContext);
PropagatedObjectsInTx = lappend(PropagatedObjectsInTx, propagatedObjectsHash);
MemoryContextSwitchTo(oldContext);
}


/*
* PopObjectsPropagatedHash removes the hashmap for the objects propagated
* PopPropagatedObjectsHash removes the hashmap for the objects propagated
* in the current subtransaction from the stack.
*/
void
PopObjectsPropagatedHash(void)
PopPropagatedObjectsHash(void)
{
PropagatedObjectsInTx = list_delete_last(PropagatedObjectsInTx);
}


/*
* ResetPropagatedObjects resets all objects propagated in the current transaction.
*/
void
ResetPropagatedObjects(void)
{
TxObjectsPropagated = list_delete_last(TxObjectsPropagated);
HTAB *objectPropagatedHash = NULL;
foreach_ptr(objectPropagatedHash, PropagatedObjectsInTx)
{
hash_destroy(objectPropagatedHash);
}
list_free(PropagatedObjectsInTx);
PropagatedObjectsInTx = NIL;
}


Expand All @@ -112,12 +123,12 @@ PopObjectsPropagatedHash(void)
void
TrackPropagatedObject(const ObjectAddress *objectAddress)
{
if (TxObjectsPropagated == NIL)
if (PropagatedObjectsInTx == NIL)
{
return;
}

HTAB *currentObjectsPropagatedHash = (HTAB *) llast(TxObjectsPropagated);
HTAB *currentObjectsPropagatedHash = (HTAB *) llast(PropagatedObjectsInTx);
hash_search(currentObjectsPropagatedHash, objectAddress, HASH_ENTER, NULL);
}

Expand All @@ -129,11 +140,12 @@ TrackPropagatedObject(const ObjectAddress *objectAddress)
void
TrackPropagatedTable(Oid relationId)
{
/* track table */
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
TrackPropagatedObject(tableAddress);

/* track its sequences as well */
/* track its sequences */
List *ownedSeqIdList = getOwnedSequences(relationId);
Oid ownedSeqId = InvalidOid;
foreach_oid(ownedSeqId, ownedSeqIdList)
Expand All @@ -146,33 +158,23 @@ TrackPropagatedTable(Oid relationId)


/*
* ResetObjectsPropagated resets all objects propagated in the current transaction.
* HasAnyDepInPropagatedObjects decides if any dependency of given object is
* propagated in the current transaction.
*/
void
ResetObjectsPropagated(void)
bool
HasAnyDepInPropagatedObjects(const ObjectAddress *objectAddress)
{
HTAB *objectPropagatedHash = NULL;
foreach_ptr(objectPropagatedHash, TxObjectsPropagated)
if (list_length(PropagatedObjectsInTx) == 0)
{
hash_destroy(objectPropagatedHash);
return false;
}
list_free(TxObjectsPropagated);
TxObjectsPropagated = NIL;
}


/*
* HasAnyDepInObjectsPropagated decides if any object in given list is propagated in
* the current transaction.
*/
bool
HasAnyDepInObjectsPropagated(List *dependencyList)
{
List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencyList)
{
HTAB *propagatedObjectsHash = NULL;
foreach_ptr(propagatedObjectsHash, TxObjectsPropagated)
foreach_ptr(propagatedObjectsHash, PropagatedObjectsInTx)
{
bool found = false;
hash_search(propagatedObjectsHash, dependency, HASH_FIND, &found);
Expand Down Expand Up @@ -264,19 +266,16 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)

/*
* We can propagate dependencies via the current user's metadata connection if
* any dependency is created in the current transaction. Our assumption is that
* if we can find a dependency created in the current transaction, then current
* any dependency for the target is created in the current transaction. Our assumption
* is that if we can find a dependency created in the current transaction, then current
* user, most probably, has permissions to create the target object as well. Note
* that, user still may not be able to create the target due to no permissions for
* any of the dependencies. But this is ok since it should be rare. If we opted to
* use outside transaction, then there would be visibility issue on outside
* transaction as we propagated objects via metadata connection and they are invisible
* to outside transaction until we locally commit.
*/
List *allSupportedDepsForTarget = GetAllSupportedDependenciesForObject(target);
bool anyDepPropagatedInCurrentTx = HasAnyDepInObjectsPropagated(
allSupportedDepsForTarget);
if (anyDepPropagatedInCurrentTx)
if (HasAnyDepInPropagatedObjects(target))
{
SendCommandListToWorkersWithMetadata(ddlCommands);
}
Expand Down Expand Up @@ -311,12 +310,6 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
*/
MarkObjectDistributedViaSuperUser(dependency);
}

/* track the propagation of the distributed table in the current transaction */
if (target->classId == RelationRelationId)
{
TrackPropagatedTable(target->objectId);
}
}


Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
* Push a new hash map for tracking objects propagated in the current
* transaction.
*/
PushObjectsPropagatedHash();
PushPropagatedObjectsHash();
}

PG_TRY();
Expand Down
8 changes: 4 additions & 4 deletions src/backend/distributed/transaction/transaction_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)

ResetGlobalVariables();
ResetRelationAccessHash();
ResetObjectsPropagated();
ResetPropagatedObjects();

/*
* Make sure that we give the shared connections back to the shared
Expand Down Expand Up @@ -393,7 +393,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)

ResetGlobalVariables();
ResetRelationAccessHash();
ResetObjectsPropagated();
ResetPropagatedObjects();

/* Reset any local replication origin session since transaction has been aborted.*/
ResetReplicationOriginLocalSession();
Expand Down Expand Up @@ -665,7 +665,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
* Push a new hash map for tracking objects propagated in the current
* subtransaction.
*/
PushObjectsPropagatedHash();
PushPropagatedObjectsHash();

MemoryContextSwitchTo(previousContext);

Expand Down Expand Up @@ -729,7 +729,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
}

/* remove the last hashmap from propagated objects stack */
PopObjectsPropagatedHash();
PopPropagatedObjectsHash();

/* Reset any local replication origin session since subtransaction has been aborted.*/
ResetReplicationOriginLocalSession();
Expand Down
8 changes: 4 additions & 4 deletions src/include/distributed/metadata/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ extern List * FilterObjectAddressListByPredicate(List *objectAddressList,
AddressPredicate predicate);

extern void InitObjectsPropagatedContext(void);
extern void PushObjectsPropagatedHash(void);
extern void PopObjectsPropagatedHash(void);
extern void PushPropagatedObjectsHash(void);
extern void PopPropagatedObjectsHash(void);
extern void ResetPropagatedObjects(void);
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
extern void TrackPropagatedTable(Oid relationId);
extern void ResetObjectsPropagated(void);
extern bool HasAnyDepInObjectsPropagated(List *dependencyList);
extern bool HasAnyDepInPropagatedObjects(const ObjectAddress *objectAddress);

#endif /* CITUS_DEPENDENCY_H */
27 changes: 20 additions & 7 deletions src/test/regress/expected/function_propagation.out
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas

-- Adding a column with constraint should propagate the function
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
CREATE TABLE table_to_prop_func_8(id int, col_1 int);
SELECT create_distributed_table('table_to_prop_func_8', 'id');
create_distributed_table
Expand All @@ -694,17 +695,29 @@ BEGIN;
return param_1 > 5;
END;
$$;
ERROR: cannot run function command because there was a parallel operation on a distributed table in the transaction
DETAIL: When running command on/for a distributed function, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;
ERROR: current transaction is aborted, commands ignored until end of transaction block
pg_identify_object_as_address
---------------------------------------------------------------------
(function,"{function_propagation_schema,func_in_transaction_10}",{integer})
(1 row)

ALTER TABLE table_to_prop_func_8 ADD CONSTRAINT col1_check CHECK (function_propagation_schema.func_in_transaction_10(col_1));
ERROR: current transaction is aborted, commands ignored until end of transaction block
-- Function should be marked as distributed after adding the constraint
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
pg_identify_object_as_address
---------------------------------------------------------------------
(function,"{function_propagation_schema,func_in_transaction_10}",{integer})
(1 row)

COMMIT;
-- Function should be marked as distributed on the worker after committing changes
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_10}",{integer})
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_10}",{integer})
(2 rows)

-- If constraint depends on a non-distributed table it should error out
BEGIN;
CREATE TABLE local_table_for_const(id int);
Expand Down
11 changes: 6 additions & 5 deletions src/test/regress/expected/propagate_extension_commands.out
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ SET search_path TO "extension'test";
-- check restriction for sequential execution
-- enable it and see that create command errors but continues its execution by changing citus.multi_shard_modify_mode TO 'off
BEGIN;
SET citus.multi_shard_modify_mode TO sequential;
CREATE TABLE some_random_table (a int);
SELECT create_distributed_table('some_random_table', 'a');
create_distributed_table
Expand All @@ -353,13 +354,13 @@ BEGIN;
(1 row)

CREATE EXTENSION seg;
ERROR: cannot run extension command because there was a parallel operation on a distributed table in the transaction
DETAIL: When running command on/for a distributed extension, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
CREATE TABLE some_random_table_2 (a int, b seg);
ERROR: current transaction is aborted, commands ignored until end of transaction block
SELECT create_distributed_table('some_random_table_2', 'a');
ERROR: current transaction is aborted, commands ignored until end of transaction block
create_distributed_table
---------------------------------------------------------------------

(1 row)

ROLLBACK;
-- show that the CREATE EXTENSION command propagated even if the transaction
-- block is rollbacked, that's a shortcoming of dependency creation logic
Expand Down
6 changes: 5 additions & 1 deletion src/test/regress/sql/function_propagation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas

-- Adding a column with constraint should propagate the function
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
CREATE TABLE table_to_prop_func_8(id int, col_1 int);
SELECT create_distributed_table('table_to_prop_func_8', 'id');

Expand All @@ -433,7 +434,10 @@ BEGIN;

-- Function should be marked as distributed after adding the constraint
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;
ROLLBACK;
COMMIT;

-- Function should be marked as distributed on the worker after committing changes
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;$$) ORDER BY 1,2;


-- If constraint depends on a non-distributed table it should error out
Expand Down
1 change: 1 addition & 0 deletions src/test/regress/sql/propagate_extension_commands.sql
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ SET search_path TO "extension'test";
-- enable it and see that create command errors but continues its execution by changing citus.multi_shard_modify_mode TO 'off

BEGIN;
SET citus.multi_shard_modify_mode TO sequential;
CREATE TABLE some_random_table (a int);
SELECT create_distributed_table('some_random_table', 'a');
CREATE EXTENSION seg;
Expand Down

0 comments on commit b9b2174

Please sign in to comment.