Skip to content

Commit

Permalink
Reuse RPC threads and TCP connections
Browse files Browse the repository at this point in the history
Keep RPC TCP connections open between sync runs and reuse them,
to reduce TCP connection initialization overhead.
  • Loading branch information
shesek committed Aug 6, 2024
1 parent d5df231 commit beef521
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,14 @@ impl Counter {

pub struct Daemon {
daemon_dir: PathBuf,
daemon_parallelism: usize,
blocks_dir: PathBuf,
network: Network,
conn: Mutex<Connection>,
message_id: Counter, // for monotonic JSONRPC 'id'
signal: Waiter,

rpc_threads: Arc<rayon::ThreadPool>,

// monitoring
latency: HistogramVec,
size: HistogramVec,
Expand All @@ -308,7 +309,6 @@ impl Daemon {
) -> Result<Daemon> {
let daemon = Daemon {
daemon_dir: daemon_dir.clone(),
daemon_parallelism,
blocks_dir: blocks_dir.clone(),
network,
conn: Mutex::new(Connection::new(
Expand All @@ -318,6 +318,13 @@ impl Daemon {
)?),
message_id: Counter::new(),
signal: signal.clone(),
rpc_threads: Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap(),
),
latency: metrics.histogram_vec(
HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"),
&["method"],
Expand Down Expand Up @@ -361,12 +368,12 @@ impl Daemon {
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
daemon_parallelism: self.daemon_parallelism,
blocks_dir: self.blocks_dir.clone(),
network: self.network,
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
message_id: Counter::new(),
signal: self.signal.clone(),
rpc_threads: self.rpc_threads.clone(),
latency: self.latency.clone(),
size: self.size.clone(),
})
Expand Down Expand Up @@ -455,16 +462,10 @@ impl Daemon {
method: &'a str,
params_list: Vec<Value>,
) -> impl ParallelIterator<Item = Result<Value>> + 'a {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap();

thread_pool.install(move || {
self.rpc_threads.install(move || {
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as
// necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped.
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
Expand Down

0 comments on commit beef521

Please sign in to comment.