From c7c46328126df6a6ea11b692be4468b9fcbed7e6 Mon Sep 17 00:00:00 2001 From: DanGould Date: Thu, 2 Jan 2025 13:29:07 -0500 Subject: [PATCH 1/3] Enum unrecoverable Sender errors Mark unrecoverable sender errors in storage so that they no longer spawn isolates. Spawning isolates and failing them immediately uses more resources than necessary. This is therefore a performance improvement for the stability of the app. In the medium term we should switch on errors from payjoin-flutter, but those errors are not yet available to switch on. This will reduce the amount of error definition and handling we have to manually implement, but since this is a potential immediate performance issue I'm addressing it straight away. --- lib/_pkg/payjoin/manager.dart | 70 +++++++++++++++++++++++------------ lib/_pkg/payjoin/storage.dart | 23 ++++++++++++ 2 files changed, 69 insertions(+), 24 deletions(-) diff --git a/lib/_pkg/payjoin/manager.dart b/lib/_pkg/payjoin/manager.dart index c6bee70d..af673b72 100644 --- a/lib/_pkg/payjoin/manager.dart +++ b/lib/_pkg/payjoin/manager.dart @@ -26,6 +26,23 @@ const List _ohttpRelayUrls = [ const payjoinDirectoryUrl = 'https://payjo.in'; +sealed class SendError { + final String message; + + const SendError.SessionError(this.message); + + factory SendError.recoverable(String message) = RecoverableError; + factory SendError.unrecoverable(String message) = UnrecoverableError; +} + +class RecoverableError extends SendError { + const RecoverableError(super.message) : super.SessionError(); +} + +class UnrecoverableError extends SendError { + const UnrecoverableError(super.message) : super.SessionError(); +} + class PayjoinManager { PayjoinManager(this._walletTx, this._payjoinStorage); final WalletTx _walletTx; @@ -123,12 +140,15 @@ class PayjoinManager { await _payjoinStorage.markSenderSessionComplete(pjUri); completer.complete(null); } - } else if (message is Err) { + } else if (message is SendError) { PayjoinEventBus().emit( - PayjoinSendFailureEvent(pjUri: pjUri, error: message), + PayjoinSendFailureEvent(pjUri: pjUri, error: message.message), ); + if (message is UnrecoverableError) { + await _payjoinStorage.markSenderSessionUnrecoverable(pjUri); + } await _cleanupSession(pjUri); - completer.complete(message); + completer.complete(Err(message.message)); } }); @@ -305,7 +325,8 @@ class PayjoinManager { .toList(); final filteredSenders = senderSessions.where((session) { return session.walletId == wallet.id && - session.status != PayjoinSessionStatus.success; + session.status != PayjoinSessionStatus.success && + session.status != PayjoinSessionStatus.unrecoverable; }).toList(); final spawnedReceivers = filteredReceivers.map((session) { @@ -368,6 +389,7 @@ class PayjoinManager { enum PayjoinSessionStatus { pending, + unrecoverable, success, } @@ -473,21 +495,19 @@ Future _isolateSender(List args) async { // Reconstruct the Sender from the JSON final sender = Sender.fromJson(senderJson); - // Run the sender logic inside the isolate try { final proposalPsbt = await _runSender(sender, sendPort: sendPort); - if (proposalPsbt == null) throw Exception('proposalPsbt is null'); sendPort.send({ 'type': 'psbt_to_sign', 'psbt': proposalPsbt, }); } catch (e) { - sendPort.send(Err(e.toString())); + sendPort.send(e); } } /// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1). -Future _runSender(Sender sender, {required SendPort sendPort}) async { +Future _runSender(Sender sender, {required SendPort sendPort}) async { final dio = Dio(); try { @@ -506,24 +526,26 @@ Future _runSender(Sender sender, {required SendPort sendPort}) async { sendPort.send({'type': 'request_posted'}); while (true) { - try { - final (getRequest, getReqCtx) = await getCtx.extractReq( - ohttpRelay: await _randomOhttpRelayUrl(), - ); - final getRes = await _postRequest(dio, getRequest); - final proposalPsbt = await getCtx.processResponse( - response: getRes.data as List, - ohttpCtx: getReqCtx, - ); - if (proposalPsbt != null) return proposalPsbt; - } catch (e) { - print('Error occurred while processing payjoin: $e'); - // Loop until a valid response is found - } + final (getRequest, getReqCtx) = await getCtx.extractReq( + ohttpRelay: await _randomOhttpRelayUrl(), + ); + final getRes = await _postRequest(dio, getRequest); + final proposalPsbt = await getCtx.processResponse( + response: getRes.data as List, + ohttpCtx: getReqCtx, + ); + if (proposalPsbt != null) return proposalPsbt; } } catch (e) { - // If V2 fails, attempt V1 - return await _runSenderV1(sender, dio, sendPort); + if (e is PayjoinException && + // TODO condition on error type instead of message content + e.message?.contains('parse receiver public key') == true) { + return await _runSenderV1(sender, dio, sendPort); + } else if (e is DioException) { + throw Exception(SessionError.recoverable(e.toString())); + } else { + throw Exception(SessionError.unrecoverable(e.toString())); + } } } diff --git a/lib/_pkg/payjoin/storage.dart b/lib/_pkg/payjoin/storage.dart index e7a7d299..354ef15e 100644 --- a/lib/_pkg/payjoin/storage.dart +++ b/lib/_pkg/payjoin/storage.dart @@ -186,6 +186,29 @@ class PayjoinStorage { } } + Future markSenderSessionUnrecoverable(String pjUri) async { + try { + final (session, err) = await readSenderSession(pjUri); + if (err != null) return err; + + final updatedSession = SendSession( + session!.isTestnet, + session.sender, + session.walletId, + session.pjUri, + PayjoinSessionStatus.unrecoverable, + ); + + await _hiveStorage.saveValue( + key: senderPrefix + pjUri, + value: jsonEncode(updatedSession.toJson()), + ); + return null; + } catch (e) { + return Err(e.toString()); + } + } + Future<(List, Err?)> readAllSenders() async { try { final (allData, err) = await _hiveStorage.getAll(); From fd2fc86b8fb2d690516a713e641a09becc3d09f8 Mon Sep 17 00:00:00 2001 From: DanGould Date: Thu, 2 Jan 2025 15:36:48 -0500 Subject: [PATCH 2/3] Mark some pj Receiver errors as unrecoverable This way they don't respawn if they're never going to succeed. --- lib/_pkg/payjoin/manager.dart | 72 +++++++++++++++++------------------ lib/_pkg/payjoin/storage.dart | 22 +++++++++++ 2 files changed, 56 insertions(+), 38 deletions(-) diff --git a/lib/_pkg/payjoin/manager.dart b/lib/_pkg/payjoin/manager.dart index af673b72..e58783fa 100644 --- a/lib/_pkg/payjoin/manager.dart +++ b/lib/_pkg/payjoin/manager.dart @@ -26,21 +26,21 @@ const List _ohttpRelayUrls = [ const payjoinDirectoryUrl = 'https://payjo.in'; -sealed class SendError { +sealed class SessionError { final String message; - const SendError.SessionError(this.message); + const SessionError._(this.message); - factory SendError.recoverable(String message) = RecoverableError; - factory SendError.unrecoverable(String message) = UnrecoverableError; + factory SessionError.recoverable(String message) = RecoverableError; + factory SessionError.unrecoverable(String message) = UnrecoverableError; } -class RecoverableError extends SendError { - const RecoverableError(super.message) : super.SessionError(); +class RecoverableError extends SessionError { + const RecoverableError(super.message) : super._(); } -class UnrecoverableError extends SendError { - const UnrecoverableError(super.message) : super.SessionError(); +class UnrecoverableError extends SessionError { + const UnrecoverableError(super.message) : super._(); } class PayjoinManager { @@ -140,7 +140,7 @@ class PayjoinManager { await _payjoinStorage.markSenderSessionComplete(pjUri); completer.complete(null); } - } else if (message is SendError) { + } else if (message is SessionError) { PayjoinEventBus().emit( PayjoinSendFailureEvent(pjUri: pjUri, error: message.message), ); @@ -301,6 +301,9 @@ class PayjoinManager { return completer.future; } catch (e) { + if (e is UnrecoverableError) { + await _payjoinStorage.markReceiverSessionUnrecoverable(receiver.id()); + } return Err( e.toString(), title: 'Error occurred while receiving Payjoin', @@ -321,7 +324,8 @@ class PayjoinManager { final filteredReceivers = receiverSessions .where((session) => session.walletId == wallet.id && - session.status != PayjoinSessionStatus.success) + session.status != PayjoinSessionStatus.success && + session.status != PayjoinSessionStatus.unrecoverable) .toList(); final filteredSenders = senderSessions.where((session) { return session.walletId == wallet.id && @@ -673,7 +677,7 @@ Future _isolateReceiver(List args) async { return payjoinProposal; } catch (e) { print('Error occurred while finalizing proposal: $e'); - throw Exception('Error occurred while finalizing proposal'); + rethrow; } } @@ -690,10 +694,10 @@ Future _isolateReceiver(List args) async { 'type': 'proposal_sent', }); } catch (e) { - try { - isolateTomainSendPort.send(Err(e.toString())); - } catch (e) { - print('$e'); + if (e is DioException) { + isolateTomainSendPort.send(SessionError.recoverable(e.toString())); + } else { + isolateTomainSendPort.send(SessionError.unrecoverable(e.toString())); } } } @@ -702,34 +706,26 @@ Future _receiveUncheckedProposal( Dio dio, Receiver receiver, ) async { - try { - while (true) { - final (req, context) = await receiver.extractReq(); - final ohttpResponse = await _postRequest(dio, req); - final proposal = await receiver.processRes( - body: ohttpResponse.data as List, - ctx: context, - ); - if (proposal != null) { - return proposal; - } + while (true) { + final (req, context) = await receiver.extractReq(); + final ohttpResponse = await _postRequest(dio, req); + final proposal = await receiver.processRes( + body: ohttpResponse.data as List, + ctx: context, + ); + if (proposal != null) { + return proposal; } - } catch (e) { - throw Exception('Error occurred while processing payjoin receiver: $e'); } } Future _respondProposal(Dio dio, PayjoinProposal proposal) async { - try { - final (postReq, ohttpCtx) = await proposal.extractV2Req(); - final postRes = await _postRequest(dio, postReq); - await proposal.processRes( - res: postRes.data as List, - ohttpContext: ohttpCtx, - ); - } catch (e) { - throw Exception('Error occurred while processing payjoin: $e'); - } + final (postReq, ohttpCtx) = await proposal.extractV2Req(); + final postRes = await _postRequest(dio, postReq); + await proposal.processRes( + res: postRes.data as List, + ohttpContext: ohttpCtx, + ); } /// Posts a request via dio and returns the response. diff --git a/lib/_pkg/payjoin/storage.dart b/lib/_pkg/payjoin/storage.dart index 354ef15e..9e074a18 100644 --- a/lib/_pkg/payjoin/storage.dart +++ b/lib/_pkg/payjoin/storage.dart @@ -96,6 +96,28 @@ class PayjoinStorage { } } + Future markReceiverSessionUnrecoverable(String id) async { + try { + final (session, err) = await readReceiverSession(id); + if (err != null) return err; + + final updatedSession = RecvSession( + session!.isTestnet, + session.receiver, + session.walletId, + PayjoinSessionStatus.unrecoverable, + ); + + await _hiveStorage.saveValue( + key: receiverPrefix + id, + value: jsonEncode(updatedSession.toJson()), + ); + return null; + } catch (e) { + return Err(e.toString()); + } + } + Future<(List, Err?)> readAllReceivers() async { //deleteAllSessions(); try { From 2248dc46285c3de063455152f53bb05cf047c2a1 Mon Sep 17 00:00:00 2001 From: DanGould Date: Fri, 3 Jan 2025 15:29:19 -0500 Subject: [PATCH 3/3] Handle receiver unrecoverable errors Receivers need to handle both switch errors or isolate errors. This change takes the error handling out of the spawn try-catch, which would never receive them and moves it into places where those errors would propagate. Note that some of the switch errors like checkIsOwned might crop up because of a wallet being improperly loaded, but for the time being we don't have the granularity to distinguish between them. --- lib/_pkg/payjoin/manager.dart | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/_pkg/payjoin/manager.dart b/lib/_pkg/payjoin/manager.dart index e58783fa..6e755dc9 100644 --- a/lib/_pkg/payjoin/manager.dart +++ b/lib/_pkg/payjoin/manager.dart @@ -274,15 +274,21 @@ class PayjoinManager { // TODO PROPAGATE ERROR TO UI TOAST / TRANSACTION HISTORY debugPrint(e.toString()); await _cleanupSession(receiver.id()); + await _payjoinStorage + .markReceiverSessionUnrecoverable(receiver.id()); completer.complete( Err( e.toString(), ), ); } - } else if (message is Err) { + } else if (message is SessionError) { await _cleanupSession(receiver.id()); - completer.complete(message); + if (message is UnrecoverableError) { + await _payjoinStorage + .markReceiverSessionUnrecoverable(receiver.id()); + } + completer.complete(Err(message.toString())); } }); @@ -301,9 +307,6 @@ class PayjoinManager { return completer.future; } catch (e) { - if (e is UnrecoverableError) { - await _payjoinStorage.markReceiverSessionUnrecoverable(receiver.id()); - } return Err( e.toString(), title: 'Error occurred while receiving Payjoin',