Skip to content

Commit

Permalink
[kv store] add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Mar 2, 2025
1 parent 5828f40 commit 09053bb
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 5 deletions.
4 changes: 4 additions & 0 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ async fn main() -> Result<()> {
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
"ingestion".to_string(),
&registry,
)
.await?;
bigtable_store = Some(BigTableProgressStore::new(bigtable_client));
Expand Down Expand Up @@ -173,6 +175,8 @@ async fn main() -> Result<()> {
kv_config.instance_id,
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
"ingestion".to_string(),
&registry,
)
.await?;
let worker_pool = WorkerPool::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-jsonrpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Context {
let pg_loader = Arc::new(pg_reader.as_data_loader());

let kv_loader = if let Some(config) = config.bigtable.clone() {
let bigtable_reader = BigtableReader::new(config.instance_id).await?;
let bigtable_reader = BigtableReader::new(config.instance_id, registry).await?;
KvLoader::new_with_bigtable(Arc::new(bigtable_reader.as_data_loader()))
} else {
KvLoader::new_with_pg(pg_loader.clone())
Expand Down
15 changes: 11 additions & 4 deletions crates/sui-indexer-alt-jsonrpc/src/data/bigtable_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use async_graphql::dataloader::DataLoader;
use prometheus::Registry;
use sui_kvstore::BigTableClient;

use crate::data::error::Error;
Expand All @@ -13,15 +14,21 @@ use crate::data::error::Error;
pub struct BigtableReader(pub(crate) BigTableClient);

impl BigtableReader {
pub(crate) async fn new(instance_id: String) -> Result<Self, Error> {
pub(crate) async fn new(instance_id: String, registry: &Registry) -> Result<Self, Error> {
if std::env::var("GOOGLE_APPLICATION_CREDENTIALS").is_err() {
return Err(Error::BigtableCreate(anyhow::anyhow!(
"Environment variable GOOGLE_APPLICATION_CREDENTIALS is not set"
)));
}
let client = BigTableClient::new_remote(instance_id, true, None)
.await
.map_err(Error::BigtableCreate)?;
let client = BigTableClient::new_remote(
instance_id,
true,
None,
"indexer-alt-jsonrpc".to_string(),
registry,
)
.await
.map_err(Error::BigtableCreate)?;
Ok(Self(client))
}

Expand Down
59 changes: 59 additions & 0 deletions crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::bigtable::metrics::KvMetrics;
use crate::bigtable::proto::bigtable::v2::bigtable_client::BigtableClient as BigtableInternalClient;
use crate::bigtable::proto::bigtable::v2::mutate_rows_request::Entry;
use crate::bigtable::proto::bigtable::v2::mutation::SetCell;
Expand All @@ -14,11 +15,13 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use gcp_auth::{Token, TokenProvider};
use http::{HeaderValue, Request, Response};
use prometheus::Registry;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::Instant;
use sui_types::base_types::{ObjectID, TransactionDigest};
use sui_types::digests::CheckpointDigest;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,6 +65,8 @@ struct AuthChannel {
pub struct BigTableClient {
table_prefix: String,
client: BigtableInternalClient<AuthChannel>,
client_name: String,
metrics: Option<Arc<KvMetrics>>,
}

#[async_trait]
Expand Down Expand Up @@ -281,13 +286,17 @@ impl BigTableClient {
Ok(Self {
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_id),
client: BigtableInternalClient::new(auth_channel),
client_name: "local".to_string(),
metrics: None,
})
}

pub async fn new_remote(
instance_id: String,
is_read_only: bool,
timeout: Option<Duration>,
client_name: String,
registry: &Registry,
) -> Result<Self> {
let policy = if is_read_only {
"https://www.googleapis.com/auth/bigtable.data.readonly"
Expand Down Expand Up @@ -319,6 +328,8 @@ impl BigTableClient {
Ok(Self {
table_prefix,
client: BigtableInternalClient::new(auth_channel),
client_name,
metrics: Some(KvMetrics::new(registry)),
})
}

Expand Down Expand Up @@ -423,6 +434,54 @@ impl BigTableClient {
&mut self,
table_name: &str,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Vec<(Bytes, Bytes)>>> {
let elapsed = Instant::now().elapsed();
let num_keys_requested = keys.len();
let result = self.multi_get_internal(table_name, keys).await;
let labels = [&self.client_name, table_name];
match &self.metrics {
None => result,
Some(metrics) => match result {
Err(e) => {
metrics.kv_get_errors.with_label_values(&labels).inc();
Err(e)
}
Ok(result) => {
metrics
.kv_get_batch_size
.with_label_values(&labels)
.observe(num_keys_requested as f64);
if num_keys_requested > result.len() {
metrics
.kv_get_not_found
.with_label_values(&labels)
.inc_by((num_keys_requested - result.len()) as u64);
}
metrics
.kv_get_success
.with_label_values(&labels)
.inc_by(result.len() as u64);
let elapsed_ms = elapsed.as_millis() as f64;
metrics
.kv_get_latency_ms
.with_label_values(&labels)
.observe(elapsed_ms);
if num_keys_requested > 0 {
metrics
.kv_get_latency_ms_per_key
.with_label_values(&labels)
.observe(elapsed_ms / num_keys_requested as f64);
}
Ok(result)
}
},
}
}

pub async fn multi_get_internal(
&mut self,
table_name: &str,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Vec<(Bytes, Bytes)>>> {
let request = ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
Expand Down
75 changes: 75 additions & 0 deletions crates/sui-kvstore/src/bigtable/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramVec,
IntCounterVec, Registry,
};
use std::sync::Arc;

pub(crate) struct KvMetrics {
pub kv_get_success: IntCounterVec,
pub kv_get_not_found: IntCounterVec,
pub kv_get_errors: IntCounterVec,
pub kv_get_latency_ms: HistogramVec,
pub kv_get_batch_size: HistogramVec,
pub kv_get_latency_ms_per_key: HistogramVec,
}

impl KvMetrics {
pub(crate) fn new(registry: &Registry) -> Arc<Self> {
Arc::new(Self {
kv_get_success: register_int_counter_vec_with_registry!(
"kv_get_success",
"Number of successful fetches from kv store",
&["client", "table"],
registry,
)
.unwrap(),
kv_get_not_found: register_int_counter_vec_with_registry!(
"kv_get_not_found",
"Number of fetches from kv store that returned not found",
&["client", "table"],
registry,
)
.unwrap(),
kv_get_errors: register_int_counter_vec_with_registry!(
"kv_get_errors",
"Number of fetches from kv store that returned an error",
&["client", "table"],
registry,
)
.unwrap(),
kv_get_latency_ms: register_histogram_vec_with_registry!(
"kv_get_latency_ms",
"Latency of fetches from kv store",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 24)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
kv_get_batch_size: register_histogram_vec_with_registry!(
"kv_get_batch_size",
"Number of keys fetched per batch from kv store",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 20)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
kv_get_latency_ms_per_key: register_histogram_vec_with_registry!(
"kv_get_latency_ms_per_key",
"Latency of fetches from kv store per key",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 24)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
})
}
}
1 change: 1 addition & 0 deletions crates/sui-kvstore/src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod client;
mod metrics;
pub(crate) mod progress_store;
mod proto;
pub(crate) mod worker;

0 comments on commit 09053bb

Please sign in to comment.