diff --git a/Cargo.lock b/Cargo.lock index 305926dac3..6b02ce3e2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9141,6 +9141,7 @@ dependencies = [ "starcoin-crypto", "starcoin-decrypt", "starcoin-logger", + "starcoin-schemadb", "starcoin-storage", "starcoin-types", "tempfile", @@ -10561,6 +10562,23 @@ dependencies = [ "vm-status-translator", ] +[[package]] +name = "starcoin-schemadb" +version = "0.1.0" +dependencies = [ + "anyhow", + "byteorder", + "coarsetime", + "num_enum", + "once_cell", + "parking_lot 0.12.1", + "rocksdb", + "starcoin-config", + "starcoin-metrics", + "starcoin-temppath", + "thiserror", +] + [[package]] name = "starcoin-service-registry" version = "1.13.11" @@ -10694,6 +10712,7 @@ dependencies = [ "starcoin-crypto", "starcoin-logger", "starcoin-metrics", + "starcoin-schemadb", "starcoin-state-store-api", "starcoin-types", "starcoin-uint", @@ -10812,6 +10831,14 @@ dependencies = [ "systemstat", ] +[[package]] +name = "starcoin-temppath" +version = "0.1.0" +dependencies = [ + "hex", + "rand 0.8.5", +] + [[package]] name = "starcoin-time-service" version = "1.13.11" diff --git a/Cargo.toml b/Cargo.toml index dabb326153..ce2f00405a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "commons/forkable-jellyfish-merkle", "commons/time-service", "commons/infallible", + "commons/starcoin-temppath", "types", "types/uint", "genesis", @@ -28,6 +29,7 @@ members = [ "state/service", "config", "storage", + "storage/schemadb", "consensus", "consensus/cryptonight-rs", "testsuite", @@ -131,6 +133,7 @@ default-members = [ "commons/accumulator", "commons/forkable-jellyfish-merkle", "commons/infallible", + "commons/starcoin-temppath", "types", "types/uint", "genesis", @@ -141,6 +144,7 @@ default-members = [ "state/service", "config", "storage", + "storage/schemadb", "consensus", "consensus/cryptonight-rs", "testsuite", @@ -475,6 +479,7 @@ starcoin-state-store-api = { path = "state/state-store-api" } starcoin-state-tree = { path = "state/state-tree" } starcoin-statedb = { path = "state/statedb" } starcoin-storage = { path = "storage" } +starcoin-schemadb = { path = "storage/schemadb" } starcoin-stratum = { path = "stratum" } starcoin-sync = { path = "sync" } starcoin-sync-api = { path = "sync/api" } @@ -489,6 +494,7 @@ starcoin-types = { path = "types" } starcoin-uint = { path = "types/uint" } starcoin-vm-runtime = { path = "vm/vm-runtime" } starcoin-vm-types = { path = "vm/types" } +starcoin-temppath = { path = "commons/starcoin-temppath" } stdlib = { path = "vm/stdlib" } stest = { path = "commons/stest" } stest-macro = { path = "commons/stest/stest-macro" } diff --git a/account/Cargo.toml b/account/Cargo.toml index 76c29e1c5a..e4d9f8a9ea 100644 --- a/account/Cargo.toml +++ b/account/Cargo.toml @@ -15,6 +15,7 @@ starcoin-decrypt = { workspace = true } starcoin-logger = { workspace = true } starcoin-storage = { workspace = true } starcoin-types = { workspace = true } +starcoin-schemadb = { workspace = true } [dev-dependencies] hex = { workspace = true } diff --git a/account/src/account.rs b/account/src/account.rs index 780177b7b6..cdf1fe0db4 100644 --- a/account/src/account.rs +++ b/account/src/account.rs @@ -4,19 +4,19 @@ use crate::account_manager::gen_private_key; use crate::account_storage::AccountStorage; use anyhow::{format_err, Result}; -use starcoin_account_api::error::AccountError; use starcoin_account_api::{ - AccountInfo, AccountPrivateKey, AccountPublicKey, AccountResult, Setting, + error::AccountError, AccountInfo, AccountPrivateKey, AccountPublicKey, AccountResult, Setting, }; use starcoin_crypto::PrivateKey; use starcoin_logger::prelude::*; -use starcoin_storage::storage::StorageInstance; -use starcoin_types::account_address; -use starcoin_types::account_address::AccountAddress; -use starcoin_types::genesis_config::ChainId; -use starcoin_types::sign_message::{SignedMessage, SigningMessage}; -use starcoin_types::transaction::authenticator::AuthenticationKey; -use starcoin_types::transaction::{RawUserTransaction, SignedUserTransaction}; +use starcoin_types::{ + account_address, + account_address::AccountAddress, + genesis_config::ChainId, + sign_message::{SignedMessage, SigningMessage}, + transaction::authenticator::AuthenticationKey, + transaction::{RawUserTransaction, SignedUserTransaction}, +}; pub struct Account { addr: AccountAddress, @@ -172,7 +172,7 @@ impl Account { let private_key = gen_private_key(); let public_key = private_key.public_key(); let address = account_address::from_public_key(&public_key); - let storage = AccountStorage::new(StorageInstance::new_cache_instance()); + let storage = AccountStorage::mock(); Self::create(address, private_key.into(), "".to_string(), storage).map_err(|e| e.into()) } } diff --git a/account/src/account_manager.rs b/account/src/account_manager.rs index 7833c71e0d..24c819c88f 100644 --- a/account/src/account_manager.rs +++ b/account/src/account_manager.rs @@ -148,6 +148,7 @@ impl AccountManager { None => Account::create_readonly(address, public_key, self.store.clone())?, }; + // todo: merge following single writes to one schema-batch-write: add_address + set_default self.store.add_address(*account.address())?; // if it's the first address, set it default. @@ -195,7 +196,8 @@ impl AccountManager { "Can not find account_info by address:{}, clear it from address list.", account ); - self.store.remove_address(account)?; + // todo: merge single writes to one batch-write + self.store.remove_address_from_all(account)?; } } Ok(res) @@ -254,6 +256,7 @@ impl AccountManager { } } + // todo: merge single writes to one batch-write pub fn set_default_account(&self, address: AccountAddress) -> AccountResult { let mut account_info = self .account_info(address)? diff --git a/account/src/account_schemadb/accepted_token.rs b/account/src/account_schemadb/accepted_token.rs new file mode 100644 index 0000000000..9ffc5f2249 --- /dev/null +++ b/account/src/account_schemadb/accepted_token.rs @@ -0,0 +1,42 @@ +use anyhow::Result; +use bcs_ext::BCSCodec; +use serde::{Deserialize, Serialize}; +use starcoin_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, + ColumnFamilyName, +}; +use starcoin_types::account_address::AccountAddress; +use starcoin_types::account_config::token_code::TokenCode; + +pub const ACCEPTED_TOKEN_PREFIX_NAME: ColumnFamilyName = "accepted_token"; + +define_schema!( + AcceptedToken, + AccountAddress, + AcceptedTokens, + ACCEPTED_TOKEN_PREFIX_NAME +); + +impl KeyCodec for AccountAddress { + fn encode_key(&self) -> Result> { + Ok(self.to_vec()) + } + + fn decode_key(data: &[u8]) -> Result { + AccountAddress::try_from(data).map_err(Into::into) + } +} + +#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct AcceptedTokens(pub Vec); + +impl ValueCodec for AcceptedTokens { + fn encode_value(&self) -> Result> { + self.0.encode() + } + + fn decode_value(data: &[u8]) -> Result { + >::decode(data).map(AcceptedTokens) + } +} diff --git a/account/src/account_schemadb/global_setting.rs b/account/src/account_schemadb/global_setting.rs new file mode 100644 index 0000000000..04b8d8c813 --- /dev/null +++ b/account/src/account_schemadb/global_setting.rs @@ -0,0 +1,50 @@ +use anyhow::Result; +use bcs_ext::BCSCodec; +use serde::{Deserialize, Serialize}; +use starcoin_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, + ColumnFamilyName, +}; +use starcoin_types::account_address::AccountAddress; + +pub const GLOBAL_PREFIX_NAME: ColumnFamilyName = "global"; + +define_schema!( + GlobalSetting, + GlobalSettingKey, + GlobalValue, + GLOBAL_PREFIX_NAME +); + +#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub enum GlobalSettingKey { + DefaultAddress, + /// FIXME: once db support iter, remove this. + AllAddresses, +} + +#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct GlobalValue { + pub(crate) addresses: Vec, +} + +impl KeyCodec for GlobalSettingKey { + fn encode_key(&self) -> Result> { + self.encode() + } + + fn decode_key(data: &[u8]) -> Result { + GlobalSettingKey::decode(data) + } +} + +impl ValueCodec for GlobalValue { + fn encode_value(&self) -> Result> { + self.addresses.encode() + } + + fn decode_value(data: &[u8]) -> Result { + >::decode(data).map(|addresses| GlobalValue { addresses }) + } +} diff --git a/account/src/account_schemadb/mod.rs b/account/src/account_schemadb/mod.rs new file mode 100644 index 0000000000..a48b0006bb --- /dev/null +++ b/account/src/account_schemadb/mod.rs @@ -0,0 +1,78 @@ +use anyhow::Result; +use starcoin_schemadb::schema::Schema; +use starcoin_storage::cache_storage::GCacheStorage; +use std::sync::Arc; + +mod accepted_token; +mod global_setting; +mod private_key; +mod public_key; +mod setting; + +pub(crate) use accepted_token::*; +pub(crate) use global_setting::*; +pub(crate) use private_key::*; +pub(crate) use public_key::*; +pub(crate) use setting::*; +use starcoin_schemadb::{db::DBStorage as DB, SchemaBatch}; + +#[derive(Clone)] +pub(super) struct AccountStore { + cache: Arc>, + db: Option>, +} + +impl AccountStore { + // create an memory-based store + pub fn new() -> Self { + Self { + cache: Arc::new(GCacheStorage::::new(None)), + db: None, + } + } + pub fn new_with_db(db: &Arc) -> Self { + Self { + cache: Arc::new(GCacheStorage::::new(None)), + db: Some(Arc::clone(db)), + } + } + + pub fn get(&self, key: &S::Key) -> Result> { + self.cache + .get_inner(key) + .map(|val| Ok(Some(val))) + .unwrap_or_else(|| { + self.db + .as_ref() + .map_or_else(|| Ok(None), |db| db.get::(key)) + }) + } + + pub fn put(&self, key: S::Key, value: S::Value) -> Result<()> { + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.put::(&key, &value)) + .map(|_| { + self.cache.put_inner(key, value); + }) + } + + pub fn remove(&self, key: &S::Key) -> Result<()> { + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.remove::(key)) + .map(|_| { + self.cache.remove_inner(key); + }) + } + + pub fn put_batch(&self, key: S::Key, value: S::Value, batch: &SchemaBatch) -> Result<()> { + batch.put::(&key, &value)?; + self.put(key, value) + } + + pub fn remove_batch(&self, key: &S::Key, batch: &SchemaBatch) -> Result<()> { + batch.delete::(key)?; + self.remove(key) + } +} diff --git a/account/src/account_schemadb/private_key.rs b/account/src/account_schemadb/private_key.rs new file mode 100644 index 0000000000..95fdc2c3d2 --- /dev/null +++ b/account/src/account_schemadb/private_key.rs @@ -0,0 +1,44 @@ +use anyhow::Result; +use starcoin_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, + ColumnFamilyName, +}; +use starcoin_types::account_address::AccountAddress; + +pub const ENCRYPTED_PRIVATE_KEY_PREFIX_NAME: ColumnFamilyName = "encrypted_private_key"; + +define_schema!( + PrivateKey, + AccountAddress, + EncryptedPrivateKey, + ENCRYPTED_PRIVATE_KEY_PREFIX_NAME +); + +impl KeyCodec for AccountAddress { + fn encode_key(&self) -> Result> { + Ok(self.to_vec()) + } + + fn decode_key(data: &[u8]) -> Result { + AccountAddress::try_from(data).map_err(Into::into) + } +} + +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct EncryptedPrivateKey(pub Vec); +impl From> for EncryptedPrivateKey { + fn from(s: Vec) -> Self { + Self(s) + } +} + +impl ValueCodec for EncryptedPrivateKey { + fn encode_value(&self) -> Result> { + Ok(self.0.clone()) + } + + fn decode_value(data: &[u8]) -> Result { + Ok(EncryptedPrivateKey(data.to_vec())) + } +} diff --git a/account/src/account_schemadb/public_key.rs b/account/src/account_schemadb/public_key.rs new file mode 100644 index 0000000000..e8ec0d1275 --- /dev/null +++ b/account/src/account_schemadb/public_key.rs @@ -0,0 +1,38 @@ +use anyhow::Result; +use bcs_ext::BCSCodec; +use starcoin_account_api::AccountPublicKey; +use starcoin_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, + ColumnFamilyName, +}; +use starcoin_types::account_address::AccountAddress; + +pub const PUBLIC_KEY_PREFIX_NAME: ColumnFamilyName = "public_key"; + +define_schema!( + PublicKey, + AccountAddress, + AccountPublicKey, + PUBLIC_KEY_PREFIX_NAME +); + +impl KeyCodec for AccountAddress { + fn encode_key(&self) -> Result> { + Ok(self.to_vec()) + } + + fn decode_key(data: &[u8]) -> Result { + AccountAddress::try_from(data).map_err(Into::into) + } +} + +impl ValueCodec for AccountPublicKey { + fn encode_value(&self) -> Result> { + self.encode() + } + + fn decode_value(data: &[u8]) -> Result { + bcs_ext::from_bytes::(data) + } +} diff --git a/account/src/account_schemadb/setting.rs b/account/src/account_schemadb/setting.rs new file mode 100644 index 0000000000..e5005974df --- /dev/null +++ b/account/src/account_schemadb/setting.rs @@ -0,0 +1,32 @@ +use anyhow::Result; +use starcoin_account_api::Setting; +use starcoin_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, + ColumnFamilyName, +}; +use starcoin_types::account_address::AccountAddress; + +pub const SETTING_PREFIX_NAME: ColumnFamilyName = "account_settings"; + +define_schema!(AccountSetting, AccountAddress, Setting, SETTING_PREFIX_NAME); + +impl KeyCodec for AccountAddress { + fn encode_key(&self) -> Result> { + Ok(self.to_vec()) + } + + fn decode_key(data: &[u8]) -> Result { + AccountAddress::try_from(data).map_err(Into::into) + } +} +/// Setting use json encode/decode for support more setting field in the future. +impl ValueCodec for Setting { + fn encode_value(&self) -> Result> { + serde_json::to_vec(&self).map_err(Into::into) + } + + fn decode_value(data: &[u8]) -> Result { + serde_json::from_slice::(data).map_err(Into::into) + } +} diff --git a/account/src/account_storage.rs b/account/src/account_storage.rs index da72f1360e..3fcc394078 100644 --- a/account/src/account_storage.rs +++ b/account/src/account_storage.rs @@ -1,196 +1,35 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 +use crate::account_schemadb::{ + AcceptedToken, AcceptedTokens, AccountSetting, AccountStore, EncryptedPrivateKey, + GlobalSetting, GlobalSettingKey, GlobalValue, PrivateKey, PublicKey, + ACCEPTED_TOKEN_PREFIX_NAME, ENCRYPTED_PRIVATE_KEY_PREFIX_NAME, GLOBAL_PREFIX_NAME, + PUBLIC_KEY_PREFIX_NAME, SETTING_PREFIX_NAME, +}; use anyhow::{Error, Result}; -use bcs_ext::BCSCodec; -use serde::Deserialize; -use serde::Serialize; use starcoin_account_api::{AccountPrivateKey, AccountPublicKey, Setting}; use starcoin_config::RocksdbConfig; use starcoin_crypto::ValidCryptoMaterial; use starcoin_decrypt::{decrypt, encrypt}; -use starcoin_storage::cache_storage::CacheStorage; -use starcoin_storage::db_storage::DBStorage; -use starcoin_storage::storage::{KeyCodec, ValueCodec}; -use starcoin_storage::{ - define_storage, - storage::{CodecKVStore, ColumnFamilyName, StorageInstance}, -}; -use starcoin_types::account_address::AccountAddress; -use starcoin_types::account_config::token_code::TokenCode; -use std::convert::TryFrom; -use std::path::Path; - -pub const SETTING_PREFIX_NAME: ColumnFamilyName = "account_settings"; -pub const ENCRYPTED_PRIVATE_KEY_PREFIX_NAME: ColumnFamilyName = "encrypted_private_key"; -pub const PUBLIC_KEY_PREFIX_NAME: ColumnFamilyName = "public_key"; -pub const ACCEPTED_TOKEN_PREFIX_NAME: ColumnFamilyName = "accepted_token"; -pub const GLOBAL_PREFIX_NAME: ColumnFamilyName = "global"; - -define_storage!( - AccountSettingStore, - AccountAddressWrapper, - SettingWrapper, - SETTING_PREFIX_NAME -); - -define_storage!( - PrivateKeyStore, - AccountAddressWrapper, - EncryptedPrivateKey, - ENCRYPTED_PRIVATE_KEY_PREFIX_NAME -); -define_storage!( - PublicKeyStore, - AccountAddressWrapper, - PublicKeyWrapper, - PUBLIC_KEY_PREFIX_NAME -); - -define_storage!( - GlobalSettingStore, - GlobalSettingKey, - GlobalValue, - GLOBAL_PREFIX_NAME -); - -define_storage!( - AcceptedTokenStore, - AccountAddressWrapper, - AcceptedTokens, - ACCEPTED_TOKEN_PREFIX_NAME -); - -#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] -pub struct AcceptedTokens(pub Vec); - -impl ValueCodec for AcceptedTokens { - fn encode_value(&self) -> Result, Error> { - self.0.encode() - } - - fn decode_value(data: &[u8]) -> Result { - >::decode(data).map(AcceptedTokens) - } -} - -#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] -pub enum GlobalSettingKey { - DefaultAddress, - /// FIXME: once db support iter, remove this. - AllAddresses, -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct GlobalValue { - addresses: Vec, -} - -impl KeyCodec for GlobalSettingKey { - fn encode_key(&self) -> Result, Error> { - self.encode() - } - - fn decode_key(data: &[u8]) -> Result { - GlobalSettingKey::decode(data) - } -} - -impl ValueCodec for GlobalValue { - fn encode_value(&self) -> Result, Error> { - self.addresses.encode() - } - - fn decode_value(data: &[u8]) -> Result { - >::decode(data).map(|addresses| GlobalValue { addresses }) - } -} - -#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Clone, Copy)] -pub struct AccountAddressWrapper(AccountAddress); -impl From for AccountAddressWrapper { - fn from(addr: AccountAddress) -> Self { - Self(addr) - } -} -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SettingWrapper(Setting); -impl From for SettingWrapper { - fn from(setting: Setting) -> Self { - Self(setting) - } -} - -impl KeyCodec for AccountAddressWrapper { - fn encode_key(&self) -> Result, Error> { - Ok(self.0.to_vec()) - } - - fn decode_key(data: &[u8]) -> Result { - AccountAddress::try_from(data) - .map(AccountAddressWrapper) - .map_err(anyhow::Error::new) - } -} -/// Setting use json encode/decode for support more setting field in the future. -impl ValueCodec for SettingWrapper { - fn encode_value(&self) -> Result, Error> { - Ok(serde_json::to_vec(&self.0)?) - } - - fn decode_value(data: &[u8]) -> Result { - Ok(SettingWrapper(serde_json::from_slice(data)?)) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct EncryptedPrivateKey(pub Vec); -impl From> for EncryptedPrivateKey { - fn from(s: Vec) -> Self { - Self(s) - } -} - -impl ValueCodec for EncryptedPrivateKey { - fn encode_value(&self) -> Result, Error> { - Ok(self.0.clone()) - } - - fn decode_value(data: &[u8]) -> Result { - Ok(EncryptedPrivateKey(data.to_vec())) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PublicKeyWrapper(AccountPublicKey); -impl From for PublicKeyWrapper { - fn from(s: AccountPublicKey) -> Self { - Self(s) - } -} - -impl ValueCodec for PublicKeyWrapper { - fn encode_value(&self) -> Result, Error> { - bcs_ext::to_bytes(&self.0) - } - - fn decode_value(data: &[u8]) -> Result { - Ok(Self(bcs_ext::from_bytes::(data)?)) - } -} +use starcoin_schemadb::{db::DBStorage as DB, SchemaBatch}; +use starcoin_types::{account_address::AccountAddress, account_config::token_code::TokenCode}; +use std::{convert::TryFrom, path::Path, sync::Arc}; #[derive(Clone)] pub struct AccountStorage { - setting_store: AccountSettingStore, - private_key_store: PrivateKeyStore, - public_key_store: PublicKeyStore, - global_value_store: GlobalSettingStore, - accepted_token_store: AcceptedTokenStore, + db: Option>, + setting_store: AccountStore, + private_key_store: AccountStore, + public_key_store: AccountStore, + global_value_store: AccountStore, + accepted_token_store: AccountStore, } impl AccountStorage { pub fn create_from_path(p: impl AsRef, rocksdb_config: RocksdbConfig) -> Result { - let db = DBStorage::open_with_cfs( + let db = DB::open_with_cfs( + "accountdb", p, vec![ SETTING_PREFIX_NAME, @@ -203,103 +42,141 @@ impl AccountStorage { rocksdb_config, None, )?; - let storage_instance = - StorageInstance::new_cache_and_db_instance(CacheStorage::default(), db); - Ok(Self::new(storage_instance)) + Ok(Self::new(Arc::new(db))) } - pub fn new(store: StorageInstance) -> Self { + pub fn new(db: Arc) -> Self { Self { - setting_store: AccountSettingStore::new(store.clone()), - private_key_store: PrivateKeyStore::new(store.clone()), - public_key_store: PublicKeyStore::new(store.clone()), - accepted_token_store: AcceptedTokenStore::new(store.clone()), - global_value_store: GlobalSettingStore::new(store), + db: Some(Arc::clone(&db)), + setting_store: AccountStore::::new_with_db(&db), + private_key_store: AccountStore::::new_with_db(&db), + public_key_store: AccountStore::::new_with_db(&db), + accepted_token_store: AccountStore::::new_with_db(&db), + global_value_store: AccountStore::::new_with_db(&db), } } pub fn mock() -> Self { - let storage_instance = StorageInstance::new_cache_instance(); - Self::new(storage_instance) + Self { + db: None, + setting_store: AccountStore::::new(), + private_key_store: AccountStore::::new(), + public_key_store: AccountStore::::new(), + accepted_token_store: AccountStore::::new(), + global_value_store: AccountStore::::new(), + } } } impl AccountStorage { pub fn default_address(&self) -> Result> { - let value = self - .global_value_store - .get(GlobalSettingKey::DefaultAddress)?; + let value = self.get_addresses(&GlobalSettingKey::DefaultAddress)?; Ok(value.and_then(|mut v| v.addresses.pop())) } /// Update or remove default address settings pub fn set_default_address(&self, address: Option) -> Result<()> { + let key = GlobalSettingKey::DefaultAddress; match address { - Some(addr) => self.global_value_store.put( - GlobalSettingKey::DefaultAddress, - GlobalValue { + Some(addr) => { + let val = GlobalValue { addresses: vec![addr], - }, - ), - None => self - .global_value_store - .remove(GlobalSettingKey::DefaultAddress), + }; + self.put_addresses(key, val) + } + None => self.remove_address(&key), } } pub fn contain_address(&self, address: AccountAddress) -> Result { - self.public_key_store - .get(address.into()) - .map(|w| w.is_some()) + match self.get_public_key(&address)? { + Some(v) => { + let _ = Into::::into(v); + Ok(true) + } + None => Ok(false), + } + } + + fn get_addresses(&self, global_setting_key: &GlobalSettingKey) -> Result> { + self.global_value_store + .get(global_setting_key)? + .map(|v| Ok(Some(v))) + .unwrap_or_else(|| { + if global_setting_key != &GlobalSettingKey::AllAddresses { + self.global_value_store.get(&GlobalSettingKey::AllAddresses) + } else { + Ok(None) + } + }) + } + + fn put_addresses(&self, key: GlobalSettingKey, value: GlobalValue) -> Result<()> { + self.global_value_store.put(key, value) + } + + fn remove_address(&self, key: &GlobalSettingKey) -> Result<()> { + self.global_value_store.remove(key) } /// FIXME: once storage support iter, we can remove this. pub fn add_address(&self, address: AccountAddress) -> Result<()> { - let value = self - .global_value_store - .get(GlobalSettingKey::AllAddresses)?; + let value = self.get_addresses(&GlobalSettingKey::AllAddresses)?; let mut addrs = value.map(|v| v.addresses).unwrap_or_default(); if !addrs.contains(&address) { addrs.push(address); } - self.global_value_store.put( + self.put_addresses( GlobalSettingKey::AllAddresses, GlobalValue { addresses: addrs }, ) } - pub fn remove_address(&self, address: AccountAddress) -> Result<()> { - let value = self - .global_value_store - .get(GlobalSettingKey::AllAddresses)?; + pub fn remove_address_from_all(&self, address: AccountAddress) -> Result<()> { + let value = self.get_addresses(&GlobalSettingKey::AllAddresses)?; let mut addrs = value.map(|v| v.addresses).unwrap_or_default(); addrs.retain(|a| a != &address); - self.global_value_store.put( + self.put_addresses( GlobalSettingKey::AllAddresses, GlobalValue { addresses: addrs }, ) } pub fn list_addresses(&self) -> Result> { - let value = self - .global_value_store - .get(GlobalSettingKey::AllAddresses)?; + let value = self.get_addresses(&GlobalSettingKey::AllAddresses)?; Ok(value.map(|v| v.addresses).unwrap_or_default()) } + fn get_public_key(&self, address: &AccountAddress) -> Result> { + self.public_key_store.get(address) + } + + fn put_public_key(&self, key: AccountAddress, value: AccountPublicKey) -> Result<()> { + self.public_key_store.put(key, value) + } + pub fn public_key(&self, address: AccountAddress) -> Result> { - self.public_key_store - .get(address.into()) - .map(|w| w.map(|p| p.0)) + self.get_public_key(&address) } + fn get_private_key(&self, address: &AccountAddress) -> Result> { + self.private_key_store.get(address) + } + + //fn put_private_key(&self, key: AccountAddress, value: EncryptedPrivateKey) -> Result<()> { + // let key: AccountAddressWrapper = key.into(); + // self.db + // .put::(&key, &value) + // .and_then(|_| self.private_key_store.put(key, value)) + //} + pub fn decrypt_private_key( &self, address: AccountAddress, password: impl AsRef, ) -> Result> { - match self.private_key_store.get(address.into())? { + match self.get_private_key(&address)? { None => Ok(None), Some(encrypted_key) => { let plain_key_data = decrypt(password.as_ref().as_bytes(), &encrypted_key.0)?; @@ -314,9 +191,7 @@ impl AccountStorage { address: AccountAddress, public_key: AccountPublicKey, ) -> Result<()> { - self.public_key_store - .put(address.into(), public_key.into())?; - Ok(()) + self.put_public_key(address, public_key) } /// Update private and public key @@ -326,59 +201,92 @@ impl AccountStorage { private_key: &AccountPrivateKey, password: impl AsRef, ) -> Result<()> { + let batch = SchemaBatch::default(); let encrypted_prikey = encrypt(password.as_ref().as_bytes(), &private_key.to_bytes()); self.private_key_store - .put(address.into(), encrypted_prikey.into())?; + .put_batch(address, encrypted_prikey.into(), &batch)?; let public_key = private_key.public_key(); - self.update_public_key(address, public_key)?; + self.public_key_store + .put_batch(address, public_key, &batch)?; + self.write_schemas(batch)?; Ok(()) } + fn put_setting(&self, address: AccountAddress, setting: Setting) -> Result<()> { + self.setting_store.put(address, setting) + } + pub fn update_setting(&self, address: AccountAddress, setting: Setting) -> Result<()> { - self.setting_store.put(address.into(), setting.into()) + self.put_setting(address, setting) } pub fn load_setting(&self, address: AccountAddress) -> Result { - Ok(self - .setting_store - .get(address.into())? - .map(|setting| setting.0) - .unwrap_or_else(Setting::default)) + Ok(self.setting_store.get(&address)?.unwrap_or_default()) } pub fn destroy_account(&self, address: AccountAddress) -> Result<()> { + let batch = SchemaBatch::default(); + if self.default_address()?.filter(|a| a == &address).is_some() { - self.set_default_address(None)?; + // clean up default address + // self.set_default_address(None)?; + self.global_value_store + .remove_batch(&GlobalSettingKey::DefaultAddress, &batch)?; } - self.remove_address(address)?; - self.private_key_store.remove(address.into())?; - self.public_key_store.remove(address.into())?; - self.setting_store.remove(address.into())?; - self.accepted_token_store.remove(address.into())?; + + //self.remove_address_from_all(address)?; + { + if let Some(GlobalValue { + addresses: mut addrs, + }) = self.get_addresses(&GlobalSettingKey::AllAddresses)? + { + addrs.retain(|a| a != &address); + self.global_value_store.put_batch( + GlobalSettingKey::AllAddresses, + GlobalValue { addresses: addrs }, + &batch, + )?; + } + } + + self.private_key_store.remove_batch(&address, &batch)?; + self.public_key_store.remove_batch(&address, &batch)?; + self.setting_store.remove_batch(&address, &batch)?; + self.accepted_token_store.remove_batch(&address, &batch)?; + + // persist updates to underlying storage + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.write_schemas(batch))?; Ok(()) } pub fn get_accepted_tokens(&self, address: AccountAddress) -> Result> { - let ts = self.accepted_token_store.get(address.into())?; + let ts = self.accepted_token_store.get(&address)?; Ok(ts.map(|t| t.0).unwrap_or_default()) } + fn put_accepted_tokens(&self, key: AccountAddress, value: AcceptedTokens) -> Result<()> { + self.accepted_token_store.put(key, value) + } + pub fn add_accepted_token( &self, address: AccountAddress, token_code: TokenCode, ) -> Result<(), Error> { - let mut tokens = self - .accepted_token_store - .get(address.into())? - .map(|l| l.0) - .unwrap_or_default(); + let mut tokens = self.get_accepted_tokens(address)?; if !tokens.contains(&token_code) { tokens.push(token_code); - self.accepted_token_store - .put(address.into(), AcceptedTokens(tokens))?; + self.put_accepted_tokens(address, AcceptedTokens(tokens))?; } Ok(()) } + + pub fn write_schemas(&self, batch: SchemaBatch) -> Result<()> { + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.write_schemas(batch)) + } } diff --git a/account/src/lib.rs b/account/src/lib.rs index c661195255..6d02a3fd3c 100644 --- a/account/src/lib.rs +++ b/account/src/lib.rs @@ -8,5 +8,6 @@ pub use account::Account; pub use account_manager::AccountManager; pub mod account_storage; +mod account_schemadb; #[cfg(test)] mod account_test; diff --git a/cmd/db-exporter/src/main.rs b/cmd/db-exporter/src/main.rs index 1c4b68c484..a982ba9d47 100644 --- a/cmd/db-exporter/src/main.rs +++ b/cmd/db-exporter/src/main.rs @@ -30,7 +30,7 @@ use starcoin_storage::{ block::FailedBlock, block_info::BlockInfoStore, cache_storage::CacheStorage, - db_storage::DBStorage, + db_storage::{ClassicIter, DBStorage}, storage::{ColumnFamilyName, InnerStore, StorageInstance, ValueCodec}, BlockStore, Storage, StorageVersion, Store, BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, BLOCK_HEADER_PREFIX_NAME, BLOCK_INFO_PREFIX_NAME, BLOCK_PREFIX_NAME, FAILED_BLOCK_PREFIX_NAME, @@ -85,6 +85,7 @@ pub fn export( schema: DbSchema, ) -> anyhow::Result<()> { let db_storage = DBStorage::open_with_cfs( + "starcoindb", db, StorageVersion::current_version() .get_column_family_names() @@ -93,7 +94,7 @@ pub fn export( Default::default(), None, )?; - let mut iter = db_storage.iter::, Vec>(schema.to_string().as_str())?; + let mut iter = db_storage.iter_raw::, Vec>(schema.to_string().as_str())?; iter.seek_to_first(); let key_codec = schema.get_key_codec(); let value_codec = schema.get_value_codec(); @@ -595,6 +596,7 @@ async fn main() -> anyhow::Result<()> { } Cmd::Checkkey(option) => { let db = DBStorage::open_with_cfs( + "starcoindb", option.db_path.display().to_string().as_str(), StorageVersion::current_version() .get_column_family_names() @@ -604,7 +606,7 @@ async fn main() -> anyhow::Result<()> { None, )?; - let result = db.get(option.cf_name.as_str(), option.block_hash.to_vec())?; + let result = db.get_raw(option.cf_name.as_str(), option.block_hash.to_vec())?; if result.is_some() { println!("{} block_hash {} exist", option.cf_name, option.block_hash); } else { @@ -754,6 +756,7 @@ pub fn export_block_range( ) -> anyhow::Result<()> { let net = ChainNetwork::new_builtin(network); let db_storage = DBStorage::open_with_cfs( + "starcoindb", from_dir.join("starcoindb/db/starcoindb"), StorageVersion::current_version() .get_column_family_names() @@ -1411,6 +1414,7 @@ pub fn export_snapshot( let start_time = SystemTime::now(); let net = ChainNetwork::new_builtin(network); let db_storage = DBStorage::open_with_cfs( + "starcoindb", from_dir.join("starcoindb/db/starcoindb"), StorageVersion::current_version() .get_column_family_names() @@ -1959,6 +1963,7 @@ pub fn export_resource( fields: &[String], ) -> anyhow::Result<()> { let db_storage = DBStorage::open_with_cfs( + "starcoindb", db, StorageVersion::current_version() .get_column_family_names() diff --git a/cmd/resource-exporter/src/main.rs b/cmd/resource-exporter/src/main.rs index c9a08012ce..7036f50885 100644 --- a/cmd/resource-exporter/src/main.rs +++ b/cmd/resource-exporter/src/main.rs @@ -33,6 +33,7 @@ pub fn export( fields: &[String], ) -> anyhow::Result<()> { let db_storage = DBStorage::open_with_cfs( + "starcoindb", db, StorageVersion::current_version() .get_column_family_names() diff --git a/commons/starcoin-temppath/Cargo.toml b/commons/starcoin-temppath/Cargo.toml new file mode 100644 index 0000000000..471d4050cb --- /dev/null +++ b/commons/starcoin-temppath/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "starcoin-temppath" +description = "Temporary path utilities" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +hex = { workspace = true } +rand = { workspace = true } + + diff --git a/commons/starcoin-temppath/src/lib.rs b/commons/starcoin-temppath/src/lib.rs new file mode 100644 index 0000000000..2bc76a556e --- /dev/null +++ b/commons/starcoin-temppath/src/lib.rs @@ -0,0 +1,84 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +#![forbid(unsafe_code)] + +use rand::RngCore; +use std::{ + fs, io, + path::{Path, PathBuf}, +}; + +/// A simple wrapper for creating a temporary directory that is automatically deleted when it's +/// dropped. Used in lieu of tempfile due to the large number of dependencies. +#[derive(Debug, PartialEq, Eq)] +pub struct TempPath { + path_buf: PathBuf, + persist: bool, +} + +impl Drop for TempPath { + fn drop(&mut self) { + if !self.persist { + fs::remove_dir_all(&self.path_buf) + .or_else(|_| fs::remove_file(&self.path_buf)) + .unwrap_or(()); + } + } +} + +impl TempPath { + /// Create new, uninitialized temporary path in the system temp directory. + pub fn new() -> Self { + Self::new_with_temp_dir(std::env::temp_dir()) + } + + /// Create new, uninitialized temporary path in the specified directory. + pub fn new_with_temp_dir(temp_dir: PathBuf) -> Self { + let mut temppath = temp_dir; + let mut rng = rand::thread_rng(); + let mut bytes = [0_u8; 16]; + rng.fill_bytes(&mut bytes); + temppath.push(hex::encode(bytes)); + + TempPath { + path_buf: temppath, + persist: false, + } + } + + /// Return the underlying path to this temporary directory. + pub fn path(&self) -> &Path { + &self.path_buf + } + + /// Keep the temp path + pub fn persist(&mut self) { + self.persist = true; + } + + pub fn create_as_file(&self) -> io::Result<()> { + let mut builder = fs::OpenOptions::new(); + builder.write(true).create_new(true); + builder.open(self.path())?; + Ok(()) + } + + pub fn create_as_dir(&self) -> io::Result<()> { + let builder = fs::DirBuilder::new(); + builder.create(self.path())?; + Ok(()) + } +} + +impl std::convert::AsRef for TempPath { + fn as_ref(&self) -> &Path { + self.path() + } +} + +impl Default for TempPath { + fn default() -> Self { + Self::new() + } +} diff --git a/node/src/lib.rs b/node/src/lib.rs index 3c52be3b13..7ef96c6004 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -186,7 +186,7 @@ impl NodeHandle { let receiver = bus.oneshot::().await?; bus.broadcast(GenerateBlockEvent::new_break(true))?; let block = if let Ok(Ok(event)) = - async_std::future::timeout(Duration::from_secs(5), receiver).await + async_std::future::timeout(Duration::from_secs(30), receiver).await { //wait for new block event to been processed. Delay::new(Duration::from_millis(100)).await; diff --git a/storage/Cargo.toml b/storage/Cargo.toml index b7ece6ca02..ce97d05f14 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -24,6 +24,7 @@ starcoin-config = { workspace = true } starcoin-metrics = { workspace = true } starcoin-uint = { workspace = true } starcoin-vm-types = { workspace = true } +starcoin-schemadb = { workspace = true } [dev-dependencies] proptest = { workspace = true } diff --git a/storage/schemadb/Cargo.toml b/storage/schemadb/Cargo.toml new file mode 100644 index 0000000000..b2c953888b --- /dev/null +++ b/storage/schemadb/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "starcoin-schemadb" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +thiserror = { workspace = true } +starcoin-config = { workspace = true } +starcoin-metrics = { workspace = true } +parking_lot = { workspace = true } +anyhow = { workspace = true } +coarsetime = { workspace = true } +num_enum = { workspace = true } +once_cell = { workspace = true } + +[dependencies.rocksdb] +default-features = false +features = ["lz4"] +workspace = true + +[dev-dependencies] +byteorder = { workspace = true } +starcoin-temppath = { workspace = true } diff --git a/storage/schemadb/src/db/mod.rs b/storage/schemadb/src/db/mod.rs new file mode 100644 index 0000000000..386b100fdd --- /dev/null +++ b/storage/schemadb/src/db/mod.rs @@ -0,0 +1,452 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +mod version; + +use crate::iterator::ScanDirection; +use crate::{ + error::StorageInitError, + metrics::StorageMetrics, + schema::{KeyCodec, Schema, ValueCodec}, + ColumnFamilyName, SchemaBatch, SchemaIterator, WriteOp, +}; +use anyhow::{ensure, format_err, Error, Result}; +use rocksdb::{ + DBIterator, IteratorMode, Options, ReadOptions, WriteBatch as DBWriteBatch, WriteOptions, DB, +}; +use starcoin_config::{check_open_fds_limit, RocksdbConfig}; +use std::collections::HashSet; +use std::path::Path; +pub use version::*; + +const RES_FDS: u64 = 4096; + +#[allow(clippy::upper_case_acronyms)] +pub struct DBStorage { + name: String, // for logging + db: DB, + cfs: Vec, + metrics: Option, +} +impl DBStorage { + pub fn db(&self) -> &DB { + &self.db + } + + pub fn metrics(&self) -> Option<&StorageMetrics> { + self.metrics.as_ref() + } + + pub fn new + Clone>( + db_root_path: P, + rocksdb_config: RocksdbConfig, + metrics: Option, + ) -> Result { + //TODO find a compat way to remove the `starcoindb` path + let path = db_root_path.as_ref().join("starcoindb"); + Self::open_with_cfs( + "default", + path, + StorageVersion::current_version() + .get_column_family_names() + .to_vec(), + false, + rocksdb_config, + metrics, + ) + } + + pub fn open_with_cfs

( + name: &str, + root_path: P, + column_families: Vec, + readonly: bool, + rocksdb_config: RocksdbConfig, + metrics: Option, + ) -> Result + where + P: AsRef, + { + Self::open_with_cfs_as_secondary( + name, + root_path, + None, + column_families, + readonly, + rocksdb_config, + metrics, + ) + } + + fn open_with_cfs_as_secondary

( + name: &str, + primary_path: P, + secondary_path: Option

, + column_families: Vec, + readonly: bool, + rocksdb_config: RocksdbConfig, + metrics: Option, + ) -> Result + where + P: AsRef, + { + let path = primary_path.as_ref(); + + let cfs_set: HashSet<_> = column_families.iter().collect(); + { + ensure!( + cfs_set.len() == column_families.len(), + "Duplicate column family name found.", + ); + } + if Self::db_exists(path) { + let cf_vec = Self::list_cf(path)?; + let mut db_cfs_set: HashSet<_> = cf_vec.iter().collect(); + db_cfs_set.remove(&DEFAULT_PREFIX_NAME.to_string()); + ensure!( + db_cfs_set.len() <= cfs_set.len(), + StorageInitError::StorageCheckError(format_err!( + "ColumnFamily in db ({:?}) not same as ColumnFamily in code {:?}.", + column_families, + cf_vec + )) + ); + let mut remove_cf_vec = Vec::new(); + db_cfs_set.iter().for_each(|k| { + if !cfs_set.contains(&k.as_str()) { + remove_cf_vec.push(<&String>::clone(k)); + } + }); + ensure!( + remove_cf_vec.is_empty(), + StorageInitError::StorageCheckError(format_err!( + "Can not remove ColumnFamily, ColumnFamily in db ({:?}) not in code {:?}.", + remove_cf_vec, + cf_vec + )) + ); + } + + let mut rocksdb_opts = Self::gen_rocksdb_options(&rocksdb_config); + + let db = if readonly { + if let Some(secondary_path) = secondary_path { + Self::open_inner( + &rocksdb_opts, + path, + Some(secondary_path.as_ref()), + column_families.clone(), + )? + } else { + Self::open_readonly_inner(&rocksdb_opts, path, column_families.clone())? + } + } else { + rocksdb_opts.create_if_missing(true); + rocksdb_opts.create_missing_column_families(true); + Self::open_inner(&rocksdb_opts, path, None, column_families.clone())? + }; + check_open_fds_limit(rocksdb_config.max_open_files as u64 + RES_FDS)?; + Ok(DBStorage { + name: name.to_string(), + db, + cfs: column_families, + metrics, + }) + } + + fn open_inner

( + opts: &Options, + primary_path: P, + secondary_path: Option

, + column_families: Vec, + ) -> Result + where + P: AsRef, + { + let cfs = column_families.iter().map(|cf_name| { + let mut cf_opts = Options::default(); + cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + /* + cf_opts.set_compression_per_level(&[ + rocksdb::DBCompressionType::None, + rocksdb::DBCompressionType::None, + rocksdb::DBCompressionType::Lz4, + rocksdb::DBCompressionType::Lz4, + rocksdb::DBCompressionType::Lz4, + rocksdb::DBCompressionType::Lz4, + rocksdb::DBCompressionType::Lz4, + ]); + */ + rocksdb::ColumnFamilyDescriptor::new((*cf_name).to_string(), cf_opts) + }); + + let inner = if let Some(secondary_path) = secondary_path { + DB::open_cf_descriptors_as_secondary(opts, primary_path, secondary_path, cfs) + } else { + DB::open_cf_descriptors(opts, primary_path, cfs) + }; + + Ok(inner?) + } + + fn open_readonly_inner( + db_opts: &Options, + path: impl AsRef, + column_families: Vec, + ) -> Result { + let error_if_log_file_exists = false; + let inner = + DB::open_cf_for_read_only(db_opts, path, column_families, error_if_log_file_exists)?; + Ok(inner) + } + + pub fn drop_cf(&mut self) -> Result<(), Error> { + for cf in self.cfs.clone() { + self.db.drop_cf(cf)?; + } + Ok(()) + } + + pub fn drop_unused_cfs(&mut self, names: Vec<&str>) -> Result<(), Error> { + // https://github.com/facebook/rocksdb/issues/1295 + for name in names { + for cf in &self.cfs { + if cf == &name { + self.db.drop_cf(name)?; + let opt = Options::default(); + self.db.create_cf(name, &opt)?; + break; + } + } + } + Ok(()) + } + + /// Flushes all memtable data. This is only used for testing `get_approximate_sizes_cf` in unit + /// tests. + pub fn flush_all(&self) -> Result<()> { + for cf_name in &self.cfs { + self.flush_cf(cf_name)? + } + Ok(()) + } + + pub fn flush_cf(&self, cf_name: &str) -> Result<()> { + let cf_handle = self.get_cf_handle(cf_name)?; + Ok(self.db.flush_cf(cf_handle)?) + } + + pub fn write_batch_inner(&self, prefix_name: &str, rows: &[WriteOp], sync: bool) -> Result<()> { + let mut db_batch = DBWriteBatch::default(); + let cf_handle = self.get_cf_handle(prefix_name)?; + for write_op in rows { + match write_op { + WriteOp::Value(key, value) => db_batch.put_cf(cf_handle, key, value), + WriteOp::Deletion(key) => db_batch.delete_cf(cf_handle, key), + }; + } + + let write_opts = if sync { + Self::sync_write_options() + } else { + Self::default_write_options() + }; + + self.db.write_opt(db_batch, &write_opts)?; + Ok(()) + } + + /// List cf + pub fn list_cf(path: impl AsRef) -> Result, Error> { + Ok(DB::list_cf(&Options::default(), path)?) + } + + fn db_exists(path: &Path) -> bool { + let rocksdb_current_file = path.join("CURRENT"); + rocksdb_current_file.is_file() + } + + pub fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::ColumnFamily> { + self.db.cf_handle(cf_name).ok_or_else(|| { + format_err!( + "DB::cf_handle not found for column family name: {}", + cf_name + ) + }) + } + + pub fn default_write_options() -> WriteOptions { + let mut opts = WriteOptions::new(); + opts.set_sync(false); + opts + } + + fn gen_rocksdb_options(config: &RocksdbConfig) -> Options { + let mut db_opts = Options::default(); + db_opts.set_max_open_files(config.max_open_files); + db_opts.set_max_total_wal_size(config.max_total_wal_size); + db_opts.set_wal_bytes_per_sync(config.wal_bytes_per_sync); + db_opts.set_bytes_per_sync(config.bytes_per_sync); + // db_opts.enable_statistics(); + // write buffer size + db_opts.set_max_write_buffer_number(5); + db_opts.set_max_background_jobs(5); + // todo: configure parallelism for backend rocksdb + //if config.parallelism > 1 { + // db_opts.increase_parallelism(config.parallelism as i32); + //} + // cache + // let cache = Cache::new_lru_cache(2 * 1024 * 1024 * 1024); + // db_opts.set_row_cache(&cache.unwrap()); + db_opts + } + + pub fn raw_iterator_cf_opt( + &self, + prefix_name: &str, + mode: IteratorMode, + readopts: ReadOptions, + ) -> Result { + let cf_handle = self.get_cf_handle(prefix_name)?; + Ok(self.db.iterator_cf_opt(cf_handle, readopts, mode)) + } + + pub fn sync_write_options() -> WriteOptions { + let mut opts = WriteOptions::new(); + opts.set_sync(true); + opts + } +} + +// The new Apis +impl DBStorage { + pub fn open( + name: &str, + root_path: impl AsRef, + column_families: Vec, + rocksdb_config: RocksdbConfig, + metrics: Option, + ) -> Result { + Self::open_with_cfs( + name, + root_path, + column_families, + false, + rocksdb_config, + metrics, + ) + } + + pub fn open_readonly( + name: &str, + root_path: impl AsRef, + column_families: Vec, + rocksdb_config: RocksdbConfig, + metrics: Option, + ) -> Result { + Self::open_with_cfs( + name, + root_path, + column_families, + true, + rocksdb_config, + metrics, + ) + } + + pub fn open_cf_as_secondary

( + name: &str, + primary_path: P, + secondary_path: P, + column_families: Vec, + rocksdb_config: RocksdbConfig, + metrics: Option, + ) -> Result + where + P: AsRef, + { + DBStorage::open_with_cfs_as_secondary( + name, + primary_path, + Some(secondary_path), + column_families, + true, + rocksdb_config, + metrics, + ) + } + + pub fn write_schemas(&self, batch: SchemaBatch) -> Result<()> { + let rows_locked = batch.rows.lock(); + + for row in rows_locked.iter() { + self.write_batch_inner(row.0, row.1, false /*normal write*/)? + } + + Ok(()) + } + + pub fn get(&self, key: &S::Key) -> Result> { + let raw_key = >::encode_key(key)?; + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY)?; + self.db + .get_pinned_cf(cf_handle, raw_key) + .map_err(Into::into) + .and_then(|raw_value| { + raw_value + .map(|v| >::decode_value(&v)) + .transpose() + }) + } + + pub fn put(&self, key: &S::Key, value: &S::Value) -> Result<()> { + let raw_key = >::encode_key(key)?; + let raw_value = >::encode_value(value)?; + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY)?; + + self.db.put_cf(cf_handle, raw_key, raw_value)?; + + Ok(()) + } + + pub fn remove(&self, key: &S::Key) -> Result<()> { + let raw_key = >::encode_key(key)?; + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY)?; + + self.db.delete_cf(cf_handle, raw_key)?; + Ok(()) + } + + pub fn iter(&self, opts: ReadOptions) -> Result> { + self.iter_with_direction(opts, ScanDirection::Forward) + } + + pub fn rev_iter(&self, opts: ReadOptions) -> Result> { + self.iter_with_direction(opts, ScanDirection::Backward) + } + + fn iter_with_direction( + &self, + opts: ReadOptions, + direction: ScanDirection, + ) -> Result> { + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY)?; + Ok(SchemaIterator::new( + self.db.raw_iterator_cf_opt(cf_handle, opts), + direction, + )) + } + + pub fn get_property(&self, cf_name: &str, property_name: &str) -> Result { + self.db + .property_int_value_cf(self.get_cf_handle(cf_name)?, property_name)? + .ok_or_else(|| { + format_err!( + "Unable to get property \"{}\" of column family \"{}\" in db \"{}\".", + property_name, + cf_name, + self.name, + ) + }) + } +} diff --git a/storage/schemadb/src/db/version.rs b/storage/schemadb/src/db/version.rs new file mode 100644 index 0000000000..349db46a9c --- /dev/null +++ b/storage/schemadb/src/db/version.rs @@ -0,0 +1,112 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +use crate::ColumnFamilyName; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use once_cell::sync::Lazy; + +pub const DEFAULT_PREFIX_NAME: ColumnFamilyName = "default"; +pub const BLOCK_ACCUMULATOR_NODE_PREFIX_NAME: ColumnFamilyName = "acc_node_block"; +pub const TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME: ColumnFamilyName = "acc_node_transaction"; +pub const BLOCK_PREFIX_NAME: ColumnFamilyName = "block"; +pub const BLOCK_HEADER_PREFIX_NAME: ColumnFamilyName = "block_header"; +pub const BLOCK_BODY_PREFIX_NAME: ColumnFamilyName = "block_body"; +pub const BLOCK_INFO_PREFIX_NAME: ColumnFamilyName = "block_info"; +pub const BLOCK_TRANSACTIONS_PREFIX_NAME: ColumnFamilyName = "block_txns"; +pub const BLOCK_TRANSACTION_INFOS_PREFIX_NAME: ColumnFamilyName = "block_txn_infos"; +pub const STATE_NODE_PREFIX_NAME: ColumnFamilyName = "state_node"; +pub const STATE_NODE_PREFIX_NAME_PREV: ColumnFamilyName = "state_node_prev"; +pub const CHAIN_INFO_PREFIX_NAME: ColumnFamilyName = "chain_info"; +pub const TRANSACTION_PREFIX_NAME: ColumnFamilyName = "transaction"; +pub const TRANSACTION_INFO_PREFIX_NAME: ColumnFamilyName = "transaction_info"; +pub const TRANSACTION_INFO_PREFIX_NAME_V2: ColumnFamilyName = "transaction_info_v2"; +pub const TRANSACTION_INFO_HASH_PREFIX_NAME: ColumnFamilyName = "transaction_info_hash"; +pub const CONTRACT_EVENT_PREFIX_NAME: ColumnFamilyName = "contract_event"; +pub const FAILED_BLOCK_PREFIX_NAME: ColumnFamilyName = "failed_block"; +pub const TABLE_INFO_PREFIX_NAME: ColumnFamilyName = "table_info"; + +///db storage use prefix_name vec to init +/// Please note that adding a prefix needs to be added in vec simultaneously, remember!! +static VEC_PREFIX_NAME_V1: Lazy> = Lazy::new(|| { + vec![ + BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, + TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, + BLOCK_PREFIX_NAME, + BLOCK_HEADER_PREFIX_NAME, + BLOCK_BODY_PREFIX_NAME, + BLOCK_INFO_PREFIX_NAME, + BLOCK_TRANSACTIONS_PREFIX_NAME, + BLOCK_TRANSACTION_INFOS_PREFIX_NAME, + STATE_NODE_PREFIX_NAME, + CHAIN_INFO_PREFIX_NAME, + TRANSACTION_PREFIX_NAME, + TRANSACTION_INFO_PREFIX_NAME, + TRANSACTION_INFO_HASH_PREFIX_NAME, + CONTRACT_EVENT_PREFIX_NAME, + FAILED_BLOCK_PREFIX_NAME, + ] +}); + +static VEC_PREFIX_NAME_V2: Lazy> = Lazy::new(|| { + vec![ + BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, + TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, + BLOCK_PREFIX_NAME, + BLOCK_HEADER_PREFIX_NAME, + BLOCK_BODY_PREFIX_NAME, + BLOCK_INFO_PREFIX_NAME, + BLOCK_TRANSACTIONS_PREFIX_NAME, + BLOCK_TRANSACTION_INFOS_PREFIX_NAME, + STATE_NODE_PREFIX_NAME, + CHAIN_INFO_PREFIX_NAME, + TRANSACTION_PREFIX_NAME, + TRANSACTION_INFO_PREFIX_NAME, + TRANSACTION_INFO_PREFIX_NAME_V2, + TRANSACTION_INFO_HASH_PREFIX_NAME, + CONTRACT_EVENT_PREFIX_NAME, + FAILED_BLOCK_PREFIX_NAME, + ] +}); + +static VEC_PREFIX_NAME_V3: Lazy> = Lazy::new(|| { + vec![ + BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, + TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, + BLOCK_PREFIX_NAME, + BLOCK_HEADER_PREFIX_NAME, + BLOCK_BODY_PREFIX_NAME, // unused column + BLOCK_INFO_PREFIX_NAME, + BLOCK_TRANSACTIONS_PREFIX_NAME, + BLOCK_TRANSACTION_INFOS_PREFIX_NAME, + STATE_NODE_PREFIX_NAME, + CHAIN_INFO_PREFIX_NAME, + TRANSACTION_PREFIX_NAME, + TRANSACTION_INFO_PREFIX_NAME, // unused column + TRANSACTION_INFO_PREFIX_NAME_V2, + TRANSACTION_INFO_HASH_PREFIX_NAME, + CONTRACT_EVENT_PREFIX_NAME, + FAILED_BLOCK_PREFIX_NAME, + TABLE_INFO_PREFIX_NAME, + ] +}); +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, IntoPrimitive, TryFromPrimitive)] +#[repr(u8)] +pub enum StorageVersion { + V1 = 1, + V2 = 2, + V3 = 3, +} + +impl StorageVersion { + pub fn current_version() -> StorageVersion { + StorageVersion::V3 + } + + pub fn get_column_family_names(&self) -> &'static [ColumnFamilyName] { + match self { + StorageVersion::V1 => &VEC_PREFIX_NAME_V1, + StorageVersion::V2 => &VEC_PREFIX_NAME_V2, + StorageVersion::V3 => &VEC_PREFIX_NAME_V3, + } + } +} diff --git a/storage/schemadb/src/error.rs b/storage/schemadb/src/error.rs new file mode 100644 index 0000000000..75ebb4e4ed --- /dev/null +++ b/storage/schemadb/src/error.rs @@ -0,0 +1,77 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +use anyhow::Error; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum StorageInitError { + #[error("Storage check error {0:?}.")] + StorageCheckError(Error), +} + +#[derive(Error, Debug)] +pub enum StoreError { + #[error("key {0} not found in store")] + KeyNotFound(String), + + #[error("key {0} already exists in store")] + KeyAlreadyExists(String), + + #[error("column family {0} not exist in db")] + CFNotExist(String), + + #[error("IO error {0}")] + DBIoError(String), + + #[error("rocksdb error {0}")] + DbError(#[from] rocksdb::Error), + + #[error("encode error {0}")] + EncodeError(String), + + #[error("decode error {0}")] + DecodeError(String), + + #[error("ghostdag {0} duplicate blocks")] + DAGDupBlocksError(String), + + #[error("error {0}")] + AnyError(String), +} + +impl From for StoreError { + fn from(value: Error) -> Self { + StoreError::AnyError(value.to_string()) + } +} + +pub type StoreResult = std::result::Result; + +pub trait StoreResultExtensions { + fn unwrap_option(self) -> Option; +} + +impl StoreResultExtensions for StoreResult { + fn unwrap_option(self) -> Option { + match self { + Ok(value) => Some(value), + Err(StoreError::KeyNotFound(_)) => None, + Err(err) => panic!("Unexpected store error: {err:?}"), + } + } +} + +pub trait StoreResultEmptyTuple { + fn unwrap_and_ignore_key_already_exists(self); +} + +impl StoreResultEmptyTuple for StoreResult<()> { + fn unwrap_and_ignore_key_already_exists(self) { + match self { + Ok(_) => (), + Err(StoreError::KeyAlreadyExists(_)) => (), + Err(err) => panic!("Unexpected store error: {err:?}"), + } + } +} diff --git a/storage/schemadb/src/iterator.rs b/storage/schemadb/src/iterator.rs new file mode 100644 index 0000000000..1431490220 --- /dev/null +++ b/storage/schemadb/src/iterator.rs @@ -0,0 +1,85 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +use crate::schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec}; +use anyhow::Result; +use std::marker::PhantomData; + +pub enum ScanDirection { + Forward, + Backward, +} + +pub struct SchemaIterator<'a, S: Schema> { + db_iter: rocksdb::DBRawIterator<'a>, + direction: ScanDirection, + phantom: PhantomData, +} + +impl<'a, S> SchemaIterator<'a, S> +where + S: Schema, +{ + pub(crate) fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self { + SchemaIterator { + db_iter, + direction, + phantom: PhantomData, + } + } + + /// Seeks to the first key. + pub fn seek_to_first(&mut self) { + self.db_iter.seek_to_first(); + } + + /// Seeks to the last key. + pub fn seek_to_last(&mut self) { + self.db_iter.seek_to_last(); + } + + /// Seeks to the first key whose binary representation is equal to or greater than that of the + /// `seek_key`. + pub fn seek>(&mut self, seek_key: &SK) -> Result<()> { + let raw_key = >::encode_seek_key(seek_key)?; + self.db_iter.seek(&raw_key); + Ok(()) + } + + /// Seeks to the last key whose binary representation is less than or equal to that of the + /// `seek_key`. + pub fn seek_for_prev>(&mut self, seek_key: &SK) -> Result<()> { + let raw_key = >::encode_seek_key(seek_key)?; + self.db_iter.seek_for_prev(&raw_key); + Ok(()) + } + + fn next_impl(&mut self) -> Result> { + if !self.db_iter.valid() { + self.db_iter.status()?; + return Ok(None); + } + + let raw_key = self.db_iter.key().expect("Iterator must be valid."); + let raw_value = self.db_iter.value().expect("Iterator must be valid."); + let key = >::decode_key(raw_key)?; + let value = >::decode_value(raw_value)?; + match self.direction { + ScanDirection::Forward => self.db_iter.next(), + ScanDirection::Backward => self.db_iter.prev(), + } + + Ok(Some((key, value))) + } +} + +impl<'a, S> Iterator for SchemaIterator<'a, S> +where + S: Schema, +{ + type Item = Result<(S::Key, S::Value)>; + + fn next(&mut self) -> Option { + self.next_impl().transpose() + } +} diff --git a/storage/schemadb/src/lib.rs b/storage/schemadb/src/lib.rs new file mode 100644 index 0000000000..a90530eff8 --- /dev/null +++ b/storage/schemadb/src/lib.rs @@ -0,0 +1,69 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +pub mod db; +pub mod error; +pub mod iterator; +pub mod metrics; +pub mod schema; + +use crate::{ + iterator::SchemaIterator, + schema::{KeyCodec, Schema, ValueCodec}, +}; +use anyhow::Result; +use parking_lot::Mutex; +use std::collections::HashMap; + +pub type ColumnFamilyName = &'static str; + +#[derive(Debug, Clone)] +pub enum GWriteOp { + Value(K, V), + Deletion(K), +} + +pub type WriteOp = GWriteOp, Vec>; + +#[derive(Debug)] +pub struct SchemaBatch { + rows: Mutex>>, +} + +impl Default for SchemaBatch { + fn default() -> Self { + Self { + rows: Mutex::new(HashMap::new()), + } + } +} + +impl SchemaBatch { + pub fn new() -> Self { + Self::default() + } + + pub fn put(&self, key: &S::Key, val: &S::Value) -> Result<()> { + let key = >::encode_key(key)?; + let value = >::encode_value(val)?; + self.rows + .lock() + .entry(S::COLUMN_FAMILY) + .or_insert_with(Vec::new) + .push(WriteOp::Value(key, value)); + + Ok(()) + } + + pub fn delete(&self, key: &S::Key) -> Result<()> { + let key = >::encode_key(key)?; + + self.rows + .lock() + .entry(S::COLUMN_FAMILY) + .or_insert_with(Vec::new) + .push(WriteOp::Deletion(key)); + + Ok(()) + } +} diff --git a/storage/src/metrics.rs b/storage/schemadb/src/metrics.rs similarity index 100% rename from storage/src/metrics.rs rename to storage/schemadb/src/metrics.rs diff --git a/storage/schemadb/src/schema.rs b/storage/schemadb/src/schema.rs new file mode 100644 index 0000000000..d49cb76a6a --- /dev/null +++ b/storage/schemadb/src/schema.rs @@ -0,0 +1,57 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +use anyhow::Result; +use core::hash::Hash; +use std::fmt::Debug; + +pub trait KeyCodec: Clone + Sized + Debug + Send + Sync { + /// Converts `self` to bytes to be stored in DB. + fn encode_key(&self) -> Result>; + /// Converts bytes fetched from DB to `Self`. + fn decode_key(data: &[u8]) -> Result; +} + +pub trait ValueCodec: Clone + Sized + Debug + Send + Sync { + /// Converts `self` to bytes to be stored in DB. + fn encode_value(&self) -> Result>; + /// Converts bytes fetched from DB to `Self`. + fn decode_value(data: &[u8]) -> Result; +} + +pub trait SeekKeyCodec { + fn encode_seek_key(&self) -> Result>; +} + +// auto implements for all keys +impl SeekKeyCodec for K +where + S: Schema, + K: KeyCodec, +{ + fn encode_seek_key(&self) -> Result> { + >::encode_key(self) + } +} + +pub trait Schema: Debug + Send + Sync + 'static { + const COLUMN_FAMILY: &'static str; + + type Key: KeyCodec + Hash + Eq; + type Value: ValueCodec; +} + +#[macro_export] +macro_rules! define_schema { + ($schema_type: ident, $key_type: ty, $value_type: ty, $cf_name: expr) => { + #[derive(Clone, Debug)] + pub(crate) struct $schema_type; + + impl $crate::schema::Schema for $schema_type { + type Key = $key_type; + type Value = $value_type; + + const COLUMN_FAMILY: &'static str = $cf_name; + } + }; +} diff --git a/storage/schemadb/tests/db.rs b/storage/schemadb/tests/db.rs new file mode 100644 index 0000000000..ce6afbc4b4 --- /dev/null +++ b/storage/schemadb/tests/db.rs @@ -0,0 +1,395 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +use anyhow::Result; +use byteorder::{LittleEndian, ReadBytesExt}; +use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use starcoin_config::RocksdbConfig; +use starcoin_schemadb::{ + db::DBStorage as DB, + define_schema, + schema::{KeyCodec, Schema, ValueCodec}, + ColumnFamilyName, SchemaBatch, +}; + +// Creating two schemas that share exactly the same structure but are stored in different column +// families. Also note that the key and value are of the same type `TestField`. By implementing +// both the `KeyCodec<>` and `ValueCodec<>` traits for both schemas, we are able to use it +// everywhere. +define_schema!(TestSchema1, TestField, TestField, "TestCF1"); +define_schema!(TestSchema2, TestField, TestField, "TestCF2"); + +#[derive(Debug, Eq, PartialEq, Clone, Default, Hash)] +struct TestField(u32); + +impl TestField { + fn to_bytes(&self) -> Vec { + self.0.to_le_bytes().to_vec() + } + + fn from_bytes(data: &[u8]) -> Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestField(reader.read_u32::()?)) + } +} + +impl KeyCodec for TestField { + fn encode_key(&self) -> Result> { + Ok(self.to_bytes()) + } + + fn decode_key(data: &[u8]) -> Result { + Self::from_bytes(data) + } +} + +impl ValueCodec for TestField { + fn encode_value(&self) -> Result> { + Ok(self.to_bytes()) + } + + fn decode_value(data: &[u8]) -> Result { + Self::from_bytes(data) + } +} + +impl KeyCodec for TestField { + fn encode_key(&self) -> Result> { + Ok(self.to_bytes()) + } + + fn decode_key(data: &[u8]) -> Result { + Self::from_bytes(data) + } +} + +impl ValueCodec for TestField { + fn encode_value(&self) -> Result> { + Ok(self.to_bytes()) + } + + fn decode_value(data: &[u8]) -> Result { + Self::from_bytes(data) + } +} + +fn get_column_families() -> Vec { + vec![ + DEFAULT_COLUMN_FAMILY_NAME, + TestSchema1::COLUMN_FAMILY, + TestSchema2::COLUMN_FAMILY, + ] +} + +fn open_db(dir: &starcoin_temppath::TempPath) -> DB { + let db_opts = RocksdbConfig::default(); + DB::open("test", dir.path(), get_column_families(), db_opts, None).expect("Failed to open DB.") +} + +fn open_db_read_only(dir: &starcoin_temppath::TempPath) -> DB { + DB::open_readonly( + "test", + dir.path(), + get_column_families(), + RocksdbConfig::default(), + None, + ) + .expect("Failed to open DB.") +} + +fn open_db_as_secondary( + dir: &starcoin_temppath::TempPath, + dir_sec: &starcoin_temppath::TempPath, +) -> DB { + DB::open_cf_as_secondary( + "test", + dir.path(), + dir_sec.path(), + get_column_families(), + RocksdbConfig::default(), + None, + ) + .expect("Failed to open DB.") +} + +struct TestDB { + _tmpdir: starcoin_temppath::TempPath, + db: DB, +} + +impl TestDB { + fn new() -> Self { + let tmpdir = starcoin_temppath::TempPath::new(); + let db = open_db(&tmpdir); + + TestDB { + _tmpdir: tmpdir, + db, + } + } +} + +impl std::ops::Deref for TestDB { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +#[test] +fn test_schema_put_get() { + let db = TestDB::new(); + + db.put::(&TestField(0), &TestField(0)).unwrap(); + db.put::(&TestField(1), &TestField(1)).unwrap(); + db.put::(&TestField(2), &TestField(2)).unwrap(); + db.put::(&TestField(2), &TestField(3)).unwrap(); + db.put::(&TestField(3), &TestField(4)).unwrap(); + db.put::(&TestField(4), &TestField(5)).unwrap(); + + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + assert_eq!( + db.get::(&TestField(1)).unwrap(), + Some(TestField(1)), + ); + assert_eq!( + db.get::(&TestField(2)).unwrap(), + Some(TestField(2)), + ); + assert_eq!(db.get::(&TestField(3)).unwrap(), None); + + assert_eq!(db.get::(&TestField(1)).unwrap(), None); + assert_eq!( + db.get::(&TestField(2)).unwrap(), + Some(TestField(3)), + ); + assert_eq!( + db.get::(&TestField(3)).unwrap(), + Some(TestField(4)), + ); + assert_eq!( + db.get::(&TestField(4)).unwrap(), + Some(TestField(5)), + ); +} + +fn collect_values(db: &TestDB) -> Vec<(S::Key, S::Value)> { + let mut iter = db + .iter::(Default::default()) + .expect("Failed to create iterator."); + iter.seek_to_first(); + iter.collect::>>().unwrap() +} + +fn gen_expected_values(values: &[(u32, u32)]) -> Vec<(TestField, TestField)> { + values + .iter() + .cloned() + .map(|(x, y)| (TestField(x), TestField(y))) + .collect() +} + +#[test] +fn test_single_schema_batch() { + let db = TestDB::new(); + + let db_batch = SchemaBatch::new(); + db_batch + .put::(&TestField(0), &TestField(0)) + .unwrap(); + db_batch + .put::(&TestField(1), &TestField(1)) + .unwrap(); + db_batch + .put::(&TestField(2), &TestField(2)) + .unwrap(); + db_batch + .put::(&TestField(3), &TestField(3)) + .unwrap(); + db_batch.delete::(&TestField(4)).unwrap(); + db_batch.delete::(&TestField(3)).unwrap(); + db_batch + .put::(&TestField(4), &TestField(4)) + .unwrap(); + db_batch + .put::(&TestField(5), &TestField(5)) + .unwrap(); + + db.write_schemas(db_batch).unwrap(); + + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(0, 0), (1, 1), (2, 2)]), + ); + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(4, 4), (5, 5)]), + ); +} + +#[test] +fn test_two_schema_batches() { + let db = TestDB::new(); + + let db_batch1 = SchemaBatch::new(); + db_batch1 + .put::(&TestField(0), &TestField(0)) + .unwrap(); + db_batch1 + .put::(&TestField(1), &TestField(1)) + .unwrap(); + db_batch1 + .put::(&TestField(2), &TestField(2)) + .unwrap(); + db_batch1.delete::(&TestField(2)).unwrap(); + db.write_schemas(db_batch1).unwrap(); + + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(0, 0), (1, 1)]), + ); + + let db_batch2 = SchemaBatch::new(); + db_batch2.delete::(&TestField(3)).unwrap(); + db_batch2 + .put::(&TestField(3), &TestField(3)) + .unwrap(); + db_batch2 + .put::(&TestField(4), &TestField(4)) + .unwrap(); + db_batch2 + .put::(&TestField(5), &TestField(5)) + .unwrap(); + db.write_schemas(db_batch2).unwrap(); + + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(0, 0), (1, 1)]), + ); + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(3, 3), (4, 4), (5, 5)]), + ); +} + +#[test] +fn test_reopen() { + let tmpdir = starcoin_temppath::TempPath::new(); + { + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + } + { + let db = open_db(&tmpdir); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + } +} + +#[test] +fn test_open_read_only() { + let tmpdir = starcoin_temppath::TempPath::new(); + { + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + } + { + let db = open_db_read_only(&tmpdir); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + assert!(db.put::(&TestField(1), &TestField(1)).is_err()); + } +} + +#[test] +fn test_open_as_secondary() { + let tmpdir = starcoin_temppath::TempPath::new(); + let tmpdir_sec = starcoin_temppath::TempPath::new(); + + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + + let db_sec = open_db_as_secondary(&tmpdir, &tmpdir_sec); + assert_eq!( + db_sec.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); +} + +#[test] +fn test_report_size() { + let db = TestDB::new(); + + for i in 0..1000 { + let db_batch = SchemaBatch::new(); + db_batch + .put::(&TestField(i), &TestField(i)) + .unwrap(); + db_batch + .put::(&TestField(i), &TestField(i)) + .unwrap(); + db.write_schemas(db_batch).unwrap(); + } + + db.flush_cf("TestCF1").unwrap(); + db.flush_cf("TestCF2").unwrap(); + + assert!( + db.get_property("TestCF1", "rocksdb.estimate-live-data-size") + .unwrap() + > 0 + ); + assert!( + db.get_property("TestCF2", "rocksdb.estimate-live-data-size") + .unwrap() + > 0 + ); + assert_eq!( + db.get_property("default", "rocksdb.estimate-live-data-size") + .unwrap(), + 0 + ); +} + +//#[test] +//#[ignore] +//fn test_checkpoint() { +// let tmpdir = starcoin_temppath::TempPath::new(); +// let checkpoint = starcoin_temppath::TempPath::new(); +// { +// let db = open_db(&tmpdir); +// db.put::(&TestField(0), &TestField(0)).unwrap(); +// db.create_checkpoint(&checkpoint).unwrap(); +// } +// { +// let db = open_db(&tmpdir); +// assert_eq!( +// db.get::(&TestField(0)).unwrap(), +// Some(TestField(0)), +// ); +// +// let cp = open_db(&checkpoint); +// assert_eq!( +// cp.get::(&TestField(0)).unwrap(), +// Some(TestField(0)), +// ); +// cp.put::(&TestField(1), &TestField(1)).unwrap(); +// assert_eq!( +// cp.get::(&TestField(1)).unwrap(), +// Some(TestField(1)), +// ); +// assert_eq!(db.get::(&TestField(1)).unwrap(), None); +// } +//} diff --git a/storage/schemadb/tests/iterator.rs b/storage/schemadb/tests/iterator.rs new file mode 100644 index 0000000000..0e104ebbd9 --- /dev/null +++ b/storage/schemadb/tests/iterator.rs @@ -0,0 +1,268 @@ +// Copyright (c) The Starcoin Core Contributors +// SPDX-License-Identifier: Apache-2 + +use anyhow::Result; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use starcoin_config::RocksdbConfig; +use starcoin_schemadb::{ + db::DBStorage as DB, + define_schema, + iterator::SchemaIterator, + schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec}, +}; + +define_schema!(TestSchema, TestKey, TestValue, "TestCF"); + +#[derive(Debug, Eq, PartialEq, Clone, Default, Hash)] +struct TestKey(u32, u32, u32); + +#[derive(Debug, Eq, PartialEq, Clone, Default)] +struct TestValue(u32); + +impl KeyCodec for TestKey { + fn encode_key(&self) -> Result> { + let mut bytes = vec![]; + bytes.write_u32::(self.0)?; + bytes.write_u32::(self.1)?; + bytes.write_u32::(self.2)?; + Ok(bytes) + } + + fn decode_key(data: &[u8]) -> Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestKey( + reader.read_u32::()?, + reader.read_u32::()?, + reader.read_u32::()?, + )) + } +} + +impl ValueCodec for TestValue { + fn encode_value(&self) -> Result> { + Ok(self.0.to_be_bytes().to_vec()) + } + + fn decode_value(data: &[u8]) -> Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestValue(reader.read_u32::()?)) + } +} + +pub struct KeyPrefix1(u32); + +impl SeekKeyCodec for KeyPrefix1 { + fn encode_seek_key(&self) -> Result> { + Ok(self.0.to_be_bytes().to_vec()) + } +} + +pub struct KeyPrefix2(u32, u32); + +impl SeekKeyCodec for KeyPrefix2 { + fn encode_seek_key(&self) -> Result> { + let mut bytes = vec![]; + bytes.write_u32::(self.0)?; + bytes.write_u32::(self.1)?; + Ok(bytes) + } +} + +fn collect_values(iter: SchemaIterator) -> Vec { + iter.map(|row| (row.unwrap().1).0).collect() +} + +struct TestDB { + _tmpdir: starcoin_temppath::TempPath, + db: DB, +} + +impl TestDB { + fn new() -> Self { + let tmpdir = starcoin_temppath::TempPath::new(); + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, TestSchema::COLUMN_FAMILY]; + let db_opts = RocksdbConfig::default(); + let db = DB::open("test", tmpdir.path(), column_families, db_opts, None).unwrap(); + + db.put::(&TestKey(1, 0, 0), &TestValue(100)) + .unwrap(); + db.put::(&TestKey(1, 0, 2), &TestValue(102)) + .unwrap(); + db.put::(&TestKey(1, 0, 4), &TestValue(104)) + .unwrap(); + db.put::(&TestKey(1, 1, 0), &TestValue(110)) + .unwrap(); + db.put::(&TestKey(1, 1, 2), &TestValue(112)) + .unwrap(); + db.put::(&TestKey(1, 1, 4), &TestValue(114)) + .unwrap(); + db.put::(&TestKey(2, 0, 0), &TestValue(200)) + .unwrap(); + db.put::(&TestKey(2, 0, 2), &TestValue(202)) + .unwrap(); + + TestDB { + _tmpdir: tmpdir, + db, + } + } +} + +impl TestDB { + fn iter(&self) -> SchemaIterator { + self.db + .iter(Default::default()) + .expect("Failed to create iterator.") + } + + fn rev_iter(&self) -> SchemaIterator { + self.db + .rev_iter(Default::default()) + .expect("Failed to create iterator.") + } +} + +impl std::ops::Deref for TestDB { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +#[test] +fn test_seek_to_first() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_to_first(); + assert_eq!( + collect_values(iter), + [100, 102, 104, 110, 112, 114, 200, 202] + ); + + let mut iter = db.rev_iter(); + iter.seek_to_first(); + assert_eq!(collect_values(iter), [100]); +} + +#[test] +fn test_seek_to_last() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_to_last(); + assert_eq!(collect_values(iter), [202]); + + let mut iter = db.rev_iter(); + iter.seek_to_last(); + assert_eq!( + collect_values(iter), + [202, 200, 114, 112, 110, 104, 102, 100] + ); +} + +#[test] +fn test_seek_by_existing_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&TestKey(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&TestKey(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 104, 102, 100]); +} + +#[test] +fn test_seek_by_nonexistent_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&TestKey(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&TestKey(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_existing_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&TestKey(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&TestKey(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_nonexistent_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&TestKey(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [110, 112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&TestKey(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [110, 104, 102, 100]); +} + +#[test] +fn test_seek_by_1prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [200, 114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_1prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_by_2prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [200, 114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_2prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [114, 112, 110, 104, 102, 100]); +} diff --git a/storage/src/batch/mod.rs b/storage/src/batch/mod.rs index 60e463274e..2c0e0487b7 100644 --- a/storage/src/batch/mod.rs +++ b/storage/src/batch/mod.rs @@ -5,30 +5,32 @@ use crate::storage::{CodecWriteBatch, KeyCodec, ValueCodec, WriteOp}; use anyhow::Result; use std::convert::TryFrom; +pub type WriteBatch = GWriteBatch, Vec>; + #[derive(Debug, Default, Clone)] -pub struct WriteBatch { - pub rows: Vec<(Vec, WriteOp>)>, +pub struct GWriteBatch { + pub rows: Vec>, } -impl WriteBatch { +impl GWriteBatch { /// Creates an empty batch. pub fn new() -> Self { Self::default() } - pub fn new_with_rows(rows: Vec<(Vec, WriteOp>)>) -> Self { + pub fn new_with_rows(rows: Vec>) -> Self { Self { rows } } /// Adds an insert/update operation to the batch. - pub fn put(&mut self, key: Vec, value: Vec) -> Result<()> { - self.rows.push((key, WriteOp::Value(value))); + pub fn put(&mut self, key: K, value: V) -> Result<()> { + self.rows.push(WriteOp::Value(key, value)); Ok(()) } /// Adds a delete operation to the batch. - pub fn delete(&mut self, key: Vec) -> Result<()> { - self.rows.push((key, WriteOp::Deletion)); + pub fn delete(&mut self, key: K) -> Result<()> { + self.rows.push(WriteOp::Deletion(key)); Ok(()) } @@ -39,6 +41,13 @@ impl WriteBatch { } } +fn into_raw_op(op: WriteOp) -> Result, Vec>> { + Ok(match op { + WriteOp::Value(k, v) => WriteOp::Value(k.encode_key()?, v.encode_value()?), + WriteOp::Deletion(k) => WriteOp::Deletion(k.encode_key()?), + }) +} + impl TryFrom> for WriteBatch where K: KeyCodec, @@ -47,10 +56,8 @@ where type Error = anyhow::Error; fn try_from(batch: CodecWriteBatch) -> Result { - let rows: Result, WriteOp>)>> = batch - .into_iter() - .map(|(key, op)| Ok((KeyCodec::encode_key(&key)?, op.into_raw_op()?))) - .collect(); + let rows: Result, Vec>>> = + batch.into_iter().map(|op| into_raw_op(op)).collect(); Ok(WriteBatch::new_with_rows(rows?)) } } diff --git a/storage/src/cache_storage/mod.rs b/storage/src/cache_storage/mod.rs index 46001ba401..5d2722bbc4 100644 --- a/storage/src/cache_storage/mod.rs +++ b/storage/src/cache_storage/mod.rs @@ -1,88 +1,95 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::batch::WriteBatch; -use crate::metrics::{record_metrics, StorageMetrics}; -use crate::storage::{InnerStore, WriteOp}; +use crate::batch::GWriteBatch; +use crate::{ + batch::WriteBatch, + metrics::{record_metrics, StorageMetrics}, + storage::{InnerStore, WriteOp}, +}; use anyhow::{Error, Result}; +use core::hash::Hash; use lru::LruCache; use parking_lot::Mutex; use starcoin_config::DEFAULT_CACHE_SIZE; -pub struct CacheStorage { - cache: Mutex, Vec>>, + +pub type CacheStorage = GCacheStorage, Vec>; + +pub struct GCacheStorage { + cache: Mutex>, metrics: Option, } -impl CacheStorage { +impl GCacheStorage { pub fn new(metrics: Option) -> Self { - CacheStorage { - cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)), + GCacheStorage { + cache: Mutex::new(LruCache::::new(DEFAULT_CACHE_SIZE)), metrics, } } pub fn new_with_capacity(size: usize, metrics: Option) -> Self { - CacheStorage { - cache: Mutex::new(LruCache::new(size)), + GCacheStorage { + cache: Mutex::new(LruCache::::new(size)), metrics, } } + pub fn remove_all(&self) { + self.cache.lock().clear(); + } } -impl Default for CacheStorage { +impl Default for GCacheStorage { fn default() -> Self { Self::new(None) } } impl InnerStore for CacheStorage { - fn get(&self, prefix_name: &str, key: Vec) -> Result>> { - record_metrics("cache", prefix_name, "get", self.metrics.as_ref()).call(|| { - Ok(self - .cache - .lock() - .get(&compose_key(prefix_name.to_string(), key)) - .cloned()) - }) + fn get_raw(&self, prefix_name: &str, key: Vec) -> Result>> { + let composed_key = compose_key(Some(prefix_name), key); + record_metrics("cache", prefix_name, "get", self.metrics.as_ref()) + .call(|| Ok(self.get_inner(&composed_key))) } - fn put(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { + fn put_raw(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { // remove record_metrics for performance // record_metrics add in write_batch to reduce Instant::now system call - let mut cache = self.cache.lock(); - cache.put(compose_key(prefix_name.to_string(), key), value); + let composed_key = compose_key(Some(prefix_name), key); + let len = self.put_inner(composed_key, value); if let Some(metrics) = self.metrics.as_ref() { - metrics.cache_items.set(cache.len() as u64); + metrics.cache_items.set(len as u64); } Ok(()) } fn contains_key(&self, prefix_name: &str, key: Vec) -> Result { - record_metrics("cache", prefix_name, "contains_key", self.metrics.as_ref()).call(|| { - Ok(self - .cache - .lock() - .contains(&compose_key(prefix_name.to_string(), key))) - }) + let composed_key = compose_key(Some(prefix_name), key); + record_metrics("cache", prefix_name, "contains_key", self.metrics.as_ref()) + .call(|| Ok(self.contains_key_inner(&composed_key))) } - fn remove(&self, prefix_name: &str, key: Vec) -> Result<()> { + fn remove_raw(&self, prefix_name: &str, key: Vec) -> Result<()> { // remove record_metrics for performance // record_metrics add in write_batch to reduce Instant::now system call - let mut cache = self.cache.lock(); - cache.pop(&compose_key(prefix_name.to_string(), key)); + let composed_key = compose_key(Some(prefix_name), key); + let len = self.remove_inner(&composed_key); if let Some(metrics) = self.metrics.as_ref() { - metrics.cache_items.set(cache.len() as u64); + metrics.cache_items.set(len as u64); } Ok(()) } fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { + let rows = batch + .rows + .into_iter() + .map(|op| match op { + WriteOp::Value(k, v) => WriteOp::Value(compose_key(Some(prefix_name), k), v), + WriteOp::Deletion(k) => WriteOp::Deletion(compose_key(Some(prefix_name), k)), + }) + .collect(); + let batch = WriteBatch { rows }; record_metrics("cache", prefix_name, "write_batch", self.metrics.as_ref()).call(|| { - for (key, write_op) in &batch.rows { - match write_op { - WriteOp::Value(value) => self.put(prefix_name, key.to_vec(), value.to_vec())?, - WriteOp::Deletion => self.remove(prefix_name, key.to_vec())?, - }; - } + self.write_batch_inner(batch); Ok(()) }) } @@ -100,7 +107,7 @@ impl InnerStore for CacheStorage { } fn put_sync(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { - self.put(prefix_name, key, value) + self.put_raw(prefix_name, key, value) } fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { @@ -108,22 +115,76 @@ impl InnerStore for CacheStorage { } fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>> { + let composed_keys = keys + .into_iter() + .map(|k| compose_key(Some(prefix_name), k)) + .collect::>(); + Ok(self.multi_get_inner(composed_keys.as_slice())) + } +} + +fn compose_key(prefix_name: Option<&str>, source_key: Vec) -> Vec { + match prefix_name { + Some(prefix_name) => { + let temp_vec = prefix_name.as_bytes().to_vec(); + let mut compose = Vec::with_capacity(temp_vec.len() + source_key.len()); + compose.extend(temp_vec); + compose.extend(source_key); + compose + } + None => source_key, + } +} + +impl GCacheStorage { + pub fn get_inner(&self, key: &K) -> Option { + self.cache.lock().get(key).cloned() + } + + pub fn put_inner(&self, key: K, value: V) -> usize { + let mut cache = self.cache.lock(); + cache.put(key, value); + cache.len() + } + + pub fn contains_key_inner(&self, key: &K) -> bool { + self.cache.lock().contains(key) + } + + pub fn remove_inner(&self, key: &K) -> usize { + let mut cache = self.cache.lock(); + cache.pop(key); + cache.len() + } + + pub fn write_batch_inner(&self, batch: GWriteBatch) { + for write_op in batch.rows { + match write_op { + WriteOp::Value(key, value) => { + self.put_inner(key, value); + } + WriteOp::Deletion(key) => { + self.remove_inner(&key); + } + }; + } + } + + pub fn put_sync_inner(&self, key: K, value: V) -> usize { + self.put_inner(key, value) + } + + pub fn write_batch_sync_inner(&self, batch: GWriteBatch) { + self.write_batch_inner(batch) + } + + pub fn multi_get_inner(&self, keys: &[K]) -> Vec> { let mut cache = self.cache.lock(); let mut result = vec![]; - for key in keys.into_iter() { - let item = cache - .get(&compose_key(prefix_name.to_string(), key)) - .cloned(); + for key in keys { + let item = cache.get(key).cloned(); result.push(item); } - Ok(result) + result } } - -fn compose_key(prefix_name: String, source_key: Vec) -> Vec { - let temp_vec = prefix_name.as_bytes().to_vec(); - let mut compose = Vec::with_capacity(temp_vec.len() + source_key.len()); - compose.extend(temp_vec); - compose.extend(source_key); - compose -} diff --git a/storage/src/db_storage/iterator.rs b/storage/src/db_storage/iterator.rs new file mode 100644 index 0000000000..49cc4d5137 --- /dev/null +++ b/storage/src/db_storage/iterator.rs @@ -0,0 +1,84 @@ +use crate::storage::{KeyCodec, ValueCodec}; +use anyhow::Result; +use std::marker::PhantomData; + +pub enum ScanDirection { + Forward, + Backward, +} + +pub struct SchemaIterator<'a, K, V> { + db_iter: rocksdb::DBRawIterator<'a>, + direction: ScanDirection, + phantom_k: PhantomData, + phantom_v: PhantomData, +} + +impl<'a, K, V> SchemaIterator<'a, K, V> +where + K: KeyCodec, + V: ValueCodec, +{ + pub(crate) fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self { + SchemaIterator { + db_iter, + direction, + phantom_k: PhantomData, + phantom_v: PhantomData, + } + } + + /// Seeks to the first key. + pub fn seek_to_first(&mut self) { + self.db_iter.seek_to_first(); + } + + /// Seeks to the last key. + pub fn seek_to_last(&mut self) { + self.db_iter.seek_to_last(); + } + + /// Seeks to the first key whose binary representation is equal to or greater than that of the + /// `seek_key`. + pub fn seek(&mut self, seek_key: Vec) -> Result<()> { + self.db_iter.seek(&seek_key); + Ok(()) + } + + /// Seeks to the last key whose binary representation is less than or equal to that of the + /// `seek_key`. + pub fn seek_for_prev(&mut self, seek_key: Vec) -> Result<()> { + self.db_iter.seek_for_prev(&seek_key); + Ok(()) + } + + fn next_impl(&mut self) -> Result> { + if !self.db_iter.valid() { + self.db_iter.status()?; + return Ok(None); + } + + let raw_key = self.db_iter.key().expect("Iterator must be valid."); + let raw_value = self.db_iter.value().expect("Iterator must be valid."); + let key = K::decode_key(raw_key)?; + let value = V::decode_value(raw_value)?; + match self.direction { + ScanDirection::Forward => self.db_iter.next(), + ScanDirection::Backward => self.db_iter.prev(), + } + + Ok(Some((key, value))) + } +} + +impl<'a, K, V> Iterator for SchemaIterator<'a, K, V> +where + K: KeyCodec, + V: ValueCodec, +{ + type Item = Result<(K, V)>; + + fn next(&mut self) -> Option { + self.next_impl().transpose() + } +} diff --git a/storage/src/db_storage/mod.rs b/storage/src/db_storage/mod.rs index 20e6f82dbc..3e3cb2cb65 100644 --- a/storage/src/db_storage/mod.rs +++ b/storage/src/db_storage/mod.rs @@ -1,223 +1,36 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::batch::WriteBatch; -use crate::errors::StorageInitError; -use crate::metrics::{record_metrics, StorageMetrics}; -use crate::storage::{ColumnFamilyName, InnerStore, KeyCodec, ValueCodec, WriteOp}; -use crate::{StorageVersion, DEFAULT_PREFIX_NAME}; -use anyhow::{ensure, format_err, Error, Result}; -use rocksdb::{Options, ReadOptions, WriteBatch as DBWriteBatch, WriteOptions, DB}; -use starcoin_config::{check_open_fds_limit, RocksdbConfig}; -use std::collections::HashSet; +mod iterator; + +use crate::{ + batch::WriteBatch, + storage::{InnerStore, KeyCodec, ValueCodec}, +}; +use anyhow::Result; +use rocksdb::ReadOptions; +use starcoin_schemadb::metrics::record_metrics; use std::iter; -use std::marker::PhantomData; -use std::path::Path; +pub use {iterator::*, starcoin_schemadb::db::DBStorage}; -const RES_FDS: u64 = 4096; +pub trait ClassicIter { + fn iter_with_direction( + &self, + prefix_name: &str, + direction: ScanDirection, + ) -> Result>; -#[allow(clippy::upper_case_acronyms)] -pub struct DBStorage { - db: DB, - cfs: Vec, - metrics: Option, + fn iter_raw( + &self, + prefix_name: &str, + ) -> Result>; + fn rev_iter_raw( + &self, + prefix_name: &str, + ) -> Result>; } -impl DBStorage { - pub fn new + Clone>( - db_root_path: P, - rocksdb_config: RocksdbConfig, - metrics: Option, - ) -> Result { - //TODO find a compat way to remove the `starcoindb` path - let path = db_root_path.as_ref().join("starcoindb"); - Self::open_with_cfs( - path, - StorageVersion::current_version() - .get_column_family_names() - .to_vec(), - false, - rocksdb_config, - metrics, - ) - } - - pub fn open_with_cfs( - root_path: impl AsRef, - column_families: Vec, - readonly: bool, - rocksdb_config: RocksdbConfig, - metrics: Option, - ) -> Result { - let path = root_path.as_ref(); - - let cfs_set: HashSet<_> = column_families.iter().collect(); - { - ensure!( - cfs_set.len() == column_families.len(), - "Duplicate column family name found.", - ); - } - if Self::db_exists(path) { - let cf_vec = Self::list_cf(path)?; - let mut db_cfs_set: HashSet<_> = cf_vec.iter().collect(); - db_cfs_set.remove(&DEFAULT_PREFIX_NAME.to_string()); - ensure!( - db_cfs_set.len() <= cfs_set.len(), - StorageInitError::StorageCheckError(format_err!( - "ColumnFamily in db ({:?}) not same as ColumnFamily in code {:?}.", - column_families, - cf_vec - )) - ); - let mut remove_cf_vec = Vec::new(); - db_cfs_set.iter().for_each(|k| { - if !cfs_set.contains(&k.as_str()) { - remove_cf_vec.push(<&std::string::String>::clone(k)); - } - }); - ensure!( - remove_cf_vec.is_empty(), - StorageInitError::StorageCheckError(format_err!( - "Can not remove ColumnFamily, ColumnFamily in db ({:?}) not in code {:?}.", - remove_cf_vec, - cf_vec - )) - ); - } - - let mut rocksdb_opts = Self::gen_rocksdb_options(&rocksdb_config); - - let db = if readonly { - Self::open_readonly(&rocksdb_opts, path, column_families.clone())? - } else { - rocksdb_opts.create_if_missing(true); - rocksdb_opts.create_missing_column_families(true); - Self::open_inner(&rocksdb_opts, path, column_families.clone())? - }; - check_open_fds_limit(rocksdb_config.max_open_files as u64 + RES_FDS)?; - Ok(DBStorage { - db, - cfs: column_families, - metrics, - }) - } - - fn open_inner( - opts: &Options, - path: impl AsRef, - column_families: Vec, - ) -> Result { - let inner = rocksdb::DB::open_cf_descriptors( - opts, - path, - column_families.iter().map(|cf_name| { - let mut cf_opts = rocksdb::Options::default(); - cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - /* - cf_opts.set_compression_per_level(&[ - rocksdb::DBCompressionType::None, - rocksdb::DBCompressionType::None, - rocksdb::DBCompressionType::Lz4, - rocksdb::DBCompressionType::Lz4, - rocksdb::DBCompressionType::Lz4, - rocksdb::DBCompressionType::Lz4, - rocksdb::DBCompressionType::Lz4, - ]); - */ - rocksdb::ColumnFamilyDescriptor::new((*cf_name).to_string(), cf_opts) - }), - )?; - Ok(inner) - } - - fn open_readonly( - db_opts: &Options, - path: impl AsRef, - column_families: Vec, - ) -> Result { - let error_if_log_file_exists = false; - let inner = rocksdb::DB::open_cf_for_read_only( - db_opts, - path, - column_families, - error_if_log_file_exists, - )?; - Ok(inner) - } - - pub fn drop_cf(&mut self) -> Result<(), Error> { - for cf in self.cfs.clone() { - self.db.drop_cf(cf)?; - } - Ok(()) - } - - pub fn drop_unused_cfs(&mut self, names: Vec<&str>) -> Result<(), Error> { - // https://github.com/facebook/rocksdb/issues/1295 - for name in names { - for cf in &self.cfs { - if cf == &name { - self.db.drop_cf(name)?; - let opt = Options::default(); - self.db.create_cf(name, &opt)?; - break; - } - } - } - Ok(()) - } - - /// Flushes all memtable data. This is only used for testing `get_approximate_sizes_cf` in unit - /// tests. - pub fn flush_all(&self) -> Result<()> { - for cf_name in &self.cfs { - let cf_handle = self.get_cf_handle(cf_name)?; - self.db.flush_cf(cf_handle)?; - } - Ok(()) - } - - /// List cf - pub fn list_cf(path: impl AsRef) -> Result, Error> { - Ok(rocksdb::DB::list_cf(&rocksdb::Options::default(), path)?) - } - - fn db_exists(path: &Path) -> bool { - let rocksdb_current_file = path.join("CURRENT"); - rocksdb_current_file.is_file() - } - - fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::ColumnFamily> { - self.db.cf_handle(cf_name).ok_or_else(|| { - format_err!( - "DB::cf_handle not found for column family name: {}", - cf_name - ) - }) - } - - fn default_write_options() -> WriteOptions { - let mut opts = WriteOptions::new(); - opts.set_sync(false); - opts - } - - fn gen_rocksdb_options(config: &RocksdbConfig) -> Options { - let mut db_opts = Options::default(); - db_opts.set_max_open_files(config.max_open_files); - db_opts.set_max_total_wal_size(config.max_total_wal_size); - db_opts.set_wal_bytes_per_sync(config.wal_bytes_per_sync); - db_opts.set_bytes_per_sync(config.bytes_per_sync); - // db_opts.enable_statistics(); - // write buffer size - db_opts.set_max_write_buffer_number(5); - db_opts.set_max_background_jobs(5); - // cache - // let cache = Cache::new_lru_cache(2 * 1024 * 1024 * 1024); - // db_opts.set_row_cache(&cache.unwrap()); - db_opts - } +impl ClassicIter for DBStorage { fn iter_with_direction( &self, prefix_name: &str, @@ -229,14 +42,13 @@ impl DBStorage { { let cf_handle = self.get_cf_handle(prefix_name)?; Ok(SchemaIterator::new( - self.db + self.db() .raw_iterator_cf_opt(cf_handle, ReadOptions::default()), direction, )) } - /// Returns a forward [`SchemaIterator`] on a certain schema. - pub fn iter(&self, prefix_name: &str) -> Result> + fn iter_raw(&self, prefix_name: &str) -> Result> where K: KeyCodec, V: ValueCodec, @@ -245,157 +57,64 @@ impl DBStorage { } /// Returns a backward [`SchemaIterator`] on a certain schema. - pub fn rev_iter(&self, prefix_name: &str) -> Result> + fn rev_iter_raw(&self, prefix_name: &str) -> Result> where K: KeyCodec, V: ValueCodec, { self.iter_with_direction(prefix_name, ScanDirection::Backward) } - - fn sync_write_options() -> WriteOptions { - let mut opts = WriteOptions::new(); - opts.set_sync(true); - opts - } -} - -pub enum ScanDirection { - Forward, - Backward, -} - -pub struct SchemaIterator<'a, K, V> { - db_iter: rocksdb::DBRawIterator<'a>, - direction: ScanDirection, - phantom_k: PhantomData, - phantom_v: PhantomData, -} - -impl<'a, K, V> SchemaIterator<'a, K, V> -where - K: KeyCodec, - V: ValueCodec, -{ - fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self { - SchemaIterator { - db_iter, - direction, - phantom_k: PhantomData, - phantom_v: PhantomData, - } - } - - /// Seeks to the first key. - pub fn seek_to_first(&mut self) { - self.db_iter.seek_to_first(); - } - - /// Seeks to the last key. - pub fn seek_to_last(&mut self) { - self.db_iter.seek_to_last(); - } - - /// Seeks to the first key whose binary representation is equal to or greater than that of the - /// `seek_key`. - pub fn seek(&mut self, seek_key: Vec) -> Result<()> { - self.db_iter.seek(&seek_key); - Ok(()) - } - - /// Seeks to the last key whose binary representation is less than or equal to that of the - /// `seek_key`. - pub fn seek_for_prev(&mut self, seek_key: Vec) -> Result<()> { - self.db_iter.seek_for_prev(&seek_key); - Ok(()) - } - - fn next_impl(&mut self) -> Result> { - if !self.db_iter.valid() { - self.db_iter.status()?; - return Ok(None); - } - - let raw_key = self.db_iter.key().expect("Iterator must be valid."); - let raw_value = self.db_iter.value().expect("Iterator must be valid."); - let key = K::decode_key(raw_key)?; - let value = V::decode_value(raw_value)?; - match self.direction { - ScanDirection::Forward => self.db_iter.next(), - ScanDirection::Backward => self.db_iter.prev(), - } - - Ok(Some((key, value))) - } -} - -impl<'a, K, V> Iterator for SchemaIterator<'a, K, V> -where - K: KeyCodec, - V: ValueCodec, -{ - type Item = Result<(K, V)>; - - fn next(&mut self) -> Option { - self.next_impl().transpose() - } } impl InnerStore for DBStorage { - fn get(&self, prefix_name: &str, key: Vec) -> Result>> { - record_metrics("db", prefix_name, "get", self.metrics.as_ref()).call(|| { + fn get_raw(&self, prefix_name: &str, key: Vec) -> Result>> { + record_metrics("db", prefix_name, "get", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - let result = self.db.get_cf(cf_handle, key.as_slice())?; + let result = self.db().get_cf(cf_handle, key.as_slice())?; Ok(result) }) } - fn put(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { - if let Some(metrics) = self.metrics.as_ref() { + fn put_raw(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { + if let Some(metrics) = self.metrics() { metrics .storage_item_bytes .with_label_values(&[prefix_name]) .observe((key.len() + value.len()) as f64); } - record_metrics("db", prefix_name, "put", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "put", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - self.db + self.db() .put_cf_opt(cf_handle, &key, &value, &Self::default_write_options())?; Ok(()) }) } fn contains_key(&self, prefix_name: &str, key: Vec) -> Result { - record_metrics("db", prefix_name, "contains_key", self.metrics.as_ref()).call(|| match self - .get(prefix_name, key) - { - Ok(Some(_)) => Ok(true), - _ => Ok(false), + record_metrics("db", prefix_name, "contains_key", self.metrics()).call(|| { + match self.get_raw(prefix_name, key) { + Ok(Some(_)) => Ok(true), + _ => Ok(false), + } }) } - fn remove(&self, prefix_name: &str, key: Vec) -> Result<()> { - record_metrics("db", prefix_name, "remove", self.metrics.as_ref()).call(|| { + fn remove_raw(&self, prefix_name: &str, key: Vec) -> Result<()> { + record_metrics("db", prefix_name, "remove", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - self.db.delete_cf(cf_handle, &key)?; + self.db().delete_cf(cf_handle, &key)?; Ok(()) }) } /// Writes a group of records wrapped in a WriteBatch. fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { - record_metrics("db", prefix_name, "write_batch", self.metrics.as_ref()).call(|| { - let mut db_batch = DBWriteBatch::default(); - let cf_handle = self.get_cf_handle(prefix_name)?; - for (key, write_op) in &batch.rows { - match write_op { - WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), - WriteOp::Deletion => db_batch.delete_cf(cf_handle, key), - }; - } - self.db - .write_opt(db_batch, &Self::default_write_options())?; - Ok(()) + record_metrics("db", prefix_name, "write_batch", self.metrics()).call(|| { + self.write_batch_inner( + prefix_name, + batch.rows.as_slice(), + false, /*normal write*/ + ) }) } @@ -408,38 +127,28 @@ impl InnerStore for DBStorage { } fn put_sync(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { - if let Some(metrics) = self.metrics.as_ref() { + if let Some(metrics) = self.metrics() { metrics .storage_item_bytes .with_label_values(&[prefix_name]) .observe((key.len() + value.len()) as f64); } - record_metrics("db", prefix_name, "put_sync", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "put_sync", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - self.db + self.db() .put_cf_opt(cf_handle, &key, &value, &Self::sync_write_options())?; Ok(()) }) } fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { - record_metrics("db", prefix_name, "write_batch_sync", self.metrics.as_ref()).call(|| { - let mut db_batch = DBWriteBatch::default(); - let cf_handle = self.get_cf_handle(prefix_name)?; - for (key, write_op) in &batch.rows { - match write_op { - WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), - WriteOp::Deletion => db_batch.delete_cf(cf_handle, key), - }; - } - self.db.write_opt(db_batch, &Self::sync_write_options())?; - Ok(()) - }) + record_metrics("db", prefix_name, "write_batch_sync", self.metrics()) + .call(|| self.write_batch_inner(prefix_name, batch.rows.as_slice(), true)) } fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>> { - record_metrics("db", prefix_name, "multi_get", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "multi_get", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; let cf_handles = iter::repeat(&cf_handle) .take(keys.len()) @@ -450,7 +159,7 @@ impl InnerStore for DBStorage { .map(|(key, handle)| (handle, key.as_slice())) .collect::>(); - let result = self.db.multi_get_cf(keys_multi); + let result = self.db().multi_get_cf(keys_multi); let mut res = vec![]; for item in result { let item = item?; diff --git a/storage/src/errors.rs b/storage/src/errors.rs deleted file mode 100644 index 668319bc0e..0000000000 --- a/storage/src/errors.rs +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (c) The Starcoin Core Contributors -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Error; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum StorageInitError { - #[error("Storage check error {0:?}.")] - StorageCheckError(Error), -} diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 0246b6e7f4..c11a60c0e5 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,41 +1,46 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::accumulator::{ - AccumulatorStorage, BlockAccumulatorStorage, TransactionAccumulatorStorage, +use crate::{ + accumulator::{AccumulatorStorage, BlockAccumulatorStorage, TransactionAccumulatorStorage}, + block::BlockStorage, + block_info::{BlockInfoStorage, BlockInfoStore}, + chain_info::ChainInfoStorage, + contract_event::ContractEventStorage, + state_node::StateStorage, + storage::{CodecKVStore, CodecWriteBatch, StorageInstance}, + table_info::{TableInfoStorage, TableInfoStore}, + transaction::TransactionStorage, + transaction_info::{TransactionInfoHashStorage, TransactionInfoStorage}, }; -use crate::block::BlockStorage; -use crate::block_info::{BlockInfoStorage, BlockInfoStore}; -use crate::chain_info::ChainInfoStorage; -use crate::contract_event::ContractEventStorage; -use crate::state_node::StateStorage; -use crate::storage::{CodecKVStore, CodecWriteBatch, ColumnFamilyName, StorageInstance}; -use crate::table_info::{TableInfoStorage, TableInfoStore}; -use crate::transaction::TransactionStorage; -use crate::transaction_info::{TransactionInfoHashStorage, TransactionInfoStorage}; use anyhow::{bail, format_err, Error, Result}; use network_p2p_types::peer_id::PeerId; -use num_enum::{IntoPrimitive, TryFromPrimitive}; -use once_cell::sync::Lazy; -use starcoin_accumulator::node::AccumulatorStoreType; -use starcoin_accumulator::AccumulatorTreeStore; +use starcoin_accumulator::{node::AccumulatorStoreType, AccumulatorTreeStore}; use starcoin_crypto::HashValue; +pub use starcoin_schemadb::db::{ + StorageVersion, BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, BLOCK_BODY_PREFIX_NAME, + BLOCK_HEADER_PREFIX_NAME, BLOCK_INFO_PREFIX_NAME, BLOCK_PREFIX_NAME, + BLOCK_TRANSACTIONS_PREFIX_NAME, BLOCK_TRANSACTION_INFOS_PREFIX_NAME, CHAIN_INFO_PREFIX_NAME, + CONTRACT_EVENT_PREFIX_NAME, DEFAULT_PREFIX_NAME, FAILED_BLOCK_PREFIX_NAME, + STATE_NODE_PREFIX_NAME, STATE_NODE_PREFIX_NAME_PREV, TABLE_INFO_PREFIX_NAME, + TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, TRANSACTION_INFO_HASH_PREFIX_NAME, + TRANSACTION_INFO_PREFIX_NAME, TRANSACTION_INFO_PREFIX_NAME_V2, TRANSACTION_PREFIX_NAME, +}; use starcoin_state_store_api::{StateNode, StateNodeStore}; -use starcoin_types::contract_event::ContractEvent; -use starcoin_types::startup_info::{ChainInfo, ChainStatus, SnapshotRange}; -use starcoin_types::transaction::{RichTransactionInfo, Transaction}; use starcoin_types::{ + account_address::AccountAddress, block::{Block, BlockBody, BlockHeader, BlockInfo}, - startup_info::StartupInfo, + contract_event::ContractEvent, + startup_info::{ChainInfo, ChainStatus, SnapshotRange, StartupInfo}, + transaction::{RichTransactionInfo, Transaction}, }; -//use starcoin_vm_types::state_store::table::{TableHandle, TableInfo}; -use starcoin_types::account_address::AccountAddress; use starcoin_vm_types::state_store::table::{TableHandle, TableInfo}; -use std::collections::BTreeMap; -use std::fmt::{Debug, Display, Formatter}; -use std::sync::Arc; -pub use upgrade::BARNARD_HARD_FORK_HASH; -pub use upgrade::BARNARD_HARD_FORK_HEIGHT; +use std::{ + collections::BTreeMap, + fmt::{Debug, Display, Formatter}, + sync::Arc, +}; +pub use upgrade::{BARNARD_HARD_FORK_HASH, BARNARD_HARD_FORK_HEIGHT}; pub mod accumulator; pub mod batch; @@ -45,8 +50,12 @@ pub mod cache_storage; pub mod chain_info; pub mod contract_event; pub mod db_storage; -pub mod errors; -pub mod metrics; +pub mod errors { + pub use starcoin_schemadb::error::StorageInitError; +} +pub mod metrics { + pub use starcoin_schemadb::metrics::*; +} pub mod state_node; pub mod storage; pub mod table_info; @@ -59,112 +68,6 @@ mod upgrade; #[macro_use] pub mod storage_macros; -pub const DEFAULT_PREFIX_NAME: ColumnFamilyName = "default"; -pub const BLOCK_ACCUMULATOR_NODE_PREFIX_NAME: ColumnFamilyName = "acc_node_block"; -pub const TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME: ColumnFamilyName = "acc_node_transaction"; -pub const BLOCK_PREFIX_NAME: ColumnFamilyName = "block"; -pub const BLOCK_HEADER_PREFIX_NAME: ColumnFamilyName = "block_header"; -pub const BLOCK_BODY_PREFIX_NAME: ColumnFamilyName = "block_body"; -pub const BLOCK_INFO_PREFIX_NAME: ColumnFamilyName = "block_info"; -pub const BLOCK_TRANSACTIONS_PREFIX_NAME: ColumnFamilyName = "block_txns"; -pub const BLOCK_TRANSACTION_INFOS_PREFIX_NAME: ColumnFamilyName = "block_txn_infos"; -pub const STATE_NODE_PREFIX_NAME: ColumnFamilyName = "state_node"; -pub const STATE_NODE_PREFIX_NAME_PREV: ColumnFamilyName = "state_node_prev"; -pub const CHAIN_INFO_PREFIX_NAME: ColumnFamilyName = "chain_info"; -pub const TRANSACTION_PREFIX_NAME: ColumnFamilyName = "transaction"; -pub const TRANSACTION_INFO_PREFIX_NAME: ColumnFamilyName = "transaction_info"; -pub const TRANSACTION_INFO_PREFIX_NAME_V2: ColumnFamilyName = "transaction_info_v2"; -pub const TRANSACTION_INFO_HASH_PREFIX_NAME: ColumnFamilyName = "transaction_info_hash"; -pub const CONTRACT_EVENT_PREFIX_NAME: ColumnFamilyName = "contract_event"; -pub const FAILED_BLOCK_PREFIX_NAME: ColumnFamilyName = "failed_block"; -pub const TABLE_INFO_PREFIX_NAME: ColumnFamilyName = "table_info"; - -///db storage use prefix_name vec to init -/// Please note that adding a prefix needs to be added in vec simultaneously, remember!! -static VEC_PREFIX_NAME_V1: Lazy> = Lazy::new(|| { - vec![ - BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, - TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, - BLOCK_PREFIX_NAME, - BLOCK_HEADER_PREFIX_NAME, - BLOCK_BODY_PREFIX_NAME, - BLOCK_INFO_PREFIX_NAME, - BLOCK_TRANSACTIONS_PREFIX_NAME, - BLOCK_TRANSACTION_INFOS_PREFIX_NAME, - STATE_NODE_PREFIX_NAME, - CHAIN_INFO_PREFIX_NAME, - TRANSACTION_PREFIX_NAME, - TRANSACTION_INFO_PREFIX_NAME, - TRANSACTION_INFO_HASH_PREFIX_NAME, - CONTRACT_EVENT_PREFIX_NAME, - FAILED_BLOCK_PREFIX_NAME, - ] -}); - -static VEC_PREFIX_NAME_V2: Lazy> = Lazy::new(|| { - vec![ - BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, - TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, - BLOCK_PREFIX_NAME, - BLOCK_HEADER_PREFIX_NAME, - BLOCK_BODY_PREFIX_NAME, - BLOCK_INFO_PREFIX_NAME, - BLOCK_TRANSACTIONS_PREFIX_NAME, - BLOCK_TRANSACTION_INFOS_PREFIX_NAME, - STATE_NODE_PREFIX_NAME, - CHAIN_INFO_PREFIX_NAME, - TRANSACTION_PREFIX_NAME, - TRANSACTION_INFO_PREFIX_NAME, - TRANSACTION_INFO_PREFIX_NAME_V2, - TRANSACTION_INFO_HASH_PREFIX_NAME, - CONTRACT_EVENT_PREFIX_NAME, - FAILED_BLOCK_PREFIX_NAME, - ] -}); - -static VEC_PREFIX_NAME_V3: Lazy> = Lazy::new(|| { - vec![ - BLOCK_ACCUMULATOR_NODE_PREFIX_NAME, - TRANSACTION_ACCUMULATOR_NODE_PREFIX_NAME, - BLOCK_PREFIX_NAME, - BLOCK_HEADER_PREFIX_NAME, - BLOCK_BODY_PREFIX_NAME, // unused column - BLOCK_INFO_PREFIX_NAME, - BLOCK_TRANSACTIONS_PREFIX_NAME, - BLOCK_TRANSACTION_INFOS_PREFIX_NAME, - STATE_NODE_PREFIX_NAME, - CHAIN_INFO_PREFIX_NAME, - TRANSACTION_PREFIX_NAME, - TRANSACTION_INFO_PREFIX_NAME, // unused column - TRANSACTION_INFO_PREFIX_NAME_V2, - TRANSACTION_INFO_HASH_PREFIX_NAME, - CONTRACT_EVENT_PREFIX_NAME, - FAILED_BLOCK_PREFIX_NAME, - TABLE_INFO_PREFIX_NAME, - ] -}); -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, IntoPrimitive, TryFromPrimitive)] -#[repr(u8)] -pub enum StorageVersion { - V1 = 1, - V2 = 2, - V3 = 3, -} - -impl StorageVersion { - pub fn current_version() -> StorageVersion { - StorageVersion::V3 - } - - pub fn get_column_family_names(&self) -> &'static [ColumnFamilyName] { - match self { - StorageVersion::V1 => &VEC_PREFIX_NAME_V1, - StorageVersion::V2 => &VEC_PREFIX_NAME_V2, - StorageVersion::V3 => &VEC_PREFIX_NAME_V3, - } - } -} - pub trait BlockStore { fn get_startup_info(&self) -> Result>; fn save_startup_info(&self, startup_info: StartupInfo) -> Result<()>; diff --git a/storage/src/storage.rs b/storage/src/storage.rs index bd012da98b..029cee3a86 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -1,23 +1,22 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -pub use crate::batch::WriteBatch; -use crate::cache_storage::CacheStorage; -use crate::db_storage::{DBStorage, SchemaIterator}; -use crate::upgrade::DBUpgrade; +use crate::{ + cache_storage::GCacheStorage, + db_storage::{ClassicIter, SchemaIterator}, + upgrade::DBUpgrade, +}; use anyhow::{bail, format_err, Result}; use byteorder::{BigEndian, ReadBytesExt}; use starcoin_config::NodeConfig; use starcoin_crypto::HashValue; use starcoin_logger::prelude::info; use starcoin_vm_types::state_store::table::TableHandle; -use std::convert::TryInto; -use std::fmt::Debug; -use std::marker::PhantomData; -use std::sync::Arc; - -/// Type alias to improve readability. -pub type ColumnFamilyName = &'static str; +use std::{convert::TryInto, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; +pub use { + crate::batch::WriteBatch, + starcoin_schemadb::{db::DBStorage, ColumnFamilyName, GWriteOp as WriteOp}, +}; #[allow(clippy::upper_case_acronyms)] pub trait KVStore: Send + Sync { @@ -34,10 +33,10 @@ pub trait KVStore: Send + Sync { } pub trait InnerStore: Send + Sync { - fn get(&self, prefix_name: &str, key: Vec) -> Result>>; - fn put(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()>; + fn get_raw(&self, prefix_name: &str, key: Vec) -> Result>>; + fn put_raw(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()>; fn contains_key(&self, prefix_name: &str, key: Vec) -> Result; - fn remove(&self, prefix_name: &str, key: Vec) -> Result<()>; + fn remove_raw(&self, prefix_name: &str, key: Vec) -> Result<()>; fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()>; fn get_len(&self) -> Result; fn keys(&self) -> Result>>; @@ -46,53 +45,59 @@ pub trait InnerStore: Send + Sync { fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>>; } -///Storage instance type define +pub type StorageInstance = GStorageInstance, Vec>; + +///Generic Storage instance type define #[derive(Clone)] #[allow(clippy::upper_case_acronyms)] -pub enum StorageInstance { +pub enum GStorageInstance +where + K: Hash + Eq + Default, + V: Default, +{ CACHE { - cache: Arc, + cache: Arc>, }, DB { db: Arc, }, CacheAndDb { - cache: Arc, + cache: Arc>, db: Arc, }, } -impl StorageInstance { +impl GStorageInstance +where + K: Hash + Eq + Default, + V: Default, +{ pub fn new_cache_instance() -> Self { - StorageInstance::CACHE { - cache: Arc::new(CacheStorage::new(None)), + GStorageInstance::CACHE { + cache: Arc::new(GCacheStorage::default()), } } pub fn new_db_instance(db: DBStorage) -> Self { Self::DB { db: Arc::new(db) } } - pub fn new_cache_and_db_instance(cache: CacheStorage, db: DBStorage) -> Self { + pub fn new_cache_and_db_instance(cache: GCacheStorage, db: DBStorage) -> Self { Self::CacheAndDb { cache: Arc::new(cache), db: Arc::new(db), } } - pub fn cache(&self) -> Option> { + pub fn cache(&self) -> Option>> { match self { - StorageInstance::CACHE { cache } | StorageInstance::CacheAndDb { cache, db: _ } => { - Some(cache.clone()) - } + Self::CACHE { cache } | Self::CacheAndDb { cache, db: _ } => Some(cache.clone()), _ => None, } } - pub fn db(&self) -> Option<&DBStorage> { + pub fn db(&self) -> Option<&Arc> { match self { - StorageInstance::DB { db } | StorageInstance::CacheAndDb { cache: _, db } => { - Some(db.as_ref()) - } + Self::DB { db } | Self::CacheAndDb { cache: _, db } => Some(db), _ => None, } } @@ -100,13 +105,13 @@ impl StorageInstance { // make sure Arc::strong_count(&db) == 1 unless will get None pub fn db_mut(&mut self) -> Option<&mut DBStorage> { match self { - StorageInstance::DB { db } | StorageInstance::CacheAndDb { cache: _, db } => { - Arc::get_mut(db) - } + Self::DB { db } | Self::CacheAndDb { cache: _, db } => Arc::get_mut(db), _ => None, } } +} +impl StorageInstance { pub fn check_upgrade(&mut self) -> Result<()> { DBUpgrade::check_upgrade(self) } @@ -129,17 +134,17 @@ impl StorageInstance { } impl InnerStore for StorageInstance { - fn get(&self, prefix_name: &str, key: Vec) -> Result>> { + fn get_raw(&self, prefix_name: &str, key: Vec) -> Result>> { match self { - StorageInstance::CACHE { cache } => cache.get(prefix_name, key), - StorageInstance::DB { db } => db.get(prefix_name, key), + StorageInstance::CACHE { cache } => cache.get_raw(prefix_name, key), + StorageInstance::DB { db } => db.get_raw(prefix_name, key), StorageInstance::CacheAndDb { cache, db } => { // first get from cache // if from cache get non-existent, query from db - if let Ok(Some(value)) = cache.get(prefix_name, key.clone()) { + if let Ok(Some(value)) = cache.get_raw(prefix_name, key.clone()) { Ok(Some(value)) } else { - match db.get(prefix_name, key)? { + match db.get_raw(prefix_name, key)? { Some(value) => { // cache.put_obj(prefix_name, key, CacheObject::Value(value.clone()))?; Ok(Some(value)) @@ -155,13 +160,13 @@ impl InnerStore for StorageInstance { } } - fn put(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { + fn put_raw(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { match self { - StorageInstance::CACHE { cache } => cache.put(prefix_name, key, value), - StorageInstance::DB { db } => db.put(prefix_name, key, value), + StorageInstance::CACHE { cache } => cache.put_raw(prefix_name, key, value), + StorageInstance::DB { db } => db.put_raw(prefix_name, key, value), StorageInstance::CacheAndDb { cache, db } => db - .put(prefix_name, key.clone(), value.clone()) - .and_then(|_| cache.put(prefix_name, key, value)), + .put_raw(prefix_name, key.clone(), value.clone()) + .and_then(|_| cache.put_raw(prefix_name, key, value)), } } @@ -178,13 +183,13 @@ impl InnerStore for StorageInstance { } } - fn remove(&self, prefix_name: &str, key: Vec) -> Result<()> { + fn remove_raw(&self, prefix_name: &str, key: Vec) -> Result<()> { match self { - StorageInstance::CACHE { cache } => cache.remove(prefix_name, key), - StorageInstance::DB { db } => db.remove(prefix_name, key), + StorageInstance::CACHE { cache } => cache.remove_raw(prefix_name, key), + StorageInstance::DB { db } => db.remove_raw(prefix_name, key), StorageInstance::CacheAndDb { cache, db } => { - match db.remove(prefix_name, key.clone()) { - Ok(_) => cache.remove(prefix_name, key), + match db.remove_raw(prefix_name, key.clone()) { + Ok(_) => cache.remove_raw(prefix_name, key), _ => bail!("db storage remove error."), } } @@ -221,11 +226,11 @@ impl InnerStore for StorageInstance { fn put_sync(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { match self { - StorageInstance::CACHE { cache } => cache.put(prefix_name, key, value), + StorageInstance::CACHE { cache } => cache.put_raw(prefix_name, key, value), StorageInstance::DB { db } => db.put_sync(prefix_name, key, value), StorageInstance::CacheAndDb { cache, db } => db .put_sync(prefix_name, key.clone(), value.clone()) - .and_then(|_| cache.put(prefix_name, key, value)), + .and_then(|_| cache.put_raw(prefix_name, key, value)), } } @@ -323,7 +328,7 @@ where CF: ColumnFamily, { fn get(&self, key: &[u8]) -> Result>> { - self.instance.get(self.prefix_name, key.to_vec()) + self.instance.get_raw(self.prefix_name, key.to_vec()) } fn multiple_get(&self, keys: Vec>) -> Result>>> { @@ -331,7 +336,7 @@ where } fn put(&self, key: Vec, value: Vec) -> Result<()> { - self.instance.put(self.prefix_name, key, value) + self.instance.put_raw(self.prefix_name, key, value) } fn contains_key(&self, key: Vec) -> Result { @@ -339,7 +344,7 @@ where } fn remove(&self, key: Vec) -> Result<()> { - self.instance.remove(self.prefix_name, key) + self.instance.remove_raw(self.prefix_name, key) } fn write_batch(&self, batch: WriteBatch) -> Result<()> { @@ -381,31 +386,13 @@ pub trait ValueCodec: Clone + Sized + Debug + std::marker::Send + std::marker::S fn decode_value(data: &[u8]) -> Result; } -#[derive(Debug, Clone)] -pub enum WriteOp { - Value(V), - Deletion, -} - -impl WriteOp -where - V: ValueCodec, -{ - pub fn into_raw_op(self) -> Result>> { - Ok(match self { - WriteOp::Value(v) => WriteOp::Value(v.encode_value()?), - WriteOp::Deletion => WriteOp::Deletion, - }) - } -} - #[derive(Debug, Clone)] pub struct CodecWriteBatch where K: KeyCodec, V: ValueCodec, { - rows: Vec<(K, WriteOp)>, + rows: Vec>, } impl Default for CodecWriteBatch @@ -430,25 +417,25 @@ where pub fn new_puts(kvs: Vec<(K, V)>) -> Self { let mut rows = Vec::new(); - rows.extend(kvs.into_iter().map(|(k, v)| (k, WriteOp::Value(v)))); + rows.extend(kvs.into_iter().map(|(k, v)| (WriteOp::Value(k, v)))); Self { rows } } pub fn new_deletes(ks: Vec) -> Self { let mut rows = Vec::new(); - rows.extend(ks.into_iter().map(|k| (k, WriteOp::Deletion))); + rows.extend(ks.into_iter().map(|k| WriteOp::Deletion(k))); Self { rows } } /// Adds an insert/update operation to the batch. pub fn put(&mut self, key: K, value: V) -> Result<()> { - self.rows.push((key, WriteOp::Value(value))); + self.rows.push(WriteOp::Value(key, value)); Ok(()) } /// Adds a delete operation to the batch. pub fn delete(&mut self, key: K) -> Result<()> { - self.rows.push((key, WriteOp::Deletion)); + self.rows.push(WriteOp::Deletion(key)); Ok(()) } @@ -464,8 +451,8 @@ where K: KeyCodec, V: ValueCodec, { - type Item = (K, WriteOp); - type IntoIter = std::vec::IntoIter<(K, WriteOp)>; + type Item = WriteOp; + type IntoIter = std::vec::IntoIter>; fn into_iter(self) -> Self::IntoIter { self.rows.into_iter() @@ -648,6 +635,6 @@ where .storage() .db() .ok_or_else(|| format_err!("Only support scan on db storage instance"))?; - db.iter::(self.get_store().prefix_name) + db.iter_raw::(self.get_store().prefix_name) } } diff --git a/storage/src/tests/test_batch.rs b/storage/src/tests/test_batch.rs index caeaaf5acf..5b4ebcfa5b 100644 --- a/storage/src/tests/test_batch.rs +++ b/storage/src/tests/test_batch.rs @@ -56,7 +56,7 @@ fn test_db_batch() { assert_eq!( RichTransactionInfo::decode_value( &db_storage - .get(DEFAULT_PREFIX_NAME, id.to_vec()) + .get_raw(DEFAULT_PREFIX_NAME, id.to_vec()) .unwrap() .unwrap() ) @@ -66,7 +66,7 @@ fn test_db_batch() { assert_eq!( RichTransactionInfo::decode_value( &db_storage - .get(DEFAULT_PREFIX_NAME, id2.to_vec()) + .get_raw(DEFAULT_PREFIX_NAME, id2.to_vec()) .unwrap() .unwrap() ) @@ -115,7 +115,7 @@ fn test_cache_batch() { assert_eq!( RichTransactionInfo::decode_value( &cache_storage - .get(DEFAULT_PREFIX_NAME, id.to_vec()) + .get_raw(DEFAULT_PREFIX_NAME, id.to_vec()) .unwrap() .unwrap() ) @@ -125,7 +125,7 @@ fn test_cache_batch() { assert_eq!( RichTransactionInfo::decode_value( &cache_storage - .get(DEFAULT_PREFIX_NAME, id2.to_vec()) + .get_raw(DEFAULT_PREFIX_NAME, id2.to_vec()) .unwrap() .unwrap() ) @@ -145,7 +145,7 @@ fn test_batch_comm() { write_batch.delete(key.to_vec()).unwrap(); let result = db.write_batch(DEFAULT_PREFIX_NAME, write_batch.clone()); assert!(result.is_ok()); - let result = db.get(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(); + let result = db.get_raw(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(); assert_eq!(result, None); let mut key_vec = vec![]; write_batch.clone().clear().unwrap(); diff --git a/storage/src/tests/test_storage.rs b/storage/src/tests/test_storage.rs index be7a2eaa44..b1a40ca63b 100644 --- a/storage/src/tests/test_storage.rs +++ b/storage/src/tests/test_storage.rs @@ -36,17 +36,17 @@ fn test_reopen() { let value = HashValue::zero(); { let db = DBStorage::new(tmpdir.path(), RocksdbConfig::default(), None).unwrap(); - db.put(DEFAULT_PREFIX_NAME, key.to_vec(), value.to_vec()) + db.put_raw(DEFAULT_PREFIX_NAME, key.to_vec(), value.to_vec()) .unwrap(); assert_eq!( - db.get(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(), + db.get_raw(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(), Some(value.to_vec()) ); } { let db = DBStorage::new(tmpdir.path(), RocksdbConfig::default(), None).unwrap(); assert_eq!( - db.get(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(), + db.get_raw(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(), Some(value.to_vec()) ); } @@ -58,10 +58,11 @@ fn test_open_read_only() { let db = DBStorage::new(tmpdir.path(), RocksdbConfig::default(), None).unwrap(); let key = HashValue::random(); let value = HashValue::zero(); - let result = db.put(DEFAULT_PREFIX_NAME, key.to_vec(), value.to_vec()); + let result = db.put_raw(DEFAULT_PREFIX_NAME, key.to_vec(), value.to_vec()); assert!(result.is_ok()); let path = tmpdir.as_ref().join("starcoindb"); let db = DBStorage::open_with_cfs( + "test", path, StorageVersion::current_version() .get_column_family_names() @@ -71,9 +72,9 @@ fn test_open_read_only() { None, ) .unwrap(); - let result = db.put(DEFAULT_PREFIX_NAME, key.to_vec(), value.to_vec()); + let result = db.put_raw(DEFAULT_PREFIX_NAME, key.to_vec(), value.to_vec()); assert!(result.is_err()); - let result = db.get(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(); + let result = db.get_raw(DEFAULT_PREFIX_NAME, key.to_vec()).unwrap(); assert_eq!(result, Some(value.to_vec())); } @@ -176,14 +177,14 @@ fn test_two_level_storage() { assert_eq!(transaction_info1, transaction_info2.unwrap()); //verfiy cache storage let value3 = cache_storage - .get(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) + .get_raw(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) .unwrap() .unwrap(); let transaction_info3 = RichTransactionInfo::decode_value(&value3).unwrap(); assert_eq!(transaction_info3, transaction_info1); // // verify db storage let value4 = db_storage - .get(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) + .get_raw(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) .unwrap() .unwrap(); let transaction_info4 = RichTransactionInfo::decode_value(&value4).unwrap(); @@ -194,11 +195,11 @@ fn test_two_level_storage() { assert_eq!(transaction_info5, None); // verify cache storage is null let value6 = cache_storage - .get(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) + .get_raw(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) .unwrap(); assert!(value6.is_none()); let value7 = db_storage - .get(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) + .get_raw(TRANSACTION_INFO_PREFIX_NAME_V2, id.to_vec()) .unwrap(); assert_eq!(value7, None); } @@ -245,7 +246,7 @@ fn test_two_level_storage_read_through() -> Result<()> { let transaction_info_data = storage_instance .cache() .unwrap() - .get(TRANSACTION_INFO_PREFIX_NAME, id.to_vec())?; + .get_raw(TRANSACTION_INFO_PREFIX_NAME, id.to_vec())?; assert!(transaction_info_data.is_none()); //let transaction_info3 = @@ -267,14 +268,14 @@ fn test_missing_key_handle() -> Result<()> { let key = HashValue::random(); let result = storage.get_transaction_info(key)?; assert!(result.is_none()); - let value2 = cache_storage.get(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; + let value2 = cache_storage.get_raw(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; assert!(value2.is_none()); - let value3 = db_storage.get(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; + let value3 = db_storage.get_raw(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; assert!(value3.is_none()); // test remove - let result2 = instance.remove(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec()); + let result2 = instance.remove_raw(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec()); assert!(result2.is_ok()); - let value4 = cache_storage.get(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; + let value4 = cache_storage.get_raw(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; assert!(value4.is_none()); let contains = instance.contains_key(TRANSACTION_INFO_PREFIX_NAME, key.clone().to_vec())?; assert!(!contains);