Skip to content

Commit

Permalink
Convert coordinator/substrate/db to use create_db macro (#436)
Browse files Browse the repository at this point in the history
* chore: implement create_db for substrate (fix broken branch)

* Correct rebase artifacts

* chore: remove todo statement

* chore: rename BlockDb to NextBlock

* chore: return empty tuple instead of empty array for event storage

* Finish rebasing

* .Minor tweaks to remove leftover variables

These may be rebase artifacts.

---------

Co-authored-by: Luke Parker <[email protected]>
  • Loading branch information
davidjohnbell and kayabaNerve authored Dec 8, 2023
1 parent a6947d6 commit 16b22dd
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 98 deletions.
6 changes: 3 additions & 3 deletions coordinator/src/cosign_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use processor_messages::coordinator::cosign_block_msg;

use crate::{
p2p::{CosignedBlock, P2pMessageKind, P2p},
substrate::SubstrateDb,
substrate::LatestCosignedBlock,
};

create_db! {
Expand Down Expand Up @@ -67,9 +67,9 @@ impl<D: Db> CosignEvaluator<D> {

let mut db_lock = self.db.lock().await;
let mut txn = db_lock.txn();
if highest_block > SubstrateDb::<D>::latest_cosigned_block(&txn) {
if highest_block > LatestCosignedBlock::latest_cosigned_block(&txn) {
log::info!("setting latest cosigned block to {}", highest_block);
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, highest_block);
LatestCosignedBlock::set(&mut txn, &highest_block);
}
txn.commit();
}
Expand Down
94 changes: 24 additions & 70 deletions coordinator/src/substrate/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use serai_client::{

pub use serai_db::*;

create_db! {
create_db!(
SubstrateDb {
CosignTriggered: () -> (),
IntendedCosign: () -> (u64, Option<u64>),
BlockHasEvents: (block: u64) -> u8,
LatestCosignedBlock: () -> u64,
NextBlock: () -> u64,
EventDb: (id: &[u8], index: u32) -> (),
BatchInstructionsHashDb: (network: NetworkId, id: u32) -> [u8; 32]
}
}
);

impl IntendedCosign {
pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) {
Expand All @@ -26,82 +30,32 @@ impl IntendedCosign {
}
}

db_channel! {
SubstrateDb {
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
impl LatestCosignedBlock {
pub fn latest_cosigned_block(getter: &impl Get) -> u64 {
Self::get(getter).unwrap_or_default().max(1)
}
}
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
}
}

#[derive(Debug)]
pub struct SubstrateDb<D: Db>(pub D);
impl<D: Db> SubstrateDb<D> {
pub fn new(db: D) -> Self {
Self(db)
}

fn substrate_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"coordinator_substrate", dst, key)
}

fn next_block_key() -> Vec<u8> {
Self::substrate_key(b"next_block", [])
}
pub fn set_next_block(&mut self, block: u64) {
let mut txn = self.0.txn();
txn.put(Self::next_block_key(), block.to_le_bytes());
txn.commit();
}
pub fn next_block(&self) -> u64 {
u64::from_le_bytes(self.0.get(Self::next_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap())
impl EventDb {
pub fn is_unhandled(getter: &impl Get, id: &[u8], index: u32) -> bool {
Self::get(getter, id, index).is_none()
}

fn latest_cosigned_block_key() -> Vec<u8> {
Self::substrate_key(b"latest_cosigned_block", [])
}
pub fn set_latest_cosigned_block(txn: &mut D::Transaction<'_>, latest_cosigned_block: u64) {
txn.put(Self::latest_cosigned_block_key(), latest_cosigned_block.to_le_bytes());
}
pub fn latest_cosigned_block<G: Get>(getter: &G) -> u64 {
let db = u64::from_le_bytes(
getter.get(Self::latest_cosigned_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap(),
);
// Mark the genesis as cosigned
db.max(1)
pub fn handle_event(txn: &mut impl DbTxn, id: &[u8], index: u32) {
assert!(Self::is_unhandled(txn, id, index));
Self::set(txn, id, index, &());
}
}

fn event_key(id: &[u8], index: u32) -> Vec<u8> {
Self::substrate_key(b"event", [id, index.to_le_bytes().as_ref()].concat())
}
pub fn handled_event<G: Get>(getter: &G, id: [u8; 32], index: u32) -> bool {
getter.get(Self::event_key(&id, index)).is_some()
}
pub fn handle_event(txn: &mut D::Transaction<'_>, id: [u8; 32], index: u32) {
assert!(!Self::handled_event(txn, id, index));
txn.put(Self::event_key(&id, index), []);
db_channel! {
SubstrateDbChannels {
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
}
}

fn batch_instructions_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::substrate_key(b"batch", (network, id).encode())
}
pub fn batch_instructions_hash<G: Get>(
getter: &G,
network: NetworkId,
id: u32,
) -> Option<[u8; 32]> {
getter.get(Self::batch_instructions_key(network, id)).map(|bytes| bytes.try_into().unwrap())
}
pub fn save_batch_instructions_hash(
txn: &mut D::Transaction<'_>,
network: NetworkId,
id: u32,
hash: [u8; 32],
) {
txn.put(Self::batch_instructions_key(network, id), hash);
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
}
}
50 changes: 25 additions & 25 deletions coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
network_had_event(&mut burns, &mut batches, network);

let mut txn = db.txn();
SubstrateDb::<D>::save_batch_instructions_hash(&mut txn, network, id, instructions_hash);
BatchInstructionsHashDb::set(&mut txn, network, id, &instructions_hash);
txn.commit();

// Make sure this is the only Batch event for this network in this Block
Expand Down Expand Up @@ -239,7 +239,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
// Handle a specific Substrate block, returning an error when it fails to get data
// (not blocking / holding)
async fn handle_block<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>,
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
Expand Down Expand Up @@ -268,27 +268,27 @@ async fn handle_block<D: Db, Pro: Processors>(
continue;
}

if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if EventDb::is_unhandled(db, &hash, event_id) {
log::info!("found fresh new set event {:?}", new_set);
let mut txn = db.0.txn();
let mut txn = db.txn();
handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();
}
event_id += 1;
}

// If a key pair was confirmed, inform the processor
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if EventDb::is_unhandled(db, &hash, event_id) {
log::info!("found fresh key gen event {:?}", key_gen);
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
handle_key_gen(processors, serai, &block, set, key_pair).await?;
} else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
}
let mut txn = db.0.txn();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
let mut txn = db.txn();
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();
}
event_id += 1;
Expand All @@ -303,12 +303,12 @@ async fn handle_block<D: Db, Pro: Processors>(
continue;
}

if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if EventDb::is_unhandled(db, &hash, event_id) {
log::info!("found fresh set retired event {:?}", retired_set);
let mut txn = db.0.txn();
let mut txn = db.txn();
crate::ActiveTributaryDb::retire_tributary(&mut txn, set);
tributary_retired.send(set).unwrap();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();
}
event_id += 1;
Expand All @@ -319,18 +319,18 @@ async fn handle_block<D: Db, Pro: Processors>(
// following events share data collection
// This does break the uniqueness of (hash, event_id) -> one event, yet
// (network, (hash, event_id)) remains valid as a unique ID for an event
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
handle_batch_and_burns(&mut db.0, processors, serai, &block).await?;
if EventDb::is_unhandled(db, &hash, event_id) {
handle_batch_and_burns(db, processors, serai, &block).await?;
}
let mut txn = db.0.txn();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
let mut txn = db.txn();
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();

Ok(())
}

async fn handle_new_blocks<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>,
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
Expand Down Expand Up @@ -394,7 +394,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
}
}

let mut txn = db.0.txn();
let mut txn = db.txn();
let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else {
IntendedCosign::set_intended_cosign(&mut txn, 1);
txn.commit();
Expand Down Expand Up @@ -506,7 +506,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
// cosigned
if let Some(has_no_cosigners) = has_no_cosigners {
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, has_no_cosigners.number());
LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number());
} else {
CosignTriggered::set(&mut txn, &());
for (set, block, hash) in cosign {
Expand All @@ -518,7 +518,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
}

// Reduce to the latest cosigned block
let latest_number = latest_number.min(SubstrateDb::<D>::latest_cosigned_block(&db.0));
let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db));

if latest_number < *next_block {
return Ok(());
Expand All @@ -540,25 +540,25 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
)
.await?;
*next_block += 1;
db.set_next_block(*next_block);
let mut txn = db.txn();
NextBlock::set(&mut txn, next_block);
txn.commit();
log::info!("handled substrate block {b}");
}

Ok(())
}

pub async fn scan_task<D: Db, Pro: Processors>(
db: D,
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro,
serai: Arc<Serai>,
new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
tributary_retired: mpsc::UnboundedSender<ValidatorSet>,
) {
log::info!("scanning substrate");

let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();
let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default();

/*
let new_substrate_block_notifier = {
Expand Down Expand Up @@ -680,7 +680,7 @@ pub(crate) async fn verify_published_batches<D: Db>(
// TODO: Localize from MainDb to SubstrateDb
let last = crate::LastVerifiedBatchDb::get(txn, network);
for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to {
let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else {
let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else {
break;
};
let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap();
Expand Down

0 comments on commit 16b22dd

Please sign in to comment.