Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should perform lazy recovery in implicit pre-read #1476

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Selection;
import com.scalar.db.common.AbstractDistributedTransaction;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CommitConflictException;
Expand Down Expand Up @@ -71,7 +70,7 @@ public Optional<Result> get(Get get) throws CrudException {
try {
return crud.get(get);
} catch (UncommittedRecordException e) {
lazyRecovery(get, e.getResults());
lazyRecovery(e);
throw e;
}
}
Expand All @@ -82,7 +81,7 @@ public List<Result> scan(Scan scan) throws CrudException {
try {
return crud.scan(scan);
} catch (UncommittedRecordException e) {
lazyRecovery(scan, e.getResults());
lazyRecovery(e);
throw e;
}
}
Expand Down Expand Up @@ -135,6 +134,9 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
try {
crud.readIfImplicitPreReadEnabled();
} catch (CrudConflictException e) {
if (e instanceof UncommittedRecordException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Or, explicitly catching UncommittedRecordException is also an option?

    // Execute implicit pre-read
    try {
      crud.readIfImplicitPreReadEnabled();
    } catch (UncommittedRecordException e) {
      lazyRecovery(e);
      throw new CommitConflictException("Read uncommitted record while implicit pre-read", e, getId());
    } catch (CrudConflictException e) {
      throw new CommitConflictException("Conflict occurred while implicit pre-read", e, getId());
    } catch (CrudException e) {
      throw new CommitException("Failed to execute implicit pre-read", e, getId());
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's an option. I didn't do that because I didn't think we needed to change the error message base on the cause, as we pass the cause exception to CommitConflictException.

throw new CommitConflictException("Conflict occurred while implicit pre-read", e /* the cause exception */, getId());

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

lazyRecovery((UncommittedRecordException) e);
}
throw new CommitConflictException("Conflict occurred while implicit pre-read", e, getId());
} catch (CrudException e) {
throw new CommitException("Failed to execute implicit pre-read", e, getId());
Expand Down Expand Up @@ -168,10 +170,10 @@ void setBeforeRecoveryHook(Runnable beforeRecoveryHook) {
this.beforeRecoveryHook = beforeRecoveryHook;
}

private void lazyRecovery(Selection selection, List<TransactionResult> results) {
logger.debug("Recover uncommitted records: {}", results);
private void lazyRecovery(UncommittedRecordException e) {
logger.debug("Recover uncommitted records: {}", e.getResults());
beforeRecoveryHook.run();
results.forEach(r -> recovery.recover(selection, r));
e.getResults().forEach(r -> recovery.recover(e.getSelection(), r));
}

private void checkMutation(Mutation mutation) throws CrudException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void read(Snapshot.Key key, Get get) throws CrudException {
return;
}
throw new UncommittedRecordException(
result.get(), "This record needs recovery", snapshot.getId());
get, result.get(), "This record needs recovery", snapshot.getId());
}

private Optional<Result> createGetResult(Snapshot.Key key, List<String> projections)
Expand Down Expand Up @@ -136,7 +136,7 @@ private List<Result> scanInternal(Scan scan) throws CrudException {
TransactionResult result = new TransactionResult(r);
if (!result.isCommitted()) {
throw new UncommittedRecordException(
result, "The record needs recovery", snapshot.getId());
scan, result, "The record needs recovery", snapshot.getId());
}

Snapshot.Key key = new Snapshot.Key(scan, r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TransactionState;
import com.scalar.db.common.AbstractTwoPhaseCommitTransaction;
import com.scalar.db.exception.storage.ExecutionException;
Expand Down Expand Up @@ -65,7 +64,7 @@ public Optional<Result> get(Get get) throws CrudException {
try {
return crud.get(get);
} catch (UncommittedRecordException e) {
lazyRecovery(get, e.getResults());
lazyRecovery(e);
throw e;
}
}
Expand All @@ -76,7 +75,7 @@ public List<Result> scan(Scan scan) throws CrudException {
try {
return crud.scan(scan);
} catch (UncommittedRecordException e) {
lazyRecovery(scan, e.getResults());
lazyRecovery(e);
throw e;
}
}
Expand Down Expand Up @@ -137,6 +136,9 @@ public void prepare() throws PreparationException {
try {
crud.readIfImplicitPreReadEnabled();
} catch (CrudConflictException e) {
if (e instanceof UncommittedRecordException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Same as above

lazyRecovery((UncommittedRecordException) e);
}
throw new PreparationConflictException(
"Conflict occurred while implicit pre-read", e, getId());
} catch (CrudException e) {
Expand Down Expand Up @@ -216,10 +218,10 @@ void setBeforeRecoveryHook(Runnable beforeRecoveryHook) {
this.beforeRecoveryHook = beforeRecoveryHook;
}

private void lazyRecovery(Selection selection, List<TransactionResult> results) {
logger.debug("Recover uncommitted records: {}", results);
private void lazyRecovery(UncommittedRecordException e) {
logger.debug("Recover uncommitted records: {}", e.getResults());
beforeRecoveryHook.run();
results.forEach(r -> recovery.recover(selection, r));
e.getResults().forEach(r -> recovery.recover(e.getSelection(), r));
}

private void checkMutation(Mutation mutation) throws CrudException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,30 @@
package com.scalar.db.transaction.consensuscommit;

import com.google.common.collect.ImmutableList;
import com.scalar.db.api.Selection;
import com.scalar.db.exception.transaction.CrudConflictException;
import java.util.ArrayList;
import java.util.Collections;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;

public class UncommittedRecordException extends CrudConflictException {
private final List<TransactionResult> results;

public UncommittedRecordException(
TransactionResult result, String message, String transactionId) {
this(result, message, null, transactionId);
}

public UncommittedRecordException(
TransactionResult result, String message, Throwable cause, String transactionId) {
super(message, cause, transactionId);
results = Collections.singletonList(result);
}
private final Selection selection;
private final List<TransactionResult> results;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public UncommittedRecordException(
List<TransactionResult> results, String message, String transactionId) {
this(results, message, null, transactionId);
Selection selection, TransactionResult result, String message, String transactionId) {
super(message, transactionId);
this.selection = selection;
results = ImmutableList.of(result);
}

public UncommittedRecordException(
List<TransactionResult> results, String message, Throwable cause, String transactionId) {
super(message, cause, transactionId);
this.results = new ArrayList<>();
this.results.addAll(results);
@SuppressFBWarnings("EI_EXPOSE_REP")
public Selection getSelection() {
return selection;
}

public List<TransactionResult> getResults() {
return ImmutableList.copyOf(results);
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void get_GetForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudEx
TransactionResult result = mock(TransactionResult.class);
UncommittedRecordException toThrow = mock(UncommittedRecordException.class);
when(crud.get(get)).thenThrow(toThrow);
when(toThrow.getSelection()).thenReturn(get);
when(toThrow.getResults()).thenReturn(Collections.singletonList(result));

// Act Assert
Expand All @@ -139,6 +140,22 @@ public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException {
verify(crud).scan(scan);
}

@Test
public void scan_ScanForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudException {
// Arrange
Scan scan = prepareScan();
TransactionResult result = mock(TransactionResult.class);
UncommittedRecordException toThrow = mock(UncommittedRecordException.class);
when(crud.scan(scan)).thenThrow(toThrow);
when(toThrow.getSelection()).thenReturn(scan);
when(toThrow.getResults()).thenReturn(Collections.singletonList(result));

// Act Assert
assertThatThrownBy(() -> consensus.scan(scan)).isInstanceOf(UncommittedRecordException.class);

verify(recovery).recover(scan, result);
}

@Test
public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException {
// Arrange
Expand Down Expand Up @@ -244,6 +261,28 @@ public void commit_ProcessedCrudGiven_ShouldCommitWithSnapshot()
assertThatThrownBy(() -> consensus.commit()).isInstanceOf(CommitConflictException.class);
}

@Test
public void
commit_ProcessedCrudGiven_UncommittedRecordExceptionThrownWhileImplicitPreRead_ShouldPerformLazyRecoveryAndThrowCommitConflictException()
throws CrudException {
// Arrange
when(crud.getSnapshot()).thenReturn(snapshot);

Get get = mock(Get.class);
TransactionResult result = mock(TransactionResult.class);

UncommittedRecordException uncommittedRecordException = mock(UncommittedRecordException.class);
when(uncommittedRecordException.getSelection()).thenReturn(get);
when(uncommittedRecordException.getResults()).thenReturn(Collections.singletonList(result));

doThrow(uncommittedRecordException).when(crud).readIfImplicitPreReadEnabled();

// Act Assert
assertThatThrownBy(() -> consensus.commit()).isInstanceOf(CommitConflictException.class);

verify(recovery).recover(get, result);
}

@Test
public void
commit_ProcessedCrudGiven_CrudExceptionThrownWhileImplicitPreRead_ShouldThrowCommitException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,24 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept
throws CrudException, ExecutionException {
// Arrange
Get get = prepareGet();
Optional<Result> expected = Optional.of(prepareResult(TransactionState.PREPARED));
result = prepareResult(TransactionState.PREPARED);
Optional<Result> expected = Optional.of(result);
when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty());
when(storage.get(get)).thenReturn(expected);

// Act Assert
assertThatThrownBy(() -> handler.get(get)).isInstanceOf(UncommittedRecordException.class);
assertThatThrownBy(() -> handler.get(get))
.isInstanceOf(UncommittedRecordException.class)
.satisfies(
e -> {
UncommittedRecordException exception = (UncommittedRecordException) e;
assertThat(exception.getSelection()).isEqualTo(get);
assertThat(exception.getResults().size()).isEqualTo(1);
assertThat(exception.getResults().get(0)).isEqualTo(result);
});

verify(snapshot, never())
.put(any(Snapshot.Key.class), ArgumentMatchers.<Optional<TransactionResult>>any());
}

@Test
Expand Down Expand Up @@ -260,10 +272,17 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn()
when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator());
when(storage.scan(scan)).thenReturn(scanner);

// Act
assertThatThrownBy(() -> handler.scan(scan)).isInstanceOf(UncommittedRecordException.class);
// Act Assert
assertThatThrownBy(() -> handler.scan(scan))
.isInstanceOf(UncommittedRecordException.class)
.satisfies(
e -> {
UncommittedRecordException exception = (UncommittedRecordException) e;
assertThat(exception.getSelection()).isEqualTo(scan);
assertThat(exception.getResults().size()).isEqualTo(1);
assertThat(exception.getResults().get(0)).isEqualTo(result);
});

// Assert
verify(snapshot, never())
.put(any(Snapshot.Key.class), ArgumentMatchers.<Optional<TransactionResult>>any());
}
Expand Down Expand Up @@ -475,10 +494,17 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe
when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator());
when(storage.scan(any(ScanAll.class))).thenReturn(scanner);

// Act
assertThatThrownBy(() -> handler.scan(scan)).isInstanceOf(UncommittedRecordException.class);
// Act Assert
assertThatThrownBy(() -> handler.scan(scan))
.isInstanceOf(UncommittedRecordException.class)
.satisfies(
e -> {
UncommittedRecordException exception = (UncommittedRecordException) e;
assertThat(exception.getSelection()).isEqualTo(scan);
assertThat(exception.getResults().size()).isEqualTo(1);
assertThat(exception.getResults().get(0)).isEqualTo(result);
});

// Assert
verify(snapshot, never())
.put(any(Snapshot.Key.class), ArgumentMatchers.<Optional<TransactionResult>>any());
verify(snapshot, never()).verify(any());
Expand Down Expand Up @@ -839,7 +865,14 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods()

// Act Assert
assertThatThrownBy(() -> handler.readUnread(key, getForKey))
.isInstanceOf(UncommittedRecordException.class);
.isInstanceOf(UncommittedRecordException.class)
.satisfies(
e -> {
UncommittedRecordException exception = (UncommittedRecordException) e;
assertThat(exception.getSelection()).isEqualTo(getForKey);
assertThat(exception.getResults().size()).isEqualTo(1);
assertThat(exception.getResults().get(0)).isEqualTo(result);
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void get_GetForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudEx
UncommittedRecordException toThrow = mock(UncommittedRecordException.class);
when(crud.get(get)).thenThrow(toThrow);
when(crud.getSnapshot()).thenReturn(snapshot);
when(toThrow.getSelection()).thenReturn(get);
when(toThrow.getResults()).thenReturn(Collections.singletonList(result));

// Act
Expand All @@ -140,6 +141,22 @@ public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException {
verify(crud).scan(scan);
}

@Test
public void scan_ScanForUncommittedRecordGiven_ShouldRecoverRecord() throws CrudException {
// Arrange
Scan scan = prepareScan();
TransactionResult result = mock(TransactionResult.class);
UncommittedRecordException toThrow = mock(UncommittedRecordException.class);
when(crud.scan(scan)).thenThrow(toThrow);
when(toThrow.getSelection()).thenReturn(scan);
when(toThrow.getResults()).thenReturn(Collections.singletonList(result));

// Act Assert
assertThatThrownBy(() -> transaction.scan(scan)).isInstanceOf(UncommittedRecordException.class);

verify(recovery).recover(scan, result);
}

@Test
public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException {
// Arrange
Expand Down Expand Up @@ -243,6 +260,28 @@ public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot()
assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationConflictException.class);
}

@Test
public void
prepare_ProcessedCrudGiven_UncommittedRecordExceptionThrownWhileImplicitPreRead_ShouldPerformLazyRecoveryAndThrowPreparationConflictException()
throws CrudException {
// Arrange
when(crud.getSnapshot()).thenReturn(snapshot);

Get get = mock(Get.class);
TransactionResult result = mock(TransactionResult.class);

UncommittedRecordException uncommittedRecordException = mock(UncommittedRecordException.class);
when(uncommittedRecordException.getSelection()).thenReturn(get);
when(uncommittedRecordException.getResults()).thenReturn(Collections.singletonList(result));

doThrow(uncommittedRecordException).when(crud).readIfImplicitPreReadEnabled();

// Act Assert
assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationConflictException.class);

verify(recovery).recover(get, result);
}

@Test
public void
prepare_ProcessedCrudGiven_CrudExceptionThrownWhileImplicitPreRead_ShouldThrowPreparationException()
Expand Down