Skip to content

Commit

Permalink
Enum unrecoverable Sender errors
Browse files Browse the repository at this point in the history
Mark unrecoverable sender errors in storage so that they no longer spawn
isolates.

Spawning isolates and failing them immediately uses more resources than
necessary. This is therefore a performance improvement for the stability
of the app.

In the medium term we should switch on errors from payjoin-flutter, but
those errors are not yet available to switch on. This will reduce
the amount of error definition and handling we have to manually implement,
but since this is a potential immediate performance issue I'm addressing
it straight away.
  • Loading branch information
DanGould committed Jan 2, 2025
1 parent 3b67f26 commit 1cf3a0b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 24 deletions.
71 changes: 47 additions & 24 deletions lib/_pkg/payjoin/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'package:payjoin_flutter/bitcoin_ffi.dart';
import 'package:payjoin_flutter/common.dart';
import 'package:payjoin_flutter/receive.dart';
import 'package:payjoin_flutter/send.dart';
import 'package:payjoin_flutter/src/exceptions.dart';
import 'package:payjoin_flutter/src/generated/frb_generated.dart';
import 'package:payjoin_flutter/uri.dart' as pj_uri;
import 'package:payjoin_flutter/uri.dart';
Expand All @@ -26,6 +27,23 @@ const List<String> _ohttpRelayUrls = [

const payjoinDirectoryUrl = 'https://payjo.in';

sealed class SendError {
final String message;

const SendError._(this.message);

factory SendError.recoverable(String message) = RecoverableError;
factory SendError.unrecoverable(String message) = UnrecoverableError;
}

class RecoverableError extends SendError {
const RecoverableError(super.message) : super._();
}

class UnrecoverableError extends SendError {
const UnrecoverableError(super.message) : super._();
}

class PayjoinManager {
PayjoinManager(this._walletTx, this._payjoinStorage);
final WalletTx _walletTx;
Expand Down Expand Up @@ -123,12 +141,15 @@ class PayjoinManager {
await _payjoinStorage.markSenderSessionComplete(pjUri);
completer.complete(null);
}
} else if (message is Err) {
} else if (message is SendError) {
PayjoinEventBus().emit(
PayjoinSendFailureEvent(pjUri: pjUri, error: message),
PayjoinSendFailureEvent(pjUri: pjUri, error: message.message),
);
if (message is UnrecoverableError) {
await _payjoinStorage.markSenderSessionUnrecoverable(pjUri);
}
await _cleanupSession(pjUri);
completer.complete(message);
completer.complete(Err(message.message));
}
});

Expand Down Expand Up @@ -305,7 +326,8 @@ class PayjoinManager {
.toList();
final filteredSenders = senderSessions.where((session) {
return session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success;
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable;
}).toList();

final spawnedReceivers = filteredReceivers.map((session) {
Expand Down Expand Up @@ -368,6 +390,7 @@ class PayjoinManager {

enum PayjoinSessionStatus {
pending,
unrecoverable,
success,
}

Expand Down Expand Up @@ -473,21 +496,19 @@ Future<void> _isolateSender(List<dynamic> args) async {
// Reconstruct the Sender from the JSON
final sender = Sender.fromJson(senderJson);

// Run the sender logic inside the isolate
try {
final proposalPsbt = await _runSender(sender, sendPort: sendPort);
if (proposalPsbt == null) throw Exception('proposalPsbt is null');
sendPort.send({
'type': 'psbt_to_sign',
'psbt': proposalPsbt,
});
} catch (e) {
sendPort.send(Err(e.toString()));
sendPort.send(e);
}
}

/// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1).
Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
Future<String> _runSender(Sender sender, {required SendPort sendPort}) async {
final dio = Dio();

try {
Expand All @@ -506,24 +527,26 @@ Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
sendPort.send({'type': 'request_posted'});

while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
} catch (e) {
print('Error occurred while processing payjoin: $e');
// Loop until a valid response is found
}
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
}
} catch (e) {
// If V2 fails, attempt V1
return await _runSenderV1(sender, dio, sendPort);
if (e is PayjoinException &&
// TODO condition on error type instead of message content
e.message?.contains('parse receiver public key') == true) {
return await _runSenderV1(sender, dio, sendPort);
} else if (e is DioException) {
throw Exception(SendError.recoverable);
} else {
throw Exception(SendError.unrecoverable);
}
}
}

Expand Down
23 changes: 23 additions & 0 deletions lib/_pkg/payjoin/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,29 @@ class PayjoinStorage {
}
}

Future<Err?> markSenderSessionUnrecoverable(String pjUri) async {
try {
final (session, err) = await readSenderSession(pjUri);
if (err != null) return err;

final updatedSession = SendSession(
session!.isTestnet,
session.sender,
session.walletId,
session.pjUri,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: senderPrefix + pjUri,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<SendSession>, Err?)> readAllSenders() async {
try {
final (allData, err) = await _hiveStorage.getAll();
Expand Down

0 comments on commit 1cf3a0b

Please sign in to comment.