Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to rust-lightning:0.0.117 #15

Open
wants to merge 16 commits into
base: split-tx-experiment-117
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
name: Continuous Integration Checks

on:
push:
branches-ignore:
- master
pull_request:
branches-ignore:
- master
Expand Down Expand Up @@ -51,7 +48,7 @@ jobs:
shellcheck ci/ci-tests.sh
- name: Run CI script
shell: bash # Default on Winblows is powershell
run: ./ci/ci-tests.sh
run: CI_MINIMIZE_DISK_USAGE=1 ./ci/ci-tests.sh

coverage:
strategy:
Expand Down Expand Up @@ -141,15 +138,24 @@ jobs:
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain ${{ env.TOOLCHAIN }}
rustup override set ${{ env.TOOLCHAIN }}
- name: Fetch full tree and rebase on upstream
- name: Get PR base branch
id: get-pr-base-branch
run: |
git remote add upstream https://github.com/lightningdevkit/rust-lightning
BRANCH=${{ github.event.pull_request.base.ref }}
echo "PR_BASE_BRANCH=$BRANCH" >> "$GITHUB_OUTPUT"
- name: Fetch full tree and rebase on PR base branch
env:
PR_BASE_BRANCH: ${{ steps.get-pr-base-branch.outputs.PR_BASE_BRANCH }}
run: |
git remote add upstream https://github.com/p2pderivatives/rust-lightning
git fetch upstream
export GIT_COMMITTER_EMAIL="[email protected]"
export GIT_COMMITTER_NAME="RL CI"
git rebase upstream/main
git rebase upstream/$PR_BASE_BRANCH
- name: For each commit, run cargo check (including in fuzz)
run: ci/check-each-commit.sh upstream/main
env:
PR_BASE_BRANCH: ${{ steps.get-pr-base-branch.outputs.PR_BASE_BRANCH }}
run: ci/check-each-commit.sh upstream/$PR_BASE_BRANCH

check_release:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions ci/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,14 @@ else
[ "$RUSTC_MINOR_VERSION" -lt 60 ] && cargo update -p memchr --precise "2.5.0" --verbose
cargo check --verbose --color always
fi
[ "$CI_MINIMIZE_DISK_USAGE" != "" ] && cargo clean
popd

# Test that we can build downstream code with only the "release pins".
pushd msrv-no-dev-deps-check
PIN_RELEASE_DEPS
# The memchr crate switched to an MSRV of 1.60 starting with v2.6.0
[ "$RUSTC_MINOR_VERSION" -lt 60 ] && cargo update -p memchr --precise "2.5.0" --verbose
cargo check
popd

Expand Down
5 changes: 5 additions & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
self.chain_monitor.update_channel(funding_txo, update)
}

fn update_channel_funding_txo(&self, _: OutPoint, _: OutPoint, _: u64) -> chain::ChannelMonitorUpdateStatus {
unimplemented!()
}

fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
return self.chain_monitor.release_pending_monitor_events();
}
Expand Down Expand Up @@ -317,6 +321,7 @@ fn check_api_err(api_err: APIError, sendable_bounds_violated: bool) {
// We can (obviously) temp-fail a monitor update
},
APIError::IncompatibleShutdownScript { .. } => panic!("Cannot send an incompatible shutdown script"),
APIError::ExternalError { err } => panic!("{}", err),
}
}
#[inline]
Expand Down
5 changes: 5 additions & 0 deletions fuzz/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
config: None,
feerate_sat_per_1000_weight: None,
channel_shutdown_state: Some(channelmanager::ChannelShutdownState::NotShuttingDown),
funding_redeemscript: None,
holder_funding_pubkey: PublicKey::from_slice(&hex::decode("02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619").unwrap()[..]).unwrap(),
counter_funding_pubkey: None,
original_funding_outpoint: None,
channel_keys_id: [0; 32],
});
}
Some(&$first_hops_vec[..])
Expand Down
39 changes: 20 additions & 19 deletions lightning-block-sync/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl std::ops::Deref for ValidatedBlockHeader {
impl ValidatedBlockHeader {
/// Checks that the header correctly builds on previous_header: the claimed work differential
/// matches the actual PoW and the difficulty transition is possible, i.e., within 4x.
fn check_builds_on(&self, previous_header: &ValidatedBlockHeader, network: Network) -> BlockSourceResult<()> {
fn check_builds_on(&self, previous_header: &ValidatedBlockHeader, _network: Network) -> BlockSourceResult<()> {
if self.header.prev_blockhash != previous_header.block_hash {
return Err(BlockSourceError::persistent("invalid previous block hash"));
}
Expand All @@ -129,24 +129,25 @@ impl ValidatedBlockHeader {
return Err(BlockSourceError::persistent("invalid block height"));
}

let work = self.header.work();
if self.chainwork != previous_header.chainwork + work {
return Err(BlockSourceError::persistent("invalid chainwork"));
}

if let Network::Bitcoin = network {
if self.height % 2016 == 0 {
let target = self.header.target();
let previous_target = previous_header.header.target();
let min_target = previous_target >> 2;
let max_target = previous_target << 2;
if target > max_target || target < min_target {
return Err(BlockSourceError::persistent("invalid difficulty transition"))
}
} else if self.header.bits != previous_header.header.bits {
return Err(BlockSourceError::persistent("invalid difficulty"))
}
}
// let work = self.header.work();
// if self.chainwork != previous_header.chainwork + work {
// return Err(BlockSourceError::persistent("invalid chainwork"));
// }

// TODO(Tibo): This causes issues with Esplora, temporary fix.
// if let Network::Bitcoin = network {
// if self.height % 2016 == 0 {
// let target = self.header.target();
// let previous_target = previous_header.header.target();
// let min_target = previous_target >> 2;
// let max_target = previous_target << 2;
// if target > max_target || target < min_target {
// return Err(BlockSourceError::persistent("invalid difficulty transition"))
// }
// } else if self.header.bits != previous_header.header.bits {
// return Err(BlockSourceError::persistent("invalid difficulty"))
// }
// }

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions lightning-net-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.117", path = "../lightning" }
tokio = { version = "1.0", features = [ "rt", "sync", "net", "time" ] }
tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }

[dev-dependencies]
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
lightning = { version = "0.0.117", path = "../lightning", features = ["_test_utils"] }
72 changes: 39 additions & 33 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
use bitcoin::secp256k1::PublicKey;

use tokio::net::TcpStream;
use tokio::time;
use tokio::{io, time};
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};

use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
Expand All @@ -58,7 +59,7 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
// define a trivial two- and three- select macro with the specific types we need and just use that.

pub(crate) enum SelectorOutput {
A(Option<()>), B(Option<()>), C(tokio::io::Result<()>),
A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
}

pub(crate) struct TwoSelector<
Expand Down Expand Up @@ -86,15 +87,15 @@ impl<
}

pub(crate) struct ThreeSelector<
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
> {
pub a: A,
pub b: B,
pub c: C,
}

impl<
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
> Future for ThreeSelector<A, B, C> {
type Output = SelectorOutput;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
Expand All @@ -118,7 +119,7 @@ impl<
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
/// read future (which is returned by schedule_read).
struct Connection {
writer: Option<Arc<TcpStream>>,
writer: Option<io::WriteHalf<TcpStream>>,
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
Expand Down Expand Up @@ -155,7 +156,7 @@ impl Connection {
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
us: Arc<Mutex<Self>>,
reader: Arc<TcpStream>,
mut reader: io::ReadHalf<TcpStream>,
mut read_wake_receiver: mpsc::Receiver<()>,
mut write_avail_receiver: mpsc::Receiver<()>,
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl Connection {
ThreeSelector {
a: Box::pin(write_avail_receiver.recv()),
b: Box::pin(read_wake_receiver.recv()),
c: Box::pin(reader.readable()),
c: Box::pin(reader.read(&mut buf)),
}.await
};
match select_result {
Expand All @@ -210,9 +211,8 @@ impl Connection {
}
},
SelectorOutput::B(_) => {},
SelectorOutput::C(res) => {
if res.is_err() { break Disconnect::PeerDisconnected; }
match reader.try_read(&mut buf) {
SelectorOutput::C(read) => {
match read {
Ok(0) => break Disconnect::PeerDisconnected,
Ok(len) => {
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
Expand All @@ -226,10 +226,6 @@ impl Connection {
Err(_) => break Disconnect::CloseConnection,
}
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// readable() is allowed to spuriously wake, so we have to handle
// WouldBlock here.
},
Err(_) => break Disconnect::PeerDisconnected,
}
},
Expand All @@ -243,14 +239,18 @@ impl Connection {
// here.
let _ = tokio::task::yield_now().await;
};
us.lock().unwrap().writer.take();
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
// If the socket is already closed, shutdown() will fail, so just ignore it.
let _ = writer.shutdown().await;
}
if let Disconnect::PeerDisconnected = disconnect_type {
peer_manager.as_ref().socket_disconnected(&our_descriptor);
peer_manager.as_ref().process_events();
}
}

fn new(stream: StdTcpStream) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
// PeerManager, we will eventually get notified that there is room in the socket to write
// new bytes, which will generate an event. That event will be popped off the queue before
Expand All @@ -262,11 +262,11 @@ impl Connection {
// false.
let (read_waker, read_receiver) = mpsc::channel(1);
stream.set_nonblocking(true).unwrap();
let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());
let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap());

(Arc::clone(&tokio_stream), write_receiver, read_receiver,
(reader, write_receiver, read_receiver,
Arc::new(Mutex::new(Self {
writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
writer: Some(writer), write_avail, read_waker, read_paused: false,
rl_requested_disconnect: false,
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
})))
Expand Down Expand Up @@ -462,9 +462,9 @@ impl SocketDescriptor {
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
// there's room in the kernel buffer, or otherwise create a new Waker with a
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
// To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
// writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
// a SocketDescriptor in it which can wake up the write_avail Sender, waking up the
// processing future which will call write_buffer_space_avail and we'll end up back here.
let mut us = self.conn.lock().unwrap();
if us.writer.is_none() {
Expand All @@ -484,18 +484,24 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
let mut ctx = task::Context::from_waker(&waker);
let mut written_len = 0;
loop {
match us.writer.as_ref().unwrap().poll_write_ready(&mut ctx) {
task::Poll::Ready(Ok(())) => {
match us.writer.as_ref().unwrap().try_write(&data[written_len..]) {
Ok(res) => {
debug_assert_ne!(res, 0);
written_len += res;
if written_len == data.len() { return written_len; }
},
Err(_) => return written_len,
}
match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
task::Poll::Ready(Ok(res)) => {
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
assert_ne!(res, 0);
written_len += res;
if written_len == data.len() { return written_len; }
},
task::Poll::Ready(Err(e)) => {
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
// Probably we've already been closed, just return what we have and let the
// read thread handle closing logic.
return written_len;
},
task::Poll::Ready(Err(_)) => return written_len,
task::Poll::Pending => {
// We're queued up for a write event now, but we need to make sure we also
// pause read given we're now waiting on the remote end to ACK (and in
Expand Down
1 change: 1 addition & 0 deletions lightning/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
disable_all_formatting = true
Loading