diff --git a/Spanner/src/Connection/Grpc.php b/Spanner/src/Connection/Grpc.php index f6be4a6b4c5..9f13c46e54d 100644 --- a/Spanner/src/Connection/Grpc.php +++ b/Spanner/src/Connection/Grpc.php @@ -1103,6 +1103,13 @@ public function beginTransaction(array $args) $args = $this->addLarHeader($args, $this->larEnabled); } + // NOTE: if set for read-only actions, will throw exception + if (isset($transactionOptions['excludeTxnFromChangeStreams'])) { + $options->setExcludeTxnFromChangeStreams( + $transactionOptions['excludeTxnFromChangeStreams'] + ); + } + $requestOptions = $this->pluck('requestOptions', $args, false) ?: []; if ($requestOptions) { $args['requestOptions'] = $this->serializer->decodeMessage( diff --git a/Spanner/src/Database.php b/Spanner/src/Database.php index de55a773418..23e4317e046 100644 --- a/Spanner/src/Database.php +++ b/Spanner/src/Database.php @@ -917,7 +917,7 @@ public function runTransaction(callable $operation, array $options = []) ]; // There isn't anything configurable here. - $options['transactionOptions'] = $this->configureTransactionOptions(); + $options['transactionOptions'] = $this->configureTransactionOptions($options['transactionOptions'] ?? []); $session = $this->selectSession( SessionPoolInterface::CONTEXT_READWRITE, @@ -1556,7 +1556,7 @@ public function delete($table, KeySet $keySet, array $options = []) * use Google\Cloud\Spanner\Session\SessionPoolInterface; * * $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [ - * 'parameters' => [ + * 'parameters' => [ * 'postId' => 1337 * ], * 'begin' => true, @@ -1573,7 +1573,7 @@ public function delete($table, KeySet $keySet, array $options = []) * use Google\Cloud\Spanner\Session\SessionPoolInterface; * * $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [ - * 'parameters' => [ + * 'parameters' => [ * 'postId' => 1337 * ], * 'begin' => true, @@ -1593,11 +1593,10 @@ public function delete($table, KeySet $keySet, array $options = []) * @param string $sql The query string to execute. * @param array $options [optional] { * Configuration Options. - * See [TransactionOptions](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions) - * for detailed description of available transaction options. Please - * note that only one of `$strong`, `$minReadTimestamp`, - * `$maxStaleness`, `$readTimestamp` or `$exactStaleness` may be set in - * a request. + * See {@see V1\TransactionOptions\PBReadOnly} for detailed description of + * available transaction options. Please note that only one of + * `$strong`, `$minReadTimestamp`, `$maxStaleness`, `$readTimestamp` or + * `$exactStaleness` may be set in a request. * * @type array $parameters A key/value array of Query Parameters, where * the key is represented in the query string prefixed by a `@` @@ -1899,11 +1898,16 @@ public function executePartitionedUpdate($statement, array $options = []) unset($options['requestOptions']['transactionTag']); $session = $this->selectSession(SessionPoolInterface::CONTEXT_READWRITE); - $transaction = $this->operation->transaction($session, [ + $beginTransactionOptions = [ 'transactionOptions' => [ - 'partitionedDml' => [] + 'partitionedDml' => [], ] - ]); + ]; + if (isset($options['transactionOptions']['excludeTxnFromChangeStreams'])) { + $beginTransactionOptions['transactionOptions']['excludeTxnFromChangeStreams'] = + $options['transactionOptions']['excludeTxnFromChangeStreams']; + } + $transaction = $this->operation->transaction($session, $beginTransactionOptions); $options = $this->addLarHeader($options); diff --git a/Spanner/src/Transaction.php b/Spanner/src/Transaction.php index c88e8972d47..0db1e79cef9 100644 --- a/Spanner/src/Transaction.php +++ b/Spanner/src/Transaction.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner; +use Google\ApiCore\ValidationException; use Google\Cloud\Core\Exception\AbortedException; use Google\Cloud\Spanner\Session\Session; use Google\Cloud\Spanner\Session\SessionPoolInterface; @@ -239,6 +240,12 @@ public function getCommitStats() */ public function executeUpdate($sql, array $options = []) { + if (isset($options['transaction']['begin']['excludeTxnFromChangeStreams'])) { + throw new ValidationException( + 'The excludeTxnFromChangeStreams option cannot be set for individual DML requests.' + . ' This option should be set at the transaction level.' + ); + } $options = $this->buildUpdateOptions($options); return $this->operation ->executeUpdate($this->session, $this, $sql, $options); diff --git a/Spanner/src/TransactionConfigurationTrait.php b/Spanner/src/TransactionConfigurationTrait.php index f3307152d47..642279a39b2 100644 --- a/Spanner/src/TransactionConfigurationTrait.php +++ b/Spanner/src/TransactionConfigurationTrait.php @@ -44,13 +44,12 @@ private function transactionSelector(array &$options, array $previous = []) 'transactionType' => SessionPoolInterface::CONTEXT_READ, ]; - $res = $this->transactionOptions($options, $previous); + [$transactionOptions, $type, $context] = $this->transactionOptions($options, $previous); // TransactionSelector uses a different key name for singleUseTransaction // and transactionId than transactionOptions, so we'll rewrite those here // so transactionOptions works as expected for commitRequest. - $type = $res[1]; if ($type === 'singleUseTransaction') { $type = 'singleUse'; } elseif ($type === 'transactionId') { @@ -58,8 +57,8 @@ private function transactionSelector(array &$options, array $previous = []) } return [ - [$type => $res[0]], - $res[2] + [$type => $transactionOptions], + $context ]; } @@ -130,7 +129,9 @@ private function transactionOptions(array &$options, array $previous = []) } elseif ($context === SessionPoolInterface::CONTEXT_READ) { $transactionOptions = $this->configureSnapshotOptions($options, $previous); } elseif ($context === SessionPoolInterface::CONTEXT_READWRITE) { - $transactionOptions = $this->configureTransactionOptions(); + $transactionOptions = $this->configureTransactionOptions( + $type == 'begin' && is_array($begin) ? $begin : [] + ); } else { throw new \BadMethodCallException(sprintf( 'Invalid transaction context %s', @@ -141,11 +142,17 @@ private function transactionOptions(array &$options, array $previous = []) return [$transactionOptions, $type, $context]; } - private function configureTransactionOptions() + private function configureTransactionOptions(array $options = []) { - return [ + $transactionOptions = [ 'readWrite' => [] ]; + + if (isset($options['excludeTxnFromChangeStreams'])) { + $transactionOptions['excludeTxnFromChangeStreams'] = $options['excludeTxnFromChangeStreams']; + } + + return $transactionOptions; } /** diff --git a/Spanner/tests/Unit/DatabaseTest.php b/Spanner/tests/Unit/DatabaseTest.php index a20e1090ff6..a63c361fc6a 100644 --- a/Spanner/tests/Unit/DatabaseTest.php +++ b/Spanner/tests/Unit/DatabaseTest.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner\Tests\Unit; +use Google\ApiCore\ServerStream; use Google\Cloud\Core\Exception\AbortedException; use Google\Cloud\Core\Exception\NotFoundException; use Google\Cloud\Core\Exception\ServerException; @@ -29,6 +30,7 @@ use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient; use Google\Cloud\Spanner\Admin\Database\V1\DatabaseDialect; use Google\Cloud\Spanner\Connection\ConnectionInterface; +use Google\Cloud\Spanner\Connection\Grpc; use Google\Cloud\Spanner\Database; use Google\Cloud\Spanner\Duration; use Google\Cloud\Spanner\Instance; @@ -43,7 +45,13 @@ use Google\Cloud\Spanner\Tests\StubCreationTrait; use Google\Cloud\Spanner\Timestamp; use Google\Cloud\Spanner\Transaction; +use Google\Cloud\Spanner\V1\ResultSet; +use Google\Cloud\Spanner\V1\ResultSetStats; +use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type as ReplicaType; +use Google\Cloud\Spanner\V1\Session as SessionProto; use Google\Cloud\Spanner\V1\SpannerClient; +use Google\Cloud\Spanner\V1\Transaction as TransactionProto; +use Google\Cloud\Spanner\V1\TransactionOptions; use Google\Rpc\Code; use PHPUnit\Framework\TestCase; use Prophecy\Argument; @@ -103,19 +111,24 @@ public function setUp(): void ]); $this->directedReadOptionsIncludeReplicas = [ 'includeReplicas' => [ + 'autoFailoverDisabled' => false, 'replicaSelections' => [ - 'location' => 'us-central1', - 'type' => 'READ_WRITE', - 'autoFailoverDisabled' => false + [ + 'location' => 'us-central1', + 'type' => ReplicaType::READ_WRITE, + + ] ] ] ]; $this->directedReadOptionsExcludeReplicas = [ 'excludeReplicas' => [ + 'autoFailoverDisabled' => false, 'replicaSelections' => [ - 'location' => 'us-central1', - 'type' => 'READ_WRITE', - 'autoFailoverDisabled' => false + [ + 'location' => 'us-central1', + 'type' => ReplicaType::READ_WRITE, + ] ] ] ]; @@ -2002,6 +2015,130 @@ public function testRunTransactionWithRollback() }, ['tag' => self::TRANSACTION_TAG]); } + public function testRunTransactionWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); + $session = new SessionProto(['name' => $sessName]); + $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]); + $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); + $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + + $sql = 'SELECT example FROM sql_query'; + $stream = $this->prophesize(ServerStream::class); + $stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]); + $gapic->executeStreamingSql($sessName, $sql, Argument::that(function (array $options) { + $this->assertArrayHasKey('transaction', $options); + $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); + $this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams()); + return true; + })) + ->shouldBeCalledOnce() + ->willReturn($stream->reveal()); + + $database = new Database( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + $this->instance, + $this->lro->reveal(), + $this->lroCallables, + self::PROJECT, + self::DATABASE + ); + + $database->runTransaction( + function (Transaction $t) use ($sql) { + // Run a fake query + $t->executeUpdate($sql); + + // Simulate calling Transaction::commmit() + $prop = new \ReflectionProperty($t, 'state'); + $prop->setAccessible(true); + $prop->setValue($t, Transaction::STATE_COMMITTED); + }, + ['transactionOptions' => ['excludeTxnFromChangeStreams' => true]] + ); + } + + public function testExecutePartitionedUpdateWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); + $session = new SessionProto(['name' => $sessName]); + $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); + $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + + $sql = 'SELECT example FROM sql_query'; + $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_lower_bound' => 0])]); + $stream = $this->prophesize(ServerStream::class); + $stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]); + $gapic->executeStreamingSql($sessName, $sql, Argument::type('array')) + ->shouldBeCalledOnce() + ->willReturn($stream->reveal()); + + $gapic->beginTransaction( + $sessName, + Argument::that(function (TransactionOptions $options) { + $this->assertTrue($options->getExcludeTxnFromChangeStreams()); + return true; + }), + Argument::type('array') + ) + ->shouldBeCalledOnce() + ->willReturn(new TransactionProto(['id' => 'foo'])); + + $database = new Database( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + $this->instance, + $this->lro->reveal(), + $this->lroCallables, + self::PROJECT, + self::DATABASE + ); + + $database->executePartitionedUpdate( + $sql, + ['transactionOptions' => ['excludeTxnFromChangeStreams' => true]] + ); + } + + public function testBatchWriteWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); + $session = new SessionProto(['name' => $sessName]); + $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); + $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + + $mutationGroups = []; + $gapic->batchWrite( + $sessName, + $mutationGroups, + Argument::that(function ($options) { + $this->assertArrayHasKey('excludeTxnFromChangeStreams', $options); + $this->assertTrue($options['excludeTxnFromChangeStreams']); + return true; + }) + ) + ->shouldBeCalledOnce() + ->willReturn(new TransactionProto(['id' => 'foo'])); + + $database = new Database( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + $this->instance, + $this->lro->reveal(), + $this->lroCallables, + self::PROJECT, + self::DATABASE + ); + + $database->batchWrite($mutationGroups, [ + 'excludeTxnFromChangeStreams' => true + ]); + } + private function createStreamingAPIArgs() { $row = ['id' => 1]; diff --git a/Spanner/tests/Unit/OperationTest.php b/Spanner/tests/Unit/OperationTest.php index 0be80e57614..bfe2c23dfd4 100644 --- a/Spanner/tests/Unit/OperationTest.php +++ b/Spanner/tests/Unit/OperationTest.php @@ -17,11 +17,13 @@ namespace Google\Cloud\Spanner\Tests\Unit; +use Google\ApiCore\ServerStream; use Google\Cloud\Core\Testing\GrpcTestTrait; use Google\Cloud\Core\Testing\TestHelpers; use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient; use Google\Cloud\Spanner\Batch\QueryPartition; use Google\Cloud\Spanner\Batch\ReadPartition; +use Google\Cloud\Spanner\Connection\Grpc; use Google\Cloud\Spanner\Database; use Google\Cloud\Spanner\Duration; use Google\Cloud\Spanner\KeyRange; @@ -35,6 +37,11 @@ use Google\Cloud\Spanner\Timestamp; use Google\Cloud\Spanner\Transaction; use Google\Cloud\Spanner\V1\CommitResponse; +use Google\Cloud\Spanner\V1\ResultSet; +use Google\Cloud\Spanner\V1\ResultSetStats; +use Google\Cloud\Spanner\V1\SpannerClient; +use Google\Cloud\Spanner\V1\Transaction as TransactionProto; +use Google\Cloud\Spanner\V1\TransactionOptions; use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; @@ -354,6 +361,66 @@ public function testTransactionNoTag() $this->assertEquals(self::TRANSACTION, $t->id()); } + public function testTransactionWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + $gapic->beginTransaction( + self::SESSION, + Argument::that(function (TransactionOptions $options) { + $this->assertTrue($options->getExcludeTxnFromChangeStreams()); + return true; + }), + Argument::type('array') + ) + ->shouldBeCalled() + ->willReturn(new TransactionProto(['id' => 'foo'])); + + $operation = new Operation( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + true + ); + + $transaction = $operation->transaction($this->session, [ + 'transactionOptions' => ['excludeTxnFromChangeStreams' => true] + ]); + + $this->assertEquals('foo', $transaction->id()); + } + + public function testExecuteAndExecuteUpdateWithExcludeTxnFromChangeStreams() + { + $sql = 'SELECT example FROM sql_query'; + + $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]); + $stream = $this->prophesize(ServerStream::class); + $stream->readAll()->shouldBeCalledTimes(2)->willReturn([$resultSet]); + + $gapic = $this->prophesize(SpannerClient::class); + $gapic->executeStreamingSql(self::SESSION, $sql, Argument::that(function (array $options) { + $this->assertArrayHasKey('transaction', $options); + $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); + $this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams()); + return true; + })) + ->shouldBeCalledTimes(2) + ->willReturn($stream->reveal()); + + $operation = new Operation( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + true + ); + + $operation->execute($this->session, $sql, [ + 'transaction' => ['begin' => ['excludeTxnFromChangeStreams' => true]] + ]); + + $transaction = $this->prophesize(Transaction::class)->reveal(); + + $operation->executeUpdate($this->session, $transaction, $sql, [ + 'transaction' => ['begin' => ['excludeTxnFromChangeStreams' => true]] + ]); + } + public function testSnapshot() { $this->connection->beginTransaction(Argument::allOf( diff --git a/Spanner/tests/Unit/TransactionTest.php b/Spanner/tests/Unit/TransactionTest.php index 71bc8a02475..1563a9c9c9b 100644 --- a/Spanner/tests/Unit/TransactionTest.php +++ b/Spanner/tests/Unit/TransactionTest.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner\Tests\Unit; +use Google\ApiCore\ValidationException; use Google\Cloud\Core\Testing\GrpcTestTrait; use Google\Cloud\Core\Testing\TestHelpers; use Google\Cloud\Core\TimeTrait; @@ -252,6 +253,19 @@ public function testExecuteUpdate() $this->assertEquals(1, $res); } + public function testExecuteUpdateWithExcludeTxnFromChangeStreamsThrowsException() + { + $this->expectException(ValidationException::class); + $this->expectExceptionMessage( + 'The excludeTxnFromChangeStreams option cannot be set for individual DML requests' + ); + + $sql = 'UPDATE foo SET bar = @bar'; + $this->transaction->executeUpdate($sql, [ + 'transaction' => ['begin' => ['excludeTxnFromChangeStreams' => true]] + ]); + } + public function testDmlSeqno() { $sql = 'UPDATE foo SET bar = @bar';