Skip to content

Commit

Permalink
feat: added filter by slot
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Aug 17, 2023
1 parent 98cca15 commit 1dbc1f2
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 6 deletions.
28 changes: 28 additions & 0 deletions examples/match_pattern/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[intersect]
type = "Point"
value = [
4493860,
"ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc",
]

[[filters]]
type = "SplitBlock"

[[filters]]
type = "ParseCbor"

[[filters]]
type = "MatchPattern"

[filters.predicate.block]
slot_after = 4493920
slot_before = 4494020

[sink]
type = "Redis"
url = "redis://127.0.0.1/1"
stream_name = "my-stream"
89 changes: 83 additions & 6 deletions src/filters/match_pattern.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use gasket::framework::*;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use tracing::error;

use crate::framework::*;

#[derive(Default, Stage)]
#[derive(Stage)]
#[stage(name = "filter-match-pattern", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
predicate: Predicate,

pub input: FilterInputPort,
pub output: FilterOutputPort,

Expand All @@ -22,17 +26,90 @@ impl From<&Stage> for Worker {
}
}

gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let out = unit.clone();
gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let out = match unit {
ChainEvent::Apply(point, record) => match record {
Record::ParsedTx(tx) => {
if stage.predicate.tx_match(point, tx) {
Ok(Some(unit.to_owned()))
} else {
Ok(None)
}
},
_ => {
error!("The MatchPattern filter is valid only with the ParsedTx record");
Err(WorkerError::Panic)
}
},
_ => Ok(Some(unit.to_owned()))
}?;

stage.ops_count.inc(1);

out
});

#[derive(Default, Deserialize)]
pub struct Config {}
#[derive(Deserialize, Clone)]
pub struct AddressPattern {
pub exact_hex: Option<String>,
pub exact_bech32: Option<String>,
pub payment_hex: Option<String>,
pub payment_bech32: Option<String>,
pub stake_hex: Option<String>,
pub stake_bech32: Option<String>,
pub is_script: Option<bool>,
}

#[derive(Deserialize, Clone)]
pub struct BlockPattern {
pub slot_before: Option<u64>,
pub slot_after: Option<u64>,
}

#[derive(Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum Predicate {
Block(BlockPattern),
}

impl Predicate {
fn tx_match(&self, point: &Point, _: &ParsedTx) -> bool {
match self {
Predicate::Block(block_pattern) => self.slot_match(point, block_pattern),
}
}

fn slot_match(&self, point: &Point, block_pattern: &BlockPattern) -> bool {
if let Some(slot_after) = block_pattern.slot_after {
if point.slot_or_default() <= slot_after {
return false;
}
}

if let Some(slot_before) = block_pattern.slot_before {
if point.slot_or_default() >= slot_before {
return false;
}
}

true
}
}

#[derive(Deserialize)]
pub struct Config {
pub predicate: Predicate,
}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
Ok(Stage::default())
let stage = Stage {
predicate: self.predicate,
ops_count: Default::default(),
input: Default::default(),
output: Default::default(),
};

Ok(stage)
}
}

0 comments on commit 1dbc1f2

Please sign in to comment.