From 2531b8da8e8f2a839484adef62dd93f1deff12dd Mon Sep 17 00:00:00 2001 From: Yonghui Lin Date: Wed, 3 Jun 2020 10:25:40 -0500 Subject: [PATCH] feat: supported storage metrics (#307) --- common/apm/src/metrics.rs | 10 +- common/apm/src/metrics/consensus.rs | 8 +- common/apm/src/metrics/network.rs | 34 +++++-- common/apm/src/metrics/storage.rs | 127 ++++++++++++++++++++++++++ core/consensus/src/engine.rs | 1 - core/consensus/src/synchronization.rs | 3 + core/network/src/outbound/gossip.rs | 5 +- core/storage/src/adapter/rocks.rs | 24 +++-- core/storage/src/lib.rs | 11 ++- 9 files changed, 200 insertions(+), 23 deletions(-) create mode 100644 common/apm/src/metrics/storage.rs diff --git a/common/apm/src/metrics.rs b/common/apm/src/metrics.rs index 68ffdf0bd..70f281eea 100644 --- a/common/apm/src/metrics.rs +++ b/common/apm/src/metrics.rs @@ -2,13 +2,17 @@ pub mod api; pub mod consensus; pub mod mempool; pub mod network; +pub mod storage; -pub use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; +pub use prometheus::{ + CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, +}; use derive_more::Display; use prometheus::{ - exponential_buckets, register_histogram, register_histogram_vec, register_int_counter, - register_int_counter_vec, register_int_gauge, register_int_gauge_vec, Encoder, TextEncoder, + exponential_buckets, register_counter_vec, register_histogram, register_histogram_vec, + register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, + Encoder, TextEncoder, }; use prometheus_static_metric::{auto_flush_from, make_auto_flush_static_metric}; use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult}; diff --git a/common/apm/src/metrics/consensus.rs b/common/apm/src/metrics/consensus.rs index 1f9574ef8..965f35510 100644 --- a/common/apm/src/metrics/consensus.rs +++ b/common/apm/src/metrics/consensus.rs @@ -81,9 +81,15 @@ lazy_static! { "The counter for sync blocks from remote" ) .unwrap(); + pub static ref ENGINE_SYNC_BLOCK_HISTOGRAM: Histogram = register_histogram!( + "muta_consensus_sync_block_duration", + "Histogram of consensus sync duration", + exponential_buckets(0.5, 1.2, 20).expect("consensus duration time exponential") + ) + .unwrap(); pub static ref ENGINE_CONSENSUS_COST_TIME: Histogram = register_histogram!( "muta_consensus_duration_seconds", - "Consensus duration from last block", + "Histogram of consensus duration from last block", exponential_buckets(1.0, 1.2, 15).expect("consensus duration time exponential") ) .unwrap(); diff --git a/common/apm/src/metrics/network.rs b/common/apm/src/metrics/network.rs index a31d9c3ad..44bcbe650 100644 --- a/common/apm/src/metrics/network.rs +++ b/common/apm/src/metrics/network.rs @@ -1,11 +1,11 @@ +use lazy_static::lazy_static; + use crate::metrics::{ auto_flush_from, exponential_buckets, make_auto_flush_static_metric, register_histogram_vec, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, }; -use lazy_static::lazy_static; - make_auto_flush_static_metric! { pub label_enum MessageDirection { sent, @@ -21,6 +21,12 @@ make_auto_flush_static_metric! { timeout, } + pub label_enum MessageTaret { + single, + multi, + all + } + pub struct MessageCounterVec: LocalIntCounter { "direction" => MessageDirection, } @@ -38,7 +44,7 @@ lazy_static! { pub static ref NETWORK_MESSAGE_COUNT_VEC: IntCounterVec = register_int_counter_vec!( "muta_network_message_total", "Total number of network message", - &["direction", "type", "module", "action"] + &["direction", "target", "type", "module", "action"] ) .expect("network message total"); pub static ref NETWORK_RPC_RESULT_COUNT_VEC: IntCounterVec = register_int_counter_vec!( @@ -79,21 +85,33 @@ lazy_static! { .expect("network ip pending data size"); } -fn on_network_message(direction: &str, url: &str) { +fn on_network_message(direction: &str, target: &str, url: &str, inc: i64) { let spliced: Vec<&str> = url.split('/').collect(); if spliced.len() < 4 { return; } + let network_type = spliced[1]; + let module = spliced[2]; + let action = spliced[3]; + NETWORK_MESSAGE_COUNT_VEC - .with_label_values(&[direction, spliced[1], spliced[2], spliced[3]]) - .inc(); + .with_label_values(&[direction, target, network_type, module, action]) + .inc_by(inc); +} + +pub fn on_network_message_sent_all_target(url: &str) { + on_network_message("sent", "all", url, 1) +} + +pub fn on_network_message_sent_multi_target(url: &str, target_count: i64) { + on_network_message("sent", "single", url, target_count); } pub fn on_network_message_sent(url: &str) { - on_network_message("sent", url); + on_network_message("sent", "single", url, 1); } pub fn on_network_message_received(url: &str) { - on_network_message("received", url); + on_network_message("received", "single", url, 1); } diff --git a/common/apm/src/metrics/storage.rs b/common/apm/src/metrics/storage.rs new file mode 100644 index 000000000..1a6c4e56d --- /dev/null +++ b/common/apm/src/metrics/storage.rs @@ -0,0 +1,127 @@ +use std::time::Duration; + +use lazy_static::lazy_static; +use protocol::traits::StorageCategory; + +use crate::metrics::{ + auto_flush_from, duration_to_sec, make_auto_flush_static_metric, register_counter_vec, + register_int_counter_vec, CounterVec, IntCounterVec, +}; + +make_auto_flush_static_metric! { + pub label_enum COLUMN_FAMILY_TYPES { + block, + receipt, + signed_tx, + wal, + hash_height, + } + + pub struct StoragePutCfTimeUsageVec: LocalCounter { + "cf" => COLUMN_FAMILY_TYPES + } + + pub struct StoragePutCfBytesVec: LocalIntCounter { + "cf" => COLUMN_FAMILY_TYPES + } + + pub struct StorageGetCfTimeUsageVec: LocalCounter { + "cf" => COLUMN_FAMILY_TYPES + } + + pub struct StorageGetCfTotalVec: LocalIntCounter { + "cf" => COLUMN_FAMILY_TYPES + } +} + +lazy_static! { + pub static ref STORAGE_PUT_CF_TIME_USAGE_VEC: CounterVec = register_counter_vec!( + "muta_storage_put_cf_seconds", + "Storage put_cf time usage", + &["cf"] + ) + .unwrap(); + pub static ref STORAGE_PUT_CF_BYTES_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( + "muta_storage_put_cf_bytes", + "Storage total insert bytes", + &["cf"] + ) + .unwrap(); + pub static ref STORAGE_GET_CF_TIME_USAGE_VEC: CounterVec = register_counter_vec!( + "muta_storage_get_cf_seconds", + "Storage get_cf time usage", + &["cf"] + ) + .unwrap(); + pub static ref STORAGE_GET_CF_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( + "muta_storage_get_cf_total", + "Storage total get_cf keys number", + &["cf"] + ) + .unwrap(); +} + +lazy_static! { + pub static ref STORAGE_PUT_CF_TIME_USAGE: StoragePutCfTimeUsageVec = + auto_flush_from!(STORAGE_PUT_CF_TIME_USAGE_VEC, StoragePutCfTimeUsageVec); + pub static ref STORAGE_PUT_CF_BYTES_COUNTER: StoragePutCfBytesVec = + auto_flush_from!(STORAGE_PUT_CF_BYTES_COUNTER_VEC, StoragePutCfBytesVec); + pub static ref STORAGE_GET_CF_TIME_USAGE: StorageGetCfTimeUsageVec = + auto_flush_from!(STORAGE_GET_CF_TIME_USAGE_VEC, StorageGetCfTimeUsageVec); + pub static ref STORAGE_GET_CF_COUNTER: StorageGetCfTotalVec = + auto_flush_from!(STORAGE_GET_CF_COUNTER_VEC, StorageGetCfTotalVec); +} + +pub fn on_storage_get_cf(sc: StorageCategory, duration: Duration, keys: i64) { + let seconds = duration_to_sec(duration); + + match sc { + StorageCategory::Block => { + STORAGE_GET_CF_TIME_USAGE.block.inc_by(seconds); + STORAGE_GET_CF_COUNTER.block.inc_by(keys); + } + StorageCategory::Receipt => { + STORAGE_GET_CF_TIME_USAGE.receipt.inc_by(seconds); + STORAGE_GET_CF_COUNTER.receipt.inc_by(keys); + } + StorageCategory::Wal => { + STORAGE_GET_CF_TIME_USAGE.wal.inc_by(seconds); + STORAGE_GET_CF_COUNTER.wal.inc_by(keys); + } + StorageCategory::SignedTransaction => { + STORAGE_GET_CF_TIME_USAGE.signed_tx.inc_by(seconds); + STORAGE_GET_CF_COUNTER.signed_tx.inc_by(keys); + } + StorageCategory::HashHeight => { + STORAGE_GET_CF_TIME_USAGE.hash_height.inc_by(seconds); + STORAGE_GET_CF_COUNTER.hash_height.inc_by(keys); + } + } +} + +pub fn on_storage_put_cf(sc: StorageCategory, duration: Duration, size: i64) { + let seconds = duration_to_sec(duration); + + match sc { + StorageCategory::Block => { + STORAGE_PUT_CF_TIME_USAGE.block.inc_by(seconds); + STORAGE_PUT_CF_BYTES_COUNTER.block.inc_by(size); + } + StorageCategory::Receipt => { + STORAGE_PUT_CF_TIME_USAGE.receipt.inc_by(seconds); + STORAGE_PUT_CF_BYTES_COUNTER.receipt.inc_by(size); + } + StorageCategory::Wal => { + STORAGE_PUT_CF_TIME_USAGE.wal.inc_by(seconds); + STORAGE_PUT_CF_BYTES_COUNTER.wal.inc_by(size); + } + StorageCategory::SignedTransaction => { + STORAGE_PUT_CF_TIME_USAGE.signed_tx.inc_by(seconds); + STORAGE_PUT_CF_BYTES_COUNTER.signed_tx.inc_by(size); + } + StorageCategory::HashHeight => { + STORAGE_PUT_CF_TIME_USAGE.hash_height.inc_by(seconds); + STORAGE_PUT_CF_BYTES_COUNTER.hash_height.inc_by(size); + } + } +} diff --git a/core/consensus/src/engine.rs b/core/consensus/src/engine.rs index acfa1ebf1..75bf29596 100644 --- a/core/consensus/src/engine.rs +++ b/core/consensus/src/engine.rs @@ -416,7 +416,6 @@ impl Engine for ConsensusEngine< common_apm::metrics::consensus::ENGINE_CONSENSUS_COST_TIME.observe(elapsed / 1e3); let mut last_commit_time = self.last_commit_time.write(); *last_commit_time = now; - // pill.block.header.timestamp Ok(status) } diff --git a/core/consensus/src/synchronization.rs b/core/consensus/src/synchronization.rs index fc7cdf3db..696c10018 100644 --- a/core/consensus/src/synchronization.rs +++ b/core/consensus/src/synchronization.rs @@ -45,6 +45,7 @@ impl Synchronization for OverlordSynchronizatio logs = "{'remote_height': 'remote_height'}" )] async fn receive_remote_block(&self, ctx: Context, remote_height: u64) -> ProtocolResult<()> { + let inst = Instant::now(); let syncing_lock = self.syncing.try_lock(); if syncing_lock.is_none() { return Ok(()); @@ -93,6 +94,8 @@ impl Synchronization for OverlordSynchronizatio common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_COUNTER .inc_by((remote_height - current_height) as i64); + common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_HISTOGRAM + .observe(common_apm::metrics::duration_to_sec(inst.elapsed())); self.status.replace(sync_status.clone()); self.adapter.update_status( diff --git a/core/network/src/outbound/gossip.rs b/core/network/src/outbound/gossip.rs index df6493122..3974575f2 100644 --- a/core/network/src/outbound/gossip.rs +++ b/core/network/src/outbound/gossip.rs @@ -43,7 +43,6 @@ where headers.set_span_id(state.span_id()); log::info!("no trace id found for gossip {}", endpoint.full_url()); } - common_apm::metrics::network::on_network_message_sent(endpoint.full_url()); let net_msg = NetworkMessage::new(endpoint, data, headers) .encode() .await?; @@ -85,7 +84,7 @@ where { let msg = self.package_message(cx.clone(), end, msg).await?; self.send(cx, TargetSession::All, msg, p)?; - + common_apm::metrics::network::on_network_message_sent_all_target(end); Ok(()) } @@ -101,7 +100,9 @@ where M: MessageCodec, { let msg = self.package_message(cx.clone(), end, msg).await?; + let user_count = users.len(); self.users_send(cx, users, msg, p).await?; + common_apm::metrics::network::on_network_message_sent_multi_target(end, user_count as i64); Ok(()) } diff --git a/core/storage/src/adapter/rocks.rs b/core/storage/src/adapter/rocks.rs index 75aded553..0e0eec6b7 100644 --- a/core/storage/src/adapter/rocks.rs +++ b/core/storage/src/adapter/rocks.rs @@ -2,11 +2,14 @@ use std::error::Error; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; +use std::time::Instant; -use async_trait::async_trait; use derive_more::{Display, From}; use rocksdb::{ColumnFamily, DBIterator, Options, WriteBatch, DB}; +use async_trait::async_trait; + +use common_apm::metrics::storage::on_storage_put_cf; use protocol::codec::ProtocolCodecSync; use protocol::traits::{ IntoIteratorByRef, StorageAdapter, StorageBatchModify, StorageCategory, StorageIterator, @@ -105,16 +108,16 @@ impl<'c, S: StorageSchema, P: AsRef<[u8]>> IntoIteratorByRef for RocksIntoIte #[async_trait] impl StorageAdapter for RocksAdapter { - async fn insert( - &self, - key: ::Key, - val: ::Value, - ) -> ProtocolResult<()> { + async fn insert(&self, key: S::Key, val: S::Value) -> ProtocolResult<()> { + let inst = Instant::now(); + let column = get_column::(&self.db)?; let key = key.encode_sync()?.to_vec(); let val = val.encode_sync()?.to_vec(); + let size = val.len() as i64; db!(self.db, put_cf, column, key, val)?; + on_storage_put_cf(S::category(), inst.elapsed(), size); Ok(()) } @@ -182,13 +185,20 @@ impl StorageAdapter for RocksAdapter { } let mut batch = WriteBatch::default(); + let mut insert_size = 0usize; + let inst = Instant::now(); for (key, value) in pairs.into_iter() { match value { - Some(value) => batch.put_cf(column, key, value), + Some(value) => { + insert_size += value.len(); + batch.put_cf(column, key, value) + } None => batch.delete_cf(column, key), } } + on_storage_put_cf(S::category(), inst.elapsed(), insert_size as i64); + self.db.write(batch).map_err(RocksAdapterError::from)?; Ok(()) } diff --git a/core/storage/src/lib.rs b/core/storage/src/lib.rs index 7116752ae..6071ab4f2 100644 --- a/core/storage/src/lib.rs +++ b/core/storage/src/lib.rs @@ -10,14 +10,15 @@ use std::convert::From; use std::error::Error; use std::str::FromStr; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use derive_more::{Display, From}; use lazy_static::lazy_static; use tokio::sync::RwLock; +use common_apm::metrics::storage::on_storage_get_cf; use common_apm::muta_apm; - use protocol::codec::ProtocolCodecSync; use protocol::traits::{ Context, Storage, StorageAdapter, StorageBatchModify, StorageCategory, StorageSchema, @@ -280,6 +281,7 @@ impl Storage for ImplStorage { let mut found = Vec::with_capacity(hashes.len()); { + let inst = Instant::now(); let prepare_iter = self .adapter .prepare_iter::(&key_prefix)?; @@ -287,6 +289,11 @@ impl Storage for ImplStorage { let set = hashes.iter().collect::>(); let mut count = hashes.len(); + on_storage_get_cf( + StorageCategory::SignedTransaction, + inst.elapsed(), + count as i64, + ); while count > 0 { let (key, stx_bytes) = match iter.next() { @@ -404,6 +411,7 @@ impl Storage for ImplStorage { let mut found = Vec::with_capacity(hashes.len()); { + let inst = Instant::now(); let prepare_iter = self .adapter .prepare_iter::(&key_prefix)?; @@ -411,6 +419,7 @@ impl Storage for ImplStorage { let set = hashes.iter().collect::>(); let mut count = hashes.len(); + on_storage_get_cf(StorageCategory::Receipt, inst.elapsed(), count as i64); while count > 0 { let (key, stx_bytes) = match iter.next() {