Skip to content

Commit

Permalink
feat(derive): Move to tracing for telemetry (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby authored Apr 8, 2024
1 parent 9f9a724 commit 56fa3ba
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 343 deletions.
94 changes: 94 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ homepage.workspace = true
[dependencies]
# Workspace
anyhow.workspace = true
tracing.workspace = true

# External
alloy-primitives = { version = "0.7.0", default-features = false, features = ["rlp"] }
Expand All @@ -30,6 +31,7 @@ serde = { version = "1.0.197", default-features = false, features = ["derive"],
tokio = { version = "1.36", features = ["full"] }
proptest = "1.4.0"
spin = { version = "0.9.8", features = ["mutex"] } # Spin is used for testing synchronization primitives
tracing-subscriber = "0.3.18"

[features]
default = ["serde", "k256"]
Expand Down
9 changes: 2 additions & 7 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::{ChainProvider, TelemetryProvider};
use traits::ChainProvider;
use types::RollupConfig;

mod params;
Expand All @@ -29,14 +29,9 @@ pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P, T>(
_rollup_config: Arc<RollupConfig>,
_chain_provider: P,
_telemetry: Arc<T>,
) -> Self
pub fn new<P>(_rollup_config: Arc<RollupConfig>, _chain_provider: P) -> Self
where
P: ChainProvider + Clone + Debug + Send,
T: TelemetryProvider + Clone + Debug + Send + Sync,
{
// let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone(),
// telemetry.clone()); let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source,
Expand Down
46 changes: 16 additions & 30 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
traits::{OriginProvider, ResettableStage},
types::{
AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
},
};
use alloc::boxed::Box;
use alloy_primitives::Bytes;
use async_trait::async_trait;
use core::fmt::Debug;
use tracing::info;

pub trait AttributesBuilder {
/// Prepare the payload attributes.
Expand Down Expand Up @@ -42,18 +42,15 @@ pub trait AttributesProvider {
/// This stage can be reset by clearing its batch buffer.
/// This stage does not need to retain any references to L1 blocks.
#[derive(Debug)]
pub struct AttributesQueue<P, T, AB>
pub struct AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
cfg: RollupConfig,
/// The previous stage of the derivation pipeline.
prev: P,
/// Telemetry provider.
telemetry: T,
/// Whether the current batch is the last in its span.
is_last_in_span: bool,
/// The current batch being processed.
Expand All @@ -62,15 +59,14 @@ where
builder: AB,
}

impl<P, T, AB> AttributesQueue<P, T, AB>
impl<P, AB> AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
pub fn new(cfg: RollupConfig, prev: P, telemetry: T, builder: AB) -> Self {
Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder }
pub fn new(cfg: RollupConfig, prev: P, builder: AB) -> Self {
Self { cfg, prev, is_last_in_span: false, batch: None, builder }
}

/// Loads a [SingleBatch] from the [AttributesProvider] if needed.
Expand Down Expand Up @@ -132,23 +128,18 @@ where
attributes.no_tx_pool = true;
attributes.transactions.extend(batch.transactions);

self.telemetry.write(
Bytes::from(alloc::format!(
"generated attributes in payload queue: txs={}, timestamp={}",
tx_count,
batch.timestamp,
)),
LogLevel::Info,
info!(
"generated attributes in payload queue: txs={}, timestamp={}",
tx_count, batch.timestamp
);

Ok(attributes)
}
}

impl<P, T, AB> OriginProvider for AttributesQueue<P, T, AB>
impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
Expand All @@ -157,14 +148,13 @@ where
}

#[async_trait]
impl<P, T, AB> ResettableStage for AttributesQueue<P, T, AB>
impl<P, AB> ResettableStage for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info);
info!("resetting attributes queue");
// TODO: metrice the reset using telemetry
// telemetry can provide a method of logging and metricing
self.batch = None;
Expand All @@ -183,7 +173,6 @@ mod tests {
stages::test_utils::{
new_attributes_provider, MockAttributesBuilder, MockAttributesProvider,
},
traits::test_utils::TestTelemetry,
types::RawTransaction,
};
use alloc::{vec, vec::Vec};
Expand All @@ -193,12 +182,11 @@ mod tests {
cfg: Option<RollupConfig>,
origin: Option<BlockInfo>,
batches: Vec<StageResult<SingleBatch>>,
) -> AttributesQueue<MockAttributesProvider, TestTelemetry, MockAttributesBuilder> {
) -> AttributesQueue<MockAttributesProvider, MockAttributesBuilder> {
let cfg = cfg.unwrap_or_default();
let telemetry = TestTelemetry::new();
let mock_batch_queue = new_attributes_provider(origin, batches);
let mock_attributes_builder = MockAttributesBuilder::default();
AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder)
AttributesQueue::new(cfg, mock_batch_queue, mock_attributes_builder)
}

#[tokio::test]
Expand Down Expand Up @@ -283,12 +271,11 @@ mod tests {
#[tokio::test]
async fn test_create_next_attributes_success() {
let cfg = RollupConfig::default();
let telemetry = TestTelemetry::new();
let mock = new_attributes_provider(None, vec![]);
let mut payload_attributes = PayloadAttributes::default();
let mock_builder =
MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] };
let mut aq = AttributesQueue::new(cfg, mock, telemetry, mock_builder);
let mut aq = AttributesQueue::new(cfg, mock, mock_builder);
let parent = L2BlockInfo::default();
let txs = vec![RawTransaction::default(), RawTransaction::default()];
let batch = SingleBatch { transactions: txs.clone(), ..Default::default() };
Expand All @@ -310,11 +297,10 @@ mod tests {
#[tokio::test]
async fn test_next_attributes_load_batch_last_in_span() {
let cfg = RollupConfig::default();
let telemetry = TestTelemetry::new();
let mock = new_attributes_provider(None, vec![Ok(Default::default())]);
let mut pa = PayloadAttributes::default();
let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] };
let mut aq = AttributesQueue::new(cfg, mock, telemetry, mock_builder);
let mut aq = AttributesQueue::new(cfg, mock, mock_builder);
// If we load the batch, we should get the last in span.
// But it won't take it so it will be available in the next_attributes call.
let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap();
Expand Down
Loading

0 comments on commit 56fa3ba

Please sign in to comment.