From c74fe83914e913ecec4dcee0713248af95d33348 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:43:05 +0530 Subject: [PATCH 1/4] chore(spanner): fix unit tests for mux rw (#3577) * chore(Spanner): fix tests for mux rw * chore(Spanner): fix tests * chore(spanner): fix tests --- .../MultiplexedSessionDatabaseClient.java | 13 +- .../cloud/spanner/SessionPoolOptions.java | 31 +++- .../cloud/spanner/DatabaseClientImplTest.java | 50 ++++- .../spanner/InlineBeginTransactionTest.java | 7 + .../cloud/spanner/MockSpannerServiceImpl.java | 27 ++- .../spanner/OpenTelemetryApiTracerTest.java | 2 + ...OpenTelemetryBuiltInMetricsTracerTest.java | 1 + .../RetryOnInvalidatedSessionTest.java | 174 ++++++++++++++++++ .../cloud/spanner/SessionPoolLeakTest.java | 19 ++ .../spanner/TransactionChannelHintTest.java | 2 + .../spanner/TransactionManagerImplTest.java | 30 +++ .../spanner/TransactionRunnerImplTest.java | 15 ++ 12 files changed, 349 insertions(+), 22 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 01f41a2dfdc..89371a21c51 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) { // initiate a begin transaction request to verify if read-write transactions are // supported using multiplexed sessions. if (sessionClient - .getSpanner() - .getOptions() - .getSessionPoolOptions() - .getUseMultiplexedSessionForRW()) { + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getUseMultiplexedSessionForRW() + && !sessionClient + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getSkipVerifyBeginTransactionForMuxRW()) { verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName()); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 36a4e5fe208..03551640b43 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -83,6 +83,7 @@ public class SessionPoolOptions { // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; + private final boolean skipVerifyingBeginTransactionForMuxRW; private SessionPoolOptions(Builder builder) { // minSessions > maxSessions is only possible if the user has only set a value for maxSessions. @@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) { ? useMultiplexedSessionFromEnvVariablePartitionedOps : builder.useMultiplexedSessionPartitionedOps; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; + this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW; } @Override @@ -169,8 +171,10 @@ public boolean equals(Object o) { && Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession) && Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW) && Objects.equals( - this.multiplexedSessionMaintenanceDuration, - other.multiplexedSessionMaintenanceDuration); + this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration) + && Objects.equals( + this.skipVerifyingBeginTransactionForMuxRW, + other.skipVerifyingBeginTransactionForMuxRW); } @Override @@ -199,7 +203,8 @@ public int hashCode() { this.poolMaintainerClock, this.useMultiplexedSession, this.useMultiplexedSessionForRW, - this.multiplexedSessionMaintenanceDuration); + this.multiplexedSessionMaintenanceDuration, + this.skipVerifyingBeginTransactionForMuxRW); } public Builder toBuilder() { @@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() { return multiplexedSessionMaintenanceDuration; } + @VisibleForTesting + @InternalApi + boolean getSkipVerifyBeginTransactionForMuxRW() { + return skipVerifyingBeginTransactionForMuxRW; + } + public static Builder newBuilder() { return new Builder(); } @@ -607,6 +618,7 @@ public static class Builder { private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; + private boolean skipVerifyingBeginTransactionForMuxRW = false; private static Position getReleaseToPositionFromSystemProperty() { // NOTE: This System property is a beta feature. Support for it can be removed in the future. @@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) { this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; + this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW; } /** @@ -872,6 +885,18 @@ Builder setMultiplexedSessionMaintenanceDuration( return this; } + // The additional BeginTransaction RPC for multiplexed session read-write is causing + // unexpected behavior in mock Spanner tests that rely on mocking the BeginTransaction RPC. + // Invoking this method with `true` skips sending the BeginTransaction RPC when the multiplexed + // session is created for the first time during client initialization. + // This is only used for tests. + @VisibleForTesting + Builder setSkipVerifyingBeginTransactionForMuxRW( + boolean skipVerifyingBeginTransactionForMuxRW) { + this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW; + return this; + } + /** * Sets whether the client should automatically execute a background query to detect the dialect * that is used by the database or not. Set this option to true if you do not know what the diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 86d0bfc2c94..1ee60509d55 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -270,6 +270,9 @@ public void tearDown() { @Test public void testPoolMaintainer_whenInactiveTransactionAndSessionIsNotFoundOnBackend_removeSessionsFromPool() { + assumeFalse( + "Session pool maintainer test skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -347,6 +350,9 @@ public void tearDown() { @Test public void testPoolMaintainer_whenInactiveTransactionAndSessionExistsOnBackend_removeSessionsFromPool() { + assumeFalse( + "Session leaks tests are skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -482,6 +488,9 @@ public void testPoolMaintainer_whenLongRunningPartitionedUpdateRequest_takeNoAct */ @Test public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessionsFromPool() { + assumeFalse( + "Session leaks tests are skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -3085,6 +3094,7 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() { .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT))); // No additional requests should have been sent by the client. // Note that in case of the use of multiplexed sessions, then we have 2 requests: + // Note that in case of the use of regular sessions, then we have 1 request: // 1. BatchCreateSessions for the session pool. // 2. CreateSession for the multiplexed session. assertThat(mockSpanner.getRequests()) @@ -3211,9 +3221,16 @@ public void testDatabaseOrInstanceIsDeletedAndThenRecreated() throws Exception { ResourceNotFoundException.class, () -> dbClient.singleUse().executeQuery(SELECT1)); } - assertThrows( - ResourceNotFoundException.class, - () -> dbClient.readWriteTransaction().run(transaction -> null)); + if (!spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) { + // We only verify this for read-write transactions if we are not using multiplexed + // sessions. For multiplexed sessions, we don't need any special handling, as deleting the + // database will also invalidate the multiplexed session, and trying to continue to use it + // will continue to return an error. + assertThrows( + ResourceNotFoundException.class, + () -> dbClient.readWriteTransaction().run(transaction -> null)); + } + assertThat(mockSpanner.getRequests()).isEmpty(); // Now get a new database client. Normally multiple calls to Spanner#getDatabaseClient will // return the same instance, but not when the instance has been invalidated by a @@ -3300,13 +3317,18 @@ public void testAllowNestedTransactions() throws InterruptedException { Thread.sleep(1L); } assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + int expectedMinSessions = + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW() + ? minSessions + : minSessions - 1; Long res = client .readWriteTransaction() .allowNestedTransaction() .run( transaction -> { - assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + assertThat(client.pool.getNumberOfSessionsInPool()) + .isEqualTo(expectedMinSessions); return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(res).isEqualTo(UPDATE_COUNT); @@ -3333,6 +3355,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio } assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + // When read-write transaction uses multiplexed sessions, then sessions are not checked out from + // the session pool. + int expectedMinSessions = isMultiplexedSessionsEnabledForRW() ? minSessions : minSessions - 1; Long res = client1 .readWriteTransaction() @@ -3341,7 +3366,8 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio transaction -> { // Client1 should have 1 session checked out. // Client2 should have 0 sessions checked out. - assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + assertThat(client1.pool.getNumberOfSessionsInPool()) + .isEqualTo(expectedMinSessions); assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); Long add = client2 @@ -3350,9 +3376,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio transaction1 -> { // Both clients should now have 1 session checked out. assertThat(client1.pool.getNumberOfSessionsInPool()) - .isEqualTo(minSessions - 1); + .isEqualTo(expectedMinSessions); assertThat(client2.pool.getNumberOfSessionsInPool()) - .isEqualTo(minSessions - 1); + .isEqualTo(expectedMinSessions); try (ResultSet rs = transaction1.executeQuery(SELECT1)) { if (rs.next()) { return rs.getLong(0); @@ -5090,6 +5116,9 @@ public void testRetryOnResourceExhausted() { @Test public void testSessionPoolExhaustedError_containsStackTraces() { + assumeFalse( + "Session pool tests are skipped for multiplexed sessions", + isMultiplexedSessionsEnabledForRW()); try (Spanner spanner = SpannerOptions.newBuilder() .setProjectId(TEST_PROJECT) @@ -5450,4 +5479,11 @@ private boolean isMultiplexedSessionsEnabled() { } return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); } + + private boolean isMultiplexedSessionsEnabledForRW() { + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { + return false; + } + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index c67c0084674..94b6de149c3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -191,6 +191,13 @@ public void setUp() { .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .setTrackTransactionStarter() + // The extra BeginTransaction RPC for multiplexed session read-write is causing + // unexpected behavior in tests having a mock on the BeginTransaction RPC. Therefore, + // this is being skipped. + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) .build() .getService(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 676cb05eb07..9f27b28d323 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -62,6 +62,7 @@ import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.ResultSetStats; @@ -1829,7 +1830,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) { transactionId = null; break; case BEGIN: - transactionId = beginTransaction(session, tx.getBegin(), null).getId(); + transactionId = beginTransaction(session, tx.getBegin(), null, null).getId(); break; case ID: Transaction transaction = transactions.get(tx.getId()); @@ -1895,7 +1896,8 @@ public void beginTransaction( beginTransactionExecutionTime.simulateExecutionTime( exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = - beginTransaction(session, request.getOptions(), request.getMutationKey()); + beginTransaction( + session, request.getOptions(), request.getMutationKey(), request.getRequestOptions()); responseObserver.onNext(transaction); responseObserver.onCompleted(); } catch (StatusRuntimeException t) { @@ -1906,7 +1908,10 @@ public void beginTransaction( } private Transaction beginTransaction( - Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) { + Session session, + TransactionOptions options, + com.google.spanner.v1.Mutation mutationKey, + RequestOptions requestOptions) { ByteString transactionId = generateTransactionName(session.getName()); Transaction.Builder builder = Transaction.newBuilder().setId(transactionId); if (options != null && options.getModeCase() == ModeCase.READ_ONLY) { @@ -1920,12 +1925,17 @@ private Transaction beginTransaction( } Transaction transaction = builder.build(); transactions.put(transaction.getId(), transaction); - transactionsStarted.add(transaction.getId()); + // TODO: remove once UNIMPLEMENTED error is not thrown for read-write mux + // Do not consider the transaction if this request was from background thread + if (requestOptions == null + || !requestOptions.getTransactionTag().equals("multiplexed-rw-background-begin-txn")) { + transactionsStarted.add(transaction.getId()); + if (abortNextTransaction.getAndSet(false)) { + markAbortedTransaction(transaction.getId()); + } + } isPartitionedDmlTransaction.put( transaction.getId(), options.getModeCase() == ModeCase.PARTITIONED_DML); - if (abortNextTransaction.getAndSet(false)) { - markAbortedTransaction(transaction.getId()); - } return transaction; } @@ -2025,7 +2035,8 @@ public void commit(CommitRequest request, StreamObserver respons TransactionOptions.newBuilder() .setReadWrite(ReadWrite.getDefaultInstance()) .build(), - null); + null, + request.getRequestOptions()); } else if (request.getTransactionId() != null) { transaction = transactions.get(request.getTransactionId()); Optional aborted = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java index e4d25f1d9b3..65bb5f5f0d7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java @@ -135,6 +135,7 @@ public void createSpannerInstance() { SessionPoolOptions.newBuilder() .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) .setFailOnSessionLeak() + .setSkipVerifyingBeginTransactionForMuxRW(true) .build()) .setEnableApiTracing(true) .build() @@ -428,6 +429,7 @@ public boolean isEnableApiTracing() { SessionPoolOptions.newBuilder() .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) .setFailOnSessionLeak() + .setSkipVerifyingBeginTransactionForMuxRW(true) .build()) .build() .getService(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index 7a14681d525..98ee700a943 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -144,6 +144,7 @@ public void createSpannerInstance() { SessionPoolOptions.newBuilder() .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) .setFailOnSessionLeak() + .setSkipVerifyingBeginTransactionForMuxRW(true) .build()) // Setting this to false so that Spanner Options does not register Metrics Tracer // factory again. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index f070f154216..5496ad531bf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -538,6 +538,9 @@ public void readOnlyTransactionReadRowUsingIndexNonRecoverable() throws Interrup @Test public void readWriteTransactionReadOnlySessionInPool() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -568,6 +571,9 @@ public void readWriteTransactionReadOnlySessionInPool() throws InterruptedExcept @Test public void readWriteTransactionSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -582,6 +588,9 @@ public void readWriteTransactionSelect() throws InterruptedException { @Test public void readWriteTransactionRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -597,6 +606,9 @@ public void readWriteTransactionRead() throws InterruptedException { @Test public void readWriteTransactionReadWithOptimisticLock() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(Options.optimisticLock()); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -612,6 +624,9 @@ public void readWriteTransactionReadWithOptimisticLock() throws InterruptedExcep @Test public void readWriteTransactionReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -628,6 +643,9 @@ public void readWriteTransactionReadUsingIndex() throws InterruptedException { @Test public void readWriteTransactionReadRow() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -638,6 +656,9 @@ public void readWriteTransactionReadRow() throws InterruptedException { @Test public void readWriteTransactionReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -649,6 +670,9 @@ public void readWriteTransactionReadRowUsingIndex() throws InterruptedException @Test public void readWriteTransactionUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT))); @@ -656,6 +680,9 @@ public void readWriteTransactionUpdate() throws InterruptedException { @Test public void readWriteTransactionBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -666,6 +693,9 @@ public void readWriteTransactionBatchUpdate() throws InterruptedException { @Test public void readWriteTransactionBuffer() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -678,6 +708,9 @@ public void readWriteTransactionBuffer() throws InterruptedException { @Test public void readWriteTransactionSelectInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -702,6 +735,9 @@ public void readWriteTransactionSelectInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -728,6 +764,9 @@ public void readWriteTransactionReadInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadUsingIndexInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -756,6 +795,9 @@ public void readWriteTransactionReadUsingIndexInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadRowInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -778,6 +820,9 @@ public void readWriteTransactionReadRowInvalidatedDuringTransaction() { @Test public void readWriteTransactionReadRowUsingIndexInvalidatedDuringTransaction() { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); TransactionRunner runner = client.readWriteTransaction(); final AtomicInteger attempt = new AtomicInteger(); assertThrowsSessionNotFoundIfShouldFail( @@ -803,6 +848,9 @@ public void readWriteTransactionReadRowUsingIndexInvalidatedDuringTransaction() @SuppressWarnings("resource") @Test public void transactionManagerReadOnlySessionInPool() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -825,6 +873,9 @@ public void transactionManagerReadOnlySessionInPool() throws InterruptedExceptio @SuppressWarnings("resource") @Test public void transactionManagerSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -847,6 +898,9 @@ public void transactionManagerSelect() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -870,6 +924,9 @@ public void transactionManagerRead() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -893,6 +950,9 @@ public void transactionManagerReadUsingIndex() throws InterruptedException { @Test public void transactionManagerReadRow() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -914,6 +974,9 @@ public void transactionManagerReadRow() throws InterruptedException { @Test public void transactionManagerReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -937,6 +1000,9 @@ public void transactionManagerReadRowUsingIndex() throws InterruptedException { @Test public void transactionManagerUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager(Options.commitStats())) { TransactionContext transaction = manager.begin(); while (true) { @@ -958,6 +1024,9 @@ public void transactionManagerUpdate() throws InterruptedException { @Test public void transactionManagerAborted_thenSessionNotFoundOnBeginTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); int attempt = 0; try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); @@ -990,6 +1059,9 @@ public void transactionManagerAborted_thenSessionNotFoundOnBeginTransaction() @Test public void transactionManagerBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -1013,6 +1085,9 @@ public void transactionManagerBatchUpdate() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerBuffer() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -1037,6 +1112,9 @@ public void transactionManagerBuffer() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerSelectInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1083,6 +1161,9 @@ public void transactionManagerSelectInvalidatedDuringTransaction() throws Interr @SuppressWarnings("resource") @Test public void transactionManagerReadInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1131,6 +1212,9 @@ public void transactionManagerReadInvalidatedDuringTransaction() throws Interrup @Test public void transactionManagerReadUsingIndexInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1180,6 +1264,9 @@ public void transactionManagerReadUsingIndexInvalidatedDuringTransaction() @SuppressWarnings("resource") @Test public void transactionManagerReadRowInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1226,6 +1313,9 @@ public void transactionManagerReadRowInvalidatedDuringTransaction() throws Inter @Test public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); SessionPoolOptions.Builder builder = SessionPoolOptions.newBuilder(); if (failOnInvalidatedSession) { builder.setFailIfSessionNotFound(); @@ -1283,6 +1373,9 @@ public void partitionedDml() throws InterruptedException { @Test public void write() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); assertThrowsSessionNotFoundIfShouldFail( () -> client.write(Collections.singletonList(Mutation.delete("FOO", KeySet.all())))); } @@ -1300,17 +1393,26 @@ public void writeAtLeastOnce() throws InterruptedException { @Test public void asyncRunnerSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncRunner_withReadFunction(input -> input.executeQueryAsync(SELECT1AND2)); } @Test public void asyncRunnerRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncRunner_withReadFunction( input -> input.readAsync("FOO", KeySet.all(), Collections.singletonList("BAR"))); } @Test public void asyncRunnerReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncRunner_withReadFunction( input -> input.readUsingIndexAsync( @@ -1319,6 +1421,9 @@ public void asyncRunnerReadUsingIndex() throws InterruptedException { private void asyncRunner_withReadFunction( final Function readFunction) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try { AsyncRunner runner = client.runAsync(); @@ -1356,6 +1461,9 @@ private void asyncRunner_withReadFunction( @Test public void asyncRunnerReadRow() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1367,6 +1475,9 @@ public void asyncRunnerReadRow() throws InterruptedException { @Test public void asyncRunnerReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1380,6 +1491,9 @@ public void asyncRunnerReadRowUsingIndex() throws InterruptedException { @Test public void asyncRunnerUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> get(runner.runAsync(txn -> txn.executeUpdateAsync(UPDATE_STATEMENT), executor))); @@ -1387,6 +1501,9 @@ public void asyncRunnerUpdate() throws InterruptedException { @Test public void asyncRunnerBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1398,6 +1515,9 @@ public void asyncRunnerBatchUpdate() throws InterruptedException { @Test public void asyncRunnerBuffer() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); AsyncRunner runner = client.runAsync(); assertThrowsSessionNotFoundIfShouldFail( () -> @@ -1412,17 +1532,26 @@ public void asyncRunnerBuffer() throws InterruptedException { @Test public void asyncTransactionManagerAsyncSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readAsync(input -> input.executeQueryAsync(SELECT1AND2)); } @Test public void asyncTransactionManagerAsyncRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readAsync( input -> input.readAsync("FOO", KeySet.all(), Collections.singletonList("BAR"))); } @Test public void asyncTransactionManagerAsyncReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readAsync( input -> input.readUsingIndexAsync( @@ -1431,6 +1560,9 @@ public void asyncTransactionManagerAsyncReadUsingIndex() throws InterruptedExcep private void asyncTransactionManager_readAsync( final Function fn) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture context = manager.beginAsync(); @@ -1475,17 +1607,26 @@ private void asyncTransactionManager_readAsync( @Test public void asyncTransactionManagerSelect() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readSync(input -> input.executeQuery(SELECT1AND2)); } @Test public void asyncTransactionManagerRead() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readSync( input -> input.read("FOO", KeySet.all(), Collections.singletonList("BAR"))); } @Test public void asyncTransactionManagerReadUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readSync( input -> input.readUsingIndex("FOO", "idx", KeySet.all(), Collections.singletonList("BAR"))); @@ -1493,6 +1634,9 @@ public void asyncTransactionManagerReadUsingIndex() throws InterruptedException private void asyncTransactionManager_readSync(final Function fn) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture context = manager.beginAsync(); @@ -1524,6 +1668,9 @@ private void asyncTransactionManager_readSync(final Function ApiFutures.immediateFuture( @@ -1532,6 +1679,9 @@ public void asyncTransactionManagerReadRow() throws InterruptedException { @Test public void asyncTransactionManagerReadRowUsingIndex() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readRowFunction( input -> ApiFutures.immediateFuture( @@ -1541,12 +1691,18 @@ public void asyncTransactionManagerReadRowUsingIndex() throws InterruptedExcepti @Test public void asyncTransactionManagerReadRowAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readRowFunction( input -> input.readRowAsync("FOO", Key.of("foo"), Collections.singletonList("BAR"))); } @Test public void asyncTransactionManagerReadRowUsingIndexAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_readRowFunction( input -> input.readRowUsingIndexAsync( @@ -1555,6 +1711,9 @@ public void asyncTransactionManagerReadRowUsingIndexAsync() throws InterruptedEx private void asyncTransactionManager_readRowFunction( final Function> fn) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture context = manager.beginAsync(); @@ -1576,18 +1735,27 @@ private void asyncTransactionManager_readRowFunction( @Test public void asyncTransactionManagerUpdateAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> input.executeUpdateAsync(UPDATE_STATEMENT), UPDATE_COUNT); } @Test public void asyncTransactionManagerUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> ApiFutures.immediateFuture(input.executeUpdate(UPDATE_STATEMENT)), UPDATE_COUNT); } @Test public void asyncTransactionManagerBatchUpdateAsync() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> input.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)), new long[] {UPDATE_COUNT, UPDATE_COUNT}); @@ -1595,6 +1763,9 @@ public void asyncTransactionManagerBatchUpdateAsync() throws InterruptedExceptio @Test public void asyncTransactionManagerBatchUpdate() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); asyncTransactionManager_updateFunction( input -> ApiFutures.immediateFuture( @@ -1604,6 +1775,9 @@ public void asyncTransactionManagerBatchUpdate() throws InterruptedException { private void asyncTransactionManager_updateFunction( final Function> fn, T expected) throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture transaction = manager.beginAsync(); while (true) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java index 4672f03aeff..8ccea443dc1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java @@ -171,6 +171,9 @@ public void testIgnoreLeakedSession() { @Test public void testReadWriteTransactionExceptionOnCreateSession() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); readWriteTransactionTest( () -> mockSpanner.setBatchCreateSessionsExecutionTime( @@ -180,6 +183,9 @@ public void testReadWriteTransactionExceptionOnCreateSession() { @Test public void testReadWriteTransactionExceptionOnBegin() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); readWriteTransactionTest( () -> mockSpanner.setBeginTransactionExecutionTime( @@ -200,6 +206,9 @@ private void readWriteTransactionTest( @Test public void testTransactionManagerExceptionOnCreateSession() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); transactionManagerTest( () -> mockSpanner.setBatchCreateSessionsExecutionTime( @@ -209,6 +218,9 @@ public void testTransactionManagerExceptionOnCreateSession() { @Test public void testTransactionManagerExceptionOnBegin() { + assumeFalse( + "Session Leaks do not occur with Multiplexed Sessions", + isMultiplexedSessionsEnabledForRW()); assertThat(pool.getNumberOfSessionsInPool(), is(equalTo(0))); mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofException(FAILED_PRECONDITION)); @@ -229,4 +241,11 @@ private void transactionManagerTest(Runnable setup, int expectedNumberOfSessions } assertEquals(expectedNumberOfSessionsAfterExecution, pool.getNumberOfSessionsInPool()); } + + private boolean isMultiplexedSessionsEnabledForRW() { + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { + return false; + } + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java index bd346c4f18b..14658210f44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java @@ -188,6 +188,8 @@ private SpannerOptions createSpannerOptions() { .setCompressorName("gzip") .setHost("http://" + endpoint) .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder().setSkipVerifyingBeginTransactionForMuxRW(true).build()) .build(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 10b13125152..aee3d5ed5b4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -234,6 +234,21 @@ public void usesPreparedTransaction() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.createSession( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.eq(null), + Mockito.eq(true))) + .thenAnswer( + invocation -> + Session.newBuilder() + .setName(invocation.getArguments()[0] + "/sessions/1") + .setMultiplexed(true) + .setCreateTime( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() * 1000)) + .build()); when(rpc.beginTransactionAsync( Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( @@ -300,6 +315,21 @@ public void inlineBegin() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.createSession( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.eq(null), + Mockito.eq(true))) + .thenAnswer( + invocation -> + Session.newBuilder() + .setName(invocation.getArguments()[0] + "/sessions/1") + .setMultiplexed(true) + .setCreateTime( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() * 1000)) + .build()); when(rpc.beginTransactionAsync( Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 1fd6817ea96..d8bd6ed448d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -186,6 +186,21 @@ public void usesPreparedTransaction() { .setCreateTime( Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.createSession( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.eq(null), + Mockito.eq(true))) + .thenAnswer( + invocation -> + Session.newBuilder() + .setName(invocation.getArguments()[0] + "/sessions/1") + .setMultiplexed(true) + .setCreateTime( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() * 1000)) + .build()); when(rpc.beginTransactionAsync( Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( From d9813a05240b966f444168d3b8c30da9d27a8cc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 3 Jan 2025 13:51:24 +0100 Subject: [PATCH 2/4] fix: AsyncTransactionManager did not always close the session (#3580) In a case where the following happens, the session that was used by an AsyncTransactionManager would not be returned to the pool: 1. The transaction is abandoned without a Commit or Rollback call. 2. The AsyncTransactionManager then executes a RollbackAsync internally. 3. If the internal RollbackAsync failed, then the session would not be closed. --- .../SessionPoolAsyncTransactionManager.java | 1 + .../spanner/AsyncTransactionManagerTest.java | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 90f5317e88d..3d6a015531f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -107,6 +107,7 @@ public void onSuccess(AsyncTransactionManagerImpl result) { new ApiFutureCallback() { @Override public void onFailure(Throwable t) { + session.close(); res.setException(t); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index 2449b8fba7c..371d3688c94 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; @@ -1084,6 +1085,37 @@ public void onSuccess(Long aLong) { } } + @Test + public void testAbandonedAsyncTransactionManager_rollbackFails() throws Exception { + mockSpanner.setRollbackExecutionTime( + SimulatedExecutionTime.ofException(Status.PERMISSION_DENIED.asRuntimeException())); + + boolean gotException = false; + try (AsyncTransactionManager manager = client().transactionManagerAsync()) { + TransactionContextFuture transactionContextFuture = manager.beginAsync(); + while (true) { + try { + AsyncTransactionStep updateCount = + transactionContextFuture.then( + (transactionContext, ignored) -> + transactionContext.executeUpdateAsync(UPDATE_STATEMENT), + executor); + assertEquals(1L, updateCount.get().longValue()); + // Break without committing or rolling back the transaction. + break; + } catch (AbortedException e) { + transactionContextFuture = manager.resetForRetryAsync(); + } + } + } catch (SpannerException spannerException) { + // The error from the automatically executed Rollback is surfaced when the + // AsyncTransactionManager is closed. + assertEquals(ErrorCode.PERMISSION_DENIED, spannerException.getErrorCode()); + gotException = true; + } + assertTrue(gotException); + } + private boolean isMultiplexedSessionsEnabled() { if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { return false; From 5a1be3dedeafa6858502eadc7918820b9cd90f68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 6 Jan 2025 05:02:07 +0100 Subject: [PATCH 3/4] feat: add transaction runner for connections (#3559) Adds a `runTransaction` method to `Connection` to allow applications to execute read/write transactions that are automatically retried in the same way as in the standard client library. This feature will be extended to the JDBC driver, so transaction retries can be defined using a runner there as well. --- .../clirr-ignored-differences.xml | 8 +- .../cloud/spanner/TransactionManagerImpl.java | 2 +- .../cloud/spanner/connection/Connection.java | 15 ++ .../spanner/connection/ConnectionImpl.java | 49 +++- .../connection/ReadWriteTransaction.java | 5 + .../connection/TransactionRunnerImpl.java | 62 +++++ .../cloud/spanner/connection/UnitOfWork.java | 4 + .../RunTransactionMockServerTest.java | 226 ++++++++++++++++++ 8 files changed, 362 insertions(+), 9 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 5b84cb4ebc3..c6796085d83 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -814,6 +814,12 @@ com/google/cloud/spanner/connection/TransactionRetryListener void retryDmlAsPartitionedDmlFailed(java.util.UUID, com.google.cloud.spanner.Statement, java.lang.Throwable) - + + + + 7012 + com/google/cloud/spanner/connection/Connection + java.lang.Object runTransaction(com.google.cloud.spanner.connection.Connection$TransactionCallable) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index cafb27ba6b7..b1d37f3e4cd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -99,7 +99,7 @@ public void rollback() { public TransactionContext resetForRetry() { if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) { throw new IllegalStateException( - "resetForRetry can only be called if the previous attempt" + " aborted"); + "resetForRetry can only be called if the previous attempt aborted"); } try (IScope s = tracer.withSpan(span)) { boolean useInlinedBegin = txn.transactionId != null; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java index 547d2466e3e..eb69ae132cc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java @@ -835,6 +835,21 @@ default boolean isKeepTransactionAlive() { */ ApiFuture rollbackAsync(); + /** Functional interface for the {@link #runTransaction(TransactionCallable)} method. */ + interface TransactionCallable { + /** This method is invoked with a fresh transaction on the connection. */ + T run(Connection transaction); + } + + /** + * Runs the given callable in a transaction. The transaction type is determined by the current + * state of the connection. That is; if the connection is in read/write mode, the transaction type + * will be a read/write transaction. If the connection is in read-only mode, it will be a + * read-only transaction. The transaction will automatically be retried if it is aborted by + * Spanner. + */ + T runTransaction(TransactionCallable callable); + /** Returns the current savepoint support for this connection. */ SavepointSupport getSavepointSupport(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index 2d7c917d230..5ea249ee0ac 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -194,6 +194,11 @@ private LeakedConnectionException() { */ private final ConnectionOptions options; + enum Caller { + APPLICATION, + TRANSACTION_RUNNER, + } + /** The supported batch modes. */ enum BatchMode { NONE, @@ -267,6 +272,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) { */ private boolean transactionBeginMarked = false; + /** This field is set to true when a transaction runner is active for this connection. */ + private boolean transactionRunnerActive = false; + private BatchMode batchMode; private UnitOfWorkType unitOfWorkType; private final Stack transactionStack = new Stack<>(); @@ -1164,16 +1172,19 @@ public void onFailure() { @Override public void commit() { - get(commitAsync(CallType.SYNC)); + get(commitAsync(CallType.SYNC, Caller.APPLICATION)); } @Override public ApiFuture commitAsync() { - return commitAsync(CallType.ASYNC); + return commitAsync(CallType.ASYNC, Caller.APPLICATION); } - private ApiFuture commitAsync(CallType callType) { + ApiFuture commitAsync(CallType callType, Caller caller) { ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); + ConnectionPreconditions.checkState( + !transactionRunnerActive || caller == Caller.TRANSACTION_RUNNER, + "Cannot call commit when a transaction runner is active"); maybeAutoCommitOrFlushCurrentUnitOfWork(COMMIT_STATEMENT.getType(), COMMIT_STATEMENT); return endCurrentTransactionAsync(callType, commit, COMMIT_STATEMENT); } @@ -1201,16 +1212,19 @@ public void onFailure() { @Override public void rollback() { - get(rollbackAsync(CallType.SYNC)); + get(rollbackAsync(CallType.SYNC, Caller.APPLICATION)); } @Override public ApiFuture rollbackAsync() { - return rollbackAsync(CallType.ASYNC); + return rollbackAsync(CallType.ASYNC, Caller.APPLICATION); } - private ApiFuture rollbackAsync(CallType callType) { + ApiFuture rollbackAsync(CallType callType, Caller caller) { ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); + ConnectionPreconditions.checkState( + !transactionRunnerActive || caller == Caller.TRANSACTION_RUNNER, + "Cannot call rollback when a transaction runner is active"); maybeAutoCommitOrFlushCurrentUnitOfWork(ROLLBACK_STATEMENT.getType(), ROLLBACK_STATEMENT); return endCurrentTransactionAsync(callType, rollback, ROLLBACK_STATEMENT); } @@ -1243,6 +1257,27 @@ private ApiFuture endCurrentTransactionAsync( return res; } + @Override + public T runTransaction(TransactionCallable callable) { + ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); + ConnectionPreconditions.checkState(!isBatchActive(), "Cannot run transaction while in a batch"); + ConnectionPreconditions.checkState( + !isTransactionStarted(), "Cannot run transaction when a transaction is already active"); + ConnectionPreconditions.checkState( + !transactionRunnerActive, "A transaction runner is already active for this connection"); + this.transactionRunnerActive = true; + try { + return new TransactionRunnerImpl(this).run(callable); + } finally { + this.transactionRunnerActive = false; + } + } + + void resetForRetry(UnitOfWork retryUnitOfWork) { + retryUnitOfWork.resetForRetry(); + this.currentUnitOfWork = retryUnitOfWork; + } + @Override public SavepointSupport getSavepointSupport() { return getConnectionPropertyValue(SAVEPOINT_SUPPORT); @@ -2000,7 +2035,7 @@ private UnitOfWork maybeStartAutoDmlBatch(UnitOfWork transaction) { return transaction; } - private UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() { + UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() { return getCurrentUnitOfWorkOrStartNewUnitOfWork( StatementType.UNKNOWN, /* parsedStatement = */ null, /* internalMetadataQuery = */ false); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index 4ae0ae00608..1f6ab6bf0c6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -1261,6 +1261,11 @@ private ApiFuture rollbackAsync(CallType callType, boolean updateStatusAnd } } + @Override + public void resetForRetry() { + txContextFuture = ApiFutures.immediateFuture(txManager.resetForRetry()); + } + @Override String getUnitOfWorkName() { return "read/write transaction"; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java new file mode 100644 index 00000000000..6c959d3e5f9 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/TransactionRunnerImpl.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.connection; + +import static com.google.cloud.spanner.SpannerApiFutures.get; + +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.connection.Connection.TransactionCallable; +import com.google.cloud.spanner.connection.ConnectionImpl.Caller; +import com.google.cloud.spanner.connection.UnitOfWork.CallType; + +class TransactionRunnerImpl { + private final ConnectionImpl connection; + + TransactionRunnerImpl(ConnectionImpl connection) { + this.connection = connection; + } + + T run(TransactionCallable callable) { + connection.beginTransaction(); + // Disable internal retries during this transaction. + connection.setRetryAbortsInternally(/* retryAbortsInternally = */ false, /* local = */ true); + UnitOfWork transaction = connection.getCurrentUnitOfWorkOrStartNewUnitOfWork(); + while (true) { + try { + T result = callable.run(connection); + get(connection.commitAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER)); + return result; + } catch (AbortedException abortedException) { + try { + //noinspection BusyWait + Thread.sleep(abortedException.getRetryDelayInMillis()); + connection.resetForRetry(transaction); + } catch (InterruptedException interruptedException) { + connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER); + throw SpannerExceptionFactory.propagateInterrupt(interruptedException); + } catch (Throwable t) { + connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER); + throw t; + } + } catch (Throwable t) { + connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER); + throw t; + } + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java index ffa93d486e1..80981922225 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java @@ -125,6 +125,10 @@ interface EndTransactionCallback { ApiFuture rollbackAsync( @Nonnull CallType callType, @Nonnull EndTransactionCallback callback); + default void resetForRetry() { + throw new UnsupportedOperationException(); + } + /** @see Connection#savepoint(String) */ void savepoint(@Nonnull String name, @Nonnull Dialect dialect); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java new file mode 100644 index 00000000000..91662ef8668 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java @@ -0,0 +1,226 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.connection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.RollbackRequest; +import io.grpc.Status; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RunTransactionMockServerTest extends AbstractMockServerTest { + + @Test + public void testRunTransaction() { + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + return null; + }); + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionInAutoCommit() { + try (Connection connection = createConnection()) { + connection.setAutocommit(true); + + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + return null; + }); + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionInReadOnly() { + try (Connection connection = createConnection()) { + connection.setReadOnly(true); + connection.setAutocommit(false); + + assertEquals( + RANDOM_RESULT_SET_ROW_COUNT, + connection + .runTransaction( + transaction -> { + int rows = 0; + try (ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + while (resultSet.next()) { + rows++; + } + } + return rows; + }) + .intValue()); + } + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } + + @Test + public void testRunTransaction_rollbacksAfterException() { + try (Connection connection = createConnection()) { + SpannerException exception = + assertThrows( + SpannerException.class, + () -> + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofException( + Status.INVALID_ARGUMENT + .withDescription("invalid statement") + .asRuntimeException())); + // This statement will fail. + transaction.executeUpdate(INSERT_STATEMENT); + return null; + })); + assertEquals(ErrorCode.INVALID_ARGUMENT, exception.getErrorCode()); + assertTrue(exception.getMessage(), exception.getMessage().endsWith("invalid statement")); + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } + + @Test + public void testRunTransactionCommitAborted() { + final AtomicInteger attempts = new AtomicInteger(); + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + if (attempts.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } + return null; + }); + } + assertEquals(2, attempts.get()); + assertEquals(4, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionDmlAborted() { + final AtomicInteger attempts = new AtomicInteger(); + try (Connection connection = createConnection()) { + assertTrue(connection.isRetryAbortsInternally()); + connection.runTransaction( + transaction -> { + assertFalse(transaction.isRetryAbortsInternally()); + if (attempts.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + return null; + }); + assertTrue(connection.isRetryAbortsInternally()); + } + assertEquals(2, attempts.get()); + assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRunTransactionQueryAborted() { + final AtomicInteger attempts = new AtomicInteger(); + try (Connection connection = createConnection()) { + int rowCount = + connection.runTransaction( + transaction -> { + if (attempts.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } + int rows = 0; + try (ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + while (resultSet.next()) { + rows++; + } + } + return rows; + }); + assertEquals(RANDOM_RESULT_SET_ROW_COUNT, rowCount); + } + assertEquals(2, attempts.get()); + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testCommitInRunTransaction() { + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + SpannerException exception = assertThrows(SpannerException.class, transaction::commit); + assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); + assertEquals( + "FAILED_PRECONDITION: Cannot call commit when a transaction runner is active", + exception.getMessage()); + return null; + }); + } + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testRollbackInRunTransaction() { + try (Connection connection = createConnection()) { + connection.runTransaction( + transaction -> { + assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT)); + SpannerException exception = + assertThrows(SpannerException.class, transaction::rollback); + assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); + assertEquals( + "FAILED_PRECONDITION: Cannot call rollback when a transaction runner is active", + exception.getMessage()); + return null; + }); + } + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + assertEquals(0, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } +} From 6225efae42c0c5aaeba147f77a7029d6a20707b8 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Mon, 6 Jan 2025 12:49:52 +0530 Subject: [PATCH 4/4] chore(main): release 6.84.0 (#3556) * chore(main): release 6.84.0 * chore: generate libraries at Mon Jan 6 04:03:16 UTC 2025 --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: cloud-java-bot --- CHANGELOG.md | 23 +++++++++++++++++++ README.md | 6 ++--- benchmarks/pom.xml | 2 +- google-cloud-spanner-bom/pom.xml | 18 +++++++-------- google-cloud-spanner-executor/pom.xml | 4 ++-- google-cloud-spanner/pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- grpc-google-cloud-spanner-executor-v1/pom.xml | 4 ++-- grpc-google-cloud-spanner-v1/pom.xml | 4 ++-- pom.xml | 20 ++++++++-------- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- proto-google-cloud-spanner-v1/pom.xml | 4 ++-- samples/snapshot/pom.xml | 2 +- versions.txt | 20 ++++++++-------- 17 files changed, 77 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa465a19065..456e9bafe4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Changelog +## [6.84.0](https://github.com/googleapis/java-spanner/compare/v6.83.0...v6.84.0) (2025-01-06) + + +### Features + +* Add support for ARRAY<STRUCT> to CloudCilentExecutor ([#3544](https://github.com/googleapis/java-spanner/issues/3544)) ([6cbaf7e](https://github.com/googleapis/java-spanner/commit/6cbaf7ec6502d04fc0a0c09720e2054bd10bead9)) +* Add transaction runner for connections ([#3559](https://github.com/googleapis/java-spanner/issues/3559)) ([5a1be3d](https://github.com/googleapis/java-spanner/commit/5a1be3dedeafa6858502eadc7918820b9cd90f68)) +* Exposing InstanceType in Instance configuration (to define PROVISIONED or FREE spanner instance) ([8d295c4](https://github.com/googleapis/java-spanner/commit/8d295c4a4030b4e97b1d653cc3baf412864f3042)) +* Improve tracing by adding attributes ([#3576](https://github.com/googleapis/java-spanner/issues/3576)) ([eee333b](https://github.com/googleapis/java-spanner/commit/eee333b51fa69123e011dfbd2a0896fd31ac10dc)) +* **spanner:** Add jdbc support for external hosts ([#3536](https://github.com/googleapis/java-spanner/issues/3536)) ([801346a](https://github.com/googleapis/java-spanner/commit/801346a1b2efe7d0144f7442e1568eb5b02ddcbc)) + + +### Bug Fixes + +* AsyncTransactionManager did not always close the session ([#3580](https://github.com/googleapis/java-spanner/issues/3580)) ([d9813a0](https://github.com/googleapis/java-spanner/commit/d9813a05240b966f444168d3b8c30da9d27a8cc4)) +* Retry specific internal errors ([#3565](https://github.com/googleapis/java-spanner/issues/3565)) ([b9ce1a6](https://github.com/googleapis/java-spanner/commit/b9ce1a6fcbd11373a5cc82807af15c1cca0dd48e)) +* Update max_in_use_session at 10 mins interval ([#3570](https://github.com/googleapis/java-spanner/issues/3570)) ([cc1753d](https://github.com/googleapis/java-spanner/commit/cc1753da72b3e508f8fea8a6d19e1ed3f34e3602)) + + +### Dependencies + +* Update opentelemetry.version to v1.45.0 ([#3531](https://github.com/googleapis/java-spanner/issues/3531)) ([78c82ed](https://github.com/googleapis/java-spanner/commit/78c82edb4fcc4a5a9a372225ca429038c3b34955)) + ## [6.83.0](https://github.com/googleapis/java-spanner/compare/v6.82.0...v6.83.0) (2024-12-13) diff --git a/README.md b/README.md index 2d74ad3c5d5..a64487c221e 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-spanner' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-spanner:6.83.0' +implementation 'com.google.cloud:google-cloud-spanner:6.84.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.83.0" +libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.84.0" ``` ## Authentication @@ -725,7 +725,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.83.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.84.0 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 31ad076c283..27f32c0e9c8 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -24,7 +24,7 @@ com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/google-cloud-spanner-bom/pom.xml b/google-cloud-spanner-bom/pom.xml index d8f035ea2fb..0416dc94151 100644 --- a/google-cloud-spanner-bom/pom.xml +++ b/google-cloud-spanner-bom/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-spanner-bom - 6.83.1-SNAPSHOT + 6.84.0 pom com.google.cloud @@ -53,43 +53,43 @@ com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 com.google.cloud google-cloud-spanner test-jar - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index dd43ae439e6..07cb7db2abc 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -5,14 +5,14 @@ 4.0.0 com.google.cloud google-cloud-spanner-executor - 6.83.1-SNAPSHOT + 6.84.0 jar Google Cloud Spanner Executor com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index c5128cc19f7..2e93e6e5227 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 jar Google Cloud Spanner https://github.com/googleapis/java-spanner @@ -11,7 +11,7 @@ com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 google-cloud-spanner diff --git a/grpc-google-cloud-spanner-admin-database-v1/pom.xml b/grpc-google-cloud-spanner-admin-database-v1/pom.xml index 2ec9ebce7b9..79a7ca08bee 100644 --- a/grpc-google-cloud-spanner-admin-database-v1/pom.xml +++ b/grpc-google-cloud-spanner-admin-database-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-admin-database-v1 GRPC library for grpc-google-cloud-spanner-admin-database-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml index 78a64b26cb1..435cf44ff07 100644 --- a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml +++ b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-admin-instance-v1 GRPC library for grpc-google-cloud-spanner-admin-instance-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/grpc-google-cloud-spanner-executor-v1/pom.xml b/grpc-google-cloud-spanner-executor-v1/pom.xml index de84666cd41..c0bc1b9b837 100644 --- a/grpc-google-cloud-spanner-executor-v1/pom.xml +++ b/grpc-google-cloud-spanner-executor-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-executor-v1 GRPC library for google-cloud-spanner com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/grpc-google-cloud-spanner-v1/pom.xml b/grpc-google-cloud-spanner-v1/pom.xml index 294952b7ecb..722150cf6c7 100644 --- a/grpc-google-cloud-spanner-v1/pom.xml +++ b/grpc-google-cloud-spanner-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 grpc-google-cloud-spanner-v1 GRPC library for grpc-google-cloud-spanner-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/pom.xml b/pom.xml index 46157f0f1ed..9945c18137e 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-spanner-parent pom - 6.83.1-SNAPSHOT + 6.84.0 Google Cloud Spanner Parent https://github.com/googleapis/java-spanner @@ -61,47 +61,47 @@ com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-admin-database-v1/pom.xml b/proto-google-cloud-spanner-admin-database-v1/pom.xml index af791549cdb..b23496331a3 100644 --- a/proto-google-cloud-spanner-admin-database-v1/pom.xml +++ b/proto-google-cloud-spanner-admin-database-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-admin-database-v1 PROTO library for proto-google-cloud-spanner-admin-database-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-admin-instance-v1/pom.xml b/proto-google-cloud-spanner-admin-instance-v1/pom.xml index 96dfaf95ad2..76ccb0cf4b1 100644 --- a/proto-google-cloud-spanner-admin-instance-v1/pom.xml +++ b/proto-google-cloud-spanner-admin-instance-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-admin-instance-v1 PROTO library for proto-google-cloud-spanner-admin-instance-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-executor-v1/pom.xml b/proto-google-cloud-spanner-executor-v1/pom.xml index c37eb525cc6..301f105837f 100644 --- a/proto-google-cloud-spanner-executor-v1/pom.xml +++ b/proto-google-cloud-spanner-executor-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-executor-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-executor-v1 Proto library for google-cloud-spanner com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/proto-google-cloud-spanner-v1/pom.xml b/proto-google-cloud-spanner-v1/pom.xml index 16495a9b5c4..f42ee2b3e90 100644 --- a/proto-google-cloud-spanner-v1/pom.xml +++ b/proto-google-cloud-spanner-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.83.1-SNAPSHOT + 6.84.0 proto-google-cloud-spanner-v1 PROTO library for proto-google-cloud-spanner-v1 com.google.cloud google-cloud-spanner-parent - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 772e830d5ef..f44821f2838 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -32,7 +32,7 @@ com.google.cloud google-cloud-spanner - 6.83.1-SNAPSHOT + 6.84.0 diff --git a/versions.txt b/versions.txt index 371f87c50c2..52488f51531 100644 --- a/versions.txt +++ b/versions.txt @@ -1,13 +1,13 @@ # Format: # module:released-version:current-version -proto-google-cloud-spanner-admin-instance-v1:6.83.0:6.83.1-SNAPSHOT -proto-google-cloud-spanner-v1:6.83.0:6.83.1-SNAPSHOT -proto-google-cloud-spanner-admin-database-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-admin-instance-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-admin-database-v1:6.83.0:6.83.1-SNAPSHOT -google-cloud-spanner:6.83.0:6.83.1-SNAPSHOT -google-cloud-spanner-executor:6.83.0:6.83.1-SNAPSHOT -proto-google-cloud-spanner-executor-v1:6.83.0:6.83.1-SNAPSHOT -grpc-google-cloud-spanner-executor-v1:6.83.0:6.83.1-SNAPSHOT +proto-google-cloud-spanner-admin-instance-v1:6.84.0:6.84.0 +proto-google-cloud-spanner-v1:6.84.0:6.84.0 +proto-google-cloud-spanner-admin-database-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-admin-instance-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-admin-database-v1:6.84.0:6.84.0 +google-cloud-spanner:6.84.0:6.84.0 +google-cloud-spanner-executor:6.84.0:6.84.0 +proto-google-cloud-spanner-executor-v1:6.84.0:6.84.0 +grpc-google-cloud-spanner-executor-v1:6.84.0:6.84.0