Skip to content

Commit

Permalink
Improve Bitcoin syncer and namings
Browse files Browse the repository at this point in the history
  • Loading branch information
ekrembal committed Feb 8, 2025
1 parent 543c7fb commit f9fce5b
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 182 deletions.
145 changes: 145 additions & 0 deletions core/src/bitcoin_syncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::time::Duration;

use bitcoin::{block::Header, BlockHash, Transaction};
use bitcoincore_rpc::RpcApi;
use futures::future::try_join_all;
use tokio::{sync::broadcast, task::JoinHandle, time::sleep};

use crate::{database::Database, errors::BridgeError, extended_rpc::ExtendedRpc};

type ChainHeadPollingResult = (
broadcast::Sender<BitcoinSyncerEvent>,
JoinHandle<Result<(), BridgeError>>,
);

#[derive(Clone, Debug)]
pub struct BlockInfo {
pub block_hash: BlockHash,
pub block_header: Header,
pub block_height: u64,
}

#[derive(Clone, Debug)]
pub struct BlockInfoWithTxs {
pub block_info: BlockInfo,
pub txs: Vec<Transaction>,
}

#[derive(Clone, Debug)]
pub enum BitcoinSyncerEvent {
NewBlocks(Vec<BlockInfo>),
NewBlocksWithTxs(Vec<BlockInfoWithTxs>),
ReorgedBlocks(Vec<BlockHash>),
}

#[derive(Clone, Debug, PartialEq)]
pub enum BitcoinSyncerPollingMode {
SyncOnly,
SyncAndPollTxs,
}

pub async fn get_block_info_from_height(
rpc: &ExtendedRpc,
height: u64,
) -> Result<BlockInfo, BridgeError> {
let block_hash = rpc.client.get_block_hash(height).await?;
let block_header = rpc.client.get_block_header(&block_hash).await?;
Ok(BlockInfo {
block_hash,
block_header,
block_height: height,
})
}

pub async fn start_bitcoin_syncer(
db: Database,
rpc: ExtendedRpc,
poll_delay: Duration,
mode: BitcoinSyncerPollingMode,
) -> Result<ChainHeadPollingResult, BridgeError> {
let (tx, _) = broadcast::channel(100);

let returned_tx = tx.clone();

let mut block_height = db
.get_max_height(None)
.await?
.ok_or(BridgeError::BlockNotFound)?;

let handle = tokio::spawn(async move {
loop {
let mut block_hash = rpc.client.get_block_hash(block_height + 1).await?;
let mut block_header = rpc.client.get_block_header(&block_hash).await?;

let mut new_blocks = vec![BlockInfo {
block_hash,
block_header,
block_height: block_height + 1,
}];

for _ in 0..100 {
// if the previous block is in the db, do nothing
let height = db
.get_height_from_block_hash(None, block_header.prev_blockhash)
.await?;
if height.is_some() {
break;
}

// if the previous block is not in the db, we need to get the previous block

block_hash = block_header.prev_blockhash;
block_header = rpc.client.get_block_header(&block_hash).await?;

let block_info = BlockInfo {
block_hash,
block_header,
block_height,
};
new_blocks.push(block_info);

block_height -= 1;
}

// If we haven't found a match after 100 blocks, the database is too far out of sync
if new_blocks.len() == 100 {
return Err(BridgeError::BlockgazerTooDeep(block_height));
}

// check the reorg blocks
let reorg_blocks = db.delete_chain_head_from_height(None, block_height).await?;

if !reorg_blocks.is_empty() {
tx.send(BitcoinSyncerEvent::ReorgedBlocks(reorg_blocks))?;
}

let mut dbtx = db.begin_transaction().await?;
for block_info in new_blocks.iter() {
db.set_chain_head(Some(&mut dbtx), block_info).await?;
}
dbtx.commit().await?;

if !new_blocks.is_empty() {
match mode {
BitcoinSyncerPollingMode::SyncOnly => {
tx.send(BitcoinSyncerEvent::NewBlocks(new_blocks))?;
}
BitcoinSyncerPollingMode::SyncAndPollTxs => {
let block_futures = new_blocks.iter().map(|block_info| async {
let block = rpc.client.get_block(&block_info.block_hash).await?;
Ok::<_, BridgeError>(BlockInfoWithTxs {
block_info: block_info.clone(),
txs: block.txdata,
})
});
let block_info_with_txs = try_join_all(block_futures).await?;
tx.send(BitcoinSyncerEvent::NewBlocksWithTxs(block_info_with_txs))?;
}
}
}

sleep(poll_delay).await;
}
});
Ok((returned_tx, handle))
}
Original file line number Diff line number Diff line change
@@ -1,60 +1,37 @@
use super::{Database, DatabaseTransaction};
use crate::{errors::BridgeError, execute_query_with_tx};
use super::{wrapper::BlockHashDB, Database, DatabaseTransaction};
use crate::{bitcoin_syncer::BlockInfo, errors::BridgeError, execute_query_with_tx};
use bitcoin::BlockHash;
use std::str::FromStr;

// pub async fn set_tx_sender_chain_head(
// &self,
// tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
// block_hash: BlockHash,
// height: u64,
// ) -> Result<(BlockHash, u64), BridgeError> {
// sqlx::query("DELETE FROM tx_sender_block_info")
// .execute(tx.deref_mut())
// .await?;
// sqlx::query("INSERT INTO tx_sender_block_info (block_hash, height) VALUES ($1, $2)")
// .bind(block_hash.to_string())
// .bind(height as i64)
// .execute(tx.deref_mut())
// .await?;
// Ok((block_hash, height))
// }

// pub async fn get_tx_sender_chain_head(&self) -> Result<Option<(BlockHash, u64)>, BridgeError> {
// let mut tx = self.begin_transaction().await?;
// let ret: Option<(String, i64)> =
// sqlx::query_as("SELECT block_hash, height FROM tx_sender_block_info LIMIT 1")
// .fetch_optional(tx.deref_mut())
// .await?;
// if let Some((block_hash, height)) = ret {
// let block_hash = BlockHash::from_str(&block_hash)?;
// let height = height as u64;
// Ok(Some((block_hash, height)))
// } else {
// Ok(None)
// }
// }

impl Database {
pub async fn set_chain_head(
&self,
tx: Option<DatabaseTransaction<'_, '_>>,
block_hash: BlockHash,
prev_block_hash: BlockHash,
height: u64,
block_info: &BlockInfo,
) -> Result<(), BridgeError> {
let query = sqlx::query(
"INSERT INTO tx_sender_block_info (block_hash, prev_block_hash, height) VALUES ($1, $2, $3)",
"INSERT INTO bitcoin_syncer (blockhash, prev_blockhash, height) VALUES ($1, $2, $3)",
)
.bind(block_hash.to_string())
.bind(prev_block_hash.to_string())
.bind(height as i64);
.bind(BlockHashDB(block_info.block_hash))
.bind(BlockHashDB(block_info.block_header.prev_blockhash))
.bind(block_info.block_height as i64);

execute_query_with_tx!(self.connection, tx, query, execute)?;

Ok(())
}

pub async fn get_max_height(
&self,
tx: Option<DatabaseTransaction<'_, '_>>,
) -> Result<Option<u64>, BridgeError> {
let query =
sqlx::query_as("SELECT height FROM bitcoin_syncer ORDER BY height DESC LIMIT 1");
let result: Option<(i64,)> =
execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
Ok(result.map(|(height,)| height as u64))
}

/// Gets the height from the block hash.
pub async fn get_height_from_block_hash(
&self,
Expand Down
2 changes: 1 addition & 1 deletion core/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::{config::BridgeConfig, errors::BridgeError};
use sqlx::{Pool, Postgres};

mod chain_sycner;
mod bitcoin_syncer;
mod header_chain_prover;
mod operator;
mod tx_sender;
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod tx_sender;
pub mod utils;
pub mod verifier;
pub mod watchtower;
pub mod bitcoin_syncer;

#[cfg(test)]
mod test_utils;
Expand Down
43 changes: 14 additions & 29 deletions core/src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::time::Duration;

use crate::actor::{Actor, WinternitzDerivationPath};
use crate::bitcoin_syncer::{self, get_block_info_from_height, BitcoinSyncerPollingMode};
use crate::config::BridgeConfig;
use crate::database::Database;
use crate::errors::BridgeError;
use crate::extended_rpc::ExtendedRpc;
use crate::musig2::AggregateFromPublicKeys;
use crate::tx_sender::chain_head::{self};
use crate::tx_sender::TxSender;
use bitcoin::{Amount, OutPoint, Txid, XOnlyPublicKey};
use bitcoincore_rpc::RpcApi;
Expand Down Expand Up @@ -46,20 +46,20 @@ impl Operator {

let db = Database::new(&config).await?;

let current_height = rpc.client.get_block_count().await?;
let current_block_hash = rpc.client.get_block_hash(current_height).await?;
let current_block_header = rpc.client.get_block_header(&current_block_hash).await?;
db.set_chain_head(
None,
current_block_hash,
current_block_header.prev_blockhash,
current_height,
)
.await?;
if db.get_max_height(None).await?.is_none() {
let current_height = rpc.client.get_block_count().await?;
let current_block_info = get_block_info_from_height(&rpc, current_height).await?;
db.set_chain_head(None, &current_block_info).await?;
}

// Store sender in a variable to keep it alive
let (sender, _handle) =
chain_head::start_polling(db.clone(), rpc.clone(), Duration::from_secs(1))?;
let (sender, _handle) = bitcoin_syncer::start_bitcoin_syncer(
db.clone(),
rpc.clone(),
Duration::from_secs(1),
BitcoinSyncerPollingMode::SyncOnly,
)
.await?;

let mut receiver = sender.subscribe();

Expand All @@ -68,7 +68,7 @@ impl Operator {
let tx_sender_clone = tx_sender.clone();
tokio::spawn(async move {
tx_sender_clone
.apply_new_chain_event(&mut receiver)
.bitcoin_syncer_event_handler(&mut receiver)
.await
.expect("Failed to apply new chain event");
});
Expand All @@ -88,21 +88,6 @@ impl Operator {
return Err(BridgeError::OperatorWithdrawalFeeNotSet);
}

// let mut tx = db.begin_transaction().await?;
// // check if funding utxo is already set
// if db.get_funding_utxo(Some(&mut tx)).await?.is_none() {
// let outpoint = rpc.send_to_address(&signer.address, Amount::from_sat(200_000_000))?; // TODO: Is this OK to be a fixed value
// let funding_utxo = UTXO {
// outpoint,
// txout: TxOut {
// value: bitcoin::Amount::from_sat(200_000_000),
// script_pubkey: signer.address.script_pubkey(),
// },
// };
// db.set_funding_utxo(Some(&mut tx), funding_utxo).await?;
// }
// tx.commit().await?;

let mut tx = db.begin_transaction().await?;
// check if there is any sequential collateral tx from the current operator
let sequential_collateral_txs = db
Expand Down
Loading

0 comments on commit f9fce5b

Please sign in to comment.