From 0bfd64fbdee9bca5d55651e73b9a9bce945d5427 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Tue, 30 Jan 2024 11:58:47 +0900 Subject: [PATCH 1/2] Should perform lazy recovery in implicit pre-read --- .../consensuscommit/ConsensusCommit.java | 14 ++--- .../consensuscommit/CrudHandler.java | 4 +- .../TwoPhaseConsensusCommit.java | 14 ++--- .../UncommittedRecordException.java | 36 ++++++------- .../consensuscommit/ConsensusCommitTest.java | 39 ++++++++++++++ .../consensuscommit/CrudHandlerTest.java | 51 +++++++++++++++---- .../TwoPhaseConsensusCommitTest.java | 39 ++++++++++++++ 7 files changed, 156 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java index 2c89dc1b6a..0ae76d0eb0 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java @@ -10,7 +10,6 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; -import com.scalar.db.api.Selection; import com.scalar.db.common.AbstractDistributedTransaction; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CommitConflictException; @@ -71,7 +70,7 @@ public Optional get(Get get) throws CrudException { try { return crud.get(get); } catch (UncommittedRecordException e) { - lazyRecovery(get, e.getResults()); + lazyRecovery(e); throw e; } } @@ -82,7 +81,7 @@ public List scan(Scan scan) throws CrudException { try { return crud.scan(scan); } catch (UncommittedRecordException e) { - lazyRecovery(scan, e.getResults()); + lazyRecovery(e); throw e; } } @@ -135,6 +134,9 @@ public void commit() throws CommitException, UnknownTransactionStatusException { try { crud.readIfImplicitPreReadEnabled(); } catch (CrudConflictException e) { + if (e instanceof UncommittedRecordException) { + lazyRecovery((UncommittedRecordException) e); + } throw new CommitConflictException("Conflict occurred while implicit pre-read", e, getId()); } catch (CrudException e) { throw new CommitException("Failed to execute implicit pre-read", e, getId()); @@ -168,10 +170,10 @@ void setBeforeRecoveryHook(Runnable beforeRecoveryHook) { this.beforeRecoveryHook = beforeRecoveryHook; } - private void lazyRecovery(Selection selection, List results) { - logger.debug("Recover uncommitted records: {}", results); + private void lazyRecovery(UncommittedRecordException e) { + logger.debug("Recover uncommitted records: {}", e.getResults()); beforeRecoveryHook.run(); - results.forEach(r -> recovery.recover(selection, r)); + e.getResults().forEach(r -> recovery.recover(e.getSelection(), r)); } private void checkMutation(Mutation mutation) throws CrudException { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 0326313cd2..5fb651868c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -89,7 +89,7 @@ private void read(Snapshot.Key key, Get get) throws CrudException { return; } throw new UncommittedRecordException( - result.get(), "This record needs recovery", snapshot.getId()); + get, result.get(), "This record needs recovery", snapshot.getId()); } private Optional createGetResult(Snapshot.Key key, List projections) @@ -136,7 +136,7 @@ private List scanInternal(Scan scan) throws CrudException { TransactionResult result = new TransactionResult(r); if (!result.isCommitted()) { throw new UncommittedRecordException( - result, "The record needs recovery", snapshot.getId()); + scan, result, "The record needs recovery", snapshot.getId()); } Snapshot.Key key = new Snapshot.Key(scan, r); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java index 0727faabdb..e46e14d1ee 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java @@ -9,7 +9,6 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; -import com.scalar.db.api.Selection; import com.scalar.db.api.TransactionState; import com.scalar.db.common.AbstractTwoPhaseCommitTransaction; import com.scalar.db.exception.storage.ExecutionException; @@ -65,7 +64,7 @@ public Optional get(Get get) throws CrudException { try { return crud.get(get); } catch (UncommittedRecordException e) { - lazyRecovery(get, e.getResults()); + lazyRecovery(e); throw e; } } @@ -76,7 +75,7 @@ public List scan(Scan scan) throws CrudException { try { return crud.scan(scan); } catch (UncommittedRecordException e) { - lazyRecovery(scan, e.getResults()); + lazyRecovery(e); throw e; } } @@ -137,6 +136,9 @@ public void prepare() throws PreparationException { try { crud.readIfImplicitPreReadEnabled(); } catch (CrudConflictException e) { + if (e instanceof UncommittedRecordException) { + lazyRecovery((UncommittedRecordException) e); + } throw new PreparationConflictException( "Conflict occurred while implicit pre-read", e, getId()); } catch (CrudException e) { @@ -216,10 +218,10 @@ void setBeforeRecoveryHook(Runnable beforeRecoveryHook) { this.beforeRecoveryHook = beforeRecoveryHook; } - private void lazyRecovery(Selection selection, List results) { - logger.debug("Recover uncommitted records: {}", results); + private void lazyRecovery(UncommittedRecordException e) { + logger.debug("Recover uncommitted records: {}", e.getResults()); beforeRecoveryHook.run(); - results.forEach(r -> recovery.recover(selection, r)); + e.getResults().forEach(r -> recovery.recover(e.getSelection(), r)); } private void checkMutation(Mutation mutation) throws CrudException { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java index b0db280a53..4f492caa89 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java @@ -1,38 +1,38 @@ package com.scalar.db.transaction.consensuscommit; import com.google.common.collect.ImmutableList; +import com.scalar.db.api.Selection; import com.scalar.db.exception.transaction.CrudConflictException; -import java.util.ArrayList; -import java.util.Collections; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; public class UncommittedRecordException extends CrudConflictException { - private final List results; - public UncommittedRecordException( - TransactionResult result, String message, String transactionId) { - this(result, message, null, transactionId); - } + private final Selection selection; + private final List results; + @SuppressFBWarnings("EI_EXPOSE_REP2") public UncommittedRecordException( - TransactionResult result, String message, Throwable cause, String transactionId) { - super(message, cause, transactionId); - results = Collections.singletonList(result); + Selection selection, TransactionResult result, String message, String transactionId) { + super(message, transactionId); + this.selection = selection; + results = ImmutableList.of(result); } + @SuppressFBWarnings("EI_EXPOSE_REP2") public UncommittedRecordException( - List results, String message, String transactionId) { - this(results, message, null, transactionId); + Selection selection, List results, String message, String transactionId) { + super(message, transactionId); + this.selection = selection; + this.results = ImmutableList.copyOf(results); } - public UncommittedRecordException( - List results, String message, Throwable cause, String transactionId) { - super(message, cause, transactionId); - this.results = new ArrayList<>(); - this.results.addAll(results); + @SuppressFBWarnings("EI_EXPOSE_REP") + public Selection getSelection() { + return selection; } public List getResults() { - return ImmutableList.copyOf(results); + return results; } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java index 6e68a93929..1dad4abbdc 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java @@ -114,6 +114,7 @@ public void get_GetForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudEx TransactionResult result = mock(TransactionResult.class); UncommittedRecordException toThrow = mock(UncommittedRecordException.class); when(crud.get(get)).thenThrow(toThrow); + when(toThrow.getSelection()).thenReturn(get); when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); // Act Assert @@ -139,6 +140,22 @@ public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException { verify(crud).scan(scan); } + @Test + public void scan_ScanForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudException { + // Arrange + Scan scan = prepareScan(); + TransactionResult result = mock(TransactionResult.class); + UncommittedRecordException toThrow = mock(UncommittedRecordException.class); + when(crud.scan(scan)).thenThrow(toThrow); + when(toThrow.getSelection()).thenReturn(scan); + when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); + + // Act Assert + assertThatThrownBy(() -> consensus.scan(scan)).isInstanceOf(UncommittedRecordException.class); + + verify(recovery).recover(scan, result); + } + @Test public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException { // Arrange @@ -244,6 +261,28 @@ public void commit_ProcessedCrudGiven_ShouldCommitWithSnapshot() assertThatThrownBy(() -> consensus.commit()).isInstanceOf(CommitConflictException.class); } + @Test + public void + commit_ProcessedCrudGiven_UncommittedRecordExceptionThrownWhileImplicitPreRead_ShouldPerformLazyRecoveryAndThrowCommitConflictException() + throws CrudException { + // Arrange + when(crud.getSnapshot()).thenReturn(snapshot); + + Get get = mock(Get.class); + TransactionResult result = mock(TransactionResult.class); + + UncommittedRecordException uncommittedRecordException = mock(UncommittedRecordException.class); + when(uncommittedRecordException.getSelection()).thenReturn(get); + when(uncommittedRecordException.getResults()).thenReturn(Collections.singletonList(result)); + + doThrow(uncommittedRecordException).when(crud).readIfImplicitPreReadEnabled(); + + // Act Assert + assertThatThrownBy(() -> consensus.commit()).isInstanceOf(CommitConflictException.class); + + verify(recovery).recover(get, result); + } + @Test public void commit_ProcessedCrudGiven_CrudExceptionThrownWhileImplicitPreRead_ShouldThrowCommitException() diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 00f37b5d6f..1f266a025f 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -188,12 +188,24 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); - Optional expected = Optional.of(prepareResult(TransactionState.PREPARED)); + result = prepareResult(TransactionState.PREPARED); + Optional expected = Optional.of(result); when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty()); when(storage.get(get)).thenReturn(expected); // Act Assert - assertThatThrownBy(() -> handler.get(get)).isInstanceOf(UncommittedRecordException.class); + assertThatThrownBy(() -> handler.get(get)) + .isInstanceOf(UncommittedRecordException.class) + .satisfies( + e -> { + UncommittedRecordException exception = (UncommittedRecordException) e; + assertThat(exception.getSelection()).isEqualTo(get); + assertThat(exception.getResults().size()).isEqualTo(1); + assertThat(exception.getResults().get(0)).isEqualTo(result); + }); + + verify(snapshot, never()) + .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); } @Test @@ -260,10 +272,17 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scan)).thenReturn(scanner); - // Act - assertThatThrownBy(() -> handler.scan(scan)).isInstanceOf(UncommittedRecordException.class); + // Act Assert + assertThatThrownBy(() -> handler.scan(scan)) + .isInstanceOf(UncommittedRecordException.class) + .satisfies( + e -> { + UncommittedRecordException exception = (UncommittedRecordException) e; + assertThat(exception.getSelection()).isEqualTo(scan); + assertThat(exception.getResults().size()).isEqualTo(1); + assertThat(exception.getResults().get(0)).isEqualTo(result); + }); - // Assert verify(snapshot, never()) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); } @@ -475,10 +494,17 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(any(ScanAll.class))).thenReturn(scanner); - // Act - assertThatThrownBy(() -> handler.scan(scan)).isInstanceOf(UncommittedRecordException.class); + // Act Assert + assertThatThrownBy(() -> handler.scan(scan)) + .isInstanceOf(UncommittedRecordException.class) + .satisfies( + e -> { + UncommittedRecordException exception = (UncommittedRecordException) e; + assertThat(exception.getSelection()).isEqualTo(scan); + assertThat(exception.getResults().size()).isEqualTo(1); + assertThat(exception.getResults().get(0)).isEqualTo(result); + }); - // Assert verify(snapshot, never()) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); verify(snapshot, never()).verify(any()); @@ -839,7 +865,14 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() // Act Assert assertThatThrownBy(() -> handler.readUnread(key, getForKey)) - .isInstanceOf(UncommittedRecordException.class); + .isInstanceOf(UncommittedRecordException.class) + .satisfies( + e -> { + UncommittedRecordException exception = (UncommittedRecordException) e; + assertThat(exception.getSelection()).isEqualTo(getForKey); + assertThat(exception.getResults().size()).isEqualTo(1); + assertThat(exception.getResults().get(0)).isEqualTo(result); + }); } @Test diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java index 2728938d3a..827519c413 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java @@ -114,6 +114,7 @@ public void get_GetForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudEx UncommittedRecordException toThrow = mock(UncommittedRecordException.class); when(crud.get(get)).thenThrow(toThrow); when(crud.getSnapshot()).thenReturn(snapshot); + when(toThrow.getSelection()).thenReturn(get); when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); // Act @@ -140,6 +141,22 @@ public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException { verify(crud).scan(scan); } + @Test + public void scan_ScanForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudException { + // Arrange + Scan scan = prepareScan(); + TransactionResult result = mock(TransactionResult.class); + UncommittedRecordException toThrow = mock(UncommittedRecordException.class); + when(crud.scan(scan)).thenThrow(toThrow); + when(toThrow.getSelection()).thenReturn(scan); + when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); + + // Act Assert + assertThatThrownBy(() -> transaction.scan(scan)).isInstanceOf(UncommittedRecordException.class); + + verify(recovery).recover(scan, result); + } + @Test public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException { // Arrange @@ -243,6 +260,28 @@ public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot() assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationConflictException.class); } + @Test + public void + prepare_ProcessedCrudGiven_UncommittedRecordExceptionThrownWhileImplicitPreRead_ShouldPerformLazyRecoveryAndThrowPreparationConflictException() + throws CrudException { + // Arrange + when(crud.getSnapshot()).thenReturn(snapshot); + + Get get = mock(Get.class); + TransactionResult result = mock(TransactionResult.class); + + UncommittedRecordException uncommittedRecordException = mock(UncommittedRecordException.class); + when(uncommittedRecordException.getSelection()).thenReturn(get); + when(uncommittedRecordException.getResults()).thenReturn(Collections.singletonList(result)); + + doThrow(uncommittedRecordException).when(crud).readIfImplicitPreReadEnabled(); + + // Act Assert + assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationConflictException.class); + + verify(recovery).recover(get, result); + } + @Test public void prepare_ProcessedCrudGiven_CrudExceptionThrownWhileImplicitPreRead_ShouldThrowPreparationException() From 5c4d6a7120eb109a81a6a3c7ca8fb897d481e05b Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Wed, 31 Jan 2024 13:29:48 +0900 Subject: [PATCH 2/2] Remove unnecessary constructor of UncommittedRecordException --- .../consensuscommit/UncommittedRecordException.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java index 4f492caa89..1d9d400428 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/UncommittedRecordException.java @@ -19,14 +19,6 @@ public UncommittedRecordException( results = ImmutableList.of(result); } - @SuppressFBWarnings("EI_EXPOSE_REP2") - public UncommittedRecordException( - Selection selection, List results, String message, String transactionId) { - super(message, transactionId); - this.selection = selection; - this.results = ImmutableList.copyOf(results); - } - @SuppressFBWarnings("EI_EXPOSE_REP") public Selection getSelection() { return selection;