Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark unrecoverable errors so they don't spawn #412

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 71 additions & 53 deletions lib/_pkg/payjoin/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ const List<String> _ohttpRelayUrls = [

const payjoinDirectoryUrl = 'https://payjo.in';

sealed class SessionError {
final String message;

const SessionError._(this.message);

factory SessionError.recoverable(String message) = RecoverableError;
factory SessionError.unrecoverable(String message) = UnrecoverableError;
}

class RecoverableError extends SessionError {
const RecoverableError(super.message) : super._();
}

class UnrecoverableError extends SessionError {
const UnrecoverableError(super.message) : super._();
}

class PayjoinManager {
PayjoinManager(this._walletTx, this._payjoinStorage);
final WalletTx _walletTx;
Expand Down Expand Up @@ -123,12 +140,15 @@ class PayjoinManager {
await _payjoinStorage.markSenderSessionComplete(pjUri);
completer.complete(null);
}
} else if (message is Err) {
} else if (message is SessionError) {
PayjoinEventBus().emit(
PayjoinSendFailureEvent(pjUri: pjUri, error: message),
PayjoinSendFailureEvent(pjUri: pjUri, error: message.message),
);
if (message is UnrecoverableError) {
await _payjoinStorage.markSenderSessionUnrecoverable(pjUri);
}
await _cleanupSession(pjUri);
completer.complete(message);
completer.complete(Err(message.message));
}
});

Expand Down Expand Up @@ -281,6 +301,9 @@ class PayjoinManager {

return completer.future;
} catch (e) {
if (e is UnrecoverableError) {
Copy link
Contributor

@kumulynja kumulynja Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DanGould I think this never gets here and should be moved to the receive port listener where the UnrecoverableError is send to from the isolate. So I guess instead of the message is Err on line 283, check for and handle the UnrecoverableError there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change as well as one where if errors in the switch statement crop up we mark the receiver payjoin session as unrecoverable which I detail in the commit message. Let me know what you think.

await _payjoinStorage.markReceiverSessionUnrecoverable(receiver.id());
}
return Err(
e.toString(),
title: 'Error occurred while receiving Payjoin',
Expand All @@ -301,11 +324,13 @@ class PayjoinManager {
final filteredReceivers = receiverSessions
.where((session) =>
session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success)
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable)
.toList();
final filteredSenders = senderSessions.where((session) {
return session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success;
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable;
}).toList();

final spawnedReceivers = filteredReceivers.map((session) {
Expand Down Expand Up @@ -368,6 +393,7 @@ class PayjoinManager {

enum PayjoinSessionStatus {
pending,
unrecoverable,
success,
}

Expand Down Expand Up @@ -473,21 +499,19 @@ Future<void> _isolateSender(List<dynamic> args) async {
// Reconstruct the Sender from the JSON
final sender = Sender.fromJson(senderJson);

// Run the sender logic inside the isolate
try {
final proposalPsbt = await _runSender(sender, sendPort: sendPort);
if (proposalPsbt == null) throw Exception('proposalPsbt is null');
sendPort.send({
'type': 'psbt_to_sign',
'psbt': proposalPsbt,
});
} catch (e) {
sendPort.send(Err(e.toString()));
sendPort.send(e);
}
}

/// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1).
Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
Future<String> _runSender(Sender sender, {required SendPort sendPort}) async {
final dio = Dio();

try {
Expand All @@ -506,24 +530,26 @@ Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
sendPort.send({'type': 'request_posted'});

while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
} catch (e) {
print('Error occurred while processing payjoin: $e');
// Loop until a valid response is found
}
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
}
} catch (e) {
// If V2 fails, attempt V1
return await _runSenderV1(sender, dio, sendPort);
if (e is PayjoinException &&
// TODO condition on error type instead of message content
e.message?.contains('parse receiver public key') == true) {
return await _runSenderV1(sender, dio, sendPort);
} else if (e is DioException) {
throw Exception(SessionError.recoverable(e.toString()));
} else {
throw Exception(SessionError.unrecoverable(e.toString()));
}
}
}

Expand Down Expand Up @@ -651,7 +677,7 @@ Future<void> _isolateReceiver(List<dynamic> args) async {
return payjoinProposal;
} catch (e) {
print('Error occurred while finalizing proposal: $e');
throw Exception('Error occurred while finalizing proposal');
rethrow;
}
}

Expand All @@ -668,10 +694,10 @@ Future<void> _isolateReceiver(List<dynamic> args) async {
'type': 'proposal_sent',
});
} catch (e) {
try {
isolateTomainSendPort.send(Err(e.toString()));
} catch (e) {
print('$e');
if (e is DioException) {
isolateTomainSendPort.send(SessionError.recoverable(e.toString()));
} else {
isolateTomainSendPort.send(SessionError.unrecoverable(e.toString()));
}
}
}
Expand All @@ -680,34 +706,26 @@ Future<UncheckedProposal> _receiveUncheckedProposal(
Dio dio,
Receiver receiver,
) async {
try {
while (true) {
final (req, context) = await receiver.extractReq();
final ohttpResponse = await _postRequest(dio, req);
final proposal = await receiver.processRes(
body: ohttpResponse.data as List<int>,
ctx: context,
);
if (proposal != null) {
return proposal;
}
while (true) {
final (req, context) = await receiver.extractReq();
final ohttpResponse = await _postRequest(dio, req);
final proposal = await receiver.processRes(
body: ohttpResponse.data as List<int>,
ctx: context,
);
if (proposal != null) {
return proposal;
}
} catch (e) {
throw Exception('Error occurred while processing payjoin receiver: $e');
}
}

Future<void> _respondProposal(Dio dio, PayjoinProposal proposal) async {
try {
final (postReq, ohttpCtx) = await proposal.extractV2Req();
final postRes = await _postRequest(dio, postReq);
await proposal.processRes(
res: postRes.data as List<int>,
ohttpContext: ohttpCtx,
);
} catch (e) {
throw Exception('Error occurred while processing payjoin: $e');
}
final (postReq, ohttpCtx) = await proposal.extractV2Req();
final postRes = await _postRequest(dio, postReq);
await proposal.processRes(
res: postRes.data as List<int>,
ohttpContext: ohttpCtx,
);
}

/// Posts a request via dio and returns the response.
Expand Down
45 changes: 45 additions & 0 deletions lib/_pkg/payjoin/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@ class PayjoinStorage {
}
}

Future<Err?> markReceiverSessionUnrecoverable(String id) async {
try {
final (session, err) = await readReceiverSession(id);
if (err != null) return err;

final updatedSession = RecvSession(
session!.isTestnet,
session.receiver,
session.walletId,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: receiverPrefix + id,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<RecvSession>, Err?)> readAllReceivers() async {
//deleteAllSessions();
try {
Expand Down Expand Up @@ -186,6 +208,29 @@ class PayjoinStorage {
}
}

Future<Err?> markSenderSessionUnrecoverable(String pjUri) async {
try {
final (session, err) = await readSenderSession(pjUri);
if (err != null) return err;

final updatedSession = SendSession(
session!.isTestnet,
session.sender,
session.walletId,
session.pjUri,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: senderPrefix + pjUri,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<SendSession>, Err?)> readAllSenders() async {
try {
final (allData, err) = await _hiveStorage.getAll();
Expand Down