Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Transactions to Firestore #66

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import 'dart:async';
import 'dart:math';

class ExponentialBackoffSetting {
const ExponentialBackoffSetting({
this.initialDelayMs,
this.backoffFactor,
this.maxDelayMs,
this.jitterFactor,
});

final int? initialDelayMs;
final double? backoffFactor;
final int? maxDelayMs;
final double? jitterFactor;
}

/// A helper for running delayed tasks following an exponential backoff curve
/// between attempts.
///
/// Each delay is made up of a "base" delay which follows the exponential
/// backoff curve, and a "jitter" (+/- 50% by default) that is calculated and
/// added to the base delay. This prevents clients from accidentally
/// synchronizing their delays causing spikes of load to the backend.
///
/// @private
/// @internal
class ExponentialBackoff {
ExponentialBackoff({
ExponentialBackoffSetting options = const ExponentialBackoffSetting(),
}) : initialDelayMs = options.initialDelayMs ?? defaultBackOffInitialDelayMs,
backoffFactor = options.backoffFactor ?? defaultBackOffFactor,
maxDelayMs = options.maxDelayMs ?? defaultBackOffMaxDelayMs,
jitterFactor = options.jitterFactor ?? defaultJitterFactor;

static const defaultBackOffInitialDelayMs = 100;
static const defaultBackOffFactor = 2.0;
static const defaultBackOffMaxDelayMs = 10000;
static const defaultJitterFactor = 0.5;

static const maxRetryAttempts = 25;

final int initialDelayMs;
final double backoffFactor;
final int maxDelayMs;
final double jitterFactor;

int _retryCount = 0;
int _currentBaseMs = 0;
bool _awaitingBackoffCompletion = false;

/// Returns a future that resolves after currentDelayMs, and increases the
/// delay for any subsequent attempts.
///
/// @return A [Future] that resolves when the current delay elapsed.
Future<void> backoffAndWait() async {
if (_awaitingBackoffCompletion) {
throw Exception('A backoff operation is already in progress.');
}

if (_retryCount > maxRetryAttempts) {
throw Exception('Exceeded maximum number of retries allowed.');
}

final delayWithJitterMs = _currentBaseMs + _jitterDelayMs();

_currentBaseMs = (_currentBaseMs * backoffFactor).toInt();
_currentBaseMs = _currentBaseMs.clamp(initialDelayMs, maxDelayMs);
_retryCount += 1;

await Future<void>.delayed(Duration(milliseconds: delayWithJitterMs));
_awaitingBackoffCompletion = false;
}

/// Resets the backoff delay and retry count.
///
/// The very next [backoffAndWait] will have no delay. If it is called again
/// (i.e. due to an error), [initialDelayMs] (plus jitter) will be used, and
/// subsequent ones will increase according to the [backoffFactor].
void reset() {
_retryCount = 0;
_currentBaseMs = 0;
}

/// Resets the backoff delay to the maximum delay (e.g. for use after a
/// RESOURCE_EXHAUSTED error).
void resetToMax() {
_currentBaseMs = maxDelayMs;
}

int _jitterDelayMs() {
return ((Random().nextDouble() - 0.5) * jitterFactor * _currentBaseMs)
.toInt();
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,43 @@
part of 'firestore.dart';

class _BatchGetResponse<T> {
_BatchGetResponse(this.result, this.transaction);

List<DocumentSnapshot<T>> result;
String? transaction;
}

class _DocumentReader<T> {
_DocumentReader({
required this.firestore,
required this.documents,
required this.fieldMask,
required this.transactionId,
}) : _outstandingDocuments = documents.map((e) => e._formattedName).toSet();
this.transactionId,
this.readTime,
this.transactionOptions,
}) : _outstandingDocuments = documents.map((e) => e._formattedName).toSet(),
assert(
[transactionId, readTime, transactionOptions].nonNulls.length <= 1,
'Only transactionId or readTime or transactionOptions must be provided. transactionId = $transactionId, readTime = $readTime, transactionOptions = $transactionOptions',
);

String? _retrievedTransactionId;
final Firestore firestore;
final List<DocumentReference<T>> documents;
final List<FieldPath>? fieldMask;
final String? transactionId;
final Timestamp? readTime;
final firestore1.TransactionOptions? transactionOptions;
final Set<String> _outstandingDocuments;
final _retreivedDocuments = <String, DocumentSnapshot<DocumentData>>{};

/// Invokes the BatchGetDocuments RPC and returns the results.
Future<List<DocumentSnapshot<T>>> get(String requestTag) async {
await _fetchDocuments(requestTag);
Future<List<DocumentSnapshot<T>>> get() async {
return _get().then((value) => value.result);
}

Future<_BatchGetResponse<T>> _get() async {
await _fetchDocuments();

// BatchGetDocuments doesn't preserve document order. We use the request
// order to sort the resulting documents.
Expand All @@ -39,51 +59,75 @@ class _DocumentReader<T> {
throw StateError('Did not receive document for "${docRef.path}".');
}
}

return orderedDocuments;
return _BatchGetResponse<T>(orderedDocuments, _retrievedTransactionId);
}

Future<void> _fetchDocuments(String requestTag) async {
Future<void> _fetchDocuments() async {
if (_outstandingDocuments.isEmpty) return;

final documents = await firestore._client.v1((client) async {
return client.projects.databases.documents.batchGet(
firestore1.BatchGetDocumentsRequest(
documents: _outstandingDocuments.toList(),
mask: fieldMask.let((fieldMask) {
return firestore1.DocumentMask(
fieldPaths: fieldMask.map((e) => e._formattedName).toList(),
);
}),
transaction: transactionId,
),
firestore._formattedDatabaseName,
);
});

for (final response in documents) {
DocumentSnapshot<DocumentData> documentSnapshot;

final found = response.found;
if (found != null) {
documentSnapshot = DocumentSnapshot._fromDocument(
found,
response.readTime,
firestore,
final request = firestore1.BatchGetDocumentsRequest(
documents: _outstandingDocuments.toList(),
mask: fieldMask.let((fieldMask) {
return firestore1.DocumentMask(
fieldPaths: fieldMask.map((e) => e._formattedName).toList(),
);
} else {
final missing = response.missing!;
documentSnapshot = DocumentSnapshot._missing(
missing,
response.readTime,
firestore,
}),
transaction: transactionId,
newTransaction: transactionOptions,
readTime: readTime?._toProto().timestampValue,
);

var resultCount = 0;
try {
final documents = await firestore._client.v1((client) async {
return client.projects.databases.documents.batchGet(
request,
firestore._formattedDatabaseName,
);
}
}).catchError(_handleException);

for (final response in documents) {
DocumentSnapshot<DocumentData>? documentSnapshot;

if (response.transaction?.isNotEmpty ?? false) {
this._retrievedTransactionId = response.transaction;
}

final path = documentSnapshot.ref._formattedName;
_outstandingDocuments.remove(path);
_retreivedDocuments[path] = documentSnapshot;
final found = response.found;
if (found != null) {
documentSnapshot = DocumentSnapshot._fromDocument(
found,
response.readTime,
firestore,
);
} else if (response.missing != null) {
final missing = response.missing!;
documentSnapshot = DocumentSnapshot._missing(
missing,
response.readTime,
firestore,
);
}

if (documentSnapshot != null) {
final path = documentSnapshot.ref._formattedName;
_outstandingDocuments.remove(path);
_retreivedDocuments[path] = documentSnapshot;
resultCount++;
}
}
} on FirebaseFirestoreAdminException catch (firestoreError) {
final shoulRetry = request.transaction != null &&
request.newTransaction != null &&
// Only retry if we made progress.
resultCount > 0 &&
// Don't retry permanent errors.
StatusCode.batchGetRetryCodes.contains(firestoreError.errorCode.statusCode);
if (shoulRetry) {
return _fetchDocuments();
} else {
rethrow;
}
}
// TODO handle retry
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:math' as math;

import 'package:collection/collection.dart';
Expand All @@ -10,6 +12,8 @@ import 'package:intl/intl.dart';

import '../app.dart';
import '../object_utils.dart';
import 'backoff.dart';
import 'status_code.dart';
import 'util.dart';

part 'convert.dart';
Expand All @@ -23,11 +27,13 @@ part 'reference.dart';
part 'serializer.dart';
part 'timestamp.dart';
part 'transaction.dart';

part 'types.dart';
part 'write_batch.dart';
part 'document_change.dart';
part 'filter.dart';
part 'firestore_exception.dart';
part 'firestore_api_request_internal.dart';
part 'collection_group.dart';

class Firestore {
Expand All @@ -52,7 +58,6 @@ class Firestore {
// TODO bulkWriter
// TODO bundle
// TODO getAll
// TODO runTransaction
// TODO recursiveDelete

/// Fetches the root collections that are associated with this Firestore
Expand Down Expand Up @@ -182,7 +187,6 @@ class Firestore {
}

final fieldMask = _parseFieldMask(readOptions);
final tag = requestTag();

final reader = _DocumentReader(
firestore: this,
Expand All @@ -191,7 +195,39 @@ class Firestore {
fieldMask: fieldMask,
);

return reader.get(tag);
return reader.get();
}

/// Executes the given updateFunction and commits the changes applied within
/// the transaction.
/// You can use the transaction object passed to 'updateFunction' to read and
/// modify Firestore documents under lock. You have to perform all reads
/// before before you perform any write.
/// Transactions can be performed as read-only or read-write transactions. By
/// default, transactions are executed in read-write mode.
/// A read-write transaction obtains a pessimistic lock on all documents that
/// are read during the transaction. These locks block other transactions,
/// batched writes, and other non-transactional writes from changing that
/// document. Any writes in a read-write transactions are committed once
/// 'updateFunction' resolves, which also releases all locks.
/// If a read-write transaction fails with contention, the transaction is
/// retried up to five times. The updateFunction is invoked once for each
/// attempt.
/// Read-only transactions do not lock documents. They can be used to read
/// documents at a consistent snapshot in time, which may be up to 60 seconds
/// in the past. Read-only transactions are not retried.
/// Transactions time out after 60 seconds if no documents are read.
/// Transactions that are not committed within than 270 seconds are also
/// aborted. Any remaining locks are released when a transaction times out.
Future<T> runTransaction<T>(
TransactionHandler<T> updateFuntion, {
TransactionOptions? transactionOptions,
}) {
if (transactionOptions != null) {}

final transaction = Transaction(this, transactionOptions);

return transaction._runTransaction(updateFuntion);
}
}

Expand Down Expand Up @@ -271,3 +307,35 @@ class _FirestoreHttpClient {
);
}
}

sealed class TransactionOptions {
bool get readOnly;

int get maxAttempts;
}

class ReadOnlyTransactionOptions extends TransactionOptions {
ReadOnlyTransactionOptions({Timestamp? readTime}) : _readTime = readTime;
@override
bool readOnly = true;

@override
int get maxAttempts => 1;

Timestamp? get readTime => _readTime;

final Timestamp? _readTime;
}

class ReadWriteTransactionOptions extends TransactionOptions {
ReadWriteTransactionOptions({int maxAttempts = 5})
: _maxAttempts = maxAttempts;

final int _maxAttempts;

@override
bool readOnly = false;

@override
int get maxAttempts => _maxAttempts;
}
Loading