Skip to content

Commit

Permalink
Spawn runSender logic in an Isolate ✅
Browse files Browse the repository at this point in the history
This still ignores UI updates from the spawned isolate and doesn't even
have a proper splash screen for a confirmed send.

It has also yet to be smoke tested
  • Loading branch information
DanGould committed Dec 20, 2024
1 parent 708ff2c commit e6ae3f7
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 91 deletions.
212 changes: 148 additions & 64 deletions lib/_pkg/payjoin/manager.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import 'dart:async';
import 'dart:isolate';
import 'dart:math';

import 'package:bb_mobile/_model/transaction.dart';
import 'package:bb_mobile/_model/wallet.dart';
import 'package:bb_mobile/_pkg/error.dart';
import 'package:bb_mobile/_pkg/wallet/transaction.dart';
import 'package:dio/dio.dart';
import 'package:payjoin_flutter/common.dart';
import 'package:payjoin_flutter/send.dart';
import 'package:payjoin_flutter/src/generated/frb_generated.dart';
import 'package:payjoin_flutter/uri.dart' as pj_uri;

const List<String> _ohttpRelayUrls = [
Expand All @@ -12,6 +18,11 @@ const List<String> _ohttpRelayUrls = [
];

class PayjoinManager {
PayjoinManager(this._walletTx);
final WalletTx _walletTx;
final Map<String, Isolate> _activePollers = {};
final Map<String, ReceivePort> _activePorts = {};

Future<Sender> initSender(
String pjUriString,
int networkFeesSatPerVb,
Expand Down Expand Up @@ -42,81 +53,154 @@ class PayjoinManager {
}
}

/// Sends a payjoin using the v2 protocol given an initialized Sender.
/// V2 protocol first attempts a v2 request, but if one cannot be extracted
/// from the given bitcoin URI, it will attempt to send a v1 request.
Future<String?> runSender(Sender sender) async {
Request postReq;
V2PostContext postReqCtx;
final dio = Dio();

Future<Err?> spawnSender({
required bool isTestnet,
required Sender sender,
required Wallet wallet,
}) async {
try {
final result =
await sender.extractV2(ohttpProxyUrl: await _randomOhttpRelayUrl());
postReq = result.$1;
postReqCtx = result.$2;
} catch (e) {
// extract v2 failed, attempt to send v1
return await _runSenderV1(sender, dio);
}
final completer = Completer<Err?>();
final receivePort = ReceivePort();

try {
final postRes = await _postRequest(dio, postReq);
final getCtx = await postReqCtx.processResponse(
response: postRes.data as List<int>,
);
while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
return await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
} catch (e) {
// loop
// TODO Create unique ID for this payjoin session
const sessionId = 'TODO_SENDER_ENDPOINT';

receivePort.listen((message) async {
if (message is Map<String, dynamic>) {
if (message['type'] == 'psbt_to_sign') {
final proposalPsbt = message['psbt'] as String;
final (wtxid, err) = await _walletTx.signAndBroadcastPsbt(
psbt: proposalPsbt,
wallet: wallet,
);
if (err != null) {
completer.complete(err);
return;
}
await _cleanupSession(sessionId);
} else if (message is Err) {
// TODO propagate this error to the UI
await _cleanupSession(sessionId);
}
}
}
});

final args = [
receivePort.sendPort,
sender.toJson(),
];

final isolate = await Isolate.spawn(
_isolateSender,
args,
);
_activePollers[sessionId] = isolate;
_activePorts[sessionId] = receivePort;
return completer.future;
} catch (e) {
throw Exception('Error polling payjoin sender: $e');
return Err(e.toString());
}
}

/// Returns a random OHTTP proxy URL from the list of available URLs.
/// Random proxying makes it more difficult for a single ohttp relay or
/// payjoin directory to conduct attacks based on timing metadata.
Future<pj_uri.Url> _randomOhttpRelayUrl() async {
return await pj_uri.Url.fromStr(
_ohttpRelayUrls[Random.secure().nextInt(_ohttpRelayUrls.length)],
);
Future<void> _cleanupSession(String sessionId) async {
_activePollers[sessionId]?.kill();
_activePollers.remove(sessionId);
_activePorts[sessionId]?.close();
_activePorts.remove(sessionId);
}
}

// Attempt to send a payjoin using the v1 protocol as fallback.
Future<String> _runSenderV1(Sender sender, Dio dio) async {
try {
final (req, v1Ctx) = await sender.extractV1();
final response = await _postRequest(dio, req);
final proposalPsbt =
await v1Ctx.processResponse(response: response.data as List<int>);
return proposalPsbt;
} catch (e) {
throw Exception('Send V1 payjoin error: $e');
}
// Top-level function to generate random OHTTP relay URL
Future<pj_uri.Url> _randomOhttpRelayUrl() async {
return await pj_uri.Url.fromStr(
_ohttpRelayUrls[Random.secure().nextInt(_ohttpRelayUrls.length)],
);
}

/// Top-level function that runs inside the isolate.
/// It should not reference instance-specific variables or methods.
Future<void> _isolateSender(List<dynamic> args) async {
// Initialize any core dependencies here if required
await core.init();

final sendPort = args[0] as SendPort;
final senderJson = args[1] as String;

// Reconstruct the Sender from the JSON
final sender = Sender.fromJson(senderJson);

// Run the sender logic inside the isolate
try {
final proposalPsbt = await _runSender(sender);
sendPort.send({
'type': 'psbt_to_sign',
'psbt': proposalPsbt,
});
} catch (e) {
sendPort.send(Err(e.toString()));
}
}

/// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1).
Future<String?> _runSender(Sender sender) async {
final dio = Dio();

try {
final result = await sender.extractV2(
ohttpProxyUrl: await _randomOhttpRelayUrl(),
);
final postReq = result.$1;
final postReqCtx = result.$2;

/// Take a Request from the payjoin sender and post it over OHTTP.
Future<Response<dynamic>> _postRequest(Dio dio, Request req) async {
return await dio.post(
req.url.asString(),
options: Options(
headers: {
'Content-Type': req.contentType,
},
responseType: ResponseType.bytes,
),
data: req.body,
// Attempt V2
final postRes = await _postRequest(dio, postReq);
final getCtx = await postReqCtx.processResponse(
response: postRes.data as List<int>,
);

while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
return await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
} catch (e) {
// Loop until a valid response is found
}
}
} catch (e) {
// If V2 fails, attempt V1
return await _runSenderV1(sender, dio);
}
}

/// Attempt to send payjoin using the V1 protocol.
Future<String> _runSenderV1(Sender sender, Dio dio) async {
try {
final (req, v1Ctx) = await sender.extractV1();
final response = await _postRequest(dio, req);
final proposalPsbt =
await v1Ctx.processResponse(response: response.data as List<int>);
return proposalPsbt;
} catch (e) {
throw Exception('Send V1 payjoin error: $e');
}
}

/// Posts a request via dio and returns the response.
Future<Response<dynamic>> _postRequest(Dio dio, Request req) async {
return await dio.post(
req.url.asString(),
options: Options(
headers: {
'Content-Type': req.contentType,
},
responseType: ResponseType.bytes,
),
data: req.body,
);
}
2 changes: 1 addition & 1 deletion lib/locator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Future _setupBlocs() async {
);

locator.registerSingleton<PayjoinManager>(
PayjoinManager(),
PayjoinManager(locator<WalletTx>()),
);

locator.registerSingleton<NetworkFeesCubit>(
Expand Down
29 changes: 3 additions & 26 deletions lib/send/bloc/send_cubit.dart
Original file line number Diff line number Diff line change
Expand Up @@ -954,34 +954,11 @@ class SendCubit extends Cubit<SendState> {
// TODO copy originalPsbt.extractTx() to state.tx
// emit(state.copyWith(tx: originalPsbtTxWithId));
emit(state.copyWith(sending: true, sent: false));
final proposalPsbt = await _payjoinManager.runSender(
state.payjoinSender!,
);
final (wtxid, errSignBroadcast) = await _walletTx.signAndBroadcastPsbt(
await _payjoinManager.spawnSender(
isTestnet: _networkCubit.state.testnet,
sender: state.payjoinSender!,
wallet: wallet,
psbt: proposalPsbt!,
);
if (errSignBroadcast != null) {
emit(state.copyWith(
errSending: errSignBroadcast.toString(), sending: false));
return;
}

final txWithId = state.tx?.copyWith(txid: wtxid?.$2 ?? '');
emit(state.copyWith(tx: txWithId));

final (updatedWallet, _) = wtxid!;
state.selectedWalletBloc!.add(
UpdateWallet(
updatedWallet,
updateTypes: [
UpdateWalletTypes.addresses,
UpdateWalletTypes.transactions,
UpdateWalletTypes.swaps,
],
),
);

Future.delayed(150.ms);
state.selectedWalletBloc!.add(SyncWallet());

Expand Down

0 comments on commit e6ae3f7

Please sign in to comment.