From 1dbc1f2383b9ed86557f20398c737aa4ed94481b Mon Sep 17 00:00:00 2001 From: paulobressan Date: Thu, 17 Aug 2023 16:55:20 -0300 Subject: [PATCH] feat: added filter by slot --- examples/match_pattern/daemon.toml | 28 ++++++++++ src/filters/match_pattern.rs | 89 ++++++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 examples/match_pattern/daemon.toml diff --git a/examples/match_pattern/daemon.toml b/examples/match_pattern/daemon.toml new file mode 100644 index 00000000..3a43b119 --- /dev/null +++ b/examples/match_pattern/daemon.toml @@ -0,0 +1,28 @@ +[source] +type = "N2N" +peers = ["relays-new.cardano-mainnet.iohk.io:3001"] + +[intersect] +type = "Point" +value = [ + 4493860, + "ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc", +] + +[[filters]] +type = "SplitBlock" + +[[filters]] +type = "ParseCbor" + +[[filters]] +type = "MatchPattern" + +[filters.predicate.block] +slot_after = 4493920 +slot_before = 4494020 + +[sink] +type = "Redis" +url = "redis://127.0.0.1/1" +stream_name = "my-stream" diff --git a/src/filters/match_pattern.rs b/src/filters/match_pattern.rs index 6659118d..2706dcff 100644 --- a/src/filters/match_pattern.rs +++ b/src/filters/match_pattern.rs @@ -1,11 +1,15 @@ use gasket::framework::*; +use pallas::network::miniprotocols::Point; use serde::Deserialize; +use tracing::error; use crate::framework::*; -#[derive(Default, Stage)] +#[derive(Stage)] #[stage(name = "filter-match-pattern", unit = "ChainEvent", worker = "Worker")] pub struct Stage { + predicate: Predicate, + pub input: FilterInputPort, pub output: FilterOutputPort, @@ -22,17 +26,90 @@ impl From<&Stage> for Worker { } } -gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { - let out = unit.clone(); +gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { + let out = match unit { + ChainEvent::Apply(point, record) => match record { + Record::ParsedTx(tx) => { + if stage.predicate.tx_match(point, tx) { + Ok(Some(unit.to_owned())) + } else { + Ok(None) + } + }, + _ => { + error!("The MatchPattern filter is valid only with the ParsedTx record"); + Err(WorkerError::Panic) + } + }, + _ => Ok(Some(unit.to_owned())) + }?; + stage.ops_count.inc(1); + out }); -#[derive(Default, Deserialize)] -pub struct Config {} +#[derive(Deserialize, Clone)] +pub struct AddressPattern { + pub exact_hex: Option, + pub exact_bech32: Option, + pub payment_hex: Option, + pub payment_bech32: Option, + pub stake_hex: Option, + pub stake_bech32: Option, + pub is_script: Option, +} + +#[derive(Deserialize, Clone)] +pub struct BlockPattern { + pub slot_before: Option, + pub slot_after: Option, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Predicate { + Block(BlockPattern), +} + +impl Predicate { + fn tx_match(&self, point: &Point, _: &ParsedTx) -> bool { + match self { + Predicate::Block(block_pattern) => self.slot_match(point, block_pattern), + } + } + + fn slot_match(&self, point: &Point, block_pattern: &BlockPattern) -> bool { + if let Some(slot_after) = block_pattern.slot_after { + if point.slot_or_default() <= slot_after { + return false; + } + } + + if let Some(slot_before) = block_pattern.slot_before { + if point.slot_or_default() >= slot_before { + return false; + } + } + + true + } +} + +#[derive(Deserialize)] +pub struct Config { + pub predicate: Predicate, +} impl Config { pub fn bootstrapper(self, _ctx: &Context) -> Result { - Ok(Stage::default()) + let stage = Stage { + predicate: self.predicate, + ops_count: Default::default(), + input: Default::default(), + output: Default::default(), + }; + + Ok(stage) } }