Skip to content

Commit

Permalink
feat: Send channel tlc info directly to counterparty and update netwo…
Browse files Browse the repository at this point in the history
…rk graph with owned channel information (#446)
  • Loading branch information
contrun authored Jan 14, 2025
1 parent bde0dbc commit 7f2ca14
Show file tree
Hide file tree
Showing 14 changed files with 1,734 additions and 465 deletions.
473 changes: 323 additions & 150 deletions src/fiber/channel.rs

Large diffs are not rendered by default.

366 changes: 328 additions & 38 deletions src/fiber/gen/fiber.rs

Large diffs are not rendered by default.

214 changes: 185 additions & 29 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::channel::ChannelActorStateStore;
use super::channel::{ChannelActorState, ChannelActorStateStore, ChannelTlcInfo};
use super::config::AnnouncedNodeName;
use super::gossip::GossipMessageStore;
use super::history::{Direction, InternalResult, PaymentHistory, TimedResult};
Expand Down Expand Up @@ -144,6 +144,59 @@ impl ChannelInfo {
.map(|n| n.timestamp)
.max(self.update_of_node1.as_ref().map(|n| n.timestamp))
}

#[cfg(test)]
pub fn get_channel_update_of(&self, node: Pubkey) -> Option<&ChannelUpdateInfo> {
if self.node1() == node {
self.update_of_node1.as_ref()
} else if self.node2() == node {
self.update_of_node2.as_ref()
} else {
None
}
}
}

impl TryFrom<&ChannelActorState> for ChannelInfo {
type Error = String;

fn try_from(state: &ChannelActorState) -> Result<Self, Self::Error> {
if !state.is_ready() {
return Err("Channel is not ready".to_string());
}

let timestamp = state.must_get_funding_transaction_timestamp();
let channel_outpoint = state.must_get_funding_transaction_outpoint();
let capacity = state.get_liquid_capacity();
let udt_type_script = state.funding_udt_type_script.clone();

let (node1, node2, update_of_node1, update_of_node2) = if state.local_is_node1() {
(
state.local_pubkey,
state.remote_pubkey,
Some(state.get_local_channel_update_info()),
state.get_remote_channel_update_info(),
)
} else {
(
state.remote_pubkey,
state.local_pubkey,
state.get_remote_channel_update_info(),
Some(state.get_local_channel_update_info()),
)
};
Ok(Self {
channel_outpoint,
timestamp,
features: 0,
node1,
node2,
capacity,
udt_type_script,
update_of_node1,
update_of_node2,
})
}
}

impl From<(u64, ChannelAnnouncement)> for ChannelInfo {
Expand All @@ -168,13 +221,37 @@ pub struct ChannelUpdateInfo {
pub timestamp: u64,
/// Whether the channel can be currently used for payments (in this one direction).
pub enabled: bool,
/// The exact amount of balance that we can receive from the other party via the channel.
/// Note that this is not our balance, but the balance of the other party.
/// This node is forwarding the balance for the other party, so we need to use the receivable balance
/// instead of our balance.
pub inbound_liquidity: Option<u128>,
/// The difference in htlc expiry values that you must have when routing through this channel (in milliseconds).
pub tlc_expiry_delta: u64,
/// The minimum value, which must be relayed to the next hop via the channel
pub tlc_minimum_value: u128,
pub fee_rate: u64,
}

impl From<&ChannelTlcInfo> for ChannelUpdateInfo {
fn from(info: &ChannelTlcInfo) -> Self {
Self {
timestamp: info.timestamp,
enabled: info.enabled,
inbound_liquidity: None,
tlc_expiry_delta: info.tlc_expiry_delta,
tlc_minimum_value: info.tlc_minimum_value,
fee_rate: info.tlc_fee_proportional_millionths as u64,
}
}
}

impl From<ChannelTlcInfo> for ChannelUpdateInfo {
fn from(info: ChannelTlcInfo) -> Self {
Self::from(&info)
}
}

impl From<ChannelUpdate> for ChannelUpdateInfo {
fn from(update: ChannelUpdate) -> Self {
Self::from(&update)
Expand All @@ -186,15 +263,37 @@ impl From<&ChannelUpdate> for ChannelUpdateInfo {
Self {
timestamp: update.timestamp,
enabled: !update.is_disabled(),
inbound_liquidity: None,
tlc_expiry_delta: update.tlc_expiry_delta,
tlc_minimum_value: update.tlc_minimum_value,
fee_rate: update.tlc_fee_proportional_millionths as u64,
}
}
}

/// Update for our own channel has been made. We can use those events to update our graph.
/// The events only contain the information that is relevant for our own channels.
/// Other channel update events should be processed by gossip messages.
#[derive(Debug)]
pub enum OwnedChannelUpdateEvent {
/// The channel is back online and can be used for routing payments.
/// This normally means the peer is now reachable.
Up(ChannelInfo),
/// The channel is down and should not be used for routing payments.
/// This normally means the peer is not reachable.
Down(OutPoint),
/// One direction of the channel is updated (e.g. new balance, new fee rate).
Updated(OutPoint, Pubkey, ChannelUpdateInfo),
}

#[derive(Clone, Debug)]
pub struct NetworkGraph<S> {
// Whether to always process gossip messages for our own channels.
// See comments in should_process_gossip_message_for_channel for why we need this.
// TLDR: Most of the tests do not need this. Only tests in src/fiber/tests/graph.rs need this.
// We will only set this to true for tests in src/fiber/tests/graph.rs.
#[cfg(test)]
pub always_process_gossip_message: bool,
// The pubkey of the node that is running this instance of the network graph.
source: Pubkey,
// All the channels in the network.
Expand Down Expand Up @@ -252,6 +351,8 @@ where
{
pub fn new(store: S, source: Pubkey, announce_private_addr: bool) -> Self {
let mut network_graph = Self {
#[cfg(test)]
always_process_gossip_message: false,
source,
channels: HashMap::new(),
nodes: HashMap::new(),
Expand Down Expand Up @@ -313,6 +414,31 @@ where
return true;
}

// Process the events that are relevant for our own channels, and update the graph accordingly.
pub(crate) fn process_owned_channel_update_event(&mut self, event: OwnedChannelUpdateEvent) {
match event {
OwnedChannelUpdateEvent::Up(channel_info) => {
// Normally the channel_info passed here is the latest channel info,
// so we can just overwrite the old channel info.
self.channels
.insert(channel_info.channel_outpoint.clone(), channel_info);
}
OwnedChannelUpdateEvent::Down(channel_outpoint) => {
self.channels.remove(&channel_outpoint);
}
OwnedChannelUpdateEvent::Updated(channel_outpoint, node, channel_update) => {
if let Some(channel) = self.channels.get_mut(&channel_outpoint) {
if node == channel.node2() {
channel.update_of_node2 = Some(channel_update);
}
if node == channel.node1() {
channel.update_of_node1 = Some(channel_update);
}
}
}
}
}

// Load all the broadcast messages starting from latest_cursor from the store.
// Process them and set nodes and channels accordingly.
pub(crate) fn load_from_store(&mut self) {
Expand Down Expand Up @@ -363,11 +489,33 @@ where
self.channels.get_mut(channel_outpoint)
}

// We don't need to process our own channel announcement with gossip messages.
// They are processed by passing OwnedChannelUpdateEvents to the graph.
// These are real-time events with more detailed information (e.g. balance).
// We don't want to overwrite their detailed information here.
// But tests in src/fiber/tests/graph.rs need to process gossip messages
// to update the network graph. Many of the tests are messages from the graph.source.
// If we ignore these messages, the graph won't be updated. And many tests will fail.
fn should_process_gossip_message_for_nodes(&self, node1: &Pubkey, node2: &Pubkey) -> bool {
#[cfg(test)]
if self.always_process_gossip_message {
return true;
}
!(&self.source == node1 || &self.source == node2)
}

fn process_channel_announcement(
&mut self,
timestamp: u64,
channel_announcement: ChannelAnnouncement,
) -> Option<Cursor> {
if !self.should_process_gossip_message_for_nodes(
&channel_announcement.node1_id,
&channel_announcement.node2_id,
) {
return None;
}

match self.channels.get(&channel_announcement.channel_outpoint) {
Some(_channel) => {
trace!(
Expand Down Expand Up @@ -408,11 +556,16 @@ where
}

fn process_channel_update(&mut self, channel_update: ChannelUpdate) -> Option<Cursor> {
let channel_outpoint = &channel_update.channel_outpoint;
// The channel update message may have smaller timestamp than channel announcement.
// So it is possible that the channel announcement is not loaded into the graph yet,
// when we receive the channel update message.
let channel = self.load_channel_info_mut(channel_outpoint)?;
match self.get_channel(&channel_update.channel_outpoint) {
Some(channel)
if !self
.should_process_gossip_message_for_nodes(&channel.node1, &channel.node2) =>
{
return None;
}
_ => {}
}
let channel = self.load_channel_info_mut(&channel_update.channel_outpoint)?;
let update_info = if channel_update.is_update_of_node_1() {
&mut channel.update_of_node1
} else {
Expand Down Expand Up @@ -594,15 +747,28 @@ where

// Iterating over HashMap's values is not guaranteed to be in order,
// which may introduce randomness in the path finding.
// the weight algorithm in find_path does not considering capacity,
// so the channel with larger capacity maybe have the same weight with the channel with smaller capacity
// so we sort by capacity reverse order to make sure we try channel with larger capacity firstly
channels.sort_by(|(_, _, a, _), (_, _, b, _)| {
b.capacity().cmp(&a.capacity()).then(
b.channel_last_update_time()
.cmp(&a.channel_last_update_time()),
)
});
// We will first sort the channels by inbound_liquidity, then capacity, and at last update time.
// This is because the weight algorithm in find_path does not considering inbound_liquidity and capacity,
// so the channel with larger inbound_liquidity/capacity maybe have the same weight with the channel
// with smaller inbound_liquidity/capacity, even though the former have better chance to success.
channels.sort_by(
|(_, _, a_channel_info, a_channel_update_info),
(_, _, b_channel_info, b_channel_update_info)| {
b_channel_update_info
.inbound_liquidity
.cmp(&a_channel_update_info.inbound_liquidity)
.then(
b_channel_info
.capacity()
.cmp(&a_channel_info.capacity())
.then(
b_channel_info
.channel_last_update_time()
.cmp(&a_channel_info.channel_last_update_time()),
),
)
},
);
channels.into_iter()
}

Expand Down Expand Up @@ -904,20 +1070,10 @@ where
continue;
}

// if this is a direct channel, try to load the channel actor state for balance
if from == self.source || to == self.source {
if let Some(state) = self
.store
.get_channel_state_by_outpoint(&channel_info.out_point())
{
let balance = if from == self.source {
state.to_local_amount
} else {
state.to_remote_amount
};
if amount_to_send > balance {
continue;
}
// If we already know the balance of the channel, check if we can send the amount.
if let Some(balance) = channel_update.inbound_liquidity {
if amount_to_send > balance {
continue;
}
}

Expand Down
38 changes: 26 additions & 12 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tracing::{debug, error, info, trace, warn};
use super::channel::{
get_funding_and_reserved_amount, occupied_capacity, AcceptChannelParameter, ChannelActor,
ChannelActorMessage, ChannelActorStateStore, ChannelCommand, ChannelCommandWithId,
ChannelEvent, ChannelInitializationParameter, ChannelState, ChannelSubscribers,
ChannelEvent, ChannelInitializationParameter, ChannelState, ChannelSubscribers, ChannelTlcInfo,
OpenChannelParameter, ProcessingChannelError, ProcessingChannelResult, PublicChannelInfo,
RevocationData, SettlementData, ShuttingDownFlags, DEFAULT_COMMITMENT_FEE_RATE,
DEFAULT_FEE_RATE, DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, MAX_COMMITMENT_DELAY_EPOCHS,
Expand All @@ -57,7 +57,7 @@ use super::channel::{
use super::config::{AnnouncedNodeName, MIN_TLC_EXPIRY_DELTA};
use super::fee::calculate_commitment_tx_fee;
use super::gossip::{GossipActorMessage, GossipMessageStore, GossipMessageUpdates};
use super::graph::{NetworkGraph, NetworkGraphStateStore, SessionRoute};
use super::graph::{NetworkGraph, NetworkGraphStateStore, OwnedChannelUpdateEvent, SessionRoute};
use super::key::blake2b_hash_with_salt;
use super::types::{
BroadcastMessage, BroadcastMessageQuery, BroadcastMessageWithTimestamp, EcdsaSignature,
Expand Down Expand Up @@ -652,6 +652,9 @@ pub enum NetworkActorEvent {
Option<(ProcessingChannelError, TlcErr)>,
Option<(Hash256, u64)>,
),

// An owned channel is updated.
OwnedChannelUpdateEvent(OwnedChannelUpdateEvent),
}

#[derive(Debug)]
Expand Down Expand Up @@ -954,6 +957,19 @@ where
let mut graph = self.network_graph.write().await;
graph.update_for_messages(gossip_message_updates.messages);
}
NetworkActorEvent::OwnedChannelUpdateEvent(owned_channel_update_event) => {
let mut graph = self.network_graph.write().await;
debug!(
"Received owned channel update event: {:?}",
owned_channel_update_event
);
let is_down =
matches!(owned_channel_update_event, OwnedChannelUpdateEvent::Down(_));
graph.process_owned_channel_update_event(owned_channel_update_event);
if is_down {
debug!("Owned channel is down");
}
}
}
Ok(())
}
Expand Down Expand Up @@ -1584,12 +1600,8 @@ where
payment_session: &mut PaymentSession,
payment_data: &SendPaymentData,
) -> Result<Vec<PaymentHopData>, Error> {
match self
.network_graph
.read()
.await
.build_route(payment_data.clone())
{
let graph = self.network_graph.read().await;
match graph.build_route(payment_data.clone()) {
Err(e) => {
let error = format!("Failed to build route, {}", e);
self.set_payment_fail_with_error(payment_session, &error);
Expand Down Expand Up @@ -2099,11 +2111,12 @@ where
ChannelInitializationParameter::OpenChannel(OpenChannelParameter {
funding_amount,
seed,
public_channel_info: public.then_some(PublicChannelInfo::new(
tlc_info: ChannelTlcInfo::new(
tlc_min_value.unwrap_or(self.tlc_min_value),
tlc_expiry_delta.unwrap_or(self.tlc_expiry_delta),
tlc_fee_proportional_millionths.unwrap_or(self.tlc_fee_proportional_millionths),
)),
),
public_channel_info: public.then_some(PublicChannelInfo::new()),
funding_udt_type_script,
shutdown_script,
channel_id_sender: tx,
Expand Down Expand Up @@ -2185,11 +2198,12 @@ where
ChannelInitializationParameter::AcceptChannel(AcceptChannelParameter {
funding_amount,
reserved_ckb_amount,
public_channel_info: open_channel.is_public().then_some(PublicChannelInfo::new(
tlc_info: ChannelTlcInfo::new(
min_tlc_value.unwrap_or(self.tlc_min_value),
tlc_expiry_delta.unwrap_or(self.tlc_expiry_delta),
tlc_fee_proportional_millionths.unwrap_or(self.tlc_fee_proportional_millionths),
)),
),
public_channel_info: open_channel.is_public().then_some(PublicChannelInfo::new()),
seed,
open_channel,
shutdown_script,
Expand Down
Loading

0 comments on commit 7f2ca14

Please sign in to comment.