diff --git a/src/sinks/sql_db.rs b/src/sinks/sql_db.rs index 55eaed52..892c0c00 100644 --- a/src/sinks/sql_db.rs +++ b/src/sinks/sql_db.rs @@ -43,34 +43,39 @@ impl gasket::framework::Worker for Worker { async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> { let point = unit.point().clone(); - let template = match unit { + let templates = match unit { ChainEvent::Apply(p, r) => { let data = hbs_data(p.clone(), Some(r.clone())); - match r { + let template = match r { Record::CborBlock(_) => stage.templates.render("apply_cbor_block", &data), Record::CborTx(_) => stage.templates.render("apply_cbor_tx", &data), _ => stage.templates.render("apply", &data), - } + }; + vec![template] } ChainEvent::Undo(p, r) => { let data = hbs_data(p.clone(), Some(r.clone())); - match r { + let template = match r { Record::CborBlock(_) => stage.templates.render("undo_cbor_block", &data), Record::CborTx(_) => stage.templates.render("undo_cbor_tx", &data), _ => stage.templates.render("undo", &data), - } + }; + vec![template] } ChainEvent::Reset(p) => { let data = hbs_data(p.clone(), None); - stage.templates.render("reset_cbor_block", &data).ok(); - stage.templates.render("reset_cbor_tx", &data) + vec![ + stage.templates.render("reset_cbor_block", &data), + stage.templates.render("reset_cbor_tx", &data), + ] } }; - let statement = template.or_panic()?; - - let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?; - debug!(rows = result.rows_affected(), "sql statement executed"); + for template in templates { + let statement = template.or_panic()?; + let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?; + debug!(rows = result.rows_affected(), "sql statement executed"); + } stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64);