Skip to content

Commit

Permalink
indent
Browse files Browse the repository at this point in the history
  • Loading branch information
m7hm7t committed Sep 10, 2024
1 parent 9b76b62 commit 842f5e4
Showing 1 changed file with 47 additions and 47 deletions.
94 changes: 47 additions & 47 deletions src/backend/distributed/operations/worker_shard_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,61 +100,61 @@ CanUseLocalCopy(uint32_t destinationNodeId)
static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
{
int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
false /* missingOk */);
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);

/* Begin the remote transaction */
RemoteTransactionBegin(copyDest->connection);
int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
false /* missingOk */);
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);

/* Begin the remote transaction */
RemoteTransactionBegin(copyDest->connection);

SetupReplicationOriginRemoteSession(copyDest->connection);

/* Handle TRUNCATE or any setup commands */
StringInfo truncateStatement = ConstructShardTruncateStatement(
copyDest->destinationShardFullyQualifiedName);
/* Handle TRUNCATE or any setup commands */
StringInfo truncateStatement = ConstructShardTruncateStatement(
copyDest->destinationShardFullyQualifiedName);

if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}

PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true);
if (!IsResponseOK(truncateResult))
{
ReportResultError(copyDest->connection, truncateResult, ERROR);
PQclear(truncateResult);
ForgetResults(copyDest->connection);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PQclear(truncateResult);
ForgetResults(copyDest->connection);

/* Construct and send the COPY statement with FREEZE */
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary,
copyDest->tupleDescriptor);

if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
ResetRemoteTransaction(copyDest->connection);

Check warning on line 127 in src/backend/distributed/operations/worker_shard_copy.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/worker_shard_copy.c#L125-L127

Added lines #L125 - L127 were not covered by tests
}

PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true);
if (!IsResponseOK(truncateResult))
{
ReportResultError(copyDest->connection, truncateResult, ERROR);
PQclear(truncateResult);
ForgetResults(copyDest->connection);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
ResetRemoteTransaction(copyDest->connection);

Check warning on line 137 in src/backend/distributed/operations/worker_shard_copy.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/worker_shard_copy.c#L133-L137

Added lines #L133 - L137 were not covered by tests
}
PQclear(truncateResult);
ForgetResults(copyDest->connection);

/* Construct and send the COPY statement with FREEZE */
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary,
copyDest->tupleDescriptor);

if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);

Check warning on line 152 in src/backend/distributed/operations/worker_shard_copy.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/worker_shard_copy.c#L151-L152

Added lines #L151 - L152 were not covered by tests
}

PGresult *copyResult = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */);
if (PQresultStatus(copyResult ) != PGRES_COPY_IN)
true /* raiseInterrupts */);
if (PQresultStatus(copyResult) != PGRES_COPY_IN)
{
ReportResultError(copyDest->connection, copyResult, ERROR);
}
Expand Down

0 comments on commit 842f5e4

Please sign in to comment.