Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Reduce code bloat by using lambdas. #1484

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -33,7 +31,6 @@
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -107,9 +104,9 @@ ApiFuture<T> run() {

return ApiFutures.catchingAsync(
ApiFutures.transformAsync(
maybeRollback(), new RollbackCallback(), MoreExecutors.directExecutor()),
maybeRollback(), this::rollbackCallback, MoreExecutors.directExecutor()),
Throwable.class,
new RestartTransactionCallback(),
this::restartTransactionCallback,
MoreExecutors.directExecutor());
}

Expand All @@ -120,20 +117,17 @@ private ApiFuture<Void> maybeRollback() {
}

/** A callback that invokes the BeginTransaction callback. */
private class RollbackCallback implements ApiAsyncFunction<Void, T> {
@Override
public ApiFuture<T> apply(Void input) {
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
() -> backoff.set(null),
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);
private ApiFuture<T> rollbackCallback(Void input) {
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
() -> backoff.set(null),
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);

nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
return ApiFutures.transformAsync(
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
}
nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
return ApiFutures.transformAsync(
backoff, TransactionRunner.this::backoffCallback, MoreExecutors.directExecutor());
}

/**
Expand Down Expand Up @@ -169,125 +163,103 @@ public void onSuccess(T result) {
}

/** A callback that invokes the BeginTransaction callback. */
private class BackoffCallback implements ApiAsyncFunction<Void, T> {
@Override
public ApiFuture<T> apply(Void input) {
return ApiFutures.transformAsync(
transaction.begin(), new BeginTransactionCallback(), MoreExecutors.directExecutor());
}
private ApiFuture<T> backoffCallback(Void input) {
return ApiFutures.transformAsync(
transaction.begin(), this::beginTransactionCallback, MoreExecutors.directExecutor());
}

/**
* The callback for the BeginTransaction RPC, which invokes the user callback and handles all
* errors thereafter.
*/
private class BeginTransactionCallback implements ApiAsyncFunction<Void, T> {
public ApiFuture<T> apply(Void ignored) {
return ApiFutures.transformAsync(
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor());
}
private ApiFuture<T> beginTransactionCallback(Void input) {
return ApiFutures.transformAsync(
invokeUserCallback(), this::userFunctionCallback, MoreExecutors.directExecutor());
}

/**
* The callback that is invoked after the user function finishes execution. It invokes the Commit
* RPC.
*/
private class UserFunctionCallback implements ApiAsyncFunction<T, T> {
@Override
public ApiFuture<T> apply(T userFunctionResult) {
return ApiFutures.transform(
transaction.commit(),
new CommitTransactionCallback(userFunctionResult),
MoreExecutors.directExecutor());
}
}

/** The callback that is invoked after the Commit RPC returns. It returns the user result. */
private class CommitTransactionCallback implements ApiFunction<List<WriteResult>, T> {
private final T userFunctionResult;

CommitTransactionCallback(T userFunctionResult) {
this.userFunctionResult = userFunctionResult;
}

@Override
public T apply(List<WriteResult> input) {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
return userFunctionResult;
}
private ApiFuture<T> userFunctionCallback(T userFunctionResult) {
return ApiFutures.transform(
transaction.commit(),
// The callback that is invoked after the Commit RPC returns. It returns the user result.
input -> {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
return userFunctionResult;
},
MoreExecutors.directExecutor());
}

/** A callback that restarts a transaction after an ApiException. It invokes the Rollback RPC. */
private class RestartTransactionCallback implements ApiAsyncFunction<Throwable, T> {
public ApiFuture<T> apply(Throwable throwable) {
if (!(throwable instanceof ApiException)) {
// This is likely a failure in the user callback.
span.setStatus(USER_CALLBACK_FAILED);
return rollbackAndReject(throwable);
}
private ApiFuture<T> restartTransactionCallback(Throwable throwable) {
if (!(throwable instanceof ApiException)) {
// This is likely a failure in the user callback.
span.setStatus(USER_CALLBACK_FAILED);
return rollbackAndReject(throwable);
}

ApiException apiException = (ApiException) throwable;
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
return run();
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
final FirestoreException firestoreException =
FirestoreException.forApiException(
apiException, "Transaction was cancelled because of too many retries.");
return rollbackAndReject(firestoreException);
}
ApiException apiException = (ApiException) throwable;
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
return run();
} else {
span.setStatus(TraceUtil.statusFromApiException(apiException));
span.setStatus(TOO_MANY_RETRIES_STATUS);
final FirestoreException firestoreException =
FirestoreException.forApiException(
apiException, "Transaction failed with non-retryable error");
apiException, "Transaction was cancelled because of too many retries.");
return rollbackAndReject(firestoreException);
}
} else {
span.setStatus(TraceUtil.statusFromApiException(apiException));
final FirestoreException firestoreException =
FirestoreException.forApiException(
apiException, "Transaction failed with non-retryable error");
return rollbackAndReject(firestoreException);
}
}

/** Determines whether the provided error is considered retryable. */
private boolean isRetryableTransactionError(ApiException exception) {
switch (exception.getStatusCode().getCode()) {
// This list is based on
// https://github.com/firebase/firebase-js-sdk/blob/c822e78b00dd3420dcc749beb2f09a947aa4a344/packages/firestore/src/core/transaction_runner.ts#L112
case ABORTED:
case CANCELLED:
case UNKNOWN:
case DEADLINE_EXCEEDED:
case INTERNAL:
case UNAVAILABLE:
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
case INVALID_ARGUMENT:
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
return exception.getMessage().contains("transaction has expired");
default:
return false;
}
/** Determines whether the provided error is considered retryable. */
private static boolean isRetryableTransactionError(ApiException exception) {
switch (exception.getStatusCode().getCode()) {
// This list is based on
// https://github.com/firebase/firebase-js-sdk/blob/c822e78b00dd3420dcc749beb2f09a947aa4a344/packages/firestore/src/core/transaction_runner.ts#L112
case ABORTED:
case CANCELLED:
case UNKNOWN:
case DEADLINE_EXCEEDED:
case INTERNAL:
case UNAVAILABLE:
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
case INVALID_ARGUMENT:
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
return exception.getMessage().contains("transaction has expired");
default:
return false;
}
}
/** Rolls the transaction back and returns the error. */
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();

/** Rolls the transaction back and returns the error. */
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();

if (transaction.hasTransactionId()) {
// We use `addListener()` since we want to return the original exception regardless of
// whether rollback() succeeds.
transaction
.rollback()
.addListener(
() -> failedTransaction.setException(throwable), MoreExecutors.directExecutor());
} else {
failedTransaction.setException(throwable);
}

span.end();
return failedTransaction;
if (transaction.hasTransactionId()) {
// We use `addListener()` since we want to return the original exception regardless of
// whether rollback() succeeds.
transaction
.rollback()
.addListener(
() -> failedTransaction.setException(throwable), MoreExecutors.directExecutor());
} else {
failedTransaction.setException(throwable);
}

span.end();
return failedTransaction;
}
}