Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert coordinator/substrate/db to use create_db macro #436

Merged
merged 7 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions coordinator/src/cosign_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use processor_messages::coordinator::cosign_block_msg;

use crate::{
p2p::{CosignedBlock, P2pMessageKind, P2p},
substrate::SubstrateDb,
substrate::LatestCosignedBlock,
};

create_db! {
Expand Down Expand Up @@ -67,9 +67,9 @@ impl<D: Db> CosignEvaluator<D> {

let mut db_lock = self.db.lock().await;
let mut txn = db_lock.txn();
if highest_block > SubstrateDb::<D>::latest_cosigned_block(&txn) {
if highest_block > LatestCosignedBlock::latest_cosigned_block(&txn) {
log::info!("setting latest cosigned block to {}", highest_block);
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, highest_block);
LatestCosignedBlock::set(&mut txn, &highest_block);
}
txn.commit();
}
Expand Down
94 changes: 24 additions & 70 deletions coordinator/src/substrate/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use serai_client::{

pub use serai_db::*;

create_db! {
create_db!(
SubstrateDb {
CosignTriggered: () -> (),
IntendedCosign: () -> (u64, Option<u64>),
BlockHasEvents: (block: u64) -> u8,
LatestCosignedBlock: () -> u64,
NextBlock: () -> u64,
EventDb: (id: &[u8], index: u32) -> (),
BatchInstructionsHashDb: (network: NetworkId, id: u32) -> [u8; 32]
}
}
);

impl IntendedCosign {
pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) {
Expand All @@ -26,82 +30,32 @@ impl IntendedCosign {
}
}

db_channel! {
SubstrateDb {
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
impl LatestCosignedBlock {
pub fn latest_cosigned_block(getter: &impl Get) -> u64 {
Self::get(getter).unwrap_or_default().max(1)
}
}
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
}
}

#[derive(Debug)]
pub struct SubstrateDb<D: Db>(pub D);
impl<D: Db> SubstrateDb<D> {
pub fn new(db: D) -> Self {
Self(db)
}

fn substrate_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"coordinator_substrate", dst, key)
}

fn next_block_key() -> Vec<u8> {
Self::substrate_key(b"next_block", [])
}
pub fn set_next_block(&mut self, block: u64) {
let mut txn = self.0.txn();
txn.put(Self::next_block_key(), block.to_le_bytes());
txn.commit();
}
pub fn next_block(&self) -> u64 {
u64::from_le_bytes(self.0.get(Self::next_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap())
impl EventDb {
pub fn is_unhandled(getter: &impl Get, id: &[u8], index: u32) -> bool {
Self::get(getter, id, index).is_none()
}

fn latest_cosigned_block_key() -> Vec<u8> {
Self::substrate_key(b"latest_cosigned_block", [])
}
pub fn set_latest_cosigned_block(txn: &mut D::Transaction<'_>, latest_cosigned_block: u64) {
txn.put(Self::latest_cosigned_block_key(), latest_cosigned_block.to_le_bytes());
}
pub fn latest_cosigned_block<G: Get>(getter: &G) -> u64 {
let db = u64::from_le_bytes(
getter.get(Self::latest_cosigned_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap(),
);
// Mark the genesis as cosigned
db.max(1)
pub fn handle_event(txn: &mut impl DbTxn, id: &[u8], index: u32) {
assert!(Self::is_unhandled(txn, id, index));
Self::set(txn, id, index, &());
}
}

fn event_key(id: &[u8], index: u32) -> Vec<u8> {
Self::substrate_key(b"event", [id, index.to_le_bytes().as_ref()].concat())
}
pub fn handled_event<G: Get>(getter: &G, id: [u8; 32], index: u32) -> bool {
getter.get(Self::event_key(&id, index)).is_some()
}
pub fn handle_event(txn: &mut D::Transaction<'_>, id: [u8; 32], index: u32) {
assert!(!Self::handled_event(txn, id, index));
txn.put(Self::event_key(&id, index), []);
db_channel! {
SubstrateDbChannels {
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
}
}

fn batch_instructions_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::substrate_key(b"batch", (network, id).encode())
}
pub fn batch_instructions_hash<G: Get>(
getter: &G,
network: NetworkId,
id: u32,
) -> Option<[u8; 32]> {
getter.get(Self::batch_instructions_key(network, id)).map(|bytes| bytes.try_into().unwrap())
}
pub fn save_batch_instructions_hash(
txn: &mut D::Transaction<'_>,
network: NetworkId,
id: u32,
hash: [u8; 32],
) {
txn.put(Self::batch_instructions_key(network, id), hash);
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
}
}
50 changes: 25 additions & 25 deletions coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
network_had_event(&mut burns, &mut batches, network);

let mut txn = db.txn();
SubstrateDb::<D>::save_batch_instructions_hash(&mut txn, network, id, instructions_hash);
BatchInstructionsHashDb::set(&mut txn, network, id, &instructions_hash);
txn.commit();

// Make sure this is the only Batch event for this network in this Block
Expand Down Expand Up @@ -239,7 +239,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
// Handle a specific Substrate block, returning an error when it fails to get data
// (not blocking / holding)
async fn handle_block<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>,
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
Expand Down Expand Up @@ -268,27 +268,27 @@ async fn handle_block<D: Db, Pro: Processors>(
continue;
}

if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if EventDb::is_unhandled(db, &hash, event_id) {
log::info!("found fresh new set event {:?}", new_set);
let mut txn = db.0.txn();
let mut txn = db.txn();
handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();
}
event_id += 1;
}

// If a key pair was confirmed, inform the processor
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if EventDb::is_unhandled(db, &hash, event_id) {
log::info!("found fresh key gen event {:?}", key_gen);
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
handle_key_gen(processors, serai, &block, set, key_pair).await?;
} else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
}
let mut txn = db.0.txn();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
let mut txn = db.txn();
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();
}
event_id += 1;
Expand All @@ -303,12 +303,12 @@ async fn handle_block<D: Db, Pro: Processors>(
continue;
}

if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if EventDb::is_unhandled(db, &hash, event_id) {
log::info!("found fresh set retired event {:?}", retired_set);
let mut txn = db.0.txn();
let mut txn = db.txn();
crate::ActiveTributaryDb::retire_tributary(&mut txn, set);
tributary_retired.send(set).unwrap();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();
}
event_id += 1;
Expand All @@ -319,18 +319,18 @@ async fn handle_block<D: Db, Pro: Processors>(
// following events share data collection
// This does break the uniqueness of (hash, event_id) -> one event, yet
// (network, (hash, event_id)) remains valid as a unique ID for an event
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
handle_batch_and_burns(&mut db.0, processors, serai, &block).await?;
if EventDb::is_unhandled(db, &hash, event_id) {
handle_batch_and_burns(db, processors, serai, &block).await?;
}
let mut txn = db.0.txn();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
let mut txn = db.txn();
EventDb::handle_event(&mut txn, &hash, event_id);
txn.commit();

Ok(())
}

async fn handle_new_blocks<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>,
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
Expand Down Expand Up @@ -394,7 +394,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
}
}

let mut txn = db.0.txn();
let mut txn = db.txn();
let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else {
IntendedCosign::set_intended_cosign(&mut txn, 1);
txn.commit();
Expand Down Expand Up @@ -506,7 +506,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
// cosigned
if let Some(has_no_cosigners) = has_no_cosigners {
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, has_no_cosigners.number());
LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number());
} else {
CosignTriggered::set(&mut txn, &());
for (set, block, hash) in cosign {
Expand All @@ -518,7 +518,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
}

// Reduce to the latest cosigned block
let latest_number = latest_number.min(SubstrateDb::<D>::latest_cosigned_block(&db.0));
let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db));

if latest_number < *next_block {
return Ok(());
Expand All @@ -540,25 +540,25 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
)
.await?;
*next_block += 1;
db.set_next_block(*next_block);
let mut txn = db.txn();
NextBlock::set(&mut txn, next_block);
txn.commit();
log::info!("handled substrate block {b}");
}

Ok(())
}

pub async fn scan_task<D: Db, Pro: Processors>(
db: D,
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro,
serai: Arc<Serai>,
new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
tributary_retired: mpsc::UnboundedSender<ValidatorSet>,
) {
log::info!("scanning substrate");

let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();
let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default();

/*
let new_substrate_block_notifier = {
Expand Down Expand Up @@ -680,7 +680,7 @@ pub(crate) async fn verify_published_batches<D: Db>(
// TODO: Localize from MainDb to SubstrateDb
let last = crate::LastVerifiedBatchDb::get(txn, network);
for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to {
let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else {
let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else {
break;
};
let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap();
Expand Down