From beef521f9841530fab5e6917047f9e31c07cc8e4 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 6 Aug 2024 05:16:07 +0300 Subject: [PATCH] Reuse RPC threads and TCP connections Keep RPC TCP connections open between sync runs and reuse them, to reduce TCP connection initialization overhead. --- src/daemon.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 9f892bea3..6ebf2dcaa 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -283,13 +283,14 @@ impl Counter { pub struct Daemon { daemon_dir: PathBuf, - daemon_parallelism: usize, blocks_dir: PathBuf, network: Network, conn: Mutex, message_id: Counter, // for monotonic JSONRPC 'id' signal: Waiter, + rpc_threads: Arc, + // monitoring latency: HistogramVec, size: HistogramVec, @@ -308,7 +309,6 @@ impl Daemon { ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), - daemon_parallelism, blocks_dir: blocks_dir.clone(), network, conn: Mutex::new(Connection::new( @@ -318,6 +318,13 @@ impl Daemon { )?), message_id: Counter::new(), signal: signal.clone(), + rpc_threads: Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(daemon_parallelism) + .thread_name(|i| format!("rpc-requests-{}", i)) + .build() + .unwrap(), + ), latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), &["method"], @@ -361,12 +368,12 @@ impl Daemon { pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), - daemon_parallelism: self.daemon_parallelism, blocks_dir: self.blocks_dir.clone(), network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), signal: self.signal.clone(), + rpc_threads: self.rpc_threads.clone(), latency: self.latency.clone(), size: self.size.clone(), }) @@ -455,16 +462,10 @@ impl Daemon { method: &'a str, params_list: Vec, ) -> impl ParallelIterator> + 'a { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(self.daemon_parallelism) - .thread_name(|i| format!("rpc-requests-{}", i)) - .build() - .unwrap(); - - thread_pool.install(move || { + self.rpc_threads.install(move || { params_list.into_par_iter().map(move |params| { - // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as - // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. + // Store a local per-thread Daemon, each with its own TCP connection. These will + // get initialized as necessary for the `rpc_threads` pool thread managed by rayon. thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); DAEMON_INSTANCE.with(|daemon| {