Skip to content

Commit

Permalink
Don't use RPC batching with bitcoind
Browse files Browse the repository at this point in the history
This actually hurts performance because the batched response has to be
bueffered, as @TheBlueMatt explains at romanz#373 (comment)

Instead, send multiple RPC requests in parallel using a thread pool.

Also see romanz#374
  • Loading branch information
shesek committed May 23, 2024
1 parent 5db7d2b commit 0ac2dbc
Showing 1 changed file with 14 additions and 26 deletions.
40 changes: 14 additions & 26 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;

use base64::prelude::{Engine, BASE64_STANDARD};
use hex::FromHex;
use itertools::Itertools;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -377,31 +377,16 @@ impl Daemon {
Ok(result)
}

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let chunks = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.chunks(50_000); // Max Amount of batched requests
let mut results = vec![];
for chunk in &chunks {
let reqs = chunk.collect();
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
}
} else {
bail!("non-array replies: {:?}", replies);
}
}

Ok(results)
let req = json!({"method": method, "params": params, "id": id});
let reply = self.call_jsonrpc(method, &req)?;
parse_jsonrpc_reply(reply, method, id)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn retry_request(&self, method: &str, params: &Value) -> Result<Value> {
loop {
match self.handle_request_batch(method, params_list) {
match self.handle_request(method, &params) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
self.signal.wait(Duration::from_secs(3), false)?;
Expand All @@ -415,13 +400,16 @@ impl Daemon {
}

fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
self.retry_request(method, &params)
}

fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
// Send in parallel as individual JSONRPC requests, with no batching.
// See https://github.com/Blockstream/electrs/pull/33
params_list
.par_iter()
.map(|params| self.retry_request(method, params))
.collect()
}

// bitcoind JSONRPC API:
Expand Down

0 comments on commit 0ac2dbc

Please sign in to comment.