From 6931294a3923d26a9c2bc95c4f66d79a08e0ee86 Mon Sep 17 00:00:00 2001 From: Mariusz Reichert Date: Fri, 20 Sep 2024 12:00:00 +0200 Subject: [PATCH] Opentelemetry instrumentation implementation --- Cargo.lock | 33 +------------ Cargo.toml | 24 +++++++--- src/bin/electrs.rs | 30 ++++++++++-- src/bin/tx-fingerprint-stats.rs | 16 ++++--- src/config.rs | 1 + src/daemon.rs | 39 ++++++++++++++- src/electrum/discovery.rs | 44 ++++++++++------- src/electrum/server.rs | 23 +++++++++ src/lib.rs | 9 ++++ src/new_index/db.rs | 6 +++ src/new_index/fetch.rs | 8 ++++ src/new_index/mempool.rs | 43 ++++++++++++++--- src/new_index/precache.rs | 4 ++ src/new_index/query.rs | 20 ++++++++ src/new_index/schema.rs | 60 ++++++++++++++++++++++- src/otlp_trace.rs | 85 +++++++++++++++++++++++++++++++++ src/rest.rs | 4 ++ src/util/block.rs | 7 +++ src/util/electrum_merkle.rs | 6 ++- src/util/fees.rs | 3 ++ 20 files changed, 387 insertions(+), 78 deletions(-) create mode 100644 src/otlp_trace.rs diff --git a/Cargo.lock b/Cargo.lock index 9db85e225..52ff2ba9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1655,16 +1655,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi 0.3.9", -] - [[package]] name = "num-conv" version = "0.1.0" @@ -1824,12 +1814,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "page_size" version = "0.6.0" @@ -3141,7 +3125,6 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3179,17 +3162,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - [[package]] name = "tracing-opentelemetry" version = "0.21.0" @@ -3202,7 +3174,7 @@ dependencies = [ "smallvec", "tracing", "tracing-core", - "tracing-log 0.1.4", + "tracing-log", "tracing-subscriber", ] @@ -3213,15 +3185,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", - "nu-ansi-term", "once_cell", "regex", "sharded-slab", - "smallvec", "thread_local", "tracing", "tracing-core", - "tracing-log 0.2.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 701b40d50..6392d8d48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,18 @@ edition = "2018" [features] liquid = [ "elements" ] electrum-discovery = [ "electrum-client"] +default = ["no-otlp-tracing"] +otlp-tracing = [ + "tracing/max_level_trace", + "tracing-subscriber", + "opentelemetry", + "tracing-opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions" +] +no-otlp-tracing = [ + "tracing/max_level_off" +] [dependencies] arraydeque = "0.5.1" @@ -52,12 +64,12 @@ hyper = "0.14" hyperlocal = "0.8" # close to same tokio version as dependent by hyper v0.14 and hyperlocal 0.8 -- things can go awry if they mismatch tokio = { version = "1", features = ["sync", "macros", "rt-multi-thread", "rt"] } -opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } -tracing-opentelemetry = "0.21.0" -opentelemetry-otlp = { version = "0.13.0", default-features = false, features = ["http-proto", "reqwest-client"] } -tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } -opentelemetry-semantic-conventions = "0.12.0" -tracing = { version = "0.1.40", features = ["async-await", "log"] } +opentelemetry = { version = "0.20.0", features = ["rt-tokio"], optional = true } +tracing-opentelemetry = { version = "0.21.0", optional = true } +opentelemetry-otlp = { version = "0.13.0", default-features = false, features = ["http-proto", "reqwest-client"], optional = true } +tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt"], optional = true } +opentelemetry-semantic-conventions = { version = "0.12.0", optional = true } +tracing = { version = "0.1.40", default-features = false, features = ["attributes"], optional = true } # optional dependencies for electrum-discovery electrum-client = { version = "0.8", optional = true } diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index fb25e68a8..a2b6765e2 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -20,6 +20,9 @@ use electrs::{ signal::Waiter, }; +#[cfg(feature = "otlp-tracing")] +use electrs::otlp_trace; + #[cfg(feature = "liquid")] use electrs::elements::AssetRegistry; use electrs::metrics::MetricOpts; @@ -85,9 +88,12 @@ fn run_server(config: Arc) -> Result<()> { match Mempool::update(&mempool, &daemon) { Ok(_) => break, Err(e) => { - warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain()); + warn!( + "Error performing initial mempool update, trying again in 5 seconds: {}", + e.display_chain() + ); signal.wait(Duration::from_secs(5), false)?; - }, + } } } @@ -117,7 +123,6 @@ fn run_server(config: Arc) -> Result<()> { )); loop { - main_loop_count.inc(); if let Err(err) = signal.wait(Duration::from_secs(5), true) { @@ -137,7 +142,10 @@ fn run_server(config: Arc) -> Result<()> { // Update mempool if let Err(e) = Mempool::update(&mempool, &daemon) { // Log the error if the result is an Err - warn!("Error updating mempool, skipping mempool update: {}", e.display_chain()); + warn!( + "Error updating mempool, skipping mempool update: {}", + e.display_chain() + ); } // Update subscribed clients @@ -147,10 +155,22 @@ fn run_server(config: Arc) -> Result<()> { Ok(()) } -fn main() { +fn main_() { let config = Arc::new(Config::from_args()); if let Err(e) = run_server(config) { error!("server failed: {}", e.display_chain()); process::exit(1); } } + +#[cfg(not(feature = "otlp-tracing"))] +fn main() { + main_(); +} + +#[cfg(feature = "otlp-tracing")] +#[tokio::main] +async fn main() { + let _tracing_guard = otlp_trace::init_tracing("electrs"); + main_() +} diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index afe980f8c..aca888111 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -82,13 +82,15 @@ fn main() { //info!("{:?},{:?}", txid, blockid); - let prevouts = chain.lookup_txos( - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - .collect(), - ).unwrap(); + let prevouts = chain + .lookup_txos( + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + .collect(), + ) + .unwrap(); let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum(); let small_out = tx diff --git a/src/config.rs b/src/config.rs index 8696ecf8f..0bfb66d03 100644 --- a/src/config.rs +++ b/src/config.rs @@ -379,6 +379,7 @@ impl Config { stderrlog::Timestamp::Off }); log.init().expect("logging initialization failed"); + let config = Config { log, network_type, diff --git a/src/daemon.rs b/src/daemon.rs index 457bf4230..3d4d8aacd 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -17,6 +17,8 @@ use bitcoin::consensus::encode::{deserialize, serialize_hex}; #[cfg(feature = "liquid")] use elements::encode::{deserialize, serialize_hex}; +use tracing::instrument; + use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid}; use crate::metrics::{HistogramOpts, HistogramVec, Metrics}; use crate::signal::Waiter; @@ -36,6 +38,7 @@ lazy_static! { ); } +#[instrument(skip_all, name = "Daemon::parse_hash")] fn parse_hash(value: &Value) -> Result where T: FromStr, @@ -49,6 +52,7 @@ where .chain_err(|| format!("non-hex value: {}", value))?) } +#[instrument(skip_all, name = "Daemon::header_from_value")] fn header_from_value(value: Value) -> Result { let header_hex = value .as_str() @@ -140,6 +144,7 @@ struct Connection { signal: Waiter, } +#[instrument(skip_all, name = "Daemon::tcp_connect")] fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result { loop { match TcpStream::connect_timeout(&addr, *DAEMON_CONNECTION_TIMEOUT) { @@ -162,6 +167,7 @@ fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result { } impl Connection { + #[instrument(skip_all, name = "Daemon::Connection::new")] fn new( addr: SocketAddr, cookie_getter: Arc, @@ -181,10 +187,12 @@ impl Connection { }) } + #[instrument(skip(self))] fn reconnect(&self) -> Result { Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone()) } + #[instrument(skip_all, name = "Daemon::Connection::send")] fn send(&mut self, request: &str) -> Result<()> { let cookie = &self.cookie_getter.get()?; let msg = format!( @@ -198,6 +206,7 @@ impl Connection { }) } + #[instrument(skip_all, name = "Daemon::Connection::recv")] fn recv(&mut self) -> Result { // TODO: use proper HTTP parser. let mut in_header = true; @@ -353,6 +362,7 @@ impl Daemon { Ok(daemon) } + #[instrument(skip(self))] pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), @@ -366,6 +376,7 @@ impl Daemon { }) } + #[instrument(skip_all, name = "Daemon::list_blk_files")] pub fn list_blk_files(&self) -> Result> { let path = self.blocks_dir.join("blk*.dat"); debug!("listing block files at {:?}", path); @@ -381,6 +392,7 @@ impl Daemon { self.network.magic() } + #[instrument(skip_all, name = "Daemon::call_jsonrpc")] fn call_jsonrpc(&self, method: &str, request: &Value) -> Result { let mut conn = self.conn.lock().unwrap(); let timer = self.latency.with_label_values(&[method]).start_timer(); @@ -398,6 +410,7 @@ impl Daemon { Ok(result) } + #[instrument(skip_all, name = "Daemon::handle_request_batch")] fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { let id = self.message_id.next(); let chunks = params_list @@ -420,6 +433,7 @@ impl Daemon { Ok(results) } + #[instrument(skip_all, name = "Daemon::retry_request_batch")] fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { loop { match self.handle_request_batch(method, params_list) { @@ -435,36 +449,43 @@ impl Daemon { } } + #[instrument(skip_all, name = "Daemon::request")] fn request(&self, method: &str, params: Value) -> Result { let mut values = self.retry_request_batch(method, &[params])?; assert_eq!(values.len(), 1); Ok(values.remove(0)) } + #[instrument(skip_all, name = "Daemon::requests")] fn requests(&self, method: &str, params_list: &[Value]) -> Result> { self.retry_request_batch(method, params_list) } // bitcoind JSONRPC API: + #[instrument(skip_all, name = "Daemon::getblockchaininfo")] pub fn getblockchaininfo(&self) -> Result { let info: Value = self.request("getblockchaininfo", json!([]))?; Ok(from_value(info).chain_err(|| "invalid blockchain info")?) } + #[instrument(skip_all, name = "Daemon::getnetworkinfo")] fn getnetworkinfo(&self) -> Result { let info: Value = self.request("getnetworkinfo", json!([]))?; Ok(from_value(info).chain_err(|| "invalid network info")?) } + #[instrument(skip_all, name = "Daemon::getbestblockhash")] pub fn getbestblockhash(&self) -> Result { parse_hash(&self.request("getbestblockhash", json!([]))?) } + #[instrument(skip_all, name = "Daemon::getblockheader")] pub fn getblockheader(&self, blockhash: &BlockHash) -> Result { header_from_value(self.request("getblockheader", json!([blockhash, /*verbose=*/ false]))?) } + #[instrument(skip_all, name = "Daemon::getblockheaders")] pub fn getblockheaders(&self, heights: &[usize]) -> Result> { let heights: Vec = heights.iter().map(|height| json!([height])).collect(); let params_list: Vec = self @@ -479,6 +500,7 @@ impl Daemon { Ok(result) } + #[instrument(skip_all, name = "Daemon::getblock")] pub fn getblock(&self, blockhash: &BlockHash) -> Result { let block = block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?; @@ -486,10 +508,12 @@ impl Daemon { Ok(block) } + #[instrument(skip_all, name = "Daemon::getblock_raw")] pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result { self.request("getblock", json!([blockhash, verbose])) } + #[instrument(skip_all, name = "Daemon::getblocks")] pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result> { let params_list: Vec = blockhashes .iter() @@ -503,6 +527,7 @@ impl Daemon { Ok(blocks) } + #[instrument(skip_all, name = "Daemon::gettransactions")] pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result> { let params_list: Vec = txhashes .iter() @@ -518,6 +543,7 @@ impl Daemon { Ok(txs) } + #[instrument(skip_all, name = "Daemon::gettransaction_raw")] pub fn gettransaction_raw( &self, txid: &Txid, @@ -527,20 +553,24 @@ impl Daemon { self.request("getrawtransaction", json!([txid, verbose, blockhash])) } + #[instrument(skip_all, name = "getmempooltx")] pub fn getmempooltx(&self, txhash: &Txid) -> Result { let value = self.request("getrawtransaction", json!([txhash, /*verbose=*/ false]))?; tx_from_value(value) } + #[instrument(skip_all, name = "getmempooltxids")] pub fn getmempooltxids(&self) -> Result> { let res = self.request("getrawmempool", json!([/*verbose=*/ false]))?; Ok(serde_json::from_value(res).chain_err(|| "invalid getrawmempool reply")?) } + #[instrument(skip_all, name = "broadcast")] pub fn broadcast(&self, tx: &Transaction) -> Result { self.broadcast_raw(&serialize_hex(tx)) } + #[instrument(skip_all, name = "broadcast_raw")] pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.request("sendrawtransaction", json!([txhex]))?; Ok( @@ -552,8 +582,12 @@ impl Daemon { // Get estimated feerates for the provided confirmation targets using a batch RPC request // Missing estimates are logged but do not cause a failure, whatever is available is returned #[allow(clippy::float_cmp)] + #[instrument(skip_all, name = "Daemon::estimatesmartfee_batch")] pub fn estimatesmartfee_batch(&self, conf_targets: &[u16]) -> Result> { - let params_list: Vec = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect(); + let params_list: Vec = conf_targets + .iter() + .map(|t| json!([t, "ECONOMICAL"])) + .collect(); Ok(self .requests("estimatesmartfee", ¶ms_list)? @@ -583,6 +617,7 @@ impl Daemon { .collect()) } + #[instrument(skip_all, name = "Daemon::get_all_headers")] fn get_all_headers(&self, tip: &BlockHash) -> Result> { let info: Value = self.request("getblockheader", json!([tip]))?; let tip_height = info @@ -610,6 +645,7 @@ impl Daemon { } // Returns a list of BlockHeaders in ascending height (i.e. the tip is last). + #[instrument(skip_all, name = "Daemon::get_new_headers")] pub fn get_new_headers( &self, indexed_headers: &HeaderList, @@ -642,6 +678,7 @@ impl Daemon { Ok(new_headers) } + #[instrument(skip_all, name = "Daemon::get_relayfee")] pub fn get_relayfee(&self) -> Result { let relayfee = self.getnetworkinfo()?.relayfee; diff --git a/src/electrum/discovery.rs b/src/electrum/discovery.rs index cf70221f6..f5bb85588 100644 --- a/src/electrum/discovery.rs +++ b/src/electrum/discovery.rs @@ -547,23 +547,33 @@ mod tests { false, None, )); - discovery.add_default_server( - "electrum.blockstream.info".into(), - vec![Service::Tcp(60001)], - ).unwrap(); - discovery.add_default_server("testnet.hsmiths.com".into(), vec![Service::Ssl(53012)]).unwrap(); - discovery.add_default_server( - "tn.not.fyi".into(), - vec![Service::Tcp(55001), Service::Ssl(55002)], - ).unwrap(); - discovery.add_default_server( - "electrum.blockstream.info".into(), - vec![Service::Tcp(60001), Service::Ssl(60002)], - ).unwrap(); - discovery.add_default_server( - "explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion".into(), - vec![Service::Tcp(143)], - ).unwrap(); + discovery + .add_default_server( + "electrum.blockstream.info".into(), + vec![Service::Tcp(60001)], + ) + .unwrap(); + discovery + .add_default_server("testnet.hsmiths.com".into(), vec![Service::Ssl(53012)]) + .unwrap(); + discovery + .add_default_server( + "tn.not.fyi".into(), + vec![Service::Tcp(55001), Service::Ssl(55002)], + ) + .unwrap(); + discovery + .add_default_server( + "electrum.blockstream.info".into(), + vec![Service::Tcp(60001), Service::Ssl(60002)], + ) + .unwrap(); + discovery + .add_default_server( + "explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion".into(), + vec![Service::Tcp(143)], + ) + .unwrap(); debug!("{:#?}", discovery); diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 6129cbbe8..e8aba2708 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -13,6 +13,8 @@ use error_chain::ChainedError; use hex::{self, DisplayHex}; use serde_json::{from_str, Value}; +use tracing::instrument; + #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode::serialize_hex; #[cfg(feature = "liquid")] @@ -69,6 +71,7 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result< } // TODO: implement caching and delta updates +#[instrument(skip_all, name = "electrum::server::get_status_hash")] fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option { if txs.is_empty() { None @@ -203,6 +206,7 @@ impl Connection { Ok(json!(&self.query.mempool().backlog_stats().fee_histogram)) } + #[instrument(skip_all, name = "electrum::server::blockchain_block_header")] fn blockchain_block_header(&self, params: &[Value]) -> Result { let height = usize_from_value(params.get(0), "height")?; let cp_height = usize_from_value_or(params.get(1), "cp_height", 0)?; @@ -226,6 +230,7 @@ impl Connection { })) } + #[instrument(skip_all, name = "electrum::serer::blockchain_block_headers")] fn blockchain_block_headers(&self, params: &[Value]) -> Result { let start_height = usize_from_value(params.get(0), "start_height")?; let count = MAX_HEADERS.min(usize_from_value(params.get(1), "count")?); @@ -261,6 +266,7 @@ impl Connection { })) } + #[instrument(skip_all, name = "electrum::server::blockchain_estimatefee")] fn blockchain_estimatefee(&self, params: &[Value]) -> Result { let conf_target = usize_from_value(params.get(0), "blocks_count")?; let fee_rate = self @@ -271,12 +277,14 @@ impl Connection { Ok(json!(fee_rate / 100_000f64)) } + #[instrument(skip_all, name = "electrum::server::blockchain_relayfee")] fn blockchain_relayfee(&self) -> Result { let relayfee = self.query.get_relayfee()?; // convert from sat/b to BTC/kB, as expected by Electrum clients Ok(json!(relayfee / 100_000f64)) } + #[instrument(skip_all, name = "electrum::server::blockchain_scripthash_subscribe")] fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result { let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?; @@ -291,6 +299,7 @@ impl Connection { } #[cfg(not(feature = "liquid"))] + #[instrument(skip_all, name = "electrum::server::blockchain_scripthash_get_balance")] fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result { let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?; let (chain_stats, mempool_stats) = self.query.stats(&script_hash[..]); @@ -301,6 +310,7 @@ impl Connection { })) } + #[instrument(skip_all, name = "electrum::server::blockchain_scripthash_get_history")] fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result { let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?; let history_txids = get_history(&self.query, &script_hash[..], self.txs_limit)?; @@ -319,6 +329,7 @@ impl Connection { .collect::>())) } + #[instrument(skip_all, name = "electrum::server::blockchain_scripthash_listunspent")] fn blockchain_scripthash_listunspent(&self, params: &[Value]) -> Result { let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?; let utxos = self.query.utxo(&script_hash[..])?; @@ -347,6 +358,7 @@ impl Connection { ))) } + #[instrument(skip_all, name = "electrum::server::blockchain_transaction_broadcast")] fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result { let tx = params.get(0).chain_err(|| "missing tx")?; let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string(); @@ -357,6 +369,7 @@ impl Connection { Ok(json!(txid)) } + #[instrument(skip_all, name = "electrum::server::blockchain_transaction_get")] fn blockchain_transaction_get(&self, params: &[Value]) -> Result { let tx_hash = Txid::from(hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?); let verbose = match params.get(1) { @@ -376,6 +389,7 @@ impl Connection { Ok(json!(rawtx.to_lower_hex_string())) } + #[instrument(skip_all, name = "electrum::server::blockchain_transaction_get_merkle")] fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result { let txid = Txid::from(hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?); let height = usize_from_value(params.get(1), "height")?; @@ -396,6 +410,10 @@ impl Connection { })) } + #[instrument( + skip_all, + name = "electrum::server::blockchain_transaction_id_from_pos" + )] fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result { let height = usize_from_value(params.get(0), "height")?; let tx_pos = usize_from_value(params.get(1), "tx_pos")?; @@ -413,6 +431,7 @@ impl Connection { })) } + #[instrument(skip(self, params, id), name = "electrum::server::handle_command")] fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result { let timer = self .stats @@ -467,6 +486,7 @@ impl Connection { }) } + #[instrument(skip_all, name = "electrum::server::update_subscriptions")] fn update_subscriptions(&mut self) -> Result> { let timer = self .stats @@ -524,6 +544,7 @@ impl Connection { Ok(()) } + #[instrument(skip_all, name = "electrum::server::handle_replies")] fn handle_replies(&mut self, receiver: Receiver) -> Result<()> { let empty_params = json!([]); loop { @@ -588,6 +609,7 @@ impl Connection { } } + #[instrument(skip_all, name = "electrum::server::parse_requests")] fn parse_requests(mut reader: BufReader, tx: &SyncSender) -> Result<()> { loop { let mut line = Vec::::new(); @@ -650,6 +672,7 @@ impl Connection { } } +#[instrument(skip_all, name = "electrum::server::get_history")] fn get_history( query: &Query, scripthash: &[u8], diff --git a/src/lib.rs b/src/lib.rs index 9dbc58153..249f13d15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,3 +35,12 @@ pub mod util; #[cfg(feature = "liquid")] pub mod elements; + +#[cfg(not(any(feature = "otlp-tracing", feature = "no-otlp-tracing")))] +compile_error!("Must enable one of the 'otlp-tracing' or 'no-otlp-tracing' features"); + +#[cfg(all(feature = "otlp-tracing", feature = "no-otlp-tracing"))] +compile_error!("Cannot enable both the 'otlp-tracing' and 'no-otlp-tracing' (default) features"); + +#[cfg(feature = "otlp-tracing")] +pub mod otlp_trace; diff --git a/src/new_index/db.rs b/src/new_index/db.rs index f68da233c..0b67e7a56 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -2,6 +2,8 @@ use rocksdb; use std::path::Path; +use tracing::instrument; + use crate::config::Config; use crate::util::{bincode, Bytes}; @@ -106,6 +108,7 @@ impl DB { db } + #[instrument(skip(self))] pub fn full_compaction(&self) { // TODO: make sure this doesn't fail silently debug!("starting full compaction on {:?}", self.db); @@ -113,6 +116,7 @@ impl DB { debug!("finished full compaction on {:?}", self.db); } + #[instrument(skip(self))] pub fn enable_auto_compaction(&self) { let opts = [("disable_auto_compactions", "false")]; self.db.set_options(&opts).unwrap(); @@ -178,6 +182,7 @@ impl DB { self.db.write_opt(batch, &opts).unwrap(); } + #[instrument(skip(self))] pub fn flush(&self) { self.db.flush().unwrap(); } @@ -186,6 +191,7 @@ impl DB { self.db.put(key, value).unwrap(); } + #[instrument(skip(self, key, value))] pub fn put_sync(&self, key: &[u8], value: &[u8]) { let mut opts = rocksdb::WriteOptions::new(); opts.set_sync(true); diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 54369b5e5..6c3954781 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -14,6 +14,8 @@ use std::path::PathBuf; use std::sync::mpsc::Receiver; use std::thread; +use tracing::instrument; + use crate::chain::{Block, BlockHash}; use crate::daemon::Daemon; use crate::errors::*; @@ -25,6 +27,7 @@ pub enum FetchFrom { BlkFiles, } +#[instrument(skip(from, daemon, new_headers))] pub fn start_fetcher( from: FetchFrom, daemon: &Daemon, @@ -66,6 +69,7 @@ impl Fetcher { } } +#[instrument(skip_all, name = "fetch::bitcoind_fetcher")] fn bitcoind_fetcher( daemon: &Daemon, new_headers: Vec, @@ -103,6 +107,7 @@ fn bitcoind_fetcher( )) } +#[instrument(skip_all, name = "fetch::blkfiles_fetcher")] fn blkfiles_fetcher( daemon: &Daemon, new_headers: Vec, @@ -149,6 +154,7 @@ fn blkfiles_fetcher( )) } +#[instrument(skip_all, name = "fetch::blkfiles_reader")] fn blkfiles_reader(blk_files: Vec) -> Fetcher> { let chan = SyncChannel::new(1); let sender = chan.sender(); @@ -168,6 +174,7 @@ fn blkfiles_reader(blk_files: Vec) -> Fetcher> { ) } +#[instrument(skip_all, name = "fetch::blkfiles_parser")] fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher> { let chan = SyncChannel::new(1); let sender = chan.sender(); @@ -186,6 +193,7 @@ fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher, magic: u32) -> Result> { let mut cursor = Cursor::new(&blob); let mut slices = vec![]; diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 94ba7a41d..da457c82b 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -11,6 +11,8 @@ use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +use tracing::instrument; + use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; use crate::daemon::Daemon; @@ -107,6 +109,7 @@ impl Mempool { self.txstore.get(txid).map(serialize) } + #[instrument(skip_all, name = "Mempool::lookup_spend")] pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { self.edges.get(outpoint).map(|(txid, vin)| SpendingInput { txid: *txid, @@ -123,6 +126,7 @@ impl Mempool { Some(self.feeinfo.get(txid)?.fee) } + #[instrument(skip_all, name = "Mempool::has_unconfirmed_parents")] pub fn has_unconfirmed_parents(&self, txid: &Txid) -> bool { let tx = match self.txstore.get(txid) { Some(tx) => tx, @@ -133,6 +137,7 @@ impl Mempool { .any(|txin| self.txstore.contains_key(&txin.previous_output.txid)) } + #[instrument(skip_all, name = "Mempool::history")] pub fn history(&self, scripthash: &[u8], limit: usize) -> Vec { let _timer = self.latency.with_label_values(&["history"]).start_timer(); self.history @@ -140,6 +145,7 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } + #[instrument(skip_all, name = "Mempool::_history")] fn _history(&self, entries: &[TxHistoryInfo], limit: usize) -> Vec { entries .iter() @@ -151,6 +157,7 @@ impl Mempool { .collect() } + #[instrument(skip_all, name = "Mempool::history_txids")] pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec { let _timer = self .latency @@ -167,6 +174,7 @@ impl Mempool { } } + #[instrument(skip_all, name = "Mempool::utxo")] pub fn utxo(&self, scripthash: &[u8]) -> Vec { let _timer = self.latency.with_label_values(&["utxo"]).start_timer(); let entries = match self.history.get(scripthash) { @@ -209,6 +217,7 @@ impl Mempool { .collect() } + #[instrument(skip_all, name = "Mempool::stats")] // @XXX avoid code duplication with ChainQuery::stats()? pub fn stats(&self, scripthash: &[u8]) -> ScriptStats { let _timer = self.latency.with_label_values(&["stats"]).start_timer(); @@ -258,12 +267,14 @@ impl Mempool { stats } + #[instrument(skip_all, name = "Mempool::txids")] // Get all txids in the mempool pub fn txids(&self) -> Vec<&Txid> { let _timer = self.latency.with_label_values(&["txids"]).start_timer(); self.txstore.keys().collect() } + #[instrument(skip_all, name = "Mempool::recent_txs_overview")] // Get an overview of the most recent transactions pub fn recent_txs_overview(&self) -> Vec<&TxOverview> { // We don't bother ever deleting elements from the recent list. @@ -272,14 +283,17 @@ impl Mempool { self.recent.iter().collect() } + #[instrument(skip_all, name = "Mempool::backlog_stats")] pub fn backlog_stats(&self) -> &BacklogStats { &self.backlog_stats.0 } + #[instrument(skip_all, name = "Mempool::old_txids")] pub fn old_txids(&self) -> HashSet { return HashSet::from_iter(self.txstore.keys().cloned()); } + #[instrument(skip_all, name = "Mempool::update_backlog_stats")] pub fn update_backlog_stats(&mut self) { let _timer = self .latency @@ -288,6 +302,7 @@ impl Mempool { self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); } + #[instrument(skip_all, name = "Mempool::add_by_txid")] pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) { if self.txstore.get(txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { @@ -296,6 +311,7 @@ impl Mempool { } } + #[instrument(skip_all, name = "Mempool::add")] fn add(&mut self, txs: Vec) { self.delta .with_label_values(&["add"]) @@ -397,12 +413,14 @@ impl Mempool { } } + #[instrument(skip_all, name = "Mempool::lookup_txo")] fn lookup_txo(&self, outpoint: &OutPoint) -> Option { self.txstore .get(&outpoint.txid) .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) } + #[instrument(skip_all, name = "Mempool::lookup_txos")] pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self .latency @@ -410,12 +428,13 @@ impl Mempool { .start_timer(); // Get the txos available in the mempool, skipping over (and collecting) missing ones - let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints - .into_iter() - .partition_map(|outpoint| match self.lookup_txo(&outpoint) { - Some(txout) => Either::Left((outpoint, txout)), - None => Either::Right(outpoint), - }); + let (mut txos, remain_outpoints): (HashMap<_, _>, _) = + outpoints + .into_iter() + .partition_map(|outpoint| match self.lookup_txo(&outpoint) { + Some(txout) => Either::Left((outpoint, txout)), + None => Either::Right(outpoint), + }); // Get the remaining txos from the chain (fails if any are missing) txos.extend(self.chain.lookup_txos(remain_outpoints)?); @@ -423,6 +442,7 @@ impl Mempool { Ok(txos) } + #[instrument(skip_all, name = "Mempool::get_prevous")] fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet { let _timer = self .latency @@ -441,6 +461,7 @@ impl Mempool { .collect() } + #[instrument(skip_all, name = "Mempool::remove")] fn remove(&mut self, to_remove: HashSet<&Txid>) { self.delta .with_label_values(&["remove"]) @@ -476,6 +497,7 @@ impl Mempool { } #[cfg(feature = "liquid")] + #[instrument(skip_all, name = "Mempool::remove")] pub fn asset_history(&self, asset_id: &AssetId, limit: usize) -> Vec { let _timer = self .latency @@ -486,8 +508,14 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } + #[instrument(skip_all, name = "Mempool::update")] pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { - let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); + let _timer = mempool + .read() + .unwrap() + .latency + .with_label_values(&["update"]) + .start_timer(); // 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it let old_txids = mempool.read().unwrap().old_txids(); @@ -545,6 +573,7 @@ impl BacklogStats { } } + #[instrument(skip_all, name = "Mempool::new")] fn new(feeinfo: &HashMap) -> Self { let (count, vsize, total_fee) = feeinfo .values() diff --git a/src/new_index/precache.rs b/src/new_index/precache.rs index 4db40c27d..de78757d9 100644 --- a/src/new_index/precache.rs +++ b/src/new_index/precache.rs @@ -13,6 +13,9 @@ use std::io; use std::io::prelude::*; use std::str::FromStr; +use tracing::instrument; + +#[instrument(skip_all, name = "precache::precache")] pub fn precache(chain: &ChainQuery, scripthashes: Vec) { let total = scripthashes.len(); info!("Pre-caching stats and utxo set for {} scripthashes", total); @@ -36,6 +39,7 @@ pub fn precache(chain: &ChainQuery, scripthashes: Vec) { }); } +#[instrument(skip_all, name = "precache::scripthashes_from_file")] pub fn scripthashes_from_file(path: String) -> Result> { let reader = io::BufReader::new(File::open(path).chain_err(|| "cannot open precache scripthash file")?); diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 60d7510ab..e8b1cffe5 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -11,6 +11,8 @@ use crate::errors::*; use crate::new_index::{ChainQuery, Mempool, ScriptStats, SpendingInput, Utxo}; use crate::util::{is_spendable, BlockId, Bytes, TransactionStatus}; +use tracing::instrument; + #[cfg(feature = "liquid")] use crate::{ chain::AssetId, @@ -69,6 +71,7 @@ impl Query { self.mempool.read().unwrap() } + #[instrument(skip_all, name = "query::Query::broadcast_raw")] pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.daemon.broadcast_raw(txhex)?; self.mempool @@ -78,6 +81,7 @@ impl Query { Ok(txid) } + #[instrument(skip_all, name = "query::Query::utxo")] pub fn utxo(&self, scripthash: &[u8]) -> Result> { let mut utxos = self.chain.utxo(scripthash, self.config.utxos_limit)?; let mempool = self.mempool(); @@ -86,6 +90,7 @@ impl Query { Ok(utxos) } + #[instrument(skip_all, name = "query::Query::history_txids")] pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<(Txid, Option)> { let confirmed_txids = self.chain.history_txids(scripthash, limit); let confirmed_len = confirmed_txids.len(); @@ -107,17 +112,21 @@ impl Query { ) } + #[instrument(skip_all, name = "query::Query::lookup_txn")] pub fn lookup_txn(&self, txid: &Txid) -> Option { self.chain .lookup_txn(txid, None) .or_else(|| self.mempool().lookup_txn(txid)) } + + #[instrument(skip_all, name = "query::Query::lookup_raw_txn")] pub fn lookup_raw_txn(&self, txid: &Txid) -> Option { self.chain .lookup_raw_txn(txid, None) .or_else(|| self.mempool().lookup_raw_txn(txid)) } + #[instrument(skip_all, name = "query::Query::lookup_txos")] pub fn lookup_txos(&self, outpoints: BTreeSet) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool() @@ -125,12 +134,14 @@ impl Query { .expect("failed loading txos") } + #[instrument(skip_all, name = "query::Query::lookup_spend")] pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { self.chain .lookup_spend(outpoint) .or_else(|| self.mempool().lookup_spend(outpoint)) } + #[instrument(skip_all, name = "query::Query::lookup_tx_spends")] pub fn lookup_tx_spends(&self, tx: Transaction) -> Vec> { let txid = tx.txid(); @@ -150,18 +161,22 @@ impl Query { .collect() } + #[instrument(skip_all, name = "query::Query::get_tx_status")] pub fn get_tx_status(&self, txid: &Txid) -> TransactionStatus { TransactionStatus::from(self.chain.tx_confirming_block(txid)) } + #[instrument(skip_all, name = "query::Query::get_mempool_tx_fee")] pub fn get_mempool_tx_fee(&self, txid: &Txid) -> Option { self.mempool().get_tx_fee(txid) } + #[instrument(skip_all, name = "query::Query::has_unconfirmed_parents")] pub fn has_unconfirmed_parents(&self, txid: &Txid) -> bool { self.mempool().has_unconfirmed_parents(txid) } + #[instrument(skip_all, name = "query::Query::estimate_fee")] pub fn estimate_fee(&self, conf_target: u16) -> Option { if self.config.network_type.is_regtest() { return self.get_relayfee().ok(); @@ -181,6 +196,7 @@ impl Query { .copied() } + #[instrument(skip_all, name = "query::Query::estimate_fee_map")] pub fn estimate_fee_map(&self) -> HashMap { if let (ref cache, Some(cache_time)) = *self.cached_estimates.read().unwrap() { if cache_time.elapsed() < Duration::from_secs(FEE_ESTIMATES_TTL) { @@ -192,6 +208,7 @@ impl Query { self.cached_estimates.read().unwrap().0.clone() } + #[instrument(skip_all, name = "query::Query::update_fee_estimates")] fn update_fee_estimates(&self) { match self.daemon.estimatesmartfee_batch(&CONF_TARGETS) { Ok(estimates) => { @@ -203,6 +220,7 @@ impl Query { } } + #[instrument(skip_all, name = "query::Query::get_relayfee")] pub fn get_relayfee(&self) -> Result { if let Some(cached) = *self.cached_relayfee.read().unwrap() { return Ok(cached); @@ -233,11 +251,13 @@ impl Query { } #[cfg(feature = "liquid")] + #[instrument(skip_all, name = "query::Query::lookup_asset")] pub fn lookup_asset(&self, asset_id: &AssetId) -> Result> { lookup_asset(&self, self.asset_db.as_ref(), asset_id, None) } #[cfg(feature = "liquid")] + #[instrument(skip_all, name = "query::Query::list_registry_assets")] pub fn list_registry_assets( &self, start_index: usize, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index bbb2e2946..e600db162 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -21,6 +21,8 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, RwLock}; +use tracing::instrument; + use crate::chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, }; @@ -220,6 +222,7 @@ impl Indexer { self.duration.with_label_values(&[name]).start_timer() } + #[instrument(skip_all, name = "schema::Indexer::headers_to_add")] fn headers_to_add(&self, new_headers: &[HeaderEntry]) -> Vec { let added_blockhashes = self.store.added_blockhashes.read().unwrap(); new_headers @@ -229,6 +232,7 @@ impl Indexer { .collect() } + #[instrument(skip_all, name = "schema::Indexer::headers_to_index")] fn headers_to_index(&self, new_headers: &[HeaderEntry]) -> Vec { let indexed_blockhashes = self.store.indexed_blockhashes.read().unwrap(); new_headers @@ -238,6 +242,7 @@ impl Indexer { .collect() } + #[instrument(skip_all, name = "schema::start_auto_compactions")] fn start_auto_compactions(&self, db: &DB) { let key = b"F".to_vec(); if db.get(&key).is_none() { @@ -248,6 +253,7 @@ impl Indexer { db.enable_auto_compaction(); } + #[instrument(skip_all, name = "schema::get_new_headers")] fn get_new_headers(&self, daemon: &Daemon, tip: &BlockHash) -> Result> { let headers = self.store.indexed_headers.read().unwrap(); let new_headers = daemon.get_new_headers(&headers, &tip)?; @@ -259,6 +265,7 @@ impl Indexer { Ok(result) } + #[instrument(skip_all, name = "schema::update")] pub fn update(&mut self, daemon: &Daemon) -> Result { let daemon = daemon.reconnect()?; let tip = daemon.getbestblockhash()?; @@ -306,6 +313,7 @@ impl Indexer { Ok(tip) } + #[instrument(skip_all, name = "schema::add")] fn add(&self, blocks: &[BlockEntry]) { // TODO: skip orphaned blocks? let rows = { @@ -324,6 +332,7 @@ impl Indexer { .extend(blocks.iter().map(|b| b.entry.hash())); } + #[instrument(skip_all, name = "schema::index")] fn index(&self, blocks: &[BlockEntry]) { let previous_txos_map = { let _timer = self.start_timer("index_lookup"); @@ -375,9 +384,8 @@ impl ChainQuery { self.duration.with_label_values(&[name]).start_timer() } + #[instrument(skip_all, name = "sdchema::Indexer::get_block_txids")] pub fn get_block_txids(&self, hash: &BlockHash) -> Option> { - let _timer = self.start_timer("get_block_txids"); - if self.light_mode { // TODO fetch block as binary from REST API instead of as hex let mut blockinfo = self.daemon.getblock_raw(hash, 1).ok()?; @@ -390,6 +398,7 @@ impl ChainQuery { } } + #[instrument(skip_all, name = "schema::ChainQuery::get_block_meta")] pub fn get_block_meta(&self, hash: &BlockHash) -> Option { let _timer = self.start_timer("get_block_meta"); @@ -404,6 +413,7 @@ impl ChainQuery { } } + #[instrument(skip_all, name = "schema::ChainQuery::get_block_raw")] pub fn get_block_raw(&self, hash: &BlockHash) -> Option> { let _timer = self.start_timer("get_block_raw"); @@ -432,16 +442,19 @@ impl ChainQuery { } } + #[instrument(skip_all, name = "schema::ChainQuery::get_block_header")] pub fn get_block_header(&self, hash: &BlockHash) -> Option { let _timer = self.start_timer("get_block_header"); Some(self.header_by_hash(hash)?.header().clone()) } + #[instrument(skip_all, name = "schema::ChainQuery::get_mtp")] pub fn get_mtp(&self, height: usize) -> u32 { let _timer = self.start_timer("get_block_mtp"); self.store.indexed_headers.read().unwrap().get_mtp(height) } + #[instrument(skip_all, name = "schema::ChainQuery::get_block_with_meta")] pub fn get_block_with_meta(&self, hash: &BlockHash) -> Option { let _timer = self.start_timer("get_block_with_meta"); let header_entry = self.header_by_hash(hash)?; @@ -452,12 +465,15 @@ impl ChainQuery { }) } + #[instrument(skip_all, name = "schema::ChainQuery::history_iter_scan")] pub fn history_iter_scan(&self, code: u8, hash: &[u8], start_height: usize) -> ScanIterator { self.store.history_db.iter_scan_from( &TxHistoryRow::filter(code, &hash[..]), &TxHistoryRow::prefix_height(code, &hash[..], start_height as u32), ) } + + #[instrument(skip_all, name = "schema::ChainQuery::history_iter_scan_reverse")] fn history_iter_scan_reverse(&self, code: u8, hash: &[u8]) -> ReverseScanIterator { self.store.history_db.iter_scan_reverse( &TxHistoryRow::filter(code, &hash[..]), @@ -465,6 +481,7 @@ impl ChainQuery { ) } + #[instrument(skip_all, name = "schema::ChainQuery::history")] pub fn history( &self, scripthash: &[u8], @@ -475,6 +492,7 @@ impl ChainQuery { self._history(b'H', scripthash, last_seen_txid, limit) } + #[instrument(skip_all, name = "schema::ChainQuery::_history")] fn _history( &self, code: u8, @@ -509,11 +527,13 @@ impl ChainQuery { .collect() } + #[instrument(skip_all, name = "schema::ChainQuery::history_txids")] pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> { // scripthash lookup self._history_txids(b'H', scripthash, limit) } + #[instrument(skip_all, name = "schema::ChainQuery::_history_txids")] fn _history_txids(&self, code: u8, hash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> { let _timer = self.start_timer("history_txids"); self.history_iter_scan(code, hash, 0) @@ -525,6 +545,7 @@ impl ChainQuery { } // TODO: avoid duplication with stats/stats_delta? + #[instrument(skip_all, name = "schema::ChainQuery::utxo")] pub fn utxo(&self, scripthash: &[u8], limit: usize) -> Result> { let _timer = self.start_timer("utxo"); @@ -585,6 +606,7 @@ impl ChainQuery { .collect()) } + #[instrument(skip_all, name = "schema::ChainQuery::utxo_delta")] fn utxo_delta( &self, scripthash: &[u8], @@ -630,6 +652,7 @@ impl ChainQuery { Ok((utxos, lastblock, processed_items)) } + #[instrument(skip_all, name = "schema::ChainQuery::stats")] pub fn stats(&self, scripthash: &[u8]) -> ScriptStats { let _timer = self.start_timer("stats"); @@ -664,6 +687,7 @@ impl ChainQuery { newstats } + #[instrument(skip_all, name = "schema::ChainQuery::stats_delta")] fn stats_delta( &self, scripthash: &[u8], @@ -731,6 +755,7 @@ impl ChainQuery { (stats, lastblock) } + #[instrument(skip_all, name = "schema::ChainQuery::address_search")] pub fn address_search(&self, prefix: &str, limit: usize) -> Vec { let _timer_scan = self.start_timer("address_search"); self.store @@ -741,6 +766,7 @@ impl ChainQuery { .collect() } + #[instrument(skip_all, name = "schema::ChainQuery::header_by_hash")] fn header_by_hash(&self, hash: &BlockHash) -> Option { self.store .indexed_headers @@ -750,6 +776,7 @@ impl ChainQuery { .cloned() } + #[instrument(skip_all, name = "schema::ChainQuery::height_by_hash")] // Get the height of a blockhash, only if its part of the best chain pub fn height_by_hash(&self, hash: &BlockHash) -> Option { self.store @@ -760,6 +787,7 @@ impl ChainQuery { .map(|header| header.height()) } + #[instrument(skip_all, name = "schema::ChainQuery::header_by_height")] pub fn header_by_height(&self, height: usize) -> Option { self.store .indexed_headers @@ -769,6 +797,7 @@ impl ChainQuery { .cloned() } + #[instrument(skip_all, name = "schema::ChainQuery::hash_by_height")] pub fn hash_by_height(&self, height: usize) -> Option { self.store .indexed_headers @@ -778,6 +807,7 @@ impl ChainQuery { .map(|entry| *entry.hash()) } + #[instrument(skip_all, name = "schema::ChainQuery::blockid_by_height")] pub fn blockid_by_height(&self, height: usize) -> Option { self.store .indexed_headers @@ -787,6 +817,7 @@ impl ChainQuery { .map(BlockId::from) } + #[instrument(skip_all, name = "schema::ChainQuery::blockid_by_hash")] // returns None for orphaned blocks pub fn blockid_by_hash(&self, hash: &BlockHash) -> Option { self.store @@ -797,14 +828,17 @@ impl ChainQuery { .map(BlockId::from) } + #[instrument(skip_all, name = "schema::ChainQuery::bests_height")] pub fn best_height(&self) -> usize { self.store.indexed_headers.read().unwrap().len() - 1 } + #[instrument(skip_all, name = "schema::ChainQuery::best_hash")] pub fn best_hash(&self) -> BlockHash { *self.store.indexed_headers.read().unwrap().tip() } + #[instrument(skip_all, name = "schema::ChainQuery::best_header")] pub fn best_header(&self) -> HeaderEntry { let headers = self.store.indexed_headers.read().unwrap(); headers @@ -815,6 +849,7 @@ impl ChainQuery { // TODO: can we pass txids as a "generic iterable"? // TODO: should also use a custom ThreadPoolBuilder? + #[instrument(skip_all, name = "schema::ChainQuery::lookup_txns")] pub fn lookup_txns(&self, txids: &[(Txid, BlockId)]) -> Result> { let _timer = self.start_timer("lookup_txns"); txids @@ -826,6 +861,7 @@ impl ChainQuery { .collect::>>() } + #[instrument(skip_all, name = "schema::ChainQuery::lookup_txn")] pub fn lookup_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { let _timer = self.start_timer("lookup_txn"); self.lookup_raw_txn(txid, blockhash).map(|rawtx| { @@ -835,6 +871,7 @@ impl ChainQuery { }) } + #[instrument(skip_all, name = "schema::ChainQuery::lookup_raw_txn")] pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { let _timer = self.start_timer("lookup_raw_txn"); @@ -854,16 +891,19 @@ impl ChainQuery { } } + #[instrument(skip_all, name = "schema::ChainQuery::lookup_txo")] pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option { let _timer = self.start_timer("lookup_txo"); lookup_txo(&self.store.txstore_db, outpoint) } + #[instrument(skip_all, name = "schema::ChainQuery::lookup_txos")] pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self.start_timer("lookup_txos"); lookup_txos(&self.store.txstore_db, outpoints) } + #[instrument(skip_all, name = "schema::ChainQuery::lookup_spend")] pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { let _timer = self.start_timer("lookup_spend"); self.store @@ -879,6 +919,8 @@ impl ChainQuery { }) }) } + + #[instrument(skip_all, name = "schema::ChainQuery::tx_confirming_blocks")] pub fn tx_confirming_block(&self, txid: &Txid) -> Option { let _timer = self.start_timer("tx_confirming_block"); let headers = self.store.indexed_headers.read().unwrap(); @@ -895,6 +937,7 @@ impl ChainQuery { .map(BlockId::from) } + #[instrument(skip_all, name = "schema::ChainQuery::get_block_status")] pub fn get_block_status(&self, hash: &BlockHash) -> BlockStatus { // TODO differentiate orphaned and non-existing blocks? telling them apart requires // an additional db read. @@ -916,6 +959,7 @@ impl ChainQuery { } #[cfg(not(feature = "liquid"))] + #[instrument(skip_all, name = "schema::ChainQuery::get_merkleblock_proof")] pub fn get_merkleblock_proof(&self, txid: &Txid) -> Option { let _timer = self.start_timer("get_merkleblock_proof"); let blockid = self.tx_confirming_block(txid)?; @@ -930,6 +974,7 @@ impl ChainQuery { } #[cfg(feature = "liquid")] + #[instrument(skip_all, name = "schema::ChainQuery::asset_history")] pub fn asset_history( &self, asset_id: &AssetId, @@ -940,11 +985,13 @@ impl ChainQuery { } #[cfg(feature = "liquid")] + #[instrument(skip_all, name = "schema::ChainQuery::assets_history_txids")] pub fn asset_history_txids(&self, asset_id: &AssetId, limit: usize) -> Vec<(Txid, BlockId)> { self._history_txids(b'I', &asset_id.into_inner()[..], limit) } } +#[instrument(skip_all, name = "schema::ChainQuery::load_blockhashes")] fn load_blockhashes(db: &DB, prefix: &[u8]) -> HashSet { db.iter_scan(prefix) .map(BlockRow::from_row) @@ -952,6 +999,7 @@ fn load_blockhashes(db: &DB, prefix: &[u8]) -> HashSet { .collect() } +#[instrument(skip_all, name = "schema::ChainQuery::load_blockheaders")] fn load_blockheaders(db: &DB) -> HashMap { db.iter_scan(&BlockRow::header_filter()) .map(BlockRow::from_row) @@ -963,6 +1011,7 @@ fn load_blockheaders(db: &DB) -> HashMap { .collect() } +#[instrument(skip_all, name = "schema::add_blocks")] fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec { // persist individual transactions: // T{txid} → {rawtx} @@ -1015,6 +1064,7 @@ fn add_transaction( } } +#[instrument(skip_all, name = "schema::get_previous_txos")] fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { block_entries .iter() @@ -1028,6 +1078,7 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { .collect() } +#[instrument(skip_all, name = "schema::lookup_txos")] fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet) -> Result> { let keys = outpoints.iter().map(TxOutRow::key).collect::>(); txstore_db @@ -1049,6 +1100,7 @@ fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { .map(|val| deserialize(&val).expect("failed to parse TxOut")) } +#[instrument(skip_all, name = "schema::index_blocks")] fn index_blocks( block_entries: &[BlockEntry], previous_txos_map: &HashMap, @@ -1070,6 +1122,7 @@ fn index_blocks( } // TODO: return an iterator? +#[instrument(skip_all, name = "schema::index_transaction")] fn index_transaction( tx: &Transaction, confirmed_height: u32, @@ -1144,6 +1197,7 @@ fn index_transaction( ); } +#[instrument(skip_all, name = "schema::addr_search_row")] fn addr_search_row(spk: &Script, network: Network) -> Option { spk.to_address_str(network).map(|address| DBRow { key: [b"a", address.as_bytes()].concat(), @@ -1598,6 +1652,7 @@ impl UtxoCacheRow { } } +#[instrument(skip_all, name = "schema::make_utxo_cache")] // keep utxo cache with just the block height (the hash/timestamp are read later from the headers to reconstruct BlockId) // and use a (txid,vout) tuple instead of OutPoints (they don't play nicely with bincode serialization) fn make_utxo_cache(utxos: &UtxoMap) -> CachedUtxoMap { @@ -1612,6 +1667,7 @@ fn make_utxo_cache(utxos: &UtxoMap) -> CachedUtxoMap { .collect() } +#[instrument(skip_all, name = "schema::from_utxo_cache")] fn from_utxo_cache(utxos_cache: CachedUtxoMap, chain: &ChainQuery) -> UtxoMap { utxos_cache .into_iter() diff --git a/src/otlp_trace.rs b/src/otlp_trace.rs new file mode 100644 index 000000000..834d56737 --- /dev/null +++ b/src/otlp_trace.rs @@ -0,0 +1,85 @@ +use opentelemetry::{ + runtime, + sdk::{ + trace::{BatchConfig, RandomIdGenerator, Sampler, Tracer}, + Resource, + }, + KeyValue, +}; +use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig}; +use opentelemetry_semantic_conventions::{ + resource::{SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use std::env::var; +use std::time::Duration; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +fn init_tracer(resource: Resource, endpoint: &str) -> Tracer { + let export_config = ExportConfig { + endpoint: endpoint.to_string(), + timeout: Duration::from_secs(3), + protocol: Protocol::Grpc, + }; + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry::sdk::trace::Config::default() + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource), + ) + .with_batch_config(BatchConfig::default()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint) + .with_export_config(export_config), + ) + .install_batch(runtime::Tokio) + .unwrap() +} + +fn init_tracing_subscriber(service_name: &str) -> OtelGuard { + let resource = Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, service_name.to_owned()), + KeyValue::new(SERVICE_VERSION, "0.4.1"), + ], + SCHEMA_URL, + ); + + let env_filter = EnvFilter::from_default_env(); + + let reg = tracing_subscriber::registry().with(env_filter).with( + tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_ansi(false) + .compact(), + ); + let _ = if let Ok(endpoint) = var("OTLP_ENDPOINT") { + reg.with(OpenTelemetryLayer::new(init_tracer(resource, &endpoint))) + .try_init() + } else { + reg.try_init() + }; + + log::debug!("Initialized tracing"); + + OtelGuard {} +} + +pub fn init_tracing(service_name: &str) -> OtelGuard { + init_tracing_subscriber(service_name) +} + +pub struct OtelGuard {} +impl Drop for OtelGuard { + fn drop(&mut self) { + opentelemetry::global::shutdown_tracer_provider(); + } +} diff --git a/src/rest.rs b/src/rest.rs index 43eab5c60..928d90f74 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -24,6 +24,8 @@ use tokio::sync::oneshot; use std::fs; use std::str::FromStr; +use tracing::instrument; + #[cfg(feature = "liquid")] use { crate::elements::{ebcompact::*, peg::PegoutValue, AssetSorting, IssuanceValue}, @@ -579,6 +581,7 @@ impl Handle { } } +#[instrument(skip_all, name = "rest::handle_request")] fn handle_request( method: Method, uri: hyper::Uri, @@ -1154,6 +1157,7 @@ fn json_response(value: T, ttl: u32) -> Result, Htt .unwrap()) } +#[instrument(skip_all, name = "rest::blocks")] fn blocks(query: &Query, start_height: Option) -> Result, HttpError> { let mut values = Vec::new(); let mut current_hash = match start_height { diff --git a/src/util/block.rs b/src/util/block.rs index 18dc91c27..8173f7ed1 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -9,6 +9,8 @@ use std::slice; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime as DateTime; +use tracing::instrument; + const MTP_SPAN: usize = 11; lazy_static! { @@ -84,6 +86,7 @@ impl HeaderList { } } + #[instrument(skip_all, name = "block::new")] pub fn new( mut headers_map: HashMap, tip_hash: BlockHash, @@ -121,6 +124,7 @@ impl HeaderList { headers } + #[instrument(skip_all, name = "block::HeaderList::order")] pub fn order(&self, new_headers: Vec) -> Vec { // header[i] -> header[i-1] (i.e. header.last() is the tip) struct HashedHeader { @@ -160,6 +164,7 @@ impl HeaderList { .collect() } + #[instrument(skip_all, name = "block::HeaderList::apply")] pub fn apply(&mut self, new_headers: Vec) { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) for i in 1..new_headers.len() { @@ -197,6 +202,7 @@ impl HeaderList { } } + #[instrument(skip_all, name = "block::HeaderList::header_by_blockhash")] pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> { let height = self.heights.get(blockhash)?; let header = self.headers.get(*height)?; @@ -207,6 +213,7 @@ impl HeaderList { } } + #[instrument(skip_all, name = "block::HeaderList::header_by_height")] pub fn header_by_height(&self, height: usize) -> Option<&HeaderEntry> { self.headers.get(height).map(|entry| { assert_eq!(entry.height(), height); diff --git a/src/util/electrum_merkle.rs b/src/util/electrum_merkle.rs index 954782a7d..ef4a0e936 100644 --- a/src/util/electrum_merkle.rs +++ b/src/util/electrum_merkle.rs @@ -3,6 +3,9 @@ use crate::errors::*; use crate::new_index::ChainQuery; use bitcoin::hashes::{sha256d::Hash as Sha256dHash, Hash}; +use tracing::instrument; + +#[instrument(skip_all, name = "electrum_merkle::get_tx_merkleproof")] pub fn get_tx_merkle_proof( chain: &ChainQuery, tx_hash: &Txid, @@ -21,6 +24,7 @@ pub fn get_tx_merkle_proof( Ok((branch, pos)) } +#[instrument(skip_all, name = "electrum_merkle::get_header_merkle_proof")] pub fn get_header_merkle_proof( chain: &ChainQuery, height: usize, @@ -49,7 +53,7 @@ pub fn get_header_merkle_proof( let header_hashes = header_hashes.into_iter().map(Sha256dHash::from).collect(); Ok(create_merkle_branch_and_root(header_hashes, height)) } - +#[instrument(skip_all, name = "electrum_merkle::get_id_from_pos")] pub fn get_id_from_pos( chain: &ChainQuery, height: usize, diff --git a/src/util/fees.rs b/src/util/fees.rs index 9cbe6c1d7..5827dd751 100644 --- a/src/util/fees.rs +++ b/src/util/fees.rs @@ -1,6 +1,8 @@ use crate::chain::{Network, Transaction, TxOut}; use std::collections::HashMap; +use tracing::instrument; + const VSIZE_BIN_WIDTH: u64 = 50_000; // in vbytes pub struct TxFeeInfo { @@ -46,6 +48,7 @@ pub fn get_tx_fee(tx: &Transaction, _prevouts: &HashMap, network: N tx.fee_in(*network.native_asset()) } +#[instrument(skip_all, name = "fees::make_fee_histogram")] pub fn make_fee_histogram(mut entries: Vec<&TxFeeInfo>) -> Vec<(f64, u64)> { entries.sort_unstable_by(|e1, e2| e1.fee_per_vbyte.partial_cmp(&e2.fee_per_vbyte).unwrap());