From 4e673c806e09171e9729e0f27b9def4347e406d7 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Wed, 12 Jul 2023 11:07:53 -0300 Subject: [PATCH] chore(sinks): removed aws_s3 as pending --- src/sinks/_pending/aws_s3/mod.rs | 4 - src/sinks/_pending/aws_s3/run.rs | 138 ----------------------------- src/sinks/_pending/aws_s3/setup.rs | 81 ----------------- 3 files changed, 223 deletions(-) delete mode 100644 src/sinks/_pending/aws_s3/mod.rs delete mode 100644 src/sinks/_pending/aws_s3/run.rs delete mode 100644 src/sinks/_pending/aws_s3/setup.rs diff --git a/src/sinks/_pending/aws_s3/mod.rs b/src/sinks/_pending/aws_s3/mod.rs deleted file mode 100644 index 0a447c1d..00000000 --- a/src/sinks/_pending/aws_s3/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod run; -mod setup; - -pub use setup::*; diff --git a/src/sinks/_pending/aws_s3/run.rs b/src/sinks/_pending/aws_s3/run.rs deleted file mode 100644 index 6d9a1502..00000000 --- a/src/sinks/_pending/aws_s3/run.rs +++ /dev/null @@ -1,138 +0,0 @@ -use aws_sdk_s3::{types::ByteStream, Client}; -use std::sync::Arc; - -use crate::{ - framework::StageReceiver, - model::{BlockRecord, EventData}, - utils::Utils, - Error, -}; - -use super::{ContentType, Naming}; - -async fn send_s3_object( - client: Arc, - bucket: &str, - key: &str, - content: ByteStream, - content_type: &ContentType, - record: &BlockRecord, -) -> Result<(), Error> { - let req = client - .put_object() - .bucket(bucket) - .key(key) - .body(content) - .metadata("era", record.era.to_string()) - .metadata("issuer_vkey", &record.issuer_vkey) - .metadata("tx_count", record.tx_count.to_string()) - .metadata("slot", record.slot.to_string()) - .metadata("hash", &record.hash) - .metadata("number", record.number.to_string()) - .metadata("previous_hash", &record.previous_hash) - .content_type(content_type); - - let res = req.send().await?; - - log::trace!("S3 put response: {:?}", res); - - Ok(()) -} - -fn define_obj_key(prefix: &str, policy: &Naming, record: &BlockRecord) -> String { - match policy { - Naming::Hash => format!("{}{}", prefix, record.hash), - Naming::SlotHash => format!("{}{}.{}", prefix, record.slot, record.hash), - Naming::BlockHash => format!("{}{}.{}", prefix, record.number, record.hash), - Naming::EpochHash => format!( - "{}{}.{}", - prefix, - record.epoch.unwrap_or_default(), - record.hash - ), - Naming::EpochSlotHash => format!( - "{}{}.{}.{}", - prefix, - record.epoch.unwrap_or_default(), - record.slot, - record.hash - ), - Naming::EpochBlockHash => { - format!( - "{}{}.{}.{}", - prefix, - record.epoch.unwrap_or_default(), - record.number, - record.hash - ) - } - } -} - -fn define_content(content_type: &ContentType, record: &BlockRecord) -> ByteStream { - let hex = match record.cbor_hex.as_ref() { - Some(x) => x, - None => { - log::error!( - "found block record without CBOR, please enable CBOR in source mapper options" - ); - panic!() - } - }; - - match content_type { - ContentType::Cbor => { - let cbor = hex::decode(hex).expect("valid hex value"); - ByteStream::from(cbor) - } - ContentType::CborHex => ByteStream::from(hex.as_bytes().to_vec()), - } -} - -pub fn writer_loop( - input: StageReceiver, - client: Client, - bucket: &str, - prefix: &str, - naming: Naming, - content_type: ContentType, - utils: Arc, -) -> Result<(), Error> { - let client = Arc::new(client); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .enable_io() - .build()?; - - for event in input.iter() { - if let EventData::Block(record) = &event.data { - let key = define_obj_key(prefix, &naming, record); - let content = define_content(&content_type, record); - - let client = client.clone(); - - let result = rt.block_on(send_s3_object( - client, - bucket, - &key, - content, - &content_type, - record, - )); - - match result { - Ok(_) => { - // notify the pipeline where we are - utils.track_sink_progress(&event); - } - Err(err) => { - log::error!("unrecoverable error sending block to S3: {:?}", err); - return Err(err); - } - } - } - } - - Ok(()) -} diff --git a/src/sinks/_pending/aws_s3/setup.rs b/src/sinks/_pending/aws_s3/setup.rs deleted file mode 100644 index 65c78dab..00000000 --- a/src/sinks/_pending/aws_s3/setup.rs +++ /dev/null @@ -1,81 +0,0 @@ -use aws_sdk_s3::{Client, Region, RetryConfig}; -use serde::Deserialize; - -use crate::{ - framework::{BootstrapResult, SinkProvider, StageReceiver}, - utils::WithUtils, -}; - -use super::run::writer_loop; - -const DEFAULT_MAX_RETRIES: u32 = 5; - -#[derive(Deserialize, Debug, Clone)] -pub enum Naming { - Hash, - SlotHash, - BlockHash, - EpochHash, - EpochSlotHash, - EpochBlockHash, -} - -#[derive(Deserialize, Debug, Clone)] -pub enum ContentType { - Cbor, - CborHex, -} - -impl From<&ContentType> for String { - fn from(other: &ContentType) -> Self { - match other { - ContentType::Cbor => "application/cbor".to_string(), - ContentType::CborHex => "text/plain".to_string(), - } - } -} - -#[derive(Default, Debug, Deserialize)] -pub struct Config { - pub region: String, - pub bucket: String, - pub prefix: Option, - pub naming: Option, - pub content: Option, - pub max_retries: Option, -} - -impl SinkProvider for WithUtils { - fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { - let explicit_region = self.inner.region.to_owned(); - - let aws_config = tokio::runtime::Builder::new_current_thread() - .build()? - .block_on( - aws_config::from_env() - .region(Region::new(explicit_region)) - .load(), - ); - - let retry_config = RetryConfig::new() - .with_max_attempts(self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES)); - - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .retry_config(retry_config) - .build(); - - let client = Client::from_conf(s3_config); - let bucket = self.inner.bucket.clone(); - let prefix = self.inner.prefix.clone().unwrap_or_default(); - let naming = self.inner.naming.clone().unwrap_or(Naming::Hash); - let content = self.inner.content.clone().unwrap_or(ContentType::Cbor); - let utils = self.utils.clone(); - - let handle = std::thread::spawn(move || { - writer_loop(input, client, &bucket, &prefix, naming, content, utils) - .expect("writer loop failed") - }); - - Ok(handle) - } -}