Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support excludeTxnFromChangeStreams option #7749

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions Spanner/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,13 @@ public function beginTransaction(array $args)
$args = $this->addLarHeader($args, $this->larEnabled);
}

// NOTE: if set for read-only actions, will throw exception
if (isset($transactionOptions['excludeTxnFromChangeStreams'])) {
$options->setExcludeTxnFromChangeStreams(
$transactionOptions['excludeTxnFromChangeStreams']
);
}

$requestOptions = $this->pluck('requestOptions', $args, false) ?: [];
if ($requestOptions) {
$args['requestOptions'] = $this->serializer->decodeMessage(
Expand Down
26 changes: 15 additions & 11 deletions Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ public function runTransaction(callable $operation, array $options = [])
];

// There isn't anything configurable here.
$options['transactionOptions'] = $this->configureTransactionOptions();
$options['transactionOptions'] = $this->configureTransactionOptions($options['transactionOptions'] ?? []);

$session = $this->selectSession(
SessionPoolInterface::CONTEXT_READWRITE,
Expand Down Expand Up @@ -1556,7 +1556,7 @@ public function delete($table, KeySet $keySet, array $options = [])
* use Google\Cloud\Spanner\Session\SessionPoolInterface;
*
* $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [
* 'parameters' => [
* 'parameters' => [
* 'postId' => 1337
* ],
* 'begin' => true,
Expand All @@ -1573,7 +1573,7 @@ public function delete($table, KeySet $keySet, array $options = [])
* use Google\Cloud\Spanner\Session\SessionPoolInterface;
*
* $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [
* 'parameters' => [
* 'parameters' => [
* 'postId' => 1337
* ],
* 'begin' => true,
Expand All @@ -1593,11 +1593,10 @@ public function delete($table, KeySet $keySet, array $options = [])
* @param string $sql The query string to execute.
* @param array $options [optional] {
* Configuration Options.
* See [TransactionOptions](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions)
* for detailed description of available transaction options. Please
* note that only one of `$strong`, `$minReadTimestamp`,
* `$maxStaleness`, `$readTimestamp` or `$exactStaleness` may be set in
* a request.
* See {@see V1\TransactionOptions\PBReadOnly} for detailed description of
* available transaction options. Please note that only one of
* `$strong`, `$minReadTimestamp`, `$maxStaleness`, `$readTimestamp` or
* `$exactStaleness` may be set in a request.
*
* @type array $parameters A key/value array of Query Parameters, where
* the key is represented in the query string prefixed by a `@`
Expand Down Expand Up @@ -1899,11 +1898,16 @@ public function executePartitionedUpdate($statement, array $options = [])
unset($options['requestOptions']['transactionTag']);
$session = $this->selectSession(SessionPoolInterface::CONTEXT_READWRITE);

$transaction = $this->operation->transaction($session, [
$beginTransactionOptions = [
'transactionOptions' => [
'partitionedDml' => []
'partitionedDml' => [],
]
]);
];
if (isset($options['transactionOptions']['excludeTxnFromChangeStreams'])) {
$beginTransactionOptions['transactionOptions']['excludeTxnFromChangeStreams'] =
$options['transactionOptions']['excludeTxnFromChangeStreams'];
}
$transaction = $this->operation->transaction($session, $beginTransactionOptions);

$options = $this->addLarHeader($options);

Expand Down
7 changes: 7 additions & 0 deletions Spanner/src/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner;

use Google\ApiCore\ValidationException;
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Spanner\Session\Session;
use Google\Cloud\Spanner\Session\SessionPoolInterface;
Expand Down Expand Up @@ -239,6 +240,12 @@ public function getCommitStats()
*/
public function executeUpdate($sql, array $options = [])
{
if (isset($options['transaction']['begin']['excludeTxnFromChangeStreams'])) {
throw new ValidationException(
'The excludeTxnFromChangeStreams option cannot be set for individual DML requests.'
. ' This option should be set at the transaction level.'
);
}
$options = $this->buildUpdateOptions($options);
return $this->operation
->executeUpdate($this->session, $this, $sql, $options);
Expand Down
21 changes: 14 additions & 7 deletions Spanner/src/TransactionConfigurationTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,21 @@ private function transactionSelector(array &$options, array $previous = [])
'transactionType' => SessionPoolInterface::CONTEXT_READ,
];

$res = $this->transactionOptions($options, $previous);
[$transactionOptions, $type, $context] = $this->transactionOptions($options, $previous);

// TransactionSelector uses a different key name for singleUseTransaction
// and transactionId than transactionOptions, so we'll rewrite those here
// so transactionOptions works as expected for commitRequest.

$type = $res[1];
if ($type === 'singleUseTransaction') {
$type = 'singleUse';
} elseif ($type === 'transactionId') {
$type = 'id';
}

return [
[$type => $res[0]],
$res[2]
[$type => $transactionOptions],
$context
];
}

Expand Down Expand Up @@ -130,7 +129,9 @@ private function transactionOptions(array &$options, array $previous = [])
} elseif ($context === SessionPoolInterface::CONTEXT_READ) {
$transactionOptions = $this->configureSnapshotOptions($options, $previous);
} elseif ($context === SessionPoolInterface::CONTEXT_READWRITE) {
$transactionOptions = $this->configureTransactionOptions();
$transactionOptions = $this->configureTransactionOptions(
$type == 'begin' && is_array($begin) ? $begin : []
);
} else {
throw new \BadMethodCallException(sprintf(
'Invalid transaction context %s',
Expand All @@ -141,11 +142,17 @@ private function transactionOptions(array &$options, array $previous = [])
return [$transactionOptions, $type, $context];
}

private function configureTransactionOptions()
private function configureTransactionOptions(array $options = [])
{
return [
$transactionOptions = [
'readWrite' => []
];

if (isset($options['excludeTxnFromChangeStreams'])) {
$transactionOptions['excludeTxnFromChangeStreams'] = $options['excludeTxnFromChangeStreams'];
}

return $transactionOptions;
}

/**
Expand Down
149 changes: 143 additions & 6 deletions Spanner/tests/Unit/DatabaseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner\Tests\Unit;

use Google\ApiCore\ServerStream;
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\Exception\ServerException;
Expand All @@ -29,6 +30,7 @@
use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient;
use Google\Cloud\Spanner\Admin\Database\V1\DatabaseDialect;
use Google\Cloud\Spanner\Connection\ConnectionInterface;
use Google\Cloud\Spanner\Connection\Grpc;
use Google\Cloud\Spanner\Database;
use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\Instance;
Expand All @@ -43,7 +45,13 @@
use Google\Cloud\Spanner\Tests\StubCreationTrait;
use Google\Cloud\Spanner\Timestamp;
use Google\Cloud\Spanner\Transaction;
use Google\Cloud\Spanner\V1\ResultSet;
use Google\Cloud\Spanner\V1\ResultSetStats;
use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type as ReplicaType;
use Google\Cloud\Spanner\V1\Session as SessionProto;
use Google\Cloud\Spanner\V1\SpannerClient;
use Google\Cloud\Spanner\V1\Transaction as TransactionProto;
use Google\Cloud\Spanner\V1\TransactionOptions;
use Google\Rpc\Code;
use PHPUnit\Framework\TestCase;
use Prophecy\Argument;
Expand Down Expand Up @@ -103,19 +111,24 @@ public function setUp(): void
]);
$this->directedReadOptionsIncludeReplicas = [
'includeReplicas' => [
'autoFailoverDisabled' => false,
'replicaSelections' => [
'location' => 'us-central1',
'type' => 'READ_WRITE',
'autoFailoverDisabled' => false
[
'location' => 'us-central1',
'type' => ReplicaType::READ_WRITE,

]
]
]
];
$this->directedReadOptionsExcludeReplicas = [
'excludeReplicas' => [
'autoFailoverDisabled' => false,
'replicaSelections' => [
'location' => 'us-central1',
'type' => 'READ_WRITE',
'autoFailoverDisabled' => false
[
'location' => 'us-central1',
'type' => ReplicaType::READ_WRITE,
]
]
]
];
Expand Down Expand Up @@ -2002,6 +2015,130 @@ public function testRunTransactionWithRollback()
}, ['tag' => self::TRANSACTION_TAG]);
}

public function testRunTransactionWithExcludeTxnFromChangeStreams()
{
$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$sql = 'SELECT example FROM sql_query';
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]);
$gapic->executeStreamingSql($sessName, $sql, Argument::that(function (array $options) {
$this->assertArrayHasKey('transaction', $options);
$this->assertNotNull($transactionOptions = $options['transaction']->getBegin());
$this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams());
return true;
}))
->shouldBeCalledOnce()
->willReturn($stream->reveal());

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

$database->runTransaction(
function (Transaction $t) use ($sql) {
// Run a fake query
$t->executeUpdate($sql);

// Simulate calling Transaction::commmit()
$prop = new \ReflectionProperty($t, 'state');
$prop->setAccessible(true);
$prop->setValue($t, Transaction::STATE_COMMITTED);
},
['transactionOptions' => ['excludeTxnFromChangeStreams' => true]]
);
}

public function testExecutePartitionedUpdateWithExcludeTxnFromChangeStreams()
{
$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$sql = 'SELECT example FROM sql_query';
$resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_lower_bound' => 0])]);
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]);
$gapic->executeStreamingSql($sessName, $sql, Argument::type('array'))
->shouldBeCalledOnce()
->willReturn($stream->reveal());

$gapic->beginTransaction(
$sessName,
Argument::that(function (TransactionOptions $options) {
$this->assertTrue($options->getExcludeTxnFromChangeStreams());
return true;
}),
Argument::type('array')
)
->shouldBeCalledOnce()
->willReturn(new TransactionProto(['id' => 'foo']));

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

$database->executePartitionedUpdate(
$sql,
['transactionOptions' => ['excludeTxnFromChangeStreams' => true]]
);
}

public function testBatchWriteWithExcludeTxnFromChangeStreams()
{
$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$mutationGroups = [];
$gapic->batchWrite(
$sessName,
$mutationGroups,
Argument::that(function ($options) {
$this->assertArrayHasKey('excludeTxnFromChangeStreams', $options);
$this->assertTrue($options['excludeTxnFromChangeStreams']);
return true;
})
)
->shouldBeCalledOnce()
->willReturn(new TransactionProto(['id' => 'foo']));

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

$database->batchWrite($mutationGroups, [
'excludeTxnFromChangeStreams' => true
]);
}

private function createStreamingAPIArgs()
{
$row = ['id' => 1];
Expand Down
Loading
Loading