From 414e008338aa856845561bb8d4b5e6ae3afd1d4e Mon Sep 17 00:00:00 2001 From: marshallyale <19430240+marshallyale@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:46:41 -0800 Subject: [PATCH 1/2] fix: fix batch ordering issue Fixes issue #75 Raw client waiting map was using the same channel for every request/response. When items were put back into the channel inside of _reader_thread the waiting receiver in recv would just take the next response without validating it on request id request. This fixes this by using unique channels for each request response inside of the waiting map. --- src/raw_client.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/raw_client.rs b/src/raw_client.rs index eb78185..f7a1d95 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -3,7 +3,7 @@ //! This module contains the definition of the raw client that wraps the transport method use std::borrow::Borrow; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{BufRead, BufReader, Read, Write}; use std::mem::drop; use std::net::{TcpStream, ToSocketAddrs}; @@ -762,12 +762,10 @@ impl ElectrumApi for RawClient { fn batch_call(&self, batch: &Batch) -> Result, Error> { let mut raw = Vec::new(); - let mut missing_responses = BTreeSet::new(); + let mut missing_responses = Vec::new(); let mut answers = BTreeMap::new(); - // Add our listener to the map before we send the request, Here we will clone the sender - // for every request id, so that we only have to monitor one receiver. - let (sender, receiver) = channel(); + // Add our listener to the map before we send the request for (method, params) in batch.iter() { let req = Request::new_id( @@ -775,9 +773,12 @@ impl ElectrumApi for RawClient { method, params.to_vec(), ); - missing_responses.insert(req.id); + // Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map + // we can be sure that the response gets sent to the correct channel in self.recv + let (sender, receiver) = channel(); + missing_responses.push((req.id, receiver)); - self.waiting_map.lock()?.insert(req.id, sender.clone()); + self.waiting_map.lock()?.insert(req.id, sender); raw.append(&mut serde_json::to_vec(&req)?); raw.extend_from_slice(b"\n"); @@ -796,7 +797,7 @@ impl ElectrumApi for RawClient { self.increment_calls(); - for req_id in missing_responses.iter() { + for (req_id, receiver) in missing_responses.iter() { match self.recv(&receiver, *req_id) { Ok(mut resp) => answers.insert(req_id, resp["result"].take()), Err(e) => { @@ -805,7 +806,7 @@ impl ElectrumApi for RawClient { warn!("got error for req_id {}: {:?}", req_id, e); warn!("removing all waiting req of this batch"); let mut guard = self.waiting_map.lock()?; - for req_id in missing_responses.iter() { + for (req_id, _) in missing_responses.iter() { guard.remove(req_id); } return Err(e); From 2d3f4a5c429c7bb8f22828e52f352b392e612914 Mon Sep 17 00:00:00 2001 From: marshall-yale Date: Fri, 24 Jan 2025 11:54:36 -0800 Subject: [PATCH 2/2] fix: fix clippy error --- src/raw_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raw_client.rs b/src/raw_client.rs index f7a1d95..d0dfca5 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -798,7 +798,7 @@ impl ElectrumApi for RawClient { self.increment_calls(); for (req_id, receiver) in missing_responses.iter() { - match self.recv(&receiver, *req_id) { + match self.recv(receiver, *req_id) { Ok(mut resp) => answers.insert(req_id, resp["result"].take()), Err(e) => { // In case of error our sender could still be left in the map, depending on where