Skip to content

Commit

Permalink
fix(sink): properly handle multiple reset templates separately in Sql…
Browse files Browse the repository at this point in the history
…Db sink
  • Loading branch information
Mercurial committed May 20, 2024
1 parent 9aaff9d commit beed16a
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions src/sinks/sql_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,39 @@ impl gasket::framework::Worker<Stage> for Worker {
async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> {
let point = unit.point().clone();

let template = match unit {
let templates = match unit {
ChainEvent::Apply(p, r) => {
let data = hbs_data(p.clone(), Some(r.clone()));
match r {
let template = match r {
Record::CborBlock(_) => stage.templates.render("apply_cbor_block", &data),
Record::CborTx(_) => stage.templates.render("apply_cbor_tx", &data),
_ => stage.templates.render("apply", &data),
}
};
vec![template]
}
ChainEvent::Undo(p, r) => {
let data = hbs_data(p.clone(), Some(r.clone()));
match r {
let template = match r {
Record::CborBlock(_) => stage.templates.render("undo_cbor_block", &data),
Record::CborTx(_) => stage.templates.render("undo_cbor_tx", &data),
_ => stage.templates.render("undo", &data),
}
};
vec![template]
}
ChainEvent::Reset(p) => {
let data = hbs_data(p.clone(), None);
stage.templates.render("reset_cbor_block", &data).ok();
stage.templates.render("reset_cbor_tx", &data)
vec![
stage.templates.render("reset_cbor_block", &data),
stage.templates.render("reset_cbor_tx", &data),
]
}
};

let statement = template.or_panic()?;

let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
debug!(rows = result.rows_affected(), "sql statement executed");
for template in templates {
let statement = template.or_panic()?;
let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
debug!(rows = result.rows_affected(), "sql statement executed");
}

stage.ops_count.inc(1);
stage.latest_block.set(point.slot_or_default() as i64);
Expand Down

0 comments on commit beed16a

Please sign in to comment.