Skip to content

Commit

Permalink
Extract shared logic for matching on stdout line
Browse files Browse the repository at this point in the history
Avoid repetition by introducing `wait_for_stdout_match` which reads from
stdout until a match is found against the given pattern.

Note: it's important that `child_stdout` lives long enough, until
`sigint` is called on the child process. Otherwise the interrupt handler
would try to `println!` to a dropped stdout and panic.
  • Loading branch information
spacebear21 committed Feb 7, 2025
1 parent 9ebf027 commit 4f4432b
Showing 1 changed file with 54 additions and 77 deletions.
131 changes: 54 additions & 77 deletions payjoin-cli/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ mod e2e {
use std::path::PathBuf;

use payjoin_test_utils::{init_tracing, TestServices};
use tokio::process::Child;
use tokio::process::{Child, ChildStdout};

type Result<T> = std::result::Result<T, BoxError>;

Expand Down Expand Up @@ -266,117 +266,94 @@ mod e2e {
}

async fn get_bip21_from_receiver(mut cli_receiver: Child) -> String {
let stdout =
cli_receiver.stdout.take().expect("Failed to take stdout of child process");
let reader = BufReader::new(stdout);
let mut stdout = tokio::io::stdout();
let mut bip21 = String::new();

let mut lines = reader.lines();

while let Some(line) = lines.next_line().await.expect("Failed to read line from stdout")
{
// Write to stdout regardless
stdout
.write_all(format!("{}\n", line).as_bytes())
.await
.expect("Failed to write to stdout");

if line.to_ascii_uppercase().starts_with("BITCOIN") {
bip21 = line;
break;
}
}
let mut stdout =
cli_receiver.stdout.take().expect("failed to take stdout of child process");
let bip21 = wait_for_stdout_match(&mut stdout, |line| {
line.to_ascii_uppercase().starts_with("BITCOIN")
})
.await
.expect("payjoin-cli receiver should output a bitcoin URI");
log::debug!("Got bip21 {}", &bip21);

sigint(cli_receiver).await.expect("Failed to kill payjoin-cli");
bip21
}

async fn send_until_request_timeout(mut cli_sender: Child) -> Result<()> {
let stdout = cli_sender.stdout.take().expect("Failed to take stdout of child process");
let reader = BufReader::new(stdout);
let mut stdout = tokio::io::stdout();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);

let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.expect("Failed to read line from stdout")
{
stdout
.write_all(format!("{}\n", line).as_bytes())
.await
.expect("Failed to write to stdout");
if line.contains("No response yet.") {
let _ = tx.send(true).await;
break;
}
}

let mut stdout =
cli_sender.stdout.take().expect("failed to take stdout of child process");
let timeout = tokio::time::Duration::from_secs(35);
let fallback_sent = tokio::time::timeout(timeout, rx.recv()).await?;
let res = tokio::time::timeout(
timeout,
wait_for_stdout_match(&mut stdout, |line| line.contains("No response yet.")),
)
.await?;

sigint(cli_sender).await.expect("Failed to kill payjoin-cli initial sender");

assert!(fallback_sent.unwrap_or(false), "Fallback send was not detected");
assert!(res.is_some(), "Fallback send was not detected");
Ok(())
}

async fn respond_with_payjoin(mut cli_receive_resumer: Child) -> Result<()> {
let stdout =
let mut stdout =
cli_receive_resumer.stdout.take().expect("Failed to take stdout of child process");
let reader = BufReader::new(stdout);
let mut stdout = tokio::io::stdout();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);

let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.expect("Failed to read line from stdout")
{
stdout
.write_all(format!("{}\n", line).as_bytes())
.await
.expect("Failed to write to stdout");
if line.contains("Response successful") {
let _ = tx.send(true).await;
break;
}
}

let timeout = tokio::time::Duration::from_secs(10);
let response_successful = tokio::time::timeout(timeout, rx.recv()).await?;
let res = tokio::time::timeout(
timeout,
wait_for_stdout_match(&mut stdout, |line| line.contains("Response successful")),
)
.await?;

sigint(cli_receive_resumer).await.expect("Failed to kill payjoin-cli");

assert!(response_successful.unwrap_or(false), "Did not respond with Payjoin PSBT");
assert!(res.is_some(), "Did not respond with Payjoin PSBT");
Ok(())
}

async fn check_payjoin_sent(mut cli_send_resumer: Child) -> Result<()> {
let stdout =
let mut stdout =
cli_send_resumer.stdout.take().expect("Failed to take stdout of child process");
let reader = BufReader::new(stdout);
let mut stdout = tokio::io::stdout();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let timeout = tokio::time::Duration::from_secs(10);
let res = tokio::time::timeout(
timeout,
wait_for_stdout_match(&mut stdout, |line| line.contains("Payjoin sent")),
)
.await?;

sigint(cli_send_resumer).await.expect("Failed to kill payjoin-cli");
assert!(res.is_some(), "Payjoin send was not detected");
Ok(())
}

/// Read lines from `child_stdout` until `match_pattern` is found and the corresponding
/// line is returned.
/// Also writes every read line to tokio::io::stdout();
async fn wait_for_stdout_match<F>(
child_stdout: &mut ChildStdout,
match_pattern: F,
) -> Option<String>
where
F: Fn(&str) -> bool,
{
let reader = BufReader::new(child_stdout);
let mut lines = reader.lines();
let mut res = None;

let mut stdout = tokio::io::stdout();
while let Some(line) = lines.next_line().await.expect("Failed to read line from stdout")
{
// Write all output to tests stdout
stdout
.write_all(format!("{}\n", line).as_bytes())
.await
.expect("Failed to write to stdout");
if line.contains("Payjoin sent") {
let _ = tx.send(true).await;

if match_pattern(&line) {
res = Some(line);
break;
}
}

let timeout = tokio::time::Duration::from_secs(10);
let payjoin_sent = tokio::time::timeout(timeout, rx.recv()).await?;

sigint(cli_send_resumer).await.expect("Failed to kill payjoin-cli");

assert!(payjoin_sent.unwrap_or(false), "Payjoin send was not detected");
Ok(())
res
}

Ok(())
Expand Down

0 comments on commit 4f4432b

Please sign in to comment.