diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1d4141ac..efdfad8e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -109,6 +109,8 @@ jobs: tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} file: ./dockerfiles/components/ordhook.dockerfile + build-args: | + GIT_COMMIT=${{ env.GITHUB_SHA_SHORT }} cache-from: type=gha cache-to: type=gha,mode=max # Only push if (there's a new release on main branch, or if building a non-main branch) and (Only run on non-PR events or only PRs that aren't from forks) diff --git a/Cargo.lock b/Cargo.lock index 0039fe44..17bcf435 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -259,6 +259,29 @@ dependencies = [ "syn 2.0.41", ] +[[package]] +name = "bindgen" +version = "0.68.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726e4313eb6ec35d2730258ad4e15b547ee75d6afaa1361a922e78e59b7d8078" +dependencies = [ + "bitflags 2.4.1", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.41", + "which", +] + [[package]] name = "bitcoin" version = "0.31.0" @@ -454,12 +477,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chainhook-sdk" -version = "0.12.0" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4689cd74c5a15903b5e078b1c8247469dadbb835475793c1cd9cc93da24c5acd" +checksum = "a9f1d697633a4b8394f185f41634faaf555ba04f2881ac19e1db4b1bd0130155" dependencies = [ "base58 0.2.0", - "base64 0.13.1", + "base64 0.21.5", "bitcoincore-rpc", "bitcoincore-rpc-json", "chainhook-types", @@ -489,9 +512,9 @@ dependencies = [ [[package]] name = "chainhook-types" -version = "1.3.0" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03f5e0a721da223301c8043cdb9e54dae7731273584efda2ade15e0ea46516d4" +checksum = "44b67edc1e9b87382a973d203eada222554a774d9fae55e7d40fb2accb372716" dependencies = [ "hex", "schemars 0.8.16", @@ -515,6 +538,33 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "ciborium" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656" + +[[package]] +name = "ciborium-ll" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -1344,6 +1394,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + [[package]] name = "hashbrown" version = "0.11.2" @@ -1504,6 +1560,15 @@ dependencies = [ "hmac 0.8.1", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "http" version = "0.2.11" @@ -1795,7 +1860,7 @@ version = "0.11.0+8.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" dependencies = [ - "bindgen", + "bindgen 0.65.1", "bzip2-sys", "cc", "glob", @@ -2236,6 +2301,7 @@ dependencies = [ "anyhow", "atty", "chainhook-sdk", + "ciborium", "crossbeam-channel", "dashmap", "flate2", @@ -2280,6 +2346,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "tcmalloc2", "toml 0.5.11", ] @@ -3804,6 +3871,16 @@ version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a" +[[package]] +name = "tcmalloc2" +version = "0.1.2+2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5af9a4fe36969c3ae05e5ba829b191d671fc1bb6569e0245f0b57970da9f04a5" +dependencies = [ + "bindgen 0.68.1", + "num_cpus", +] + [[package]] name = "tempfile" version = "3.8.1" @@ -4402,6 +4479,18 @@ version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/components/ordhook-cli/Cargo.toml b/components/ordhook-cli/Cargo.toml index 8ed97c01..83fb8634 100644 --- a/components/ordhook-cli/Cargo.toml +++ b/components/ordhook-cli/Cargo.toml @@ -24,9 +24,11 @@ clap = { version = "3.2.23", features = ["derive"], optional = true } clap_generate = { version = "3.0.3", optional = true } toml = { version = "0.5.6", features = ["preserve_order"], optional = true } ctrlc = { version = "3.2.2", optional = true } +tcmalloc2 = { version = "0.1.2+2.13", optional = true } [features] default = ["cli"] cli = ["clap", "clap_generate", "toml", "ctrlc", "hiro-system-kit/log"] debug = ["hiro-system-kit/debug"] release = ["hiro-system-kit/release"] +tcmalloc = ["tcmalloc2"] \ No newline at end of file diff --git a/components/ordhook-cli/build.rs b/components/ordhook-cli/build.rs new file mode 100644 index 00000000..71d6bd9a --- /dev/null +++ b/components/ordhook-cli/build.rs @@ -0,0 +1,29 @@ +use std::process::Command; + +fn current_git_hash() -> Option { + if option_env!("GIT_COMMIT") == None { + let commit = Command::new("git") + .arg("log") + .arg("-1") + .arg("--pretty=format:%h") // Abbreviated commit hash + .current_dir(env!("CARGO_MANIFEST_DIR")) + .output(); + + if let Ok(commit) = commit { + if let Ok(commit) = String::from_utf8(commit.stdout) { + return Some(commit); + } + } + } else { + return option_env!("GIT_COMMIT").map(String::from); + } + + None +} + +fn main() { + // note: add error checking yourself. + if let Some(git) = current_git_hash() { + println!("cargo:rustc-env=GIT_COMMIT={}", git); + } +} diff --git a/components/ordhook-cli/src/cli/mod.rs b/components/ordhook-cli/src/cli/mod.rs index 93ffbbbc..c4d5875b 100644 --- a/components/ordhook-cli/src/cli/mod.rs +++ b/components/ordhook-cli/src/cli/mod.rs @@ -32,6 +32,7 @@ use ordhook::db::{ use ordhook::download::download_ordinals_dataset_if_required; use ordhook::hex; use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate; +use ordhook::service::observers::initialize_observers_db; use ordhook::service::{start_observer_forwarding, Service}; use reqwest::Client as HttpClient; use std::io::{BufReader, Read}; @@ -163,6 +164,8 @@ struct ScanTransactionCommand { pub block_height: u64, /// Inscription Id pub transaction_id: String, + /// Input index + pub input_index: usize, /// Target Regtest network #[clap( long = "regtest", @@ -342,6 +345,9 @@ struct StartCommand { /// Check blocks integrity #[clap(long = "check-blocks-integrity")] pub block_integrity_check: bool, + /// Stream indexing to observers + #[clap(long = "stream-indexing")] + pub stream_indexing_to_observers: bool, } #[derive(Subcommand, PartialEq, Clone, Debug)] @@ -558,6 +564,9 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { None, cmd.auth_token, )?; + + let _ = initialize_observers_db(&config.expected_cache_path(), ctx); + scan_bitcoin_chainstate_via_rpc_using_predicate( &predicate_spec, &config, @@ -575,15 +584,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { while let Some(block_height) = block_range.pop_front() { let inscriptions = find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, ctx); - let mut locations = + let locations = find_all_transfers_in_block(&block_height, &inscriptions_db_conn, ctx); let mut total_transfers_in_block = 0; for (_, inscription) in inscriptions.iter() { println!("Inscription {} revealed at block #{} (inscription_number {}, ordinal_number {})", inscription.get_inscription_id(), block_height, inscription.inscription_number.jubilee, inscription.ordinal_number); - if let Some(transfers) = locations.remove(&inscription.get_inscription_id()) - { + if let Some(transfers) = locations.get(&inscription.ordinal_number) { for t in transfers.iter().skip(1) { total_transfers_in_block += 1; println!( @@ -672,18 +680,21 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { .await?; let transaction_identifier = TransactionIdentifier::new(&cmd.transaction_id); let cache = new_traversals_lazy_cache(100); - let (res, mut back_trace) = compute_satoshi_number( + let (res, _, mut back_trace) = compute_satoshi_number( &config.get_ordhook_config().db_path, &block.block_identifier, &transaction_identifier, + cmd.input_index, 0, &Arc::new(cache), + config.resources.ulimit, + config.resources.memory_available, true, ctx, )?; back_trace.reverse(); - for (block_height, tx) in back_trace.iter() { - println!("{}\t{}", block_height, hex::encode(tx)); + for (block_height, tx, index) in back_trace.iter() { + println!("{}\t{}:{}", block_height, hex::encode(tx), index); } println!("{:?}", res); } @@ -707,12 +718,21 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let last_known_block = find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?; if last_known_block.is_none() { - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx); + open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + ); } let ordhook_config = config.get_ordhook_config(); - - info!(ctx.expect_logger(), "Starting service...",); + let version = env!("GIT_COMMIT"); + info!( + ctx.expect_logger(), + "Starting service (git_commit = {})...", version + ); let start_block = match cmd.start_at_block { Some(entry) => entry, @@ -744,7 +764,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let mut service = Service::new(config, ctx.clone()); return service - .run(predicates, None, cmd.block_integrity_check) + .run( + predicates, + None, + cmd.block_integrity_check, + cmd.stream_indexing_to_observers, + ) .await; } }, @@ -766,7 +791,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { Command::Db(OrdhookDbCommand::New(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; initialize_ordhook_db(&config.expected_cache_path(), ctx); - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx); + open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + ); } Command::Db(OrdhookDbCommand::Sync(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; @@ -779,10 +810,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; let mut ordhook_config = config.get_ordhook_config(); if let Some(network_threads) = cmd.network_threads { - ordhook_config.network_thread_max = network_threads; - } - if let Some(network_threads) = cmd.network_threads { - ordhook_config.network_thread_max = network_threads; + ordhook_config.resources.bitcoind_rpc_threads = network_threads; } let blocks = cmd.get_blocks(); let block_ingestion_processor = @@ -800,6 +828,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let blocks_db = open_ordhook_db_conn_rocks_db_loop( false, &config.get_ordhook_config().db_path, + config.resources.ulimit, + config.resources.memory_available, ctx, ); for i in cmd.get_blocks().into_iter() { @@ -819,7 +849,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; let mut ordhook_config = config.get_ordhook_config(); if let Some(network_threads) = cmd.network_threads { - ordhook_config.network_thread_max = network_threads; + ordhook_config.resources.bitcoind_rpc_threads = network_threads; } let block_post_processor = match cmd.repair_observers { Some(true) => { @@ -870,8 +900,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { Command::Db(OrdhookDbCommand::Check(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; { - let blocks_db = - open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?; + let blocks_db = open_readonly_ordhook_db_conn_rocks_db( + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + )?; let tip = find_last_block_inserted(&blocks_db); println!("Tip: {}", tip); let missing_blocks = find_missing_blocks(&blocks_db, 1, tip, ctx); @@ -880,11 +914,26 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { } Command::Db(OrdhookDbCommand::Drop(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; - let blocks_db = - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx); + let blocks_db = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + ); let inscriptions_db_conn_rw = open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?; + println!( + "{} blocks will be deleted. Confirm? [Y/n]", + cmd.end_block - cmd.start_block + 1 + ); + let mut buffer = String::new(); + std::io::stdin().read_line(&mut buffer).unwrap(); + if buffer.starts_with('n') { + return Err("Deletion aborted".to_string()); + } + delete_data_in_ordhook_db( cmd.start_block, cmd.end_block, diff --git a/components/ordhook-cli/src/config/file.rs b/components/ordhook-cli/src/config/file.rs index 1234c725..5cc0a3c8 100644 --- a/components/ordhook-cli/src/config/file.rs +++ b/components/ordhook-cli/src/config/file.rs @@ -1,29 +1,24 @@ use ordhook::chainhook_sdk::indexer::IndexerConfig; +use ordhook::chainhook_sdk::observer::DEFAULT_INGESTION_PORT; use ordhook::chainhook_sdk::types::{ BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig, }; use ordhook::config::{ - BootstrapConfig, Config, LimitsConfig, LogConfig, PredicatesApi, PredicatesApiConfig, - StorageConfig, + Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig, + StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT, + DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT, }; use std::fs::File; use std::io::{BufReader, Read}; -pub const DEFAULT_INGESTION_PORT: u16 = 20455; -pub const DEFAULT_CONTROL_PORT: u16 = 20456; -pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50; -pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50; - #[derive(Deserialize, Debug, Clone)] pub struct ConfigFile { pub storage: StorageConfigFile, pub http_api: Option, - pub limits: LimitsConfigFile, + pub resources: ResourcesConfigFile, pub network: NetworkConfigFile, pub logs: Option, - pub bootstrap: Option, + pub snapshot: Option, } impl ConfigFile { @@ -54,12 +49,12 @@ impl ConfigFile { _ => return Err("network.mode not supported".to_string()), }; - let bootstrap = match config_file.bootstrap { + let snapshot = match config_file.snapshot { Some(bootstrap) => match bootstrap.download_url { - Some(ref url) => BootstrapConfig::Download(url.to_string()), - None => BootstrapConfig::Build, + Some(ref url) => SnapshotConfig::Download(url.to_string()), + None => SnapshotConfig::Build, }, - None => BootstrapConfig::Build, + None => SnapshotConfig::Build, }; let config = Config { @@ -76,36 +71,29 @@ impl ConfigFile { }), }, }, - bootstrap, - limits: LimitsConfig { - max_number_of_stacks_predicates: config_file - .limits - .max_number_of_stacks_predicates - .unwrap_or(STACKS_MAX_PREDICATE_REGISTRATION), - max_number_of_bitcoin_predicates: config_file - .limits - .max_number_of_bitcoin_predicates - .unwrap_or(BITCOIN_MAX_PREDICATE_REGISTRATION), - max_number_of_concurrent_stacks_scans: config_file - .limits - .max_number_of_concurrent_stacks_scans - .unwrap_or(STACKS_SCAN_THREAD_POOL_SIZE), - max_number_of_concurrent_bitcoin_scans: config_file - .limits - .max_number_of_concurrent_bitcoin_scans - .unwrap_or(BITCOIN_SCAN_THREAD_POOL_SIZE), - max_number_of_processing_threads: config_file - .limits - .max_number_of_processing_threads - .unwrap_or(1.max(num_cpus::get().saturating_sub(1))), - bitcoin_concurrent_http_requests_max: config_file - .limits - .bitcoin_concurrent_http_requests_max - .unwrap_or(1.max(num_cpus::get().saturating_sub(1))), - max_caching_memory_size_mb: config_file - .limits - .max_caching_memory_size_mb - .unwrap_or(2048), + snapshot, + resources: ResourcesConfig { + ulimit: config_file.resources.ulimit.unwrap_or(DEFAULT_ULIMIT), + cpu_core_available: config_file + .resources + .cpu_core_available + .unwrap_or(num_cpus::get()), + memory_available: config_file + .resources + .memory_available + .unwrap_or(DEFAULT_MEMORY_AVAILABLE), + bitcoind_rpc_threads: config_file + .resources + .bitcoind_rpc_threads + .unwrap_or(DEFAULT_BITCOIND_RPC_THREADS), + bitcoind_rpc_timeout: config_file + .resources + .bitcoind_rpc_timeout + .unwrap_or(DEFAULT_BITCOIND_RPC_TIMEOUT), + expected_observers_count: config_file + .resources + .expected_observers_count + .unwrap_or(1), }, network: IndexerConfig { bitcoind_rpc_url: config_file.network.bitcoind_rpc_url.to_string(), @@ -176,19 +164,18 @@ pub struct PredicatesApiConfigFile { } #[derive(Deserialize, Debug, Clone)] -pub struct BootstrapConfigFile { +pub struct SnapshotConfigFile { pub download_url: Option, } #[derive(Deserialize, Debug, Clone)] -pub struct LimitsConfigFile { - pub max_number_of_bitcoin_predicates: Option, - pub max_number_of_concurrent_bitcoin_scans: Option, - pub max_number_of_stacks_predicates: Option, - pub max_number_of_concurrent_stacks_scans: Option, - pub max_number_of_processing_threads: Option, - pub max_caching_memory_size_mb: Option, - pub bitcoin_concurrent_http_requests_max: Option, +pub struct ResourcesConfigFile { + pub ulimit: Option, + pub cpu_core_available: Option, + pub memory_available: Option, + pub bitcoind_rpc_threads: Option, + pub bitcoind_rpc_timeout: Option, + pub expected_observers_count: Option, } #[derive(Deserialize, Debug, Clone)] diff --git a/components/ordhook-cli/src/config/generator.rs b/components/ordhook-cli/src/config/generator.rs index 53b6cce5..3db85656 100644 --- a/components/ordhook-cli/src/config/generator.rs +++ b/components/ordhook-cli/src/config/generator.rs @@ -26,16 +26,17 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543" # but stacks can also be used: # stacks_node_rpc_url = "http://0.0.0.0:20443" -[limits] -max_number_of_bitcoin_predicates = 100 -max_number_of_concurrent_bitcoin_scans = 100 -max_number_of_processing_threads = 16 -bitcoin_concurrent_http_requests_max = 16 -max_caching_memory_size_mb = 32000 +[resources] +ulimit = 2048 +cpu_core_available = 16 +memory_available = 32 +bitcoind_rpc_threads = 4 +bitcoind_rpc_timeout = 15 +expected_observers_count = 1 # Disable the following section if the state # must be built locally -[bootstrap] +[snapshot] download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest" [logs] diff --git a/components/ordhook-cli/src/main.rs b/components/ordhook-cli/src/main.rs index e71e95b8..0c65431d 100644 --- a/components/ordhook-cli/src/main.rs +++ b/components/ordhook-cli/src/main.rs @@ -7,6 +7,10 @@ extern crate hiro_system_kit; pub mod cli; pub mod config; +#[cfg(feature = "tcmalloc")] +#[global_allocator] +static GLOBAL: tcmalloc2::TcMalloc = tcmalloc2::TcMalloc; + fn main() { cli::main(); } diff --git a/components/ordhook-core/Cargo.toml b/components/ordhook-core/Cargo.toml index 0994a715..37bd5880 100644 --- a/components/ordhook-core/Cargo.toml +++ b/components/ordhook-core/Cargo.toml @@ -10,8 +10,8 @@ serde_json = "1" serde_derive = "1" hex = "0.4.3" rand = "0.8.5" -chainhook-sdk = { version = "=0.12.0", features = ["zeromq"] } -# chainhook-sdk = { version = "=0.12.0", path = "../../../chainhook/components/chainhook-sdk", features = ["zeromq"] } +chainhook-sdk = { version = "=0.12.5", features = ["zeromq"] } +# chainhook-sdk = { version = "=0.12.1", path = "../../../chainhook/components/chainhook-sdk", features = ["zeromq"] } hiro-system-kit = "0.3.1" reqwest = { version = "0.11", default-features = false, features = [ "stream", @@ -43,6 +43,7 @@ rocksdb = { version = "0.21.0", default-features = false, features = [ pprof = { version = "0.13.0", features = ["flamegraph"], optional = true } hyper = { version = "=0.14.27" } lazy_static = { version = "1.4.0" } +ciborium = "0.2.1" # [profile.release] # debug = true diff --git a/components/ordhook-core/src/config/mod.rs b/components/ordhook-core/src/config/mod.rs index 950e090e..fb782f69 100644 --- a/components/ordhook-core/src/config/mod.rs +++ b/components/ordhook-core/src/config/mod.rs @@ -11,18 +11,18 @@ const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str = pub const DEFAULT_INGESTION_PORT: u16 = 20455; pub const DEFAULT_CONTROL_PORT: u16 = 20456; -pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50; -pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50; +pub const DEFAULT_ULIMIT: usize = 2048; +pub const DEFAULT_MEMORY_AVAILABLE: usize = 8; +pub const DEFAULT_BITCOIND_RPC_THREADS: usize = 4; +pub const DEFAULT_BITCOIND_RPC_TIMEOUT: u32 = 15; #[derive(Clone, Debug)] pub struct Config { pub storage: StorageConfig, pub http_api: PredicatesApi, - pub limits: LimitsConfig, + pub resources: ResourcesConfig, pub network: IndexerConfig, - pub bootstrap: BootstrapConfig, + pub snapshot: SnapshotConfig, pub logs: LogConfig, } @@ -50,7 +50,7 @@ pub struct PredicatesApiConfig { } #[derive(Clone, Debug)] -pub enum BootstrapConfig { +pub enum SnapshotConfig { Build, Download(String), } @@ -65,15 +65,23 @@ pub struct UrlConfig { pub file_url: String, } -#[derive(Clone, Debug)] -pub struct LimitsConfig { - pub max_number_of_bitcoin_predicates: usize, - pub max_number_of_concurrent_bitcoin_scans: usize, - pub max_number_of_stacks_predicates: usize, - pub max_number_of_concurrent_stacks_scans: usize, - pub max_number_of_processing_threads: usize, - pub bitcoin_concurrent_http_requests_max: usize, - pub max_caching_memory_size_mb: usize, +#[derive(Deserialize, Debug, Clone)] +pub struct ResourcesConfig { + pub ulimit: usize, + pub cpu_core_available: usize, + pub memory_available: usize, + pub bitcoind_rpc_threads: usize, + pub bitcoind_rpc_timeout: u32, + pub expected_observers_count: usize, +} + +impl ResourcesConfig { + pub fn get_optimal_thread_pool_capacity(&self) -> usize { + // Generally speaking when dealing a pool, we need one thread for + // feeding the thread pool and eventually another thread for + // handling the "reduce" step. + self.cpu_core_available.saturating_sub(2).max(1) + } } impl Config { @@ -86,10 +94,7 @@ impl Config { pub fn get_ordhook_config(&self) -> OrdhookConfig { OrdhookConfig { - network_thread_max: self.limits.bitcoin_concurrent_http_requests_max, - ingestion_thread_max: self.limits.max_number_of_processing_threads, - ingestion_thread_queue_size: 4, - cache_size: self.limits.max_caching_memory_size_mb, + resources: self.resources.clone(), db_path: self.expected_cache_path(), first_inscription_height: match self.network.bitcoin_network { BitcoinNetwork::Mainnet => 767430, @@ -119,9 +124,9 @@ impl Config { } pub fn should_bootstrap_through_download(&self) -> bool { - match &self.bootstrap { - BootstrapConfig::Build => false, - BootstrapConfig::Download(_) => true, + match &self.snapshot { + SnapshotConfig::Build => false, + SnapshotConfig::Download(_) => true, } } @@ -139,9 +144,9 @@ impl Config { } fn expected_remote_ordinals_sqlite_base_url(&self) -> &str { - match &self.bootstrap { - BootstrapConfig::Build => unreachable!(), - BootstrapConfig::Download(url) => &url, + match &self.snapshot { + SnapshotConfig::Build => unreachable!(), + SnapshotConfig::Download(url) => &url, } } @@ -159,15 +164,14 @@ impl Config { working_dir: default_cache_path(), }, http_api: PredicatesApi::Off, - bootstrap: BootstrapConfig::Build, - limits: LimitsConfig { - max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE, - max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE, - max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)), - bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)), - max_caching_memory_size_mb: 2048, + snapshot: SnapshotConfig::Build, + resources: ResourcesConfig { + cpu_core_available: num_cpus::get(), + memory_available: DEFAULT_MEMORY_AVAILABLE, + ulimit: DEFAULT_ULIMIT, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS, + bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT, + expected_observers_count: 1, }, network: IndexerConfig { bitcoind_rpc_url: "http://0.0.0.0:18443".into(), @@ -192,15 +196,14 @@ impl Config { working_dir: default_cache_path(), }, http_api: PredicatesApi::Off, - bootstrap: BootstrapConfig::Build, - limits: LimitsConfig { - max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE, - max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE, - max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)), - bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)), - max_caching_memory_size_mb: 2048, + snapshot: SnapshotConfig::Build, + resources: ResourcesConfig { + cpu_core_available: num_cpus::get(), + memory_available: DEFAULT_MEMORY_AVAILABLE, + ulimit: DEFAULT_ULIMIT, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS, + bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT, + expected_observers_count: 1, }, network: IndexerConfig { bitcoind_rpc_url: "http://0.0.0.0:18332".into(), @@ -225,17 +228,14 @@ impl Config { working_dir: default_cache_path(), }, http_api: PredicatesApi::Off, - bootstrap: BootstrapConfig::Download( - DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(), - ), - limits: LimitsConfig { - max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE, - max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE, - max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)), - bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)), - max_caching_memory_size_mb: 2048, + snapshot: SnapshotConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()), + resources: ResourcesConfig { + cpu_core_available: num_cpus::get(), + memory_available: DEFAULT_MEMORY_AVAILABLE, + ulimit: DEFAULT_ULIMIT, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS, + bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT, + expected_observers_count: 1, }, network: IndexerConfig { bitcoind_rpc_url: "http://0.0.0.0:8332".into(), diff --git a/components/ordhook-core/src/core/mod.rs b/components/ordhook-core/src/core/mod.rs index 8c510f43..9bf1cf59 100644 --- a/components/ordhook-core/src/core/mod.rs +++ b/components/ordhook-core/src/core/mod.rs @@ -13,7 +13,7 @@ use chainhook_sdk::{ }; use crate::{ - config::{Config, LogConfig}, + config::{Config, LogConfig, ResourcesConfig}, db::{find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop}, }; @@ -26,10 +26,7 @@ use crate::db::TransactionBytesCursor; #[derive(Clone, Debug)] pub struct OrdhookConfig { - pub network_thread_max: usize, - pub ingestion_thread_max: usize, - pub ingestion_thread_queue_size: usize, - pub cache_size: usize, + pub resources: ResourcesConfig, pub db_path: PathBuf, pub first_inscription_height: u64, pub logs: LogConfig, @@ -59,43 +56,73 @@ pub enum SatPosition { Fee(u64), } +pub fn resolve_absolute_pointer(inputs: &Vec, absolute_pointer_value: u64) -> (usize, u64) { + let mut selected_index = 0; + let mut cumulated_input_value = 0; + // Check for overflow + let total: u64 = inputs.iter().sum(); + if absolute_pointer_value > total { + return (0, 0); + } + // Identify the input + satoshi offset being inscribed + for (index, input_value) in inputs.iter().enumerate() { + if (cumulated_input_value + input_value) > absolute_pointer_value { + selected_index = index; + break; + } + cumulated_input_value += input_value; + } + let relative_pointer_value = absolute_pointer_value - cumulated_input_value; + (selected_index, relative_pointer_value) +} + pub fn compute_next_satpoint_data( + _tx_index: usize, input_index: usize, - offset_intra_input: u64, inputs: &Vec, outputs: &Vec, + relative_pointer_value: u64, + _ctx: Option<&Context>, ) -> SatPosition { - let mut offset_cross_inputs = 0; + let mut absolute_offset_in_inputs = 0; for (index, input_value) in inputs.iter().enumerate() { if index == input_index { break; } - offset_cross_inputs += input_value; + absolute_offset_in_inputs += input_value; } - offset_cross_inputs += offset_intra_input; + absolute_offset_in_inputs += relative_pointer_value; - let mut offset_intra_outputs = 0; - let mut output_index = 0; + let mut absolute_offset_of_first_satoshi_in_selected_output = 0; + let mut selected_output_index = 0; let mut floating_bound = 0; for (index, output_value) in outputs.iter().enumerate() { floating_bound += output_value; - output_index = index; - if floating_bound > offset_cross_inputs { + selected_output_index = index; + if floating_bound > absolute_offset_in_inputs { break; } - offset_intra_outputs += output_value; + absolute_offset_of_first_satoshi_in_selected_output += output_value; } - if output_index == (outputs.len() - 1) && offset_cross_inputs >= floating_bound { + if selected_output_index == (outputs.len() - 1) && absolute_offset_in_inputs >= floating_bound { // Satoshi spent in fees - return SatPosition::Fee(offset_cross_inputs - floating_bound); + return SatPosition::Fee(absolute_offset_in_inputs - floating_bound); } - SatPosition::Output((output_index, (offset_cross_inputs - offset_intra_outputs))) + let relative_offset_in_selected_output = + absolute_offset_in_inputs - absolute_offset_of_first_satoshi_in_selected_output; + SatPosition::Output((selected_output_index, relative_offset_in_selected_output)) } pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result, String> { - let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx); + let blocks_db = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ); let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?; let last_compressed_block = find_last_block_inserted(&blocks_db) as u64; let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? @@ -128,7 +155,13 @@ pub fn should_sync_ordhook_db( } }; - let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx); + let blocks_db = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ); let mut start_block = find_last_block_inserted(&blocks_db) as u64; if start_block == 0 { @@ -185,53 +218,57 @@ pub fn should_sync_ordhook_db( #[test] fn test_identify_next_output_index_destination() { assert_eq!( - compute_next_satpoint_data(0, 10, &vec![20, 30, 45], &vec![20, 30, 45]), + compute_next_satpoint_data(0, 0, &vec![20, 30, 45], &vec![20, 30, 45], 10, None), SatPosition::Output((0, 10)) ); assert_eq!( - compute_next_satpoint_data(0, 20, &vec![20, 30, 45], &vec![20, 30, 45]), + compute_next_satpoint_data(0, 0, &vec![20, 30, 45], &vec![20, 30, 45], 20, None), SatPosition::Output((1, 0)) ); assert_eq!( - compute_next_satpoint_data(1, 5, &vec![20, 30, 45], &vec![20, 30, 45]), - SatPosition::Output((1, 5)) + compute_next_satpoint_data(0, 1, &vec![20, 30, 45], &vec![20, 30, 45], 25, None), + SatPosition::Output((1, 25)) ); assert_eq!( - compute_next_satpoint_data(1, 6, &vec![20, 30, 45], &vec![20, 5, 45]), - SatPosition::Output((2, 1)) + compute_next_satpoint_data(0, 1, &vec![20, 30, 45], &vec![20, 5, 45], 26, None), + SatPosition::Output((2, 21)) ); assert_eq!( - compute_next_satpoint_data(1, 10, &vec![10, 10, 10], &vec![30]), - SatPosition::Output((0, 20)) + compute_next_satpoint_data(0, 1, &vec![10, 10, 10], &vec![30], 20, None), + SatPosition::Fee(0) ); assert_eq!( - compute_next_satpoint_data(0, 30, &vec![10, 10, 10], &vec![30]), + compute_next_satpoint_data(0, 0, &vec![10, 10, 10], &vec![30], 30, None), SatPosition::Fee(0) ); assert_eq!( - compute_next_satpoint_data(0, 0, &vec![10, 10, 10], &vec![30]), + compute_next_satpoint_data(0, 0, &vec![10, 10, 10], &vec![30], 0, None), SatPosition::Output((0, 0)) ); assert_eq!( - compute_next_satpoint_data(2, 45, &vec![20, 30, 45], &vec![20, 30, 45]), - SatPosition::Fee(0) + compute_next_satpoint_data(0, 2, &vec![20, 30, 45], &vec![20, 30, 45], 95, None), + SatPosition::Fee(50) ); assert_eq!( compute_next_satpoint_data( - 2, 0, + 2, &vec![1000, 600, 546, 63034], - &vec![1600, 10000, 15000] + &vec![1600, 10000, 15000], + 1600, + None ), - SatPosition::Output((1, 0)) + SatPosition::Output((1, 1600)) ); assert_eq!( compute_next_satpoint_data( - 3, 0, + 3, &vec![6100, 148660, 103143, 7600], - &vec![81434, 173995] + &vec![81434, 173995], + 257903, + None ), - SatPosition::Fee(2474) + SatPosition::Fee(260377) ); } diff --git a/components/ordhook-core/src/core/pipeline/mod.rs b/components/ordhook-core/src/core/pipeline/mod.rs index 855ea6db..cf41c1eb 100644 --- a/components/ordhook-core/src/core/pipeline/mod.rs +++ b/components/ordhook-core/src/core/pipeline/mod.rs @@ -73,11 +73,27 @@ pub async fn download_and_pipeline_blocks( let end_block = *blocks.last().expect("no blocks to pipeline"); let mut block_heights = VecDeque::from(blocks); - for _ in 0..ordhook_config.ingestion_thread_queue_size { + // All the requests are being processed on the same thread. + // As soon as we are getting the bytes back from wire, the + // processing is moved to a thread pool, to defer the parsing, quite expensive. + // We are initially seeding the networking thread with N requests, + // with N being the number of threads in the pool handling the response. + // We need: + // - 1 thread for the thread handling networking + // - 1 thread for the thread handling disk serialization + let thread_pool_network_response_processing_capacity = + ordhook_config.resources.get_optimal_thread_pool_capacity(); + // For each worker in that pool, we want to bound the size of the queue to avoid OOM + // Blocks size can range from 1 to 4Mb (when packed with witness data). + // Start blocking networking when each worker has a backlog of 8 blocks seems reasonable. + let worker_queue_size = 2; + + for _ in 0..ordhook_config.resources.bitcoind_rpc_threads { if let Some(block_height) = block_heights.pop_front() { let config = moved_config.clone(); let ctx = moved_ctx.clone(); let http_client = moved_http_client.clone(); + // We interleave the initial requests to avoid DDOSing bitcoind from the get go. sleep(Duration::from_millis(500)); set.spawn(try_download_block_bytes_with_retry( http_client, @@ -95,8 +111,8 @@ pub async fn download_and_pipeline_blocks( let mut rx_thread_pool = vec![]; let mut thread_pool_handles = vec![]; - for _ in 0..ordhook_config.ingestion_thread_max { - let (tx, rx) = bounded::>>(ordhook_config.ingestion_thread_queue_size); + for _ in 0..thread_pool_network_response_processing_capacity { + let (tx, rx) = bounded::>>(worker_queue_size); tx_thread_pool.push(tx); rx_thread_pool.push(rx); } @@ -244,11 +260,22 @@ pub async fn download_and_pipeline_blocks( }) .expect("unable to spawn thread"); - let mut thread_index = 0; + let mut round_robin_worker_thread_index = 0; while let Some(res) = set.join_next().await { - let block = res.unwrap().unwrap(); + let block = res + .expect("unable to retrieve block") + .expect("unable to deserialize block"); + + loop { + let res = tx_thread_pool[round_robin_worker_thread_index].send(Some(block.clone())); + round_robin_worker_thread_index = (round_robin_worker_thread_index + 1) + % thread_pool_network_response_processing_capacity; + if res.is_ok() { + break; + } + sleep(Duration::from_millis(500)); + } - let _ = tx_thread_pool[thread_index].send(Some(block)); if let Some(block_height) = block_heights.pop_front() { let config = moved_config.clone(); let ctx = ctx.clone(); @@ -260,7 +287,6 @@ pub async fn download_and_pipeline_blocks( ctx, )); } - thread_index = (thread_index + 1) % ordhook_config.ingestion_thread_max; } ctx.try_log(|logger| { diff --git a/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs b/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs index 38453e52..557ca396 100644 --- a/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs +++ b/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs @@ -25,8 +25,13 @@ pub fn start_block_archiving_processor( let ctx = ctx.clone(); let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop") .spawn(move || { - let blocks_db_rw = - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx); + let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ); let mut processed_blocks = 0; loop { diff --git a/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs b/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs index 9a95b9a8..fe6ebb5e 100644 --- a/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs +++ b/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs @@ -26,9 +26,10 @@ use crate::{ }, inscription_sequencing::{ augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx, + get_bitcoin_network, get_jubilee_block_height, parallelize_inscription_data_computations, SequenceCursor, }, - inscription_tracking::augment_block_with_ordinals_transfer_data, + satoshi_tracking::augment_block_with_ordinals_transfer_data, }, OrdhookConfig, }, @@ -107,6 +108,8 @@ pub fn start_inscription_indexing_processor( let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( true, &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, &ctx, ); store_compacted_blocks( @@ -188,6 +191,13 @@ pub fn process_blocks( ctx, ); + // Invalidate and recompute cursor when crossing the jubilee height + let jubilee_height = + get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network)); + if block.block_identifier.index == jubilee_height { + sequence_cursor.reset(); + } + let _ = process_block( &mut block, &next_blocks, @@ -260,7 +270,7 @@ pub fn process_block( block: &mut BitcoinBlockData, next_blocks: &Vec, sequence_cursor: &mut SequenceCursor, - cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>, cache_l2: &Arc>>, inscriptions_db_tx: &Transaction, ordhook_config: &OrdhookConfig, diff --git a/components/ordhook-core/src/core/pipeline/processors/transfers_recomputing.rs b/components/ordhook-core/src/core/pipeline/processors/transfers_recomputing.rs index c7a17b41..8d5c12e0 100644 --- a/components/ordhook-core/src/core/pipeline/processors/transfers_recomputing.rs +++ b/components/ordhook-core/src/core/pipeline/processors/transfers_recomputing.rs @@ -12,11 +12,11 @@ use crate::{ pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent}, protocol::{ inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data, - inscription_tracking::augment_block_with_ordinals_transfer_data, + satoshi_tracking::augment_block_with_ordinals_transfer_data, }, }, db::{ - insert_new_inscriptions_from_block_in_locations, open_readwrite_ordhook_db_conn, + insert_entries_from_block_in_inscriptions, open_readwrite_ordhook_db_conn, remove_entries_from_locations_at_block_height, }, }; @@ -83,11 +83,7 @@ pub fn start_transfers_recomputing_processor( &ctx, ); - insert_new_inscriptions_from_block_in_locations( - block, - &inscriptions_db_tx, - &ctx, - ); + insert_entries_from_block_in_inscriptions(block, &inscriptions_db_tx, &ctx); augment_block_with_ordinals_transfer_data( block, diff --git a/components/ordhook-core/src/core/protocol/inscription_parsing.rs b/components/ordhook-core/src/core/protocol/inscription_parsing.rs index 02d8e769..01ce655d 100644 --- a/components/ordhook-core/src/core/protocol/inscription_parsing.rs +++ b/components/ordhook-core/src/core/protocol/inscription_parsing.rs @@ -7,6 +7,7 @@ use chainhook_sdk::types::{ OrdinalOperation, }; use chainhook_sdk::utils::Context; +use serde_json::json; use std::collections::BTreeMap; use std::str::FromStr; @@ -20,6 +21,11 @@ pub fn parse_inscriptions_from_witness( witness_bytes: Vec>, txid: &str, ) -> Option> { + // Efficient debugging: Isolate one specific transaction + // if !txid.eq("aa2ab56587c7d6609c95157e6dff37c5c3fa6531702f41229a289a5613887077") { + // return None + // } + let witness = Witness::from_slice(&witness_bytes); let tapscript = witness.tapscript()?; let envelopes: Vec> = RawEnvelope::from_tapscript(tapscript, input_index) @@ -59,6 +65,17 @@ pub fn parse_inscriptions_from_witness( let mut content_bytes = "0x".to_string(); content_bytes.push_str(&hex::encode(&inscription_content_bytes)); + let parent = envelope.payload.parent().and_then(|i| Some(i.to_string())); + let delegate = envelope + .payload + .delegate() + .and_then(|i| Some(i.to_string())); + let metaprotocol = envelope + .payload + .metaprotocol() + .and_then(|p| Some(p.to_string())); + let metadata = envelope.payload.metadata().and_then(|m| Some(json!(m))); + let reveal_data = OrdinalInscriptionRevealData { content_type: envelope .payload @@ -71,9 +88,14 @@ pub fn parse_inscriptions_from_witness( inscription_input_index: input_index, tx_index: 0, inscription_output_value: 0, + inscription_pointer: envelope.payload.pointer(), inscription_fee: 0, inscription_number: OrdinalInscriptionNumber::zero(), inscriber_address: None, + parent, + delegate, + metaprotocol, + metadata, ordinal_number: 0, ordinal_block_height: 0, ordinal_offset: 0, diff --git a/components/ordhook-core/src/core/protocol/inscription_sequencing.rs b/components/ordhook-core/src/core/protocol/inscription_sequencing.rs index 911c6226..3aa5fe5a 100644 --- a/components/ordhook-core/src/core/protocol/inscription_sequencing.rs +++ b/components/ordhook-core/src/core/protocol/inscription_sequencing.rs @@ -1,15 +1,15 @@ use std::{ - collections::{BTreeMap, HashMap, VecDeque}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, hash::BuildHasherDefault, sync::Arc, }; use chainhook_sdk::{ - bitcoincore_rpc_json::bitcoin::{Address, Network, ScriptBuf}, + bitcoincore_rpc_json::bitcoin::Network, types::{ BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier, - OrdinalInscriptionCurseType, OrdinalInscriptionNumber, OrdinalOperation, - TransactionIdentifier, + OrdinalInscriptionCurseType, OrdinalInscriptionNumber, + OrdinalInscriptionTransferDestination, OrdinalOperation, TransactionIdentifier, }, utils::Context, }; @@ -19,26 +19,26 @@ use fxhash::FxHasher; use rusqlite::{Connection, Transaction}; use crate::{ - core::OrdhookConfig, + core::{resolve_absolute_pointer, OrdhookConfig}, db::{ find_blessed_inscription_with_ordinal_number, find_nth_classic_neg_number_at_block_height, find_nth_classic_pos_number_at_block_height, find_nth_jubilee_number_at_block_height, - format_inscription_id, format_satpoint_to_watch, update_inscriptions_with_block, - update_sequence_metadata_with_block, TransactionBytesCursor, TraversalResult, + format_inscription_id, update_ordinals_db_with_block, update_sequence_metadata_with_block, + TransactionBytesCursor, TraversalResult, }, ord::height::Height, }; -use rand::seq::SliceRandom; -use rand::thread_rng; use std::sync::mpsc::channel; use crate::db::find_all_inscriptions_in_block; use super::{ inscription_parsing::get_inscriptions_revealed_in_block, - inscription_tracking::augment_transaction_with_ordinals_transfers_data, satoshi_numbering::compute_satoshi_number, + satoshi_tracking::{ + augment_transaction_with_ordinals_transfers_data, compute_satpoint_post_transfer, + }, }; /// Parallelize the computation of ordinals numbers for inscriptions present in a block. @@ -67,7 +67,7 @@ use super::{ pub fn parallelize_inscription_data_computations( block: &BitcoinBlockData, next_blocks: &Vec, - cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>, cache_l2: &Arc>>, inscriptions_db_tx: &Transaction, ordhook_config: &OrdhookConfig, @@ -86,12 +86,12 @@ pub fn parallelize_inscription_data_computations( ) }); - let (mut transactions_ids, l1_cache_hits) = + let (transactions_ids, l1_cache_hits) = get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx); let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty(); - let thread_max = ordhook_config.ingestion_thread_max; + let thread_pool_capacity = ordhook_config.resources.get_optimal_thread_pool_capacity(); // Nothing to do? early return if !has_transactions_to_process { @@ -104,29 +104,41 @@ pub fn parallelize_inscription_data_computations( let mut tx_thread_pool = vec![]; let mut thread_pool_handles = vec![]; - for thread_index in 0..thread_max { + for thread_index in 0..thread_pool_capacity { let (tx, rx) = channel(); tx_thread_pool.push(tx); let moved_traversal_tx = traversal_tx.clone(); let moved_ctx = inner_ctx.clone(); let moved_ordhook_db_path = ordhook_config.db_path.clone(); + let ulimit = ordhook_config.resources.ulimit; + let memory_available = ordhook_config.resources.memory_available; + let local_cache = cache_l2.clone(); let handle = hiro_system_kit::thread_named("Worker") .spawn(move || { - while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) = - rx.recv() + while let Ok(Some(( + transaction_id, + block_identifier, + input_index, + inscription_pointer, + prioritary, + ))) = rx.recv() { - let traversal: Result<(TraversalResult, _), String> = compute_satoshi_number( - &moved_ordhook_db_path, - &block_identifier, - &transaction_id, - input_index, - &local_cache, - false, - &moved_ctx, - ); + let traversal: Result<(TraversalResult, u64, _), String> = + compute_satoshi_number( + &moved_ordhook_db_path, + &block_identifier, + &transaction_id, + input_index, + inscription_pointer, + &local_cache, + ulimit, + memory_available, + false, + &moved_ctx, + ); let _ = moved_traversal_tx.send((traversal, prioritary, thread_index)); } }) @@ -135,12 +147,16 @@ pub fn parallelize_inscription_data_computations( } // Consume L1 cache: if the traversal was performed in a previous round - // retrieve it and use it. - let mut thread_index = 0; + // retrieve it and inject it to the "reduce" worker (by-passing the "map" thread pool) + let mut round_robin_thread_index = 0; for key in l1_cache_hits.iter() { if let Some(entry) = cache_l1.get(key) { - let _ = traversal_tx.send((Ok((entry.clone(), vec![])), true, thread_index)); - thread_index = (thread_index + 1) % thread_max; + let _ = traversal_tx.send(( + Ok((entry.clone(), key.2, vec![])), + true, + round_robin_thread_index, + )); + round_robin_thread_index = (round_robin_thread_index + 1) % thread_pool_capacity; } } @@ -162,25 +178,24 @@ pub fn parallelize_inscription_data_computations( ) }); - let mut rng = thread_rng(); - transactions_ids.shuffle(&mut rng); let mut priority_queue = VecDeque::new(); let mut warmup_queue = VecDeque::new(); - for (transaction_id, input_index) in transactions_ids.into_iter() { + for (transaction_id, input_index, inscription_pointer) in transactions_ids.into_iter() { priority_queue.push_back(( transaction_id, block.block_identifier.clone(), input_index, + inscription_pointer, true, )); } - // Feed each workers with 2 workitems each - for thread_index in 0..thread_max { + // Feed each worker from the thread pool with 2 workitems each + for thread_index in 0..thread_pool_capacity { let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); } - for thread_index in 0..thread_max { + for thread_index in 0..thread_pool_capacity { let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); } @@ -191,13 +206,14 @@ pub fn parallelize_inscription_data_computations( traversals_received += 1; } match traversal_result { - Ok((traversal, _)) => { + Ok((traversal, inscription_pointer, _)) => { inner_ctx.try_log(|logger| { info!( logger, - "Completed ordinal number retrieval for Satpoint {}:{}:0 (block: #{}:{}, transfers: {}, progress: {traversals_received}/{expected_traversals}, priority queue: {prioritary}, thread: {thread_index})", + "Completed ordinal number retrieval for Satpoint {}:{}:{} (block: #{}:{}, transfers: {}, progress: {traversals_received}/{expected_traversals}, priority queue: {prioritary}, thread: {thread_index})", traversal.transaction_identifier_inscription.hash, traversal.inscription_input_index, + inscription_pointer, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers @@ -207,6 +223,7 @@ pub fn parallelize_inscription_data_computations( ( traversal.transaction_identifier_inscription.clone(), traversal.inscription_input_index, + inscription_pointer, ), traversal, ); @@ -229,7 +246,7 @@ pub fn parallelize_inscription_data_computations( let _ = tx_thread_pool[thread_index].send(Some(w)); } else { if let Some(next_block) = next_block_iter.next() { - let (mut transactions_ids, _) = + let (transactions_ids, _) = get_transactions_to_process(next_block, cache_l1, inscriptions_db_tx, ctx); inner_ctx.try_log(|logger| { @@ -241,12 +258,14 @@ pub fn parallelize_inscription_data_computations( ) }); - transactions_ids.shuffle(&mut rng); - for (transaction_id, input_index) in transactions_ids.into_iter() { + for (transaction_id, input_index, inscription_pointer) in + transactions_ids.into_iter() + { warmup_queue.push_back(( transaction_id, next_block.block_identifier.clone(), input_index, + inscription_pointer, false, )); } @@ -266,13 +285,14 @@ pub fn parallelize_inscription_data_computations( for tx in tx_thread_pool.iter() { // Empty the queue if let Ok((traversal_result, _prioritary, thread_index)) = traversal_rx.try_recv() { - if let Ok((traversal, _)) = traversal_result { + if let Ok((traversal, inscription_pointer, _)) = traversal_result { inner_ctx.try_log(|logger| { info!( logger, - "Completed ordinal number retrieval for Satpoint {}:{}:0 (block: #{}:{}, transfers: {}, pre-retrieval, thread: {thread_index})", + "Completed ordinal number retrieval for Satpoint {}:{}:{} (block: #{}:{}, transfers: {}, pre-retrieval, thread: {thread_index})", traversal.transaction_identifier_inscription.hash, traversal.inscription_input_index, + inscription_pointer, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers @@ -282,6 +302,7 @@ pub fn parallelize_inscription_data_computations( ( traversal.transaction_identifier_inscription.clone(), traversal.inscription_input_index, + inscription_pointer, ), traversal, ); @@ -292,12 +313,9 @@ pub fn parallelize_inscription_data_computations( let ctx_moved = inner_ctx.clone(); let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || { - ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation started",)); - for handle in thread_pool_handles.into_iter() { let _ = handle.join(); } - ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation ended",)); }); inner_ctx.try_log(|logger| { @@ -326,20 +344,27 @@ pub fn parallelize_inscription_data_computations( /// fn get_transactions_to_process( block: &BitcoinBlockData, - cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>, inscriptions_db_tx: &Transaction, ctx: &Context, ) -> ( - Vec<(TransactionIdentifier, usize)>, - Vec<(TransactionIdentifier, usize)>, + HashSet<(TransactionIdentifier, usize, u64)>, + Vec<(TransactionIdentifier, usize, u64)>, ) { - let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![]; + let mut transactions_ids = HashSet::new(); let mut l1_cache_hits = vec![]; let known_transactions = find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx); for tx in block.transactions.iter().skip(1) { + let inputs = tx + .metadata + .inputs + .iter() + .map(|i| i.previous_output.value) + .collect::>(); + // Have a new inscription been revealed, if so, are looking at a re-inscription for ordinal_event in tx.metadata.ordinal_operations.iter() { let inscription_data = match ordinal_event { @@ -348,9 +373,16 @@ fn get_transactions_to_process( continue; } }; + + let (input_index, relative_offset) = match inscription_data.inscription_pointer { + Some(pointer) => resolve_absolute_pointer(&inputs, pointer), + None => (inscription_data.inscription_input_index, 0), + }; + let key = ( tx.transaction_identifier.clone(), - inscription_data.inscription_input_index, + input_index, + relative_offset, ); if cache_l1.contains_key(&key) { l1_cache_hits.push(key); @@ -361,11 +393,12 @@ fn get_transactions_to_process( continue; } + if transactions_ids.contains(&key) { + continue; + } + // Enqueue for traversals - transactions_ids.push(( - tx.transaction_identifier.clone(), - inscription_data.inscription_input_index, - )); + transactions_ids.insert(key); } } (transactions_ids, l1_cache_hits) @@ -420,14 +453,8 @@ impl<'a> SequenceCursor<'a> { true => self.pick_next_neg_classic(ctx), false => self.pick_next_pos_classic(ctx), }; - let jubilee_height = match network { - Network::Bitcoin => 824544, - Network::Regtest => 110, - Network::Signet => 175392, - Network::Testnet => 2544192, - _ => unreachable!(), - }; - let jubilee = if block_height >= jubilee_height { + + let jubilee = if block_height >= get_jubilee_block_height(&network) { self.pick_next_jubilee_number(ctx) } else { classic @@ -505,6 +532,25 @@ impl<'a> SequenceCursor<'a> { } } +pub fn get_jubilee_block_height(network: &Network) -> u64 { + match network { + Network::Bitcoin => 824544, + Network::Regtest => 110, + Network::Signet => 175392, + Network::Testnet => 2544192, + _ => unreachable!(), + } +} + +pub fn get_bitcoin_network(network: &BitcoinNetwork) -> Network { + match network { + BitcoinNetwork::Mainnet => Network::Bitcoin, + BitcoinNetwork::Regtest => Network::Regtest, + BitcoinNetwork::Testnet => Network::Testnet, + BitcoinNetwork::Signet => Network::Signet, + } +} + /// Given a `BitcoinBlockData` that have been augmented with the functions `parse_inscriptions_in_raw_tx`, `parse_inscriptions_in_standardized_tx` /// or `parse_inscriptions_and_standardize_block`, mutate the ordinals drafted informations with actual, consensus data. /// @@ -514,7 +560,7 @@ impl<'a> SequenceCursor<'a> { pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx( block: &mut BitcoinBlockData, sequence_cursor: &mut SequenceCursor, - inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>, inscriptions_db_tx: &Transaction, ctx: &Context, ) -> bool { @@ -541,7 +587,7 @@ pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx( ); // Store inscriptions - update_inscriptions_with_block(block, inscriptions_db_tx, ctx); + update_ordinals_db_with_block(block, inscriptions_db_tx, ctx); update_sequence_metadata_with_block(block, inscriptions_db_tx, ctx); any_events } @@ -557,7 +603,7 @@ pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx( pub fn augment_block_with_ordinals_inscriptions_data( block: &mut BitcoinBlockData, sequence_cursor: &mut SequenceCursor, - inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>, reinscriptions_data: &mut HashMap, ctx: &Context, ) -> bool { @@ -565,12 +611,10 @@ pub fn augment_block_with_ordinals_inscriptions_data( let mut sats_overflows = VecDeque::new(); let mut any_event = false; - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - BitcoinNetwork::Signet => Network::Signet, - }; + let network = get_bitcoin_network(&block.metadata.network); + let coinbase_subsidy = Height(block.block_identifier.index).subsidy(); + let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); + let mut cumulated_fees = 0u64; for (tx_index, tx) in block.transactions.iter_mut().enumerate() { any_event |= augment_transaction_with_ordinals_inscriptions_data( @@ -580,6 +624,9 @@ pub fn augment_block_with_ordinals_inscriptions_data( sequence_cursor, &network, inscriptions_data, + coinbase_txid, + coinbase_subsidy, + &mut cumulated_fees, &mut sats_overflows, reinscriptions_data, ctx, @@ -598,6 +645,7 @@ pub fn augment_block_with_ordinals_inscriptions_data( sequence_cursor.pick_next(is_curse, block.block_identifier.index, &network, &ctx); inscription_data.inscription_number = inscription_number; + sequence_cursor.increment_jubilee_number(ctx); if is_curse { sequence_cursor.increment_neg_classic(ctx); } else { @@ -631,14 +679,26 @@ fn augment_transaction_with_ordinals_inscriptions_data( block_identifier: &BlockIdentifier, sequence_cursor: &mut SequenceCursor, network: &Network, - inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>, + coinbase_txid: &TransactionIdentifier, + coinbase_subsidy: u64, + cumulated_fees: &mut u64, sats_overflows: &mut VecDeque<(usize, usize)>, reinscriptions_data: &mut HashMap, ctx: &Context, ) -> bool { + let inputs = tx + .metadata + .inputs + .iter() + .map(|i| i.previous_output.value) + .collect::>(); + let any_event = tx.metadata.ordinal_operations.is_empty() == false; + let mut mutated_operations = vec![]; + mutated_operations.append(&mut tx.metadata.ordinal_operations); let mut inscription_subindex = 0; - for (op_index, op) in tx.metadata.ordinal_operations.iter_mut().enumerate() { + for (op_index, op) in mutated_operations.iter_mut().enumerate() { let (mut is_cursed, inscription) = match op { OrdinalOperation::InscriptionRevealed(inscription) => { (inscription.curse_type.as_ref().is_some(), inscription) @@ -646,23 +706,27 @@ fn augment_transaction_with_ordinals_inscriptions_data( OrdinalOperation::InscriptionTransferred(_) => continue, }; + let (input_index, relative_offset) = match inscription.inscription_pointer { + Some(pointer) => resolve_absolute_pointer(&inputs, pointer), + None => (inscription.inscription_input_index, 0), + }; + let transaction_identifier = tx.transaction_identifier.clone(); let inscription_id = format_inscription_id(&transaction_identifier, inscription_subindex); - let traversal = match inscriptions_data - .get(&(transaction_identifier, inscription.inscription_input_index)) - { - Some(traversal) => traversal, - None => { - let err_msg = format!( - "Unable to retrieve backward traversal result for inscription {}", - tx.transaction_identifier.hash - ); - ctx.try_log(|logger| { - error!(logger, "{}", err_msg); - }); - std::process::exit(1); - } - }; + let traversal = + match inscriptions_data.get(&(transaction_identifier, input_index, relative_offset)) { + Some(traversal) => traversal, + None => { + let err_msg = format!( + "Unable to retrieve backward traversal result for inscription {}", + tx.transaction_identifier.hash + ); + ctx.try_log(|logger| { + error!(logger, "{}", err_msg); + }); + std::process::exit(1); + } + }; // Do we need to curse the inscription? let mut inscription_number = @@ -692,7 +756,6 @@ fn augment_transaction_with_ordinals_inscriptions_data( inscription.inscription_id = inscription_id; inscription.inscription_number = inscription_number; - let outputs = &tx.metadata.outputs; inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset(); inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height(); inscription.ordinal_number = traversal.ordinal_number; @@ -703,41 +766,41 @@ fn augment_transaction_with_ordinals_inscriptions_data( Some(curse_type) => Some(curse_type), None => inscription.curse_type.take(), }; - inscription.satpoint_post_inscription = format_satpoint_to_watch( - &traversal.transfer_data.transaction_identifier_location, - traversal.transfer_data.output_index, - traversal.transfer_data.inscription_offset_intra_output, + + let (destination, satpoint_post_transfer, output_value) = compute_satpoint_post_transfer( + &&*tx, + tx_index, + input_index, + relative_offset, + network, + coinbase_txid, + coinbase_subsidy, + cumulated_fees, + ctx, ); - if let Some(output) = outputs.get(traversal.transfer_data.output_index) { - inscription.inscription_output_value = output.value; - inscription.inscriber_address = { - let script_pub_key = output.get_script_pubkey_hex(); - match ScriptBuf::from_hex(&script_pub_key) { - Ok(script) => match Address::from_script(&script, network.clone()) { - Ok(a) => Some(a.to_string()), - _ => None, - }, - _ => None, - } - }; - } else { - ctx.try_log(|logger| { - warn!( - logger, - "Database corrupted, skipping cursed inscription => {:?} / {:?}", - traversal, - outputs - ); - }); - } - if traversal.ordinal_number == 0 { - // If the satoshi inscribed correspond to a sat overflow, we will store the inscription - // and assign an inscription number after the other inscriptions, to mimick the - // bug in ord. - sats_overflows.push_back((tx_index, op_index)); - continue; - } + // Compute satpoint_post_inscription + inscription.satpoint_post_inscription = satpoint_post_transfer; + inscription_subindex += 1; + + match destination { + OrdinalInscriptionTransferDestination::SpentInFees => { + // Inscriptions are assigned inscription numbers starting at zero, first by the + // order reveal transactions appear in blocks, and the order that reveal envelopes + // appear in those transactions. + // Due to a historical bug in `ord` which cannot be fixed without changing a great + // many inscription numbers, inscriptions which are revealed and then immediately + // spent to fees are numbered as if they appear last in the block in which they + // are revealed. + sats_overflows.push_back((tx_index, op_index)); + continue; + } + OrdinalInscriptionTransferDestination::Burnt(_) => {} + OrdinalInscriptionTransferDestination::Transferred(address) => { + inscription.inscription_output_value = output_value.unwrap_or(0); + inscription.inscriber_address = Some(address); + } + }; // The reinscriptions_data needs to be augmented as we go, to handle transaction chaining. if !is_cursed { @@ -762,8 +825,11 @@ fn augment_transaction_with_ordinals_inscriptions_data( } else { sequence_cursor.increment_pos_classic(ctx); } - inscription_subindex += 1; } + tx.metadata + .ordinal_operations + .append(&mut mutated_operations); + any_event } @@ -773,12 +839,24 @@ fn consolidate_transaction_with_pre_computed_inscription_data( tx: &mut BitcoinTransactionData, tx_index: usize, coinbase_txid: &TransactionIdentifier, + coinbase_subsidy: u64, + cumulated_fees: &mut u64, network: &Network, inscriptions_data: &mut BTreeMap, - _ctx: &Context, + ctx: &Context, ) { let mut subindex = 0; - for operation in tx.metadata.ordinal_operations.iter_mut() { + let mut mutated_operations = vec![]; + mutated_operations.append(&mut tx.metadata.ordinal_operations); + + let inputs = tx + .metadata + .inputs + .iter() + .map(|i| i.previous_output.value) + .collect::>(); + + for operation in mutated_operations.iter_mut() { let inscription = match operation { OrdinalOperation::InscriptionRevealed(ref mut inscription) => inscription, OrdinalOperation::InscriptionTransferred(_) => continue, @@ -799,42 +877,42 @@ fn consolidate_transaction_with_pre_computed_inscription_data( inscription.transfers_pre_inscription = traversal.transfers; inscription.inscription_fee = tx.metadata.fee; inscription.tx_index = tx_index; - inscription.satpoint_post_inscription = format_satpoint_to_watch( - &traversal.transfer_data.transaction_identifier_location, - traversal.transfer_data.output_index, - traversal.transfer_data.inscription_offset_intra_output, + + let (input_index, relative_offset) = match inscription.inscription_pointer { + Some(pointer) => resolve_absolute_pointer(&inputs, pointer), + None => (traversal.inscription_input_index, 0), + }; + // Compute satpoint_post_inscription + let (destination, satpoint_post_transfer, output_value) = compute_satpoint_post_transfer( + tx, + tx_index, + input_index, + relative_offset, + network, + coinbase_txid, + coinbase_subsidy, + cumulated_fees, + ctx, ); + inscription.satpoint_post_inscription = satpoint_post_transfer; + if inscription.inscription_number.classic < 0 { inscription.curse_type = Some(OrdinalInscriptionCurseType::Generic); } - if traversal - .transfer_data - .transaction_identifier_location - .eq(coinbase_txid) - { - continue; - } - - if let Some(output) = tx - .metadata - .outputs - .get(traversal.transfer_data.output_index) - { - inscription.inscription_output_value = output.value; - inscription.inscriber_address = { - let script_pub_key = output.get_script_pubkey_hex(); - match ScriptBuf::from_hex(&script_pub_key) { - Ok(script) => match Address::from_script(&script, network.clone()) { - Ok(a) => Some(a.to_string()), - _ => None, - }, - _ => None, - } - }; + match destination { + OrdinalInscriptionTransferDestination::SpentInFees => continue, + OrdinalInscriptionTransferDestination::Burnt(_) => continue, + OrdinalInscriptionTransferDestination::Transferred(address) => { + inscription.inscription_output_value = output_value.unwrap_or(0); + inscription.inscriber_address = Some(address); + } } } + tx.metadata + .ordinal_operations + .append(&mut mutated_operations); } /// Best effort to re-augment a `BitcoinBlockData` with data coming from `inscriptions` and `locations` tables. @@ -845,13 +923,7 @@ pub fn consolidate_block_with_pre_computed_ordinals_data( include_transfers: bool, ctx: &Context, ) { - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - BitcoinNetwork::Signet => Network::Signet, - }; - + let network = get_bitcoin_network(&block.metadata.network); let coinbase_subsidy = Height(block.block_identifier.index).subsidy(); let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); let mut cumulated_fees = 0; @@ -878,6 +950,8 @@ pub fn consolidate_block_with_pre_computed_ordinals_data( tx, tx_index, coinbase_txid, + coinbase_subsidy, + &mut cumulated_fees, &network, &mut inscriptions_data, ctx, @@ -888,7 +962,6 @@ pub fn consolidate_block_with_pre_computed_ordinals_data( let _ = augment_transaction_with_ordinals_transfers_data( tx, tx_index, - &block.block_identifier, &network, &coinbase_txid, coinbase_subsidy, diff --git a/components/ordhook-core/src/core/protocol/inscription_tracking.rs b/components/ordhook-core/src/core/protocol/inscription_tracking.rs deleted file mode 100644 index b2c1bbee..00000000 --- a/components/ordhook-core/src/core/protocol/inscription_tracking.rs +++ /dev/null @@ -1,215 +0,0 @@ -use chainhook_sdk::{ - bitcoincore_rpc_json::bitcoin::{Address, Network, ScriptBuf}, - types::{ - BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier, - OrdinalInscriptionTransferData, OrdinalInscriptionTransferDestination, OrdinalOperation, - TransactionIdentifier, - }, - utils::Context, -}; - -use crate::{ - core::{compute_next_satpoint_data, SatPosition}, - db::{ - find_inscriptions_at_wached_outpoint, format_outpoint_to_watch, - insert_transfer_in_locations_tx, - }, - ord::height::Height, -}; -use rusqlite::Transaction; - -pub fn augment_block_with_ordinals_transfer_data( - block: &mut BitcoinBlockData, - inscriptions_db_tx: &Transaction, - update_db_tx: bool, - ctx: &Context, -) -> bool { - let mut any_event = false; - - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - BitcoinNetwork::Signet => Network::Signet, - }; - - let coinbase_subsidy = Height(block.block_identifier.index).subsidy(); - let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); - let mut cumulated_fees = 0; - for (tx_index, tx) in block.transactions.iter_mut().enumerate() { - let transfers = augment_transaction_with_ordinals_transfers_data( - tx, - tx_index, - &block.block_identifier, - &network, - &coinbase_txid, - coinbase_subsidy, - &mut cumulated_fees, - inscriptions_db_tx, - ctx, - ); - any_event |= !transfers.is_empty(); - - if update_db_tx { - // Store transfers between each iteration - for transfer_data in transfers.into_iter() { - insert_transfer_in_locations_tx( - &transfer_data, - &block.block_identifier, - &inscriptions_db_tx, - &ctx, - ); - } - } - } - - any_event -} - -pub fn augment_transaction_with_ordinals_transfers_data( - tx: &mut BitcoinTransactionData, - tx_index: usize, - block_identifier: &BlockIdentifier, - network: &Network, - coinbase_txid: &TransactionIdentifier, - coinbase_subsidy: u64, - cumulated_fees: &mut u64, - inscriptions_db_tx: &Transaction, - ctx: &Context, -) -> Vec { - let mut transfers = vec![]; - - for (input_index, input) in tx.metadata.inputs.iter().enumerate() { - let outpoint_pre_transfer = format_outpoint_to_watch( - &input.previous_output.txid, - input.previous_output.vout as usize, - ); - - let entries = - find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx, ctx); - // For each satpoint inscribed retrieved, we need to compute the next - // outpoint to watch - for watched_satpoint in entries.into_iter() { - let satpoint_pre_transfer = - format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset); - - // Question is: are inscriptions moving to a new output, - // burnt or lost in fees and transfered to the miner? - - let inputs = tx - .metadata - .inputs - .iter() - .map(|o| o.previous_output.value) - .collect::<_>(); - let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>(); - let post_transfer_data = - compute_next_satpoint_data(input_index, watched_satpoint.offset, &inputs, &outputs); - - let ( - outpoint_post_transfer, - offset_post_transfer, - destination, - post_transfer_output_value, - ) = match post_transfer_data { - SatPosition::Output((output_index, offset)) => { - let outpoint = - format_outpoint_to_watch(&tx.transaction_identifier, output_index); - let script_pub_key_hex = - tx.metadata.outputs[output_index].get_script_pubkey_hex(); - let updated_address = match ScriptBuf::from_hex(&script_pub_key_hex) { - Ok(script) => match Address::from_script(&script, network.clone()) { - Ok(address) => OrdinalInscriptionTransferDestination::Transferred( - address.to_string(), - ), - Err(e) => { - ctx.try_log(|logger| { - info!( - logger, - "unable to retrieve address from {script_pub_key_hex}: {}", - e.to_string() - ) - }); - OrdinalInscriptionTransferDestination::Burnt(script.to_string()) - } - }, - Err(e) => { - ctx.try_log(|logger| { - info!( - logger, - "unable to retrieve address from {script_pub_key_hex}: {}", - e.to_string() - ) - }); - OrdinalInscriptionTransferDestination::Burnt( - script_pub_key_hex.to_string(), - ) - } - }; - - // At this point we know that inscriptions are being moved. - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} moved from {} to {} (block: {})", - watched_satpoint.inscription_id, - satpoint_pre_transfer, - outpoint, - block_identifier.index, - ) - }); - - ( - outpoint, - offset, - updated_address, - Some(tx.metadata.outputs[output_index].value), - ) - } - SatPosition::Fee(offset) => { - // Get Coinbase TX - let total_offset = coinbase_subsidy + *cumulated_fees + offset; - let outpoint = format_outpoint_to_watch(&coinbase_txid, 0); - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} spent in fees ({}+{}+{})", - watched_satpoint.inscription_id, - coinbase_subsidy, - cumulated_fees, - offset - ) - }); - ( - outpoint, - total_offset, - OrdinalInscriptionTransferDestination::SpentInFees, - None, - ) - } - }; - - let satpoint_post_transfer = - format!("{}:{}", outpoint_post_transfer, offset_post_transfer); - - let transfer_data = OrdinalInscriptionTransferData { - inscription_id: watched_satpoint.inscription_id.clone(), - destination, - tx_index, - satpoint_pre_transfer, - satpoint_post_transfer, - post_transfer_output_value, - }; - - transfers.push(transfer_data.clone()); - - // Attach transfer event - tx.metadata - .ordinal_operations - .push(OrdinalOperation::InscriptionTransferred(transfer_data)); - } - } - *cumulated_fees += tx.metadata.fee; - - transfers -} diff --git a/components/ordhook-core/src/core/protocol/mod.rs b/components/ordhook-core/src/core/protocol/mod.rs index 48a2322b..b3f79063 100644 --- a/components/ordhook-core/src/core/protocol/mod.rs +++ b/components/ordhook-core/src/core/protocol/mod.rs @@ -1,4 +1,4 @@ pub mod inscription_parsing; pub mod inscription_sequencing; -pub mod inscription_tracking; pub mod satoshi_numbering; +pub mod satoshi_tracking; diff --git a/components/ordhook-core/src/core/protocol/satoshi_numbering.rs b/components/ordhook-core/src/core/protocol/satoshi_numbering.rs index 33ce0593..4cf1ccf0 100644 --- a/components/ordhook-core/src/core/protocol/satoshi_numbering.rs +++ b/components/ordhook-core/src/core/protocol/satoshi_numbering.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use crate::db::{ find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop, BlockBytesCursor, - TransferData, }; use crate::db::{TransactionBytesCursor, TraversalResult}; @@ -19,28 +18,33 @@ pub fn compute_satoshi_number( block_identifier: &BlockIdentifier, transaction_identifier: &TransactionIdentifier, inscription_input_index: usize, + inscription_pointer: u64, traversals_cache: &Arc< DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault>, >, + ulimit: usize, + memory_available: usize, _back_tracking: bool, ctx: &Context, -) -> Result<(TraversalResult, Vec<(u32, [u8; 8])>), String> { - let mut inscription_offset_intra_output = 0; - let mut inscription_output_index: usize = 0; - let mut ordinal_offset = 0; - let mut ordinal_block_number = block_identifier.index as u32; +) -> Result<(TraversalResult, u64, Vec<(u32, [u8; 8], usize)>), String> { + let mut ordinal_offset = inscription_pointer; + let ordinal_block_number = block_identifier.index as u32; let txid = transaction_identifier.get_8_hash_bytes(); let mut back_track = vec![]; - let blocks_db = open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, &ctx); + let blocks_db = + open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, ulimit, memory_available, &ctx); - let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache + let (mut tx_cursor, mut ordinal_block_number) = match traversals_cache .get(&(block_identifier.index as u32, txid.clone())) { Some(entry) => { let tx = entry.value(); ( - tx.get_sat_ranges(), - tx.get_cumulated_sats_in_until_input_index(inscription_input_index), + ( + tx.inputs[inscription_input_index].txin.clone(), + tx.inputs[inscription_input_index].vout.into(), + ), + tx.inputs[inscription_input_index].block_height, ) } None => loop { @@ -53,12 +57,13 @@ pub fn compute_satoshi_number( let cursor = BlockBytesCursor::new(&block_bytes.as_ref()); match cursor.find_and_serialize_transaction_with_txid(&txid) { Some(tx) => { - let sats_ranges = tx.get_sat_ranges(); - let inscription_offset_cross_outputs = - tx.get_cumulated_sats_in_until_input_index(inscription_input_index); - traversals_cache.insert((ordinal_block_number, txid.clone()), tx); - back_track.push((ordinal_block_number, txid.clone())); - break (sats_ranges, inscription_offset_cross_outputs); + break ( + ( + tx.inputs[inscription_input_index].txin.clone(), + tx.inputs[inscription_input_index].vout.into(), + ), + tx.inputs[inscription_input_index].block_height, + ); } None => return Err(format!("txid not in block #{ordinal_block_number}")), } @@ -67,23 +72,6 @@ pub fn compute_satoshi_number( }, }; - for (i, (min, max)) in sats_ranges.into_iter().enumerate() { - if inscription_offset_cross_outputs >= min && inscription_offset_cross_outputs < max { - inscription_output_index = i; - inscription_offset_intra_output = inscription_offset_cross_outputs - min; - } - } - ctx.try_log(|logger| { - debug!( - logger, - "Start ordinal number retrieval for Satpoint {}:{}:0 (block #{})", - transaction_identifier.hash, - inscription_input_index, - block_identifier.index - ) - }); - - let mut tx_cursor: ([u8; 8], usize) = (txid, inscription_input_index); let mut hops: u32 = 0; loop { @@ -140,13 +128,8 @@ pub fn compute_satoshi_number( transfers: 0, inscription_input_index, transaction_identifier_inscription: transaction_identifier.clone(), - transfer_data: TransferData { - inscription_offset_intra_output, - transaction_identifier_location: transaction_identifier.clone(), - output_index: inscription_output_index, - tx_index: 0, - }, }, + inscription_pointer, back_track, )); } @@ -228,7 +211,8 @@ pub fn compute_satoshi_number( } } else { // isolate the target transaction - let lazy_tx = match block_cursor.find_and_serialize_transaction_with_txid(&txid) { + let tx_bytes_cursor = match block_cursor.find_and_serialize_transaction_with_txid(&txid) + { Some(entry) => entry, None => { ctx.try_log(|logger| { @@ -244,7 +228,7 @@ pub fn compute_satoshi_number( }; let mut sats_out = 0; - for (index, output_value) in lazy_tx.outputs.iter().enumerate() { + for (index, output_value) in tx_bytes_cursor.outputs.iter().enumerate() { if index == tx_cursor.1 { break; } @@ -253,12 +237,13 @@ pub fn compute_satoshi_number( sats_out += ordinal_offset; let mut sats_in = 0; - for input in lazy_tx.inputs.iter() { + for input in tx_bytes_cursor.inputs.iter() { sats_in += input.txin_value; if sats_out < sats_in { - back_track.push((ordinal_block_number, tx_cursor.0.clone())); - traversals_cache.insert((ordinal_block_number, tx_cursor.0), lazy_tx.clone()); + back_track.push((ordinal_block_number, tx_cursor.0.clone(), tx_cursor.1)); + traversals_cache + .insert((ordinal_block_number, tx_cursor.0), tx_bytes_cursor.clone()); ordinal_offset = sats_out - (sats_in - input.txin_value); ordinal_block_number = input.block_height; tx_cursor = (input.txin.clone(), input.vout as usize); @@ -281,13 +266,8 @@ pub fn compute_satoshi_number( transfers: 0, inscription_input_index, transaction_identifier_inscription: transaction_identifier.clone(), - transfer_data: TransferData { - inscription_offset_intra_output, - transaction_identifier_location: transaction_identifier.clone(), - output_index: inscription_output_index, - tx_index: 0, - }, }, + inscription_pointer, back_track, )); } @@ -295,7 +275,7 @@ pub fn compute_satoshi_number( } let height = Height(ordinal_block_number.into()); - let ordinal_number = height.starting_sat().0 + ordinal_offset + inscription_offset_intra_output; + let ordinal_number = height.starting_sat().0 + ordinal_offset; Ok(( TraversalResult { @@ -304,13 +284,8 @@ pub fn compute_satoshi_number( transfers: hops, inscription_input_index, transaction_identifier_inscription: transaction_identifier.clone(), - transfer_data: TransferData { - inscription_offset_intra_output, - transaction_identifier_location: transaction_identifier.clone(), - output_index: inscription_output_index, - tx_index: 0, - }, }, + inscription_pointer, back_track, )) } diff --git a/components/ordhook-core/src/core/protocol/satoshi_tracking.rs b/components/ordhook-core/src/core/protocol/satoshi_tracking.rs new file mode 100644 index 00000000..5116eb06 --- /dev/null +++ b/components/ordhook-core/src/core/protocol/satoshi_tracking.rs @@ -0,0 +1,236 @@ +use std::collections::HashSet; + +use chainhook_sdk::{ + bitcoincore_rpc_json::bitcoin::{Address, Network, ScriptBuf}, + types::{ + BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, OrdinalInscriptionTransferData, + OrdinalInscriptionTransferDestination, OrdinalOperation, TransactionIdentifier, + }, + utils::Context, +}; + +use crate::{ + core::{compute_next_satpoint_data, SatPosition}, + db::{ + find_inscribed_ordinals_at_wached_outpoint, format_outpoint_to_watch, + insert_ordinal_transfer_in_locations_tx, parse_satpoint_to_watch, OrdinalLocation, + }, + ord::height::Height, +}; +use rusqlite::Transaction; + +use super::inscription_sequencing::get_bitcoin_network; + +pub fn augment_block_with_ordinals_transfer_data( + block: &mut BitcoinBlockData, + inscriptions_db_tx: &Transaction, + update_db_tx: bool, + ctx: &Context, +) -> bool { + let mut any_event = false; + + let network = get_bitcoin_network(&block.metadata.network); + let coinbase_subsidy = Height(block.block_identifier.index).subsidy(); + let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); + let mut cumulated_fees = 0; + for (tx_index, tx) in block.transactions.iter_mut().enumerate() { + let transfers = augment_transaction_with_ordinals_transfers_data( + tx, + tx_index, + &network, + &coinbase_txid, + coinbase_subsidy, + &mut cumulated_fees, + inscriptions_db_tx, + ctx, + ); + any_event |= !transfers.is_empty(); + + if update_db_tx { + // Store transfers between each iteration + for transfer_data in transfers.into_iter() { + let (tx, output_index, offset) = + parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer); + let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index); + let data = OrdinalLocation { + offset, + block_height: block.block_identifier.index, + tx_index: transfer_data.tx_index, + }; + insert_ordinal_transfer_in_locations_tx( + transfer_data.ordinal_number, + &outpoint_to_watch, + data, + inscriptions_db_tx, + &ctx, + ); + } + } + } + + any_event +} + +pub fn compute_satpoint_post_transfer( + tx: &BitcoinTransactionData, + tx_index: usize, + input_index: usize, + relative_pointer_value: u64, + network: &Network, + coinbase_txid: &TransactionIdentifier, + coinbase_subsidy: u64, + cumulated_fees: &mut u64, + ctx: &Context, +) -> (OrdinalInscriptionTransferDestination, String, Option) { + let inputs: Vec = tx + .metadata + .inputs + .iter() + .map(|o| o.previous_output.value) + .collect::<_>(); + let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>(); + let post_transfer_data = compute_next_satpoint_data( + tx_index, + input_index, + &inputs, + &outputs, + relative_pointer_value, + Some(ctx), + ); + + let (outpoint_post_transfer, offset_post_transfer, destination, post_transfer_output_value) = + match post_transfer_data { + SatPosition::Output((output_index, offset)) => { + let outpoint = format_outpoint_to_watch(&tx.transaction_identifier, output_index); + let script_pub_key_hex = tx.metadata.outputs[output_index].get_script_pubkey_hex(); + let updated_address = match ScriptBuf::from_hex(&script_pub_key_hex) { + Ok(script) => match Address::from_script(&script, network.clone()) { + Ok(address) => { + OrdinalInscriptionTransferDestination::Transferred(address.to_string()) + } + Err(e) => { + ctx.try_log(|logger| { + info!( + logger, + "unable to retrieve address from {script_pub_key_hex}: {}", + e.to_string() + ) + }); + OrdinalInscriptionTransferDestination::Burnt(script.to_string()) + } + }, + Err(e) => { + ctx.try_log(|logger| { + info!( + logger, + "unable to retrieve address from {script_pub_key_hex}: {}", + e.to_string() + ) + }); + OrdinalInscriptionTransferDestination::Burnt(script_pub_key_hex.to_string()) + } + }; + + ( + outpoint, + offset, + updated_address, + Some(tx.metadata.outputs[output_index].value), + ) + } + SatPosition::Fee(offset) => { + // Get Coinbase TX + let total_offset = coinbase_subsidy + *cumulated_fees + offset; + let outpoint = format_outpoint_to_watch(&coinbase_txid, 0); + ( + outpoint, + total_offset, + OrdinalInscriptionTransferDestination::SpentInFees, + None, + ) + } + }; + let satpoint_post_transfer = format!("{}:{}", outpoint_post_transfer, offset_post_transfer); + + ( + destination, + satpoint_post_transfer, + post_transfer_output_value, + ) +} + +pub fn augment_transaction_with_ordinals_transfers_data( + tx: &mut BitcoinTransactionData, + tx_index: usize, + network: &Network, + coinbase_txid: &TransactionIdentifier, + coinbase_subsidy: u64, + cumulated_fees: &mut u64, + inscriptions_db_tx: &Transaction, + ctx: &Context, +) -> Vec { + let mut transfers = vec![]; + + // The transfers are inserted in storage after the inscriptions. + // We have a unicity constraing, and can only have 1 ordinals per satpoint. + let mut updated_sats = HashSet::new(); + for op in tx.metadata.ordinal_operations.iter() { + if let OrdinalOperation::InscriptionRevealed(data) = op { + updated_sats.insert(data.ordinal_number); + } + } + + for (input_index, input) in tx.metadata.inputs.iter().enumerate() { + let outpoint_pre_transfer = format_outpoint_to_watch( + &input.previous_output.txid, + input.previous_output.vout as usize, + ); + + let entries = find_inscribed_ordinals_at_wached_outpoint( + &outpoint_pre_transfer, + &inscriptions_db_tx, + ctx, + ); + // For each satpoint inscribed retrieved, we need to compute the next + // outpoint to watch + for watched_satpoint in entries.into_iter() { + if updated_sats.contains(&watched_satpoint.ordinal_number) { + continue; + } + let satpoint_pre_transfer = + format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset); + + let (destination, satpoint_post_transfer, post_transfer_output_value) = + compute_satpoint_post_transfer( + &&*tx, + tx_index, + input_index, + watched_satpoint.offset, + network, + coinbase_txid, + coinbase_subsidy, + cumulated_fees, + ctx, + ); + + let transfer_data = OrdinalInscriptionTransferData { + ordinal_number: watched_satpoint.ordinal_number, + destination, + tx_index, + satpoint_pre_transfer, + satpoint_post_transfer, + post_transfer_output_value, + }; + + transfers.push(transfer_data.clone()); + + // Attach transfer event + tx.metadata + .ordinal_operations + .push(OrdinalOperation::InscriptionTransferred(transfer_data)); + } + } + *cumulated_fees += tx.metadata.fee; + + transfers +} diff --git a/components/ordhook-core/src/db/mod.rs b/components/ordhook-core/src/db/mod.rs index bf7bbef4..dcd305cd 100644 --- a/components/ordhook-core/src/db/mod.rs +++ b/components/ordhook-core/src/db/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet, HashMap}, io::{Read, Write}, path::PathBuf, thread::sleep, @@ -63,7 +63,10 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection { block_height INTEGER NOT NULL, ordinal_number INTEGER NOT NULL, jubilee_inscription_number INTEGER NOT NULL, - classic_inscription_number INTEGER NOT NULL + classic_inscription_number INTEGER NOT NULL, + CONSTRAINT inscription_id_uniqueness UNIQUE (inscription_id), + CONSTRAINT jubilee_inscription_number_uniqueness UNIQUE (inscription_id), + CONSTRAINT classic_inscription_number_uniqueness UNIQUE (inscription_id) )", [], ) { @@ -79,36 +82,37 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection { "CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } if let Err(e) = conn.execute( "CREATE INDEX IF NOT EXISTS index_inscriptions_on_jubilee_inscription_number ON inscriptions(jubilee_inscription_number);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } if let Err(e) = conn.execute( "CREATE INDEX IF NOT EXISTS index_inscriptions_on_classic_inscription_number ON inscriptions(classic_inscription_number);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } if let Err(e) = conn.execute( "CREATE INDEX IF NOT EXISTS index_inscriptions_on_block_height ON inscriptions(block_height);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } } if let Err(e) = conn.execute( "CREATE TABLE IF NOT EXISTS locations ( - inscription_id TEXT NOT NULL, + ordinal_number INTEGER NOT NULL, block_height INTEGER NOT NULL, tx_index INTEGER NOT NULL, outpoint_to_watch TEXT NOT NULL, - offset INTEGER NOT NULL + offset INTEGER NOT NULL, + CONSTRAINT ordinal_number_outpoint_to_watch_offset_uniqueness UNIQUE (ordinal_number, outpoint_to_watch) )", [], ) { @@ -124,19 +128,19 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection { "CREATE INDEX IF NOT EXISTS locations_indexed_on_block_height ON locations(block_height);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } if let Err(e) = conn.execute( "CREATE INDEX IF NOT EXISTS locations_indexed_on_outpoint_to_watch ON locations(outpoint_to_watch);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } if let Err(e) = conn.execute( - "CREATE INDEX IF NOT EXISTS locations_indexed_on_inscription_id ON locations(inscription_id);", + "CREATE INDEX IF NOT EXISTS locations_indexed_on_ordinal_number ON locations(ordinal_number);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } } @@ -161,7 +165,7 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection { "CREATE INDEX IF NOT EXISTS sequence_metadata_indexed_on_block_height ON sequence_metadata(block_height);", [], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string())); } } @@ -248,7 +252,7 @@ fn get_default_ordhook_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf { destination_path } -fn rocks_db_default_options() -> rocksdb::Options { +fn rocks_db_default_options(ulimit: usize, memory_available: usize) -> rocksdb::Options { let mut opts = rocksdb::Options::default(); // Per rocksdb's documentation: // If cache_index_and_filter_blocks is false (which is default), @@ -262,8 +266,12 @@ fn rocks_db_default_options() -> rocksdb::Options { // opts.set_write_buffer_size(64 * 1024 * 1024); // opts.set_blob_file_size(1 * 1024 * 1024 * 1024); // opts.set_target_file_size_base(64 * 1024 * 1024); - opts.set_max_open_files(2048); + opts.set_max_open_files(ulimit as i32); opts.create_if_missing(true); + // opts.set_allow_mmap_reads(true); + + // set_arena_block_size + // opts.optimize_for_point_lookup(1 * 1024 * 1024 * 1024); // opts.set_level_zero_stop_writes_trigger(64); // opts.set_level_zero_slowdown_writes_trigger(20); @@ -279,10 +287,12 @@ fn rocks_db_default_options() -> rocksdb::Options { pub fn open_readonly_ordhook_db_conn_rocks_db( base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, _ctx: &Context, ) -> Result { let path = get_default_ordhook_db_file_path_rocks_db(&base_dir); - let mut opts = rocks_db_default_options(); + let mut opts = rocks_db_default_options(ulimit, memory_available); opts.set_disable_auto_compactions(true); opts.set_max_background_jobs(0); let db = DB::open_for_read_only(&opts, path, false) @@ -293,14 +303,16 @@ pub fn open_readonly_ordhook_db_conn_rocks_db( pub fn open_ordhook_db_conn_rocks_db_loop( readwrite: bool, base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, ctx: &Context, ) -> DB { let mut retries = 0; let blocks_db = loop { let res = if readwrite { - open_readwrite_ordhook_db_conn_rocks_db(&base_dir, &ctx) + open_readwrite_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx) } else { - open_readonly_ordhook_db_conn_rocks_db(&base_dir, &ctx) + open_readonly_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx) }; match res { Ok(db) => break db, @@ -323,19 +335,24 @@ pub fn open_ordhook_db_conn_rocks_db_loop( pub fn open_readwrite_ordhook_dbs( base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, ctx: &Context, ) -> Result<(DB, Connection), String> { - let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &base_dir, &ctx); + let blocks_db = + open_ordhook_db_conn_rocks_db_loop(true, &base_dir, ulimit, memory_available, &ctx); let inscriptions_db = open_readwrite_ordhook_db_conn(&base_dir, &ctx)?; Ok((blocks_db, inscriptions_db)) } fn open_readwrite_ordhook_db_conn_rocks_db( base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, _ctx: &Context, ) -> Result { let path = get_default_ordhook_db_file_path_rocks_db(&base_dir); - let opts = rocks_db_default_options(); + let opts = rocks_db_default_options(ulimit, memory_available); let db = DB::open(&opts, path) .map_err(|e| format!("unable to read-write hord.rocksdb: {}", e.to_string()))?; Ok(db) @@ -396,7 +413,6 @@ pub fn find_pinned_block_bytes_at_block_height<'a>( // read_options.set_verify_checksums(false); let mut backoff: f64 = 1.0; let mut rng = thread_rng(); - loop { match blocks_db.get_pinned(block_height.to_be_bytes()) { Ok(Some(res)) => return Some(res), @@ -500,30 +516,19 @@ pub fn insert_entry_in_inscriptions( "INSERT INTO inscriptions (inscription_id, ordinal_number, jubilee_inscription_number, classic_inscription_number, block_height, input_index) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![&inscription_data.inscription_id, &inscription_data.ordinal_number, &inscription_data.inscription_number.jubilee, &inscription_data.inscription_number.classic, &block_identifier.index, &inscription_data.inscription_input_index], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to insert inscription in hord.sqlite: {} - {:?}", e.to_string(), inscription_data)); std::thread::sleep(std::time::Duration::from_secs(1)); } } -pub fn insert_inscription_in_locations( - inscription_data: &OrdinalInscriptionRevealData, - block_identifier: &BlockIdentifier, - inscriptions_db_conn_rw: &Connection, - ctx: &Context, -) { - let (tx, output_index, offset) = - parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription); - let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index); - while let Err(e) = inscriptions_db_conn_rw.execute( - "INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)", - rusqlite::params![&inscription_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &inscription_data.tx_index], - ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); - std::thread::sleep(std::time::Duration::from_secs(1)); - } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct OrdinalLocation { + pub offset: u64, + pub block_height: u64, + pub tx_index: usize, } -pub fn update_inscriptions_with_block( +pub fn insert_entries_from_block_in_inscriptions( block: &BitcoinBlockData, inscriptions_db_conn_rw: &Connection, ctx: &Context, @@ -535,24 +540,75 @@ pub fn update_inscriptions_with_block( inscriptions_db_conn_rw, &ctx, ); - insert_inscription_in_locations( - &inscription_data, - &block.block_identifier, - &inscriptions_db_conn_rw, - ctx, - ); } } -pub fn update_locations_with_block( +pub fn update_ordinals_db_with_block( block: &BitcoinBlockData, inscriptions_db_conn_rw: &Connection, ctx: &Context, ) { - for transfer_data in get_inscriptions_transferred_in_block(&block).iter() { - insert_transfer_in_locations( - &transfer_data, + let mut locations_to_insert = HashMap::new(); + + for inscription_data in get_inscriptions_revealed_in_block(&block).iter() { + insert_entry_in_inscriptions( + inscription_data, &block.block_identifier, + inscriptions_db_conn_rw, + &ctx, + ); + let (tx, output_index, offset) = + parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription); + let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index); + let insertion_res = locations_to_insert.insert( + (inscription_data.ordinal_number, outpoint_to_watch), + OrdinalLocation { + offset, + block_height: block.block_identifier.index, + tx_index: inscription_data.tx_index, + }, + ); + if let Some(prev_location) = insertion_res { + ctx.try_log(|logger| { + warn!( + logger, + "Ignoring location insertion from inscriptions: {}, {:?}", + inscription_data.ordinal_number, + prev_location + ) + }); + } + } + + for transfer_data in get_inscriptions_transferred_in_block(&block).iter() { + let (tx, output_index, offset) = + parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer); + let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index); + let insertion_res = locations_to_insert.insert( + (transfer_data.ordinal_number, outpoint_to_watch), + OrdinalLocation { + offset, + block_height: block.block_identifier.index, + tx_index: transfer_data.tx_index, + }, + ); + if let Some(prev_location) = insertion_res { + ctx.try_log(|logger| { + warn!( + logger, + "Ignoring location insertion from transfers: {}, {:?}", + transfer_data.ordinal_number, + prev_location + ) + }); + } + } + + for ((ordinal_number, outpoint_to_watch), location_data) in locations_to_insert { + insert_ordinal_transfer_in_locations_tx( + ordinal_number, + &outpoint_to_watch, + location_data, &inscriptions_db_conn_rw, ctx, ); @@ -598,52 +654,25 @@ pub fn update_sequence_metadata_with_block( } } -pub fn insert_new_inscriptions_from_block_in_locations( - block: &BitcoinBlockData, +pub fn insert_ordinal_transfer_in_locations_tx( + ordinal_number: u64, + outpoint_to_watch: &str, + data: OrdinalLocation, inscriptions_db_conn_rw: &Connection, ctx: &Context, ) { - for inscription_data in get_inscriptions_revealed_in_block(&block).iter() { - insert_inscription_in_locations( - inscription_data, - &block.block_identifier, - inscriptions_db_conn_rw, - &ctx, - ); - } -} - -pub fn insert_transfer_in_locations_tx( - transfer_data: &OrdinalInscriptionTransferData, - block_identifier: &BlockIdentifier, - inscriptions_db_conn_rw: &Transaction, - ctx: &Context, -) { - let (tx, output_index, offset) = parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer); - let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index); + let mut retry = 0; while let Err(e) = inscriptions_db_conn_rw.execute( - "INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)", - rusqlite::params![&transfer_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &transfer_data.tx_index], - ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); - std::thread::sleep(std::time::Duration::from_secs(1)); - } -} - -pub fn insert_transfer_in_locations( - transfer_data: &OrdinalInscriptionTransferData, - block_identifier: &BlockIdentifier, - inscriptions_db_conn_rw: &Connection, - ctx: &Context, -) { - let (tx, output_index, offset) = parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer); - let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index); - while let Err(e) = inscriptions_db_conn_rw.execute( - "INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)", - rusqlite::params![&transfer_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &transfer_data.tx_index], + "INSERT INTO locations (ordinal_number, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![&ordinal_number, &outpoint_to_watch, data.offset, data.block_height, &data.tx_index], ) { + retry += 1; ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); std::thread::sleep(std::time::Duration::from_secs(1)); + if retry > 2 { + ctx.try_log(|logger| error!(logger, "unable to insert inscription in location in hord.sqlite: {}", e.to_string())); + return + } } } @@ -777,12 +806,12 @@ pub fn find_latest_inscription_block_height( } pub fn find_initial_inscription_transfer_data( - inscription_id: &str, + ordinal_number: &u64, db_conn: &Connection, ctx: &Context, ) -> Result, String> { - let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()]; - let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1"; + let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()]; + let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE ordinal_number = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1"; let entry = perform_query_one(query, args, db_conn, ctx, |row| { let outpoint_to_watch: String = row.get(0).unwrap(); let (transaction_identifier_location, output_index) = @@ -800,12 +829,12 @@ pub fn find_initial_inscription_transfer_data( } pub fn find_latest_inscription_transfer_data( - inscription_id: &str, + ordinal_number: &u64, db_conn: &Connection, ctx: &Context, ) -> Result, String> { - let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()]; - let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height DESC, tx_index DESC LIMIT 1"; + let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()]; + let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE ordinal_number = ? ORDER BY block_height DESC, tx_index DESC LIMIT 1"; let entry = perform_query_one(query, args, db_conn, ctx, |row| { let outpoint_to_watch: String = row.get(0).unwrap(); let (transaction_identifier_location, output_index) = @@ -844,11 +873,11 @@ pub fn find_all_transfers_in_block( block_height: &u64, db_conn: &Connection, ctx: &Context, -) -> BTreeMap> { +) -> BTreeMap> { let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()]; let mut stmt = loop { - match db_conn.prepare("SELECT inscription_id, offset, outpoint_to_watch, tx_index FROM locations WHERE block_height = ? ORDER BY tx_index ASC") + match db_conn.prepare("SELECT ordinal_number, offset, outpoint_to_watch, tx_index FROM locations WHERE block_height = ? ORDER BY tx_index ASC") { Ok(stmt) => break stmt, Err(e) => { @@ -860,7 +889,7 @@ pub fn find_all_transfers_in_block( } }; - let mut results: BTreeMap> = BTreeMap::new(); + let mut results: BTreeMap> = BTreeMap::new(); let mut rows = loop { match stmt.query(args) { Ok(rows) => break rows, @@ -875,7 +904,7 @@ pub fn find_all_transfers_in_block( loop { match rows.next() { Ok(Some(row)) => { - let inscription_id: String = row.get(0).unwrap(); + let ordinal_number: u64 = row.get(0).unwrap(); let inscription_offset_intra_output: u64 = row.get(1).unwrap(); let outpoint_to_watch: String = row.get(2).unwrap(); let tx_index: u64 = row.get(3).unwrap(); @@ -888,7 +917,7 @@ pub fn find_all_transfers_in_block( tx_index, }; results - .entry(inscription_id) + .entry(ordinal_number) .and_modify(|v| v.push(transfer.clone())) .or_insert(vec![transfer]); } @@ -1046,10 +1075,6 @@ pub fn find_inscription_with_id( db_conn: &Connection, ctx: &Context, ) -> Result, String> { - let Some(transfer_data) = find_initial_inscription_transfer_data(inscription_id, db_conn, ctx)? - else { - return Err(format!("unable to retrieve location for {inscription_id}")); - }; let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()]; let query = "SELECT classic_inscription_number, jubilee_inscription_number, ordinal_number, block_height, input_index FROM inscriptions WHERE inscription_id = ?"; let entry = perform_query_one(query, args, db_conn, ctx, move |row| { @@ -1069,27 +1094,30 @@ pub fn find_inscription_with_id( block_height, ) }); - Ok(entry.map( - |( + + let Some(( + inscription_number, + ordinal_number, + inscription_input_index, + transaction_identifier_inscription, + block_height, + )) = entry + else { + return Err(format!( + "unable to retrieve inscription for {inscription_id}" + )); + }; + + Ok(Some(( + TraversalResult { inscription_number, ordinal_number, inscription_input_index, transaction_identifier_inscription, - block_height, - )| { - ( - TraversalResult { - inscription_number, - ordinal_number, - inscription_input_index, - transaction_identifier_inscription, - transfers: 0, - transfer_data, - }, - block_height, - ) + transfers: 0, }, - )) + block_height, + ))) } pub fn find_all_inscriptions_in_block( @@ -1097,8 +1125,6 @@ pub fn find_all_inscriptions_in_block( inscriptions_db_tx: &Connection, ctx: &Context, ) -> BTreeMap { - let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_tx, ctx); - let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()]; let mut stmt = loop { @@ -1138,26 +1164,12 @@ pub fn find_all_inscriptions_in_block( let inscription_input_index: usize = row.get(4).unwrap(); let (transaction_identifier_inscription, _) = { parse_inscription_id(&inscription_id) }; - let Some(transfer_data) = transfers_data - .get(&inscription_id) - .and_then(|entries| entries.first()) - else { - ctx.try_log(|logger| { - error!( - logger, - "unable to retrieve inscription genesis transfer data: {}", - inscription_id, - ) - }); - continue; - }; let traversal = TraversalResult { inscription_number, ordinal_number, inscription_input_index, transfers: 0, transaction_identifier_inscription: transaction_identifier_inscription.clone(), - transfer_data: transfer_data.clone(), }; results.insert(inscription_id, traversal); } @@ -1173,45 +1185,24 @@ pub fn find_all_inscriptions_in_block( return results; } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Ord, PartialOrd, PartialEq, Eq)] pub struct WatchedSatpoint { - pub inscription_id: String, + pub ordinal_number: u64, pub offset: u64, } -pub fn find_watched_satpoint_for_inscription( - inscription_id: &str, - db_conn: &Connection, - ctx: &Context, -) -> Option<(u64, WatchedSatpoint)> { - let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()]; - let query = "SELECT inscription_id, offset, block_height FROM locations WHERE inscription_id = ? ORDER BY offset ASC"; - perform_query_one(query, args, db_conn, ctx, |row| { - let inscription_id: String = row.get(0).unwrap(); - let offset: u64 = row.get(1).unwrap(); - let block_height: u64 = row.get(2).unwrap(); - ( - block_height, - WatchedSatpoint { - inscription_id, - offset, - }, - ) - }) -} - -pub fn find_inscriptions_at_wached_outpoint( +pub fn find_inscribed_ordinals_at_wached_outpoint( outpoint: &str, db_conn: &Connection, ctx: &Context, ) -> Vec { let args: &[&dyn ToSql] = &[&outpoint.to_sql().unwrap()]; - let query = "SELECT inscription_id, offset FROM locations WHERE outpoint_to_watch = ? ORDER BY offset ASC"; + let query = "SELECT ordinal_number, offset FROM locations WHERE outpoint_to_watch = ? ORDER BY offset ASC"; perform_query_set(query, args, db_conn, ctx, |row| { - let inscription_id: String = row.get(0).unwrap(); + let ordinal_number: u64 = row.get(0).unwrap(); let offset: u64 = row.get(1).unwrap(); WatchedSatpoint { - inscription_id, + ordinal_number, offset, } }) @@ -1281,26 +1272,6 @@ pub fn remove_entries_from_locations_at_block_height( } } -pub fn insert_entry_in_locations( - inscription_id: &str, - block_height: u64, - transfer_data: &TransferData, - inscriptions_db_rw_conn: &Transaction, - ctx: &Context, -) { - let outpoint_to_watch = format_outpoint_to_watch( - &transfer_data.transaction_identifier_location, - transfer_data.output_index, - ); - while let Err(e) = inscriptions_db_rw_conn.execute( - "INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)", - rusqlite::params![&inscription_id, &outpoint_to_watch, &transfer_data.inscription_offset_intra_output, &block_height, &transfer_data.tx_index], - ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); - std::thread::sleep(std::time::Duration::from_secs(1)); - } -} - pub fn delete_data_in_ordhook_db( start_block: u64, end_block: u64, @@ -1337,7 +1308,6 @@ pub struct TraversalResult { pub transaction_identifier_inscription: TransactionIdentifier, pub ordinal_number: u64, pub transfers: u32, - pub transfer_data: TransferData, } impl TraversalResult { diff --git a/components/ordhook-core/src/ord/envelope.rs b/components/ordhook-core/src/ord/envelope.rs index a28f9d1e..e1bb452d 100644 --- a/components/ordhook-core/src/ord/envelope.rs +++ b/components/ordhook-core/src/ord/envelope.rs @@ -25,6 +25,7 @@ pub const PARENT_TAG: [u8; 1] = [3]; pub const METADATA_TAG: [u8; 1] = [5]; pub const METAPROTOCOL_TAG: [u8; 1] = [7]; pub const CONTENT_ENCODING_TAG: [u8; 1] = [9]; +pub const DELEGATE_TAG: [u8; 1] = [11]; type Result = std::result::Result; pub type RawEnvelope = Envelope>>; @@ -91,6 +92,7 @@ impl From for ParsedEnvelope { let content_encoding = remove_field(&mut fields, &CONTENT_ENCODING_TAG); let content_type = remove_field(&mut fields, &CONTENT_TYPE_TAG); + let delegate = remove_field(&mut fields, &DELEGATE_TAG); let metadata = remove_and_concatenate_field(&mut fields, &METADATA_TAG); let metaprotocol = remove_field(&mut fields, &METAPROTOCOL_TAG); let parent = remove_field(&mut fields, &PARENT_TAG); @@ -109,13 +111,14 @@ impl From for ParsedEnvelope { .cloned() .collect() }), + metaprotocol, + parent, + delegate, content_encoding, content_type, duplicate_field, incomplete_field, metadata, - metaprotocol, - parent, pointer, unrecognized_even_field, }, @@ -475,25 +478,6 @@ mod tests { ); } - #[test] - fn with_unknown_tag() { - assert_eq!( - parse(&[envelope(&[ - b"ord", - &[1], - b"text/plain;charset=utf-8", - &[11], - b"bar", - &[], - b"ord", - ])]), - vec![ParsedEnvelope { - payload: inscription("text/plain;charset=utf-8", "ord"), - ..Default::default() - }] - ); - } - #[test] fn no_body() { assert_eq!( @@ -809,17 +793,6 @@ mod tests { ); } - #[test] - fn unknown_odd_fields_are_ignored() { - assert_eq!( - parse(&[envelope(&[b"ord", &[11], &[0]])]), - vec![ParsedEnvelope { - payload: Inscription::default(), - ..Default::default() - }], - ); - } - #[test] fn unknown_even_fields() { assert_eq!( diff --git a/components/ordhook-core/src/ord/inscription.rs b/components/ordhook-core/src/ord/inscription.rs index 45fa5534..a3fbc72c 100644 --- a/components/ordhook-core/src/ord/inscription.rs +++ b/components/ordhook-core/src/ord/inscription.rs @@ -1,3 +1,5 @@ +use std::io::Cursor; + use chainhook_sdk::bitcoin::{hashes::Hash, Txid}; use super::{inscription_id::InscriptionId, media::Media}; @@ -26,6 +28,7 @@ pub struct Inscription { pub parent: Option>, pub pointer: Option>, pub unrecognized_even_field: bool, + pub delegate: Option>, } impl Inscription { @@ -159,6 +162,49 @@ impl Inscription { str::from_utf8(self.metaprotocol.as_ref()?).ok() } + fn inscription_id_field(field: &Option>) -> Option { + let value = field.as_ref()?; + + if value.len() < Txid::LEN { + return None; + } + + if value.len() > Txid::LEN + 4 { + return None; + } + + let (txid, index) = value.split_at(Txid::LEN); + + if let Some(last) = index.last() { + // Accept fixed length encoding with 4 bytes (with potential trailing zeroes) + // or variable length (no trailing zeroes) + if index.len() != 4 && *last == 0 { + return None; + } + } + + let txid = Txid::from_slice(txid).unwrap(); + + let index = [ + index.first().copied().unwrap_or(0), + index.get(1).copied().unwrap_or(0), + index.get(2).copied().unwrap_or(0), + index.get(3).copied().unwrap_or(0), + ]; + + let index = u32::from_le_bytes(index); + + Some(InscriptionId { txid, index }) + } + + pub(crate) fn delegate(&self) -> Option { + Self::inscription_id_field(&self.delegate) + } + + pub(crate) fn metadata(&self) -> Option { + ciborium::from_reader(Cursor::new(self.metadata.as_ref()?)).ok() + } + pub(crate) fn parent(&self) -> Option { use chainhook_sdk::bitcoin::hash_types::Txid as TXID_LEN; let value = self.parent.as_ref()?; diff --git a/components/ordhook-core/src/scan/bitcoin.rs b/components/ordhook-core/src/scan/bitcoin.rs index 0baa9fe8..fec68e2d 100644 --- a/components/ordhook-core/src/scan/bitcoin.rs +++ b/components/ordhook-core/src/scan/bitcoin.rs @@ -178,16 +178,25 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( ) } if block_heights_to_scan.is_empty() && floating_end_block { - match bitcoin_rpc.get_blockchain_info() { - Ok(result) => { - for entry in (current_block_height + 1)..=result.blocks { - block_heights_to_scan.push_back(entry); + let new_tip = match bitcoin_rpc.get_blockchain_info() { + Ok(result) => match predicate_spec.end_block { + Some(end_block) => { + if end_block > result.blocks { + result.blocks + } else { + end_block + } } - } + None => result.blocks, + }, Err(_e) => { continue; } }; + + for entry in (current_block_height + 1)..new_tip { + block_heights_to_scan.push_back(entry); + } } } info!( diff --git a/components/ordhook-core/src/service/mod.rs b/components/ordhook-core/src/service/mod.rs index b8621867..4673fac6 100644 --- a/components/ordhook-core/src/service/mod.rs +++ b/components/ordhook-core/src/service/mod.rs @@ -16,8 +16,8 @@ use crate::core::protocol::inscription_sequencing::SequenceCursor; use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db}; use crate::db::{ delete_data_in_ordhook_db, insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop, - open_readwrite_ordhook_db_conn, open_readwrite_ordhook_dbs, update_inscriptions_with_block, - update_locations_with_block, BlockBytesCursor, TransactionBytesCursor, + open_readwrite_ordhook_db_conn, open_readwrite_ordhook_dbs, update_ordinals_db_with_block, + BlockBytesCursor, TransactionBytesCursor, }; use crate::db::{ find_last_block_inserted, find_missing_blocks, run_compaction, @@ -33,7 +33,8 @@ use crate::service::observers::{ use crate::service::runloops::start_bitcoin_scan_runloop; use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload; use chainhook_sdk::chainhooks::types::{ - BitcoinChainhookSpecification, ChainhookFullSpecification, ChainhookSpecification, + BitcoinChainhookSpecification, ChainhookConfig, ChainhookFullSpecification, + ChainhookSpecification, }; use chainhook_sdk::observer::{ start_event_observer, BitcoinBlockDataCached, DataHandlerEvent, EventObserverConfig, @@ -63,17 +64,33 @@ impl Service { pub async fn run( &mut self, - predicates: Vec, + observer_specs: Vec, predicate_activity_relayer: Option< crossbeam_channel::Sender, >, check_blocks_integrity: bool, + stream_indexing_to_observers: bool, ) -> Result<(), String> { let mut event_observer_config = self.config.get_event_observer_config(); + let block_post_processor = if stream_indexing_to_observers && !observer_specs.is_empty() { + let mut chainhook_config: ChainhookConfig = ChainhookConfig::new(); + let specs = observer_specs.clone(); + for mut observer_spec in specs.into_iter() { + observer_spec.enabled = true; + let spec = ChainhookSpecification::Bitcoin(observer_spec); + chainhook_config.register_specification(spec)?; + } + event_observer_config.chainhook_config = Some(chainhook_config); + let block_tx = start_observer_forwarding(&event_observer_config, &self.ctx); + Some(block_tx) + } else { + None + }; + // Catch-up with chain tip let chain_tip_height = self - .catch_up_with_chain_tip(false, check_blocks_integrity) + .catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor) .await?; info!( self.ctx.expect_logger(), @@ -98,7 +115,7 @@ impl Service { // 2) catch-up outdated observers by dispatching replays let (chainhook_config, outdated_observers) = create_and_consolidate_chainhook_config_with_predicates( - predicates, + observer_specs, chain_tip_height, predicate_activity_relayer.is_some(), &self.config, @@ -413,9 +430,7 @@ impl Service { bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)), bitcoin_chain_event_notifier: Some(chain_event_notifier_tx), }; - let cache_l2 = Arc::new(new_traversals_lazy_cache( - self.config.limits.max_caching_memory_size_mb, - )); + let cache_l2 = Arc::new(new_traversals_lazy_cache(100_000)); let ctx = self.ctx.clone(); let config = self.config.clone(); @@ -448,6 +463,7 @@ impl Service { &mut self, rebuild_from_scratch: bool, compact_and_check_rocksdb_integrity: bool, + block_post_processor: Option>, ) -> Result { { if compact_and_check_rocksdb_integrity { @@ -455,6 +471,8 @@ impl Service { let blocks_db = open_ordhook_db_conn_rocks_db_loop( false, &self.config.expected_cache_path(), + self.config.resources.ulimit, + self.config.resources.memory_available, &self.ctx, ); let tip = find_last_block_inserted(&blocks_db); @@ -486,6 +504,8 @@ impl Service { let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( false, &self.config.expected_cache_path(), + self.config.resources.ulimit, + self.config.resources.memory_available, &self.ctx, ); info!(self.ctx.expect_logger(), "Running database compaction",); @@ -496,6 +516,8 @@ impl Service { let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( false, &self.config.expected_cache_path(), + self.config.resources.ulimit, + self.config.resources.memory_available, &self.ctx, ); @@ -511,7 +533,7 @@ impl Service { )?; } } - self.update_state(None).await + self.update_state(block_post_processor).await } pub async fn update_state( @@ -616,14 +638,18 @@ impl Service { } fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ctx: &Context) { - let (blocks_db_rw, inscriptions_db_conn_rw) = - match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) { - Ok(dbs) => dbs, - Err(e) => { - ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); - return; - } - }; + let (blocks_db_rw, inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs( + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ) { + Ok(dbs) => dbs, + Err(e) => { + ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); + return; + } + }; match command { HandleBlock::UndoBlock(block) => { @@ -672,9 +698,7 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct ); let _ = blocks_db_rw.flush(); - update_inscriptions_with_block(&block, &inscriptions_db_conn_rw, &ctx); - - update_locations_with_block(&block, &inscriptions_db_conn_rw, &ctx); + update_ordinals_db_with_block(&block, &inscriptions_db_conn_rw, ctx); update_sequence_metadata_with_block(&block, &inscriptions_db_conn_rw, &ctx); } @@ -725,14 +749,18 @@ pub fn chainhook_sidecar_mutate_blocks( ) { let mut updated_blocks_ids = vec![]; - let (blocks_db_rw, mut inscriptions_db_conn_rw) = - match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) { - Ok(dbs) => dbs, - Err(e) => { - ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); - return; - } - }; + let (blocks_db_rw, mut inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs( + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ) { + Ok(dbs) => dbs, + Err(e) => { + ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); + return; + } + }; let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap(); @@ -781,8 +809,7 @@ pub fn chainhook_sidecar_mutate_blocks( let _ = blocks_db_rw.flush(); if cache.processed_by_sidecar { - update_inscriptions_with_block(&cache.block, &inscriptions_db_tx, &ctx); - update_locations_with_block(&cache.block, &inscriptions_db_tx, &ctx); + update_ordinals_db_with_block(&cache.block, &inscriptions_db_tx, &ctx); update_sequence_metadata_with_block(&cache.block, &inscriptions_db_tx, &ctx); } else { updated_blocks_ids.push(format!("{}", cache.block.block_identifier.index)); diff --git a/components/ordhook-core/src/service/observers.rs b/components/ordhook-core/src/service/observers.rs index 0172cacc..e8ac421b 100644 --- a/components/ordhook-core/src/service/observers.rs +++ b/components/ordhook-core/src/service/observers.rs @@ -35,7 +35,13 @@ pub fn update_observer_progress( "UPDATE observers SET last_block_height_update = ? WHERE uuid = ?", rusqlite::params![last_block_height_update, uuid], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| { + warn!( + logger, + "unable to query observers.sqlite: {}", + e.to_string() + ) + }); std::thread::sleep(std::time::Duration::from_secs(1)); } } @@ -50,7 +56,13 @@ pub fn update_observer_streaming_enabled( "UPDATE observers SET streaming_enabled = ? WHERE uuid = ?", rusqlite::params![streaming_enabled, uuid], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| { + warn!( + logger, + "unable to query observers.sqlite: {}", + e.to_string() + ) + }); std::thread::sleep(std::time::Duration::from_secs(1)); } } @@ -66,7 +78,7 @@ pub fn insert_entry_in_observers( "INSERT INTO observers (uuid, spec, streaming_enabled, last_block_height_update) VALUES (?1, ?2, ?3, ?4)", rusqlite::params![&spec.uuid(), json!(spec).to_string(), report.streaming_enabled, report.last_block_height_update], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| warn!(logger, "unable to query observers.sqlite: {}", e.to_string())); std::thread::sleep(std::time::Duration::from_secs(1)); } } @@ -177,7 +189,13 @@ pub fn remove_entry_from_observers(uuid: &str, db_conn: &Connection, ctx: &Conte "DELETE FROM observers WHERE uuid = ?1", rusqlite::params![&uuid], ) { - ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + ctx.try_log(|logger| { + warn!( + logger, + "unable to query observers.sqlite: {}", + e.to_string() + ) + }); std::thread::sleep(std::time::Duration::from_secs(1)); } } diff --git a/components/ordhook-core/src/service/runloops.rs b/components/ordhook-core/src/service/runloops.rs index 2bea99a7..3ca9537c 100644 --- a/components/ordhook-core/src/service/runloops.rs +++ b/components/ordhook-core/src/service/runloops.rs @@ -21,7 +21,7 @@ pub fn start_bitcoin_scan_runloop( observer_command_tx: Sender, ctx: &Context, ) { - let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans); + let bitcoin_scan_pool = ThreadPool::new(config.resources.expected_observers_count); while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() { let moved_ctx = ctx.clone(); diff --git a/components/ordhook-sdk-js/src/ordinals_indexer.rs b/components/ordhook-sdk-js/src/ordinals_indexer.rs index e83611e4..5ec496bf 100644 --- a/components/ordhook-sdk-js/src/ordinals_indexer.rs +++ b/components/ordhook-sdk-js/src/ordinals_indexer.rs @@ -159,7 +159,7 @@ impl OrdinalsIndexingRunloop { match cmd { IndexerCommand::StreamBlocks => { // We start the service as soon as the start() method is being called. - let future = service.catch_up_with_chain_tip(false, true); + let future = service.catch_up_with_chain_tip(false, true, None); let _ = hiro_system_kit::nestable_block_on(future).expect("unable to start indexer"); let future = service.start_event_observer(observer_sidecar); let (command_tx, event_rx) = diff --git a/dockerfiles/components/ordhook.dockerfile b/dockerfiles/components/ordhook.dockerfile index 1e2b85b8..68a9bb9a 100644 --- a/dockerfiles/components/ordhook.dockerfile +++ b/dockerfiles/components/ordhook.dockerfile @@ -1,8 +1,12 @@ FROM rust:bullseye as build +ARG GIT_COMMIT='0000000' + +ENV GIT_COMMIT=${GIT_COMMIT} + WORKDIR /src -RUN apt-get update && apt-get install -y ca-certificates pkg-config libssl-dev libclang-11-dev curl gnupg +RUN apt-get update && apt-get install -y ca-certificates pkg-config libssl-dev libclang-11-dev libunwind-dev libunwind8 curl gnupg RUN rustup update 1.72.0 && rustup default 1.72.0 @@ -50,7 +54,7 @@ FROM debian:bullseye-slim WORKDIR /ordhook-sdk-js -RUN apt-get update && apt-get install -y ca-certificates libssl-dev +RUN apt-get update && apt-get install -y ca-certificates libssl-dev libclang-11-dev libunwind-dev libunwind8 sqlite3 # COPY --from=build /out/*.node /ordhook-sdk-js/