Skip to content

Commit

Permalink
Merge pull request fedimint#5814 from tvolk131/speed_up_mprocs
Browse files Browse the repository at this point in the history
chore: parallelize mprocs lightning channel opening
  • Loading branch information
elsirion authored Aug 16, 2024
2 parents 9d6fb7b + 14fb9a2 commit fc58030
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions devimint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fedimintd = { path = "../fedimintd" }
fs-lock = "0.1.4"
futures = { workspace = true }
hex = { workspace = true }
itertools = { workspace = true }
ln-gateway = { package = "fedimint-ln-gateway", path = "../gateway/ln-gateway" }
nix = { version = "0.29.0", features = ["signal"] }
rand = { workspace = true }
Expand Down
14 changes: 8 additions & 6 deletions devimint/src/devfed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::debug;

use crate::envs::{FM_GWID_CLN_ENV, FM_GWID_LDK_ENV, FM_GWID_LND_ENV};
use crate::external::{
open_channel, open_channel_between_gateways, Bitcoind, Electrs, Esplora, Lightningd, Lnd,
open_channel, open_channels_between_gateways, Bitcoind, Electrs, Esplora, Lightningd, Lnd,
};
use crate::federation::{Client, Federation};
use crate::gatewayd::Gatewayd;
Expand Down Expand Up @@ -273,12 +273,14 @@ impl DevJitFed {
let gw_cln = gw_cln.get_try().await?.deref();
let gw_lnd = gw_lnd.get_try().await?.deref();

open_channel_between_gateways(&bitcoind, gw_cln, gw_lnd).await?;
let gateways: &[(&Gatewayd, &str)] =
if let Some(gw_ldk) = gw_ldk.get_try().await?.deref() {
&[(gw_cln, "CLN"), (gw_lnd, "LND"), (gw_ldk, "LDK")]
} else {
&[(gw_cln, "CLN"), (gw_lnd, "LND")]
};

if let Some(gw_ldk) = gw_ldk.get_try().await?.deref() {
open_channel_between_gateways(&bitcoind, gw_ldk, gw_cln).await?;
open_channel_between_gateways(&bitcoind, gw_ldk, gw_lnd).await?;
}
open_channels_between_gateways(&bitcoind, gateways).await?;
}

Ok(Arc::new(()))
Expand Down
88 changes: 57 additions & 31 deletions devimint/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use fedimint_core::BitcoinHash;
use fedimint_logging::LOG_DEVIMINT;
use fedimint_testing::gateway::LightningNodeType;
use hex::ToHex;
use itertools::Itertools;
use tokio::fs;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use tokio::time::Instant;
Expand Down Expand Up @@ -862,67 +863,92 @@ pub async fn open_channel(
Ok(())
}

pub async fn open_channel_between_gateways(
#[allow(clippy::similar_names)]
pub async fn open_channels_between_gateways(
bitcoind: &Bitcoind,
gw_a: &Gatewayd,
gw_b: &Gatewayd,
gateways: &[(&Gatewayd, &str)],
) -> Result<()> {
// TODO: Find out why we need to wait for LDK here.
let funding_addr = loop {
if let Ok(address) = gw_a.get_funding_address().await {
break address;
}
debug!(target: LOG_DEVIMINT, "Syncing gateway lightning nodes to chain tip...");
futures::future::try_join_all(
gateways
.iter()
.map(|(gw, _gw_name)| gw.wait_for_chain_sync(bitcoind)),
)
.await?;

fedimint_core::runtime::sleep(std::time::Duration::from_secs(1)).await;
};
debug!(target: LOG_DEVIMINT, "Performing peg-in on all gateway lightning nodes...");
for (gw, _gw_name) in gateways {
let funding_addr = gw.get_funding_address().await?;
bitcoind.send_to(funding_addr, 100_000_000).await?;
}

bitcoind.send_to(funding_addr, 100_000_000).await?;
bitcoind.mine_blocks(10).await?;

debug!(target: LOG_DEVIMINT, "Await block ln nodes block processing");
tokio::try_join!(
gw_a.wait_for_chain_sync(bitcoind),
gw_b.wait_for_chain_sync(bitcoind)
)?;

debug!(target: LOG_DEVIMINT, "Opening LN channel between the nodes...");
gw_a.open_channel(
gw_b.lightning_pubkey().await?,
gw_b.lightning_node_addr.clone(),
10_000_000,
Some(5_000_000),
debug!(target: LOG_DEVIMINT, "Syncing gateway lightning nodes to chain tip...");
futures::future::try_join_all(
gateways
.iter()
.map(|(gw, _gw_name)| gw.wait_for_chain_sync(bitcoind)),
)
.await?;

// All unique pairs of gateways.
// For a list of gateways [A, B, C], this will produce [(A, B), (B, C), (C, A)].
#[allow(clippy::type_complexity)]
let gateway_pairs: Vec<(&(&Gatewayd, &str), &(&Gatewayd, &str))> =
gateways.iter().circular_tuple_windows::<(_, _)>().collect();

for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
debug!(target: LOG_DEVIMINT, "Opening channel between {gw_a_name} and {gw_b_name} gateway lightning nodes...");
gw_a.open_channel(
gw_b.lightning_pubkey().await?,
gw_b.lightning_node_addr.clone(),
10_000_000,
Some(5_000_000),
)
.await?;
}

// `open_channel` may not send out the channel funding transaction immediately
// so we need to wait for it to get to the mempool.
// TODO: LDK is the culprit here. Find a way to ensure that
// `GatewayLdkClient::open_channel` is fully done before it returns.
fedimint_core::runtime::sleep(Duration::from_secs(10)).await;
fedimint_core::runtime::sleep(Duration::from_secs(5)).await;

bitcoind.mine_blocks(10).await?;

for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;

bitcoind.mine_blocks(20).await?;
wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
}

let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
Ok(())
}

async fn wait_for_ready_channel_on_gateway_with_counterparty(
gw: &Gatewayd,
counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
) -> anyhow::Result<()> {
poll("Wait for channel update", || async {
let channels = gw_b
let channels = gw
.list_active_channels()
.await
.context("list channels")
.map_err(ControlFlow::Break)?;

if channels
.iter()
.any(|channel| channel.remote_pubkey == gw_a_node_pubkey)
.any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
{
return Ok(());
}

Err(ControlFlow::Continue(anyhow!("channel not found")))
})
.await?;

Ok(())
.await
}

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion devimint/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ where
let (process_mgr, task_group) = cli::setup(args).await?;
log_binary_versions().await?;
let dev_fed = devfed::DevJitFed::new(&process_mgr, false)?;
let res = cleanup_on_exit(f(dev_fed.clone(), process_mgr.clone()), task_group).await;
// workaround https://github.com/tokio-rs/tokio/issues/6463
// by waiting on all jits to complete, we make it less likely
// that something is not finished yet and will block in `on_block`
let _ = dev_fed.finalize(&process_mgr).await;
let res = cleanup_on_exit(f(dev_fed.clone(), process_mgr.clone()), task_group).await;
dev_fed.fast_terminate().await;
res?;

Expand Down

0 comments on commit fc58030

Please sign in to comment.