Skip to content

Commit

Permalink
Reduce code bloat by using lambdas.
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-andersen committed Nov 21, 2023
1 parent 45e33f7 commit 0a40239
Showing 1 changed file with 84 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -33,7 +31,6 @@
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -107,9 +104,9 @@ ApiFuture<T> run() {

return ApiFutures.catchingAsync(
ApiFutures.transformAsync(
maybeRollback(), new RollbackCallback(), MoreExecutors.directExecutor()),
maybeRollback(), this::rollbackCallback, MoreExecutors.directExecutor()),
Throwable.class,
new RestartTransactionCallback(),
this::restartTransactionCallback,
MoreExecutors.directExecutor());
}

Expand All @@ -120,20 +117,17 @@ private ApiFuture<Void> maybeRollback() {
}

/** A callback that invokes the BeginTransaction callback. */
private class RollbackCallback implements ApiAsyncFunction<Void, T> {
@Override
public ApiFuture<T> apply(Void input) {
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
() -> backoff.set(null),
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);
private ApiFuture<T> rollbackCallback(Void input) {
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
() -> backoff.set(null),
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);

nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
return ApiFutures.transformAsync(
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
}
nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
return ApiFutures.transformAsync(
backoff, TransactionRunner.this::backoffCallback, MoreExecutors.directExecutor());
}

/**
Expand Down Expand Up @@ -169,125 +163,103 @@ public void onSuccess(T result) {
}

/** A callback that invokes the BeginTransaction callback. */
private class BackoffCallback implements ApiAsyncFunction<Void, T> {
@Override
public ApiFuture<T> apply(Void input) {
return ApiFutures.transformAsync(
transaction.begin(), new BeginTransactionCallback(), MoreExecutors.directExecutor());
}
private ApiFuture<T> backoffCallback(Void input) {
return ApiFutures.transformAsync(
transaction.begin(), this::beginTransactionCallback, MoreExecutors.directExecutor());
}

/**
* The callback for the BeginTransaction RPC, which invokes the user callback and handles all
* errors thereafter.
*/
private class BeginTransactionCallback implements ApiAsyncFunction<Void, T> {
public ApiFuture<T> apply(Void ignored) {
return ApiFutures.transformAsync(
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor());
}
private ApiFuture<T> beginTransactionCallback(Void input) {
return ApiFutures.transformAsync(
invokeUserCallback(), this::userFunctionCallback, MoreExecutors.directExecutor());
}

/**
* The callback that is invoked after the user function finishes execution. It invokes the Commit
* RPC.
*/
private class UserFunctionCallback implements ApiAsyncFunction<T, T> {
@Override
public ApiFuture<T> apply(T userFunctionResult) {
return ApiFutures.transform(
transaction.commit(),
new CommitTransactionCallback(userFunctionResult),
MoreExecutors.directExecutor());
}
}

/** The callback that is invoked after the Commit RPC returns. It returns the user result. */
private class CommitTransactionCallback implements ApiFunction<List<WriteResult>, T> {
private final T userFunctionResult;

CommitTransactionCallback(T userFunctionResult) {
this.userFunctionResult = userFunctionResult;
}

@Override
public T apply(List<WriteResult> input) {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
return userFunctionResult;
}
private ApiFuture<T> userFunctionCallback(T userFunctionResult) {
return ApiFutures.transform(
transaction.commit(),
// The callback that is invoked after the Commit RPC returns. It returns the user result.
input -> {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
return userFunctionResult;
},
MoreExecutors.directExecutor());
}

/** A callback that restarts a transaction after an ApiException. It invokes the Rollback RPC. */
private class RestartTransactionCallback implements ApiAsyncFunction<Throwable, T> {
public ApiFuture<T> apply(Throwable throwable) {
if (!(throwable instanceof ApiException)) {
// This is likely a failure in the user callback.
span.setStatus(USER_CALLBACK_FAILED);
return rollbackAndReject(throwable);
}
private ApiFuture<T> restartTransactionCallback(Throwable throwable) {
if (!(throwable instanceof ApiException)) {
// This is likely a failure in the user callback.
span.setStatus(USER_CALLBACK_FAILED);
return rollbackAndReject(throwable);
}

ApiException apiException = (ApiException) throwable;
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
return run();
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
final FirestoreException firestoreException =
FirestoreException.forApiException(
apiException, "Transaction was cancelled because of too many retries.");
return rollbackAndReject(firestoreException);
}
ApiException apiException = (ApiException) throwable;
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
return run();
} else {
span.setStatus(TraceUtil.statusFromApiException(apiException));
span.setStatus(TOO_MANY_RETRIES_STATUS);
final FirestoreException firestoreException =
FirestoreException.forApiException(
apiException, "Transaction failed with non-retryable error");
apiException, "Transaction was cancelled because of too many retries.");
return rollbackAndReject(firestoreException);
}
} else {
span.setStatus(TraceUtil.statusFromApiException(apiException));
final FirestoreException firestoreException =
FirestoreException.forApiException(
apiException, "Transaction failed with non-retryable error");
return rollbackAndReject(firestoreException);
}
}

/** Determines whether the provided error is considered retryable. */
private boolean isRetryableTransactionError(ApiException exception) {
switch (exception.getStatusCode().getCode()) {
// This list is based on
// https://github.com/firebase/firebase-js-sdk/blob/c822e78b00dd3420dcc749beb2f09a947aa4a344/packages/firestore/src/core/transaction_runner.ts#L112
case ABORTED:
case CANCELLED:
case UNKNOWN:
case DEADLINE_EXCEEDED:
case INTERNAL:
case UNAVAILABLE:
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
case INVALID_ARGUMENT:
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
return exception.getMessage().contains("transaction has expired");
default:
return false;
}
/** Determines whether the provided error is considered retryable. */
private static boolean isRetryableTransactionError(ApiException exception) {
switch (exception.getStatusCode().getCode()) {
// This list is based on
// https://github.com/firebase/firebase-js-sdk/blob/c822e78b00dd3420dcc749beb2f09a947aa4a344/packages/firestore/src/core/transaction_runner.ts#L112
case ABORTED:
case CANCELLED:
case UNKNOWN:
case DEADLINE_EXCEEDED:
case INTERNAL:
case UNAVAILABLE:
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
case INVALID_ARGUMENT:
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
return exception.getMessage().contains("transaction has expired");
default:
return false;
}
}
/** Rolls the transaction back and returns the error. */
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();

/** Rolls the transaction back and returns the error. */
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();

if (transaction.hasTransactionId()) {
// We use `addListener()` since we want to return the original exception regardless of
// whether rollback() succeeds.
transaction
.rollback()
.addListener(
() -> failedTransaction.setException(throwable), MoreExecutors.directExecutor());
} else {
failedTransaction.setException(throwable);
}

span.end();
return failedTransaction;
if (transaction.hasTransactionId()) {
// We use `addListener()` since we want to return the original exception regardless of
// whether rollback() succeeds.
transaction
.rollback()
.addListener(
() -> failedTransaction.setException(throwable), MoreExecutors.directExecutor());
} else {
failedTransaction.setException(throwable);
}

span.end();
return failedTransaction;
}
}

0 comments on commit 0a40239

Please sign in to comment.