Skip to content

Commit

Permalink
rename structures and methods
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Jul 10, 2023
1 parent ca4c0fb commit 4b7ecf7
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 75 deletions.
105 changes: 53 additions & 52 deletions src/backend/distributed/commands/dependencies.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,91 +47,91 @@ static char * DropTableIfExistsCommand(Oid relationId);
*/

/*
* Memory context and data structure for storing distributed objects created in the
* current transaction. We push stack for each subtransaction. We store hashmap of
* distributed objects created in the subtransaction in each stack.
* Memory context and data structure for storing the objects propagated in the
* current transaction. We push a new stack for tracking each subtransaction's objects.
* In each stack, we store the objects propagated in the subtransaction.
*/
static MemoryContext TxDistObjectsContext = NULL;
static List *TxDistObjects = NIL;
static MemoryContext ObjectsPropagatedInTxContext = NULL;
static List *TxObjectsPropagated = NIL;


/*
* InitDistObjectContext allocates memory context to track distributed objects
* created in the current transaction.
* InitObjectsPropagatedContext allocates memory context to track propagated objects
* in the current transaction.
*/
void
InitDistObjectContext(void)
InitObjectsPropagatedContext(void)
{
TxDistObjectsContext = AllocSetContextCreateInternal(TopMemoryContext,
"Tx Dist Object Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
ObjectsPropagatedInTxContext = AllocSetContextCreateInternal(TopMemoryContext,
"Tx Propagated Object Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}


/*
* PushDistObjectHash pushes a new hashmap to stack to track distributed objects
* created in the current subtransaction.
* PushObjectsPropagatedHash pushes a new hashmap to stack to track objects propagated
* in the current (sub)/transaction.
*/
void
PushDistObjectHash(void)
PushObjectsPropagatedHash(void)
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ObjectAddress);
info.entrysize = sizeof(ObjectAddress);
info.hash = tag_hash;
info.hcxt = TxDistObjectsContext;
info.hcxt = ObjectsPropagatedInTxContext;
uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);

HTAB *distObjectsHash = hash_create("citus tx dist objects hash",
8, &info, hashFlags);
HTAB *propagatedObjectsHash = hash_create("citus tx propagated objects hash",
8, &info, hashFlags);

MemoryContext oldContext = MemoryContextSwitchTo(TxDistObjectsContext);
TxDistObjects = lappend(TxDistObjects, distObjectsHash);
MemoryContext oldContext = MemoryContextSwitchTo(ObjectsPropagatedInTxContext);
TxObjectsPropagated = lappend(TxObjectsPropagated, propagatedObjectsHash);
MemoryContextSwitchTo(oldContext);
}


/*
* PopDistObjectHash removes the hashmap for the distributed objects created
* PopObjectsPropagatedHash removes the hashmap for the objects propagated
* in the current subtransaction from the stack.
*/
void
PopDistObjectHash(void)
PopObjectsPropagatedHash(void)
{
TxDistObjects = list_delete_last(TxDistObjects);
TxObjectsPropagated = list_delete_last(TxObjectsPropagated);
}


/*
* AddToCurrentDistObjects adds given object into the distributed objects created in
* the current subtransaction.
* TrackPropagatedObject adds given object into the objects propagated in the current
* subtransaction.
*/
void
AddToCurrentDistObjects(const ObjectAddress *objectAddress)
TrackPropagatedObject(const ObjectAddress *objectAddress)
{
if (TxDistObjects == NIL)
if (TxObjectsPropagated == NIL)
{
return;
}

HTAB *currentTxdistHash = (HTAB *) llast(TxDistObjects);
hash_search(currentTxdistHash, objectAddress, HASH_ENTER, NULL);
HTAB *currentObjectsPropagatedHash = (HTAB *) llast(TxObjectsPropagated);
hash_search(currentObjectsPropagatedHash, objectAddress, HASH_ENTER, NULL);
}


/*
* AddTableToCurrentDistObjects adds given table and its sequences to the distributed
* objects created in the current subtransaction.
* TrackPropagatedTable adds given table and its sequences to the objects propagated
* in the current subtransaction.
*/
void
AddTableToCurrentDistObjects(Oid relationId)
TrackPropagatedTable(Oid relationId)
{
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
AddToCurrentDistObjects(tableAddress);
TrackPropagatedObject(tableAddress);

/* track its sequences as well */
List *ownedSeqIdList = getOwnedSequences(relationId);
Expand All @@ -140,42 +140,42 @@ AddTableToCurrentDistObjects(Oid relationId)
{
ObjectAddress *seqAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*seqAddress, RelationRelationId, ownedSeqId);
AddToCurrentDistObjects(seqAddress);
TrackPropagatedObject(seqAddress);
}
}


/*
* ResetDistObjects resets all distributed objects created in the current transaction.
* ResetObjectsPropagated resets all objects propagated in the current transaction.
*/
void
ResetDistObjects(void)
ResetObjectsPropagated(void)
{
HTAB *currentDistObjectHash = NULL;
foreach_ptr(currentDistObjectHash, TxDistObjects)
HTAB *objectPropagatedHash = NULL;
foreach_ptr(objectPropagatedHash, TxObjectsPropagated)
{
hash_destroy(currentDistObjectHash);
hash_destroy(objectPropagatedHash);
}
list_free(TxDistObjects);
TxDistObjects = NIL;
list_free(TxObjectsPropagated);
TxObjectsPropagated = NIL;
}


/*
* HasAnyDepInTxDistObjects decides if any object in given list is created in the current
* transaction.
* HasAnyDepInObjectsPropagated decides if any object in given list is propagated in
* the current transaction.
*/
bool
HasAnyDepInTxDistObjects(List *dependencyList)
HasAnyDepInObjectsPropagated(List *dependencyList)
{
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencyList)
{
HTAB *distObjectsHash = NULL;
foreach_ptr(distObjectsHash, TxDistObjects)
HTAB *propagatedObjectsHash = NULL;
foreach_ptr(propagatedObjectsHash, TxObjectsPropagated)
{
bool found = false;
hash_search(distObjectsHash, dependency, HASH_FIND, &found);
hash_search(propagatedObjectsHash, dependency, HASH_FIND, &found);
if (found)
{
return true;
Expand Down Expand Up @@ -274,8 +274,9 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* to outside transaction until we locally commit.
*/
List *allSupportedDepsForTarget = GetAllSupportedDependenciesForObject(target);
bool anyDepCreatedInCurrentTx = HasAnyDepInTxDistObjects(allSupportedDepsForTarget);
if (anyDepCreatedInCurrentTx)
bool anyDepPropagatedInCurrentTx = HasAnyDepInObjectsPropagated(
allSupportedDepsForTarget);
if (anyDepPropagatedInCurrentTx)
{
SendCommandListToWorkersWithMetadata(ddlCommands);
}
Expand Down Expand Up @@ -311,10 +312,10 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
MarkObjectDistributedViaSuperUser(dependency);
}

/* track the creation of the distributed table in the current transaction */
/* track the propagation of the distributed table in the current transaction */
if (target->classId == RelationRelationId)
{
AddTableToCurrentDistObjects(target->objectId);
TrackPropagatedTable(target->objectId);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ multi_ProcessUtility(PlannedStmt *pstmt,
if (context == PROCESS_UTILITY_TOPLEVEL && UtilityHookLevel == 1)
{
/*
* Push a new stack item to track distributed objects created in current
* Push a new hash map for tracking objects propagated in the current
* transaction.
*/
PushDistObjectHash();
PushObjectsPropagatedHash();
}

PG_TRY();
Expand Down Expand Up @@ -933,7 +933,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
foreach_ptr(address, addresses)
{
MarkObjectDistributed(address);
AddToCurrentDistObjects(address);
TrackPropagatedObject(address);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ _PG_init(void)
InitializeConnectionManagement();
InitPlacementConnectionManagement();
InitRelationAccessHash();
InitDistObjectContext();
InitObjectsPropagatedContext();
InitializeCitusQueryStats();
InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections();
Expand Down
12 changes: 6 additions & 6 deletions src/backend/distributed/transaction/transaction_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)

ResetGlobalVariables();
ResetRelationAccessHash();
ResetDistObjects();
ResetObjectsPropagated();

/*
* Make sure that we give the shared connections back to the shared
Expand Down Expand Up @@ -393,7 +393,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)

ResetGlobalVariables();
ResetRelationAccessHash();
ResetDistObjects();
ResetObjectsPropagated();

/* Reset any local replication origin session since transaction has been aborted.*/
ResetReplicationOriginLocalSession();
Expand Down Expand Up @@ -662,10 +662,10 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
}

/*
* Push a new hash map for distributed objects created in the current
* Push a new hash map for tracking objects propagated in the current
* subtransaction.
*/
PushDistObjectHash();
PushObjectsPropagatedHash();

MemoryContextSwitchTo(previousContext);

Expand Down Expand Up @@ -728,8 +728,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SetCreateCitusTransactionLevel(0);
}

/* remove the hash map for distributed objects created in the current subtransaction */
PopDistObjectHash();
/* remove the last hashmap from propagated objects stack */
PopObjectsPropagatedHash();

/* Reset any local replication origin session since subtransaction has been aborted.*/
ResetReplicationOriginLocalSession();
Expand Down
14 changes: 7 additions & 7 deletions src/include/distributed/metadata/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ extern Oid GetDependingView(Form_pg_depend pg_depend);
extern List * FilterObjectAddressListByPredicate(List *objectAddressList,
AddressPredicate predicate);

extern void InitDistObjectContext(void);
extern void PushDistObjectHash(void);
extern void PopDistObjectHash(void);
extern void AddToCurrentDistObjects(const ObjectAddress *objectAddress);
extern void AddTableToCurrentDistObjects(Oid relationId);
extern void ResetDistObjects(void);
extern bool HasAnyDepInTxDistObjects(List *dependencyList);
extern void InitObjectsPropagatedContext(void);
extern void PushObjectsPropagatedHash(void);
extern void PopObjectsPropagatedHash(void);
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
extern void TrackPropagatedTable(Oid relationId);
extern void ResetObjectsPropagated(void);
extern bool HasAnyDepInObjectsPropagated(List *dependencyList);

#endif /* CITUS_DEPENDENCY_H */
6 changes: 3 additions & 3 deletions src/test/regress/expected/multi_schema_support.out
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ BEGIN;

ALTER SCHEMA bar RENAME TO foo;
ROLLBACK;
-- verify that Citus uses current user's metadata connection to propagate table deps since sc1, which is one of the deps of table s1, is created in the same transaction.
-- verify that Citus uses current user's metadata connection to propagate table deps since sc1, which is one of the deps of table s1, is propagated in the same transaction.
BEGIN;
CREATE SCHEMA sc1;
CREATE SEQUENCE sc1.seq;
Expand All @@ -1400,7 +1400,7 @@ DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to sequence sc1.seq
drop cascades to table sc1.s1
-- verify that Citus uses current user's metadata connection to propagate table deps since seq1, which is one of the deps of table s1, is created in the same transaction.
-- verify that Citus uses superuser outside connection to propagate table deps since none of the table's deps is propagated in the same transaction.
CREATE SCHEMA sc1;
BEGIN;
CREATE SEQUENCE sc1.seq1;
Expand All @@ -1416,7 +1416,7 @@ DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to sequence sc1.seq1
drop cascades to table sc1.s1
-- verify that Citus uses superuser outside connection to propagate table deps since noneof the table's deps is created in the same transaction.
-- verify that Citus uses superuser outside connection to propagate table deps since none of the table's deps is propagated in the same transaction.
SET citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET citus.enable_metadata_sync TO on;
Expand Down
6 changes: 3 additions & 3 deletions src/test/regress/sql/multi_schema_support.sql
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ BEGIN;
ALTER SCHEMA bar RENAME TO foo;
ROLLBACK;

-- verify that Citus uses current user's metadata connection to propagate table deps since sc1, which is one of the deps of table s1, is created in the same transaction.
-- verify that Citus uses current user's metadata connection to propagate table deps since sc1, which is one of the deps of table s1, is propagated in the same transaction.
BEGIN;
CREATE SCHEMA sc1;
CREATE SEQUENCE sc1.seq;
Expand All @@ -992,7 +992,7 @@ BEGIN;
COMMIT;
DROP SCHEMA sc1 CASCADE;

-- verify that Citus uses current user's metadata connection to propagate table deps since seq1, which is one of the deps of table s1, is created in the same transaction.
-- verify that Citus uses superuser outside connection to propagate table deps since none of the table's deps is propagated in the same transaction.
CREATE SCHEMA sc1;
BEGIN;
CREATE SEQUENCE sc1.seq1;
Expand All @@ -1001,7 +1001,7 @@ BEGIN;
COMMIT;
DROP SCHEMA sc1 CASCADE;

-- verify that Citus uses superuser outside connection to propagate table deps since noneof the table's deps is created in the same transaction.
-- verify that Citus uses superuser outside connection to propagate table deps since none of the table's deps is propagated in the same transaction.
SET citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET citus.enable_metadata_sync TO on;
Expand Down

0 comments on commit 4b7ecf7

Please sign in to comment.