Skip to content

Commit

Permalink
feat: logging blobs (#109)
Browse files Browse the repository at this point in the history
Co-authored-by: hal3e <[email protected]>
Co-authored-by: segfault-magnet <[email protected]>
  • Loading branch information
3 people authored Aug 30, 2024
1 parent b6656cc commit df6b28a
Show file tree
Hide file tree
Showing 30 changed files with 535 additions and 176 deletions.
231 changes: 222 additions & 9 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ async-trait = { version = "0.1", default-features = false }
c-kzg = { version = "1.0", default-features = false }
clap = { version = "4.5", default-features = false }
config = { version = "0.14", default-features = false }
fs_extra = { version = "1.3", default-features = false }
fuel-core-chain-config = { version = "0.31", default-features = false }
fuel-core-client = { version = "0.31", default-features = false }
fuel-core-types = { version = "0.31", default-features = false }
fuel-crypto = { version = "0.55", default-features = false }
futures = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
hex = { version = "0.4", default-features = false }
humantime = { version = "2.1", default-features = false }
impl-tools = { version = "0.10.0", default-features = false }
Expand Down
1 change: 0 additions & 1 deletion committer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ rust-version = { workspace = true }

[dependencies]
actix-web = { workspace = true, features = ["macros"] }
alloy-chains = { workspace = true, features = [ "serde" ] }
clap = { workspace = true, features = ["default", "derive"] }
config = { workspace = true, features = ["toml", "async"] }
eth = { workspace = true }
Expand Down
23 changes: 4 additions & 19 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};

use alloy_chains::NamedChain;
use clap::{command, Parser};
use eth::Address;
use serde::Deserialize;
Expand All @@ -16,8 +15,8 @@ pub struct Config {

impl Config {
pub fn validate(&self) -> crate::errors::Result<()> {
if let Some(blob_pool_wallet_key) = &self.eth.blob_pool_key_id {
if blob_pool_wallet_key == &self.eth.main_key_id {
if let Some(blob_pool_wallet_key) = &self.eth.blob_pool_key_arn {
if blob_pool_wallet_key == &self.eth.main_key_arn {
return Err(crate::errors::Error::Other(
"Wallet key and blob pool wallet key must be different".to_string(),
));
Expand All @@ -40,30 +39,16 @@ pub struct Fuel {
#[derive(Debug, Clone, Deserialize)]
pub struct Eth {
/// The AWS KMS key ID authorized by the L1 bridging contracts to post block commitments.
pub main_key_id: String,
pub main_key_arn: String,
/// The AWS KMS key ID for posting L2 state to L1.
pub blob_pool_key_id: Option<String>,
pub blob_pool_key_arn: Option<String>,
/// URL to a Ethereum RPC endpoint.
#[serde(deserialize_with = "parse_url")]
pub rpc: Url,
/// Chain id of the ethereum network.
#[serde(deserialize_with = "deserialize_named_chain")]
pub chain_id: NamedChain,
/// Ethereum address of the fuel chain state contract.
pub state_contract_address: Address,
}

fn deserialize_named_chain<'de, D>(deserializer: D) -> Result<NamedChain, D::Error>
where
D: serde::Deserializer<'de>,
{
let chain_str: String = Deserialize::deserialize(deserializer).unwrap();
NamedChain::from_str(&chain_str).map_err(|_| {
let msg = format!("Failed to parse chain from '{chain_str}'");
serde::de::Error::custom(msg)
})
}

fn parse_url<'de, D>(deserializer: D) -> Result<Url, D::Error>
where
D: serde::Deserializer<'de>,
Expand Down
1 change: 1 addition & 0 deletions committer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl From<ports::fuel::Error> for Error {
fn from(error: ports::fuel::Error) -> Self {
match error {
ports::fuel::Error::Network(e) => Self::Network(e),
ports::fuel::Error::Other(e) => Self::Other(e),
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions committer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,25 @@ async fn main() -> Result<()> {

// If the blob pool wallet key is set, we need to start
// the state committer and state importer
if config.eth.blob_pool_key_id.is_some() {
if config.eth.blob_pool_key_arn.is_some() {
let state_committer_handle = setup::state_committer(
ethereum_rpc.clone(),
storage.clone(),
&metrics_registry,
cancel_token.clone(),
&config,
);

let state_importer_handle = setup::state_importer(
fuel_adapter,
let state_importer_handle =
setup::state_importer(fuel_adapter, storage.clone(), cancel_token.clone(), &config);

let state_listener_handle = setup::state_listener(
ethereum_rpc,
storage.clone(),
&metrics_registry,
cancel_token.clone(),
&metrics_registry,
&config,
);

let state_listener_handle =
setup::state_listener(ethereum_rpc, storage.clone(), cancel_token.clone(), &config);

handles.push(state_committer_handle);
handles.push(state_importer_handle);
handles.push(state_listener_handle);
Expand Down
10 changes: 5 additions & 5 deletions committer/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub fn block_committer(
pub fn state_committer(
l1: L1,
storage: impl Storage + 'static,
_registry: &Registry,
cancel_token: CancellationToken,
config: &config::Config,
) -> tokio::task::JoinHandle<()> {
Expand All @@ -90,7 +89,6 @@ pub fn state_committer(
pub fn state_importer(
fuel: FuelApi,
storage: impl Storage + 'static,
_registry: &Registry,
cancel_token: CancellationToken,
config: &config::Config,
) -> tokio::task::JoinHandle<()> {
Expand All @@ -109,11 +107,14 @@ pub fn state_listener(
l1: L1,
storage: impl Storage + 'static,
cancel_token: CancellationToken,
registry: &Registry,
config: &config::Config,
) -> tokio::task::JoinHandle<()> {
let state_listener =
services::StateListener::new(l1, storage, config.app.num_blocks_to_finalize_tx);

state_listener.register_metrics(registry);

schedule_polling(
config.app.block_check_interval,
state_listener,
Expand All @@ -133,10 +134,9 @@ pub async fn l1_adapter(

let l1 = L1::connect(
config.eth.rpc.clone(),
config.eth.chain_id.into(),
config.eth.state_contract_address,
config.eth.main_key_id.clone(),
config.eth.blob_pool_key_id.clone(),
config.eth.main_key_arn.clone(),
config.eth.blob_pool_key_arn.clone(),
internal_config.eth_errors_before_unhealthy,
aws_client,
)
Expand Down
3 changes: 1 addition & 2 deletions configurations/development/config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[eth]
chain_id = "anvil"
state_contract_address = "0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9"
rpc = "ws://localhost:8545"

Expand All @@ -11,7 +10,7 @@ block_producer_public_key = "0x73dc6cc8cc0041e4924954b35a71a22ccb520664c522198a6
port = 8080
host = "0.0.0.0"
block_check_interval = "1s"
num_blocks_to_finalize_tx = "12"
num_blocks_to_finalize_tx = "3"

[app.db]
host = "localhost"
Expand Down
14 changes: 12 additions & 2 deletions e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@ walkdir = { workspace = true }
zip = { workspace = true, features = ["deflate"] }

[dev-dependencies]
alloy = { workspace = true, features = [ "signer-aws", "signer-mnemonic", "serde" ] }
alloy-chains = { workspace = true }
fs_extra = { workspace = true }
alloy = { workspace = true, features = [
"signer-aws",
"signer-mnemonic",
"serde",
] }
anyhow = { workspace = true, features = ["std"] }
aws-sdk-kms = { workspace = true, features = ["rustls"] }
aws-config = { workspace = true, features = ["rustls"] }
eth = { workspace = true, features = ["test-helpers"] }
fuel = { workspace = true, features = ["test-helpers"] }
fuel-core-chain-config = { workspace = true, features = [
"std",
"test-helpers",
] }
fuel-core-types = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
portpicker = { workspace = true }
ports = { workspace = true, features = ["fuel", "l1"] }
Expand Down
46 changes: 33 additions & 13 deletions e2e/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use url::Url;
#[derive(Default)]
pub struct Committer {
show_logs: bool,
main_key_id: Option<String>,
blob_key_id: Option<String>,
main_key_arn: Option<String>,
blob_key_arn: Option<String>,
state_contract_address: Option<String>,
eth_rpc: Option<Url>,
fuel_rpc: Option<Url>,
Expand Down Expand Up @@ -36,10 +36,10 @@ impl Committer {
let mut cmd = tokio::process::Command::new("fuel-block-committer");
cmd.arg(config)
.env("E2E_TEST_AWS_ENDPOINT", kms_url)
.env("AWS_ACCESS_KEY_ID", "test")
.env("AWS_REGION", "us-east-1")
.env("AWS_ACCESS_KEY_ID", "test")
.env("AWS_SECRET_ACCESS_KEY", "test")
.env("COMMITTER__ETH__MAIN_KEY_ID", get_field!(main_key_id))
.env("COMMITTER__ETH__MAIN_KEY_ARN", get_field!(main_key_arn))
.env("COMMITTER__ETH__RPC", get_field!(eth_rpc).as_str())
.env(
"COMMITTER__ETH__STATE_CONTRACT_ADDRESS",
Expand All @@ -56,12 +56,11 @@ impl Committer {
.env("COMMITTER__APP__DB__PORT", get_field!(db_port).to_string())
.env("COMMITTER__APP__DB__DATABASE", get_field!(db_name))
.env("COMMITTER__APP__PORT", unused_port.to_string())
.env("COMMITTER__AWS__ALLOW_HTTP", "true")
.current_dir(Path::new(env!("CARGO_MANIFEST_DIR")).parent().unwrap())
.kill_on_drop(true);

if let Some(blob_wallet_key_id) = self.blob_key_id {
cmd.env("COMMITTER__ETH__BLOB_POOL_KEY_ID", blob_wallet_key_id);
if let Some(blob_wallet_key_arn) = self.blob_key_arn {
cmd.env("COMMITTER__ETH__BLOB_POOL_KEY_ARN", blob_wallet_key_arn);
}

let sink = if self.show_logs {
Expand All @@ -79,8 +78,8 @@ impl Committer {
})
}

pub fn with_main_key_id(mut self, wallet_id: String) -> Self {
self.main_key_id = Some(wallet_id);
pub fn with_main_key_arn(mut self, wallet_arn: String) -> Self {
self.main_key_arn = Some(wallet_arn);
self
}

Expand All @@ -89,8 +88,8 @@ impl Committer {
self
}

pub fn with_blob_key_id(mut self, blob_wallet_id: String) -> Self {
self.blob_key_id = Some(blob_wallet_id);
pub fn with_blob_key_arn(mut self, blob_wallet_arn: String) -> Self {
self.blob_key_arn = Some(blob_wallet_arn);
self
}

Expand Down Expand Up @@ -149,7 +148,28 @@ impl CommitterProcess {
Ok(())
}

pub async fn wait_for_committed_blob(&self) -> anyhow::Result<()> {
loop {
match self.fetch_latest_blob_block().await {
Ok(_) => break,
_ => {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
Ok(())
}

async fn fetch_latest_committed_block(&self) -> anyhow::Result<u64> {
self.fetch_metric_value("latest_committed_block").await
}

async fn fetch_latest_blob_block(&self) -> anyhow::Result<u64> {
self.fetch_metric_value("last_eth_block_w_blob").await
}

async fn fetch_metric_value(&self, metric_name: &str) -> anyhow::Result<u64> {
let response = reqwest::get(format!("http://localhost:{}/metrics", self.port))
.await?
.error_for_status()?
Expand All @@ -158,8 +178,8 @@ impl CommitterProcess {

let height_line = response
.lines()
.find(|line| line.starts_with("latest_committed_block"))
.ok_or_else(|| anyhow::anyhow!("couldn't find latest_committed_block metric"))?;
.find(|line| line.starts_with(metric_name))
.ok_or_else(|| anyhow::anyhow!("couldn't find {} metric", metric_name))?;

Ok(height_line
.split_whitespace()
Expand Down
22 changes: 3 additions & 19 deletions e2e/src/eth_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ use alloy::{
network::{EthereumWallet, TransactionBuilder},
providers::{Provider, ProviderBuilder, WsConnect},
rpc::types::TransactionRequest,
signers::{
local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner},
Signer,
},
signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner},
};
use alloy_chains::NamedChain;
use eth::Address;
use ports::types::U256;
use state_contract::CreateTransactions;
Expand Down Expand Up @@ -53,12 +49,7 @@ impl EthNode {

let child = cmd.spawn()?;

Ok(EthNodeProcess::new(
child,
unused_port,
NamedChain::AnvilHardhat.into(),
mnemonic,
))
Ok(EthNodeProcess::new(child, unused_port, mnemonic))
}

pub fn with_show_logs(mut self, show_logs: bool) -> Self {
Expand All @@ -69,18 +60,16 @@ impl EthNode {

pub struct EthNodeProcess {
_child: tokio::process::Child,
chain_id: u64,
port: u16,
mnemonic: String,
}

impl EthNodeProcess {
fn new(child: tokio::process::Child, port: u16, chain_id: u64, mnemonic: String) -> Self {
fn new(child: tokio::process::Child, port: u16, mnemonic: String) -> Self {
Self {
_child: child,
mnemonic,
port,
chain_id,
}
}

Expand Down Expand Up @@ -108,7 +97,6 @@ impl EthNodeProcess {
.expect("Should generate a valid derivation path")
.build()
.expect("phrase to be correct")
.with_chain_id(Some(self.chain_id))
}

pub fn ws_url(&self) -> Url {
Expand All @@ -117,10 +105,6 @@ impl EthNodeProcess {
.expect("URL to be well formed")
}

pub fn chain_id(&self) -> u64 {
self.chain_id
}

pub async fn fund(&self, address: Address, amount: U256) -> anyhow::Result<()> {
let wallet = EthereumWallet::from(self.wallet(0));
let ws = WsConnect::new(self.ws_url());
Expand Down
Loading

0 comments on commit df6b28a

Please sign in to comment.