Skip to content

Commit

Permalink
Apply mempool dust prevention patch + version prep (#286)
Browse files Browse the repository at this point in the history
* reject spam transactions

* allow multiple grpc client connections from the same ip

* log utxo set size

* improve spam logs

* rollback virtual utxoset sweep on startup

* block dust txs only on mainnet

* bump version to 0.1.7

* fine tune DB file limits for non-consensus DBs
  • Loading branch information
michaelsutton authored Sep 28, 2023
1 parent 79bc631 commit 1c2f766
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 116 deletions.
99 changes: 50 additions & 49 deletions Cargo.lock

Large diffs are not rendered by default.

94 changes: 47 additions & 47 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ members = [
]

[workspace.package]
version = "0.1.6"
version = "0.1.7"
authors = ["Kaspa developers"]
license = "MIT/Apache-2.0"
edition = "2021"
Expand All @@ -70,53 +70,53 @@ include = [
]

[workspace.dependencies]
# kaspa-testing-integration = { version = "0.1.2", path = "testing/integration" }
kaspa-os = { version = "0.1.6", path = "kaspa-os" }
kaspa-daemon = { version = "0.1.6", path = "daemon" }
kaspa-addresses = { version = "0.1.6", path = "crypto/addresses" }
kaspa-addressmanager = { version = "0.1.6", path = "components/addressmanager" }
kaspa-bip32 = { version = "0.1.6", path = "wallet/bip32" }
kaspa-connectionmanager = { version = "0.1.6", path = "components/connectionmanager" }
kaspa-consensus = { version = "0.1.6", path = "consensus" }
kaspa-consensus-core = { version = "0.1.6", path = "consensus/core" }
kaspa-consensus-notify = { version = "0.1.6", path = "consensus/notify" }
kaspa-consensus-wasm = { version = "0.1.6", path = "consensus/wasm" }
kaspa-consensusmanager = { version = "0.1.6", path = "components/consensusmanager" }
kaspa-core = { version = "0.1.6", path = "core" }
kaspa-database = { version = "0.1.6", path = "database" }
kaspa-grpc-client = { version = "0.1.6", path = "rpc/grpc/client" }
kaspa-grpc-core = { version = "0.1.6", path = "rpc/grpc/core" }
kaspa-grpc-server = { version = "0.1.6", path = "rpc/grpc/server" }
kaspa-hashes = { version = "0.1.6", path = "crypto/hashes" }
kaspa-index-core = { version = "0.1.6", path = "indexes/core" }
kaspa-index-processor = { version = "0.1.6", path = "indexes/processor" }
kaspa-math = { version = "0.1.6", path = "math" }
kaspa-merkle = { version = "0.1.6", path = "crypto/merkle" }
kaspa-mining = { version = "0.1.6", path = "mining" }
# kaspa-testing-integration = { version = "0.1.7", path = "testing/integration" }
kaspa-os = { version = "0.1.7", path = "kaspa-os" }
kaspa-daemon = { version = "0.1.7", path = "daemon" }
kaspa-addresses = { version = "0.1.7", path = "crypto/addresses" }
kaspa-addressmanager = { version = "0.1.7", path = "components/addressmanager" }
kaspa-bip32 = { version = "0.1.7", path = "wallet/bip32" }
kaspa-connectionmanager = { version = "0.1.7", path = "components/connectionmanager" }
kaspa-consensus = { version = "0.1.7", path = "consensus" }
kaspa-consensus-core = { version = "0.1.7", path = "consensus/core" }
kaspa-consensus-notify = { version = "0.1.7", path = "consensus/notify" }
kaspa-consensus-wasm = { version = "0.1.7", path = "consensus/wasm" }
kaspa-consensusmanager = { version = "0.1.7", path = "components/consensusmanager" }
kaspa-core = { version = "0.1.7", path = "core" }
kaspa-database = { version = "0.1.7", path = "database" }
kaspa-grpc-client = { version = "0.1.7", path = "rpc/grpc/client" }
kaspa-grpc-core = { version = "0.1.7", path = "rpc/grpc/core" }
kaspa-grpc-server = { version = "0.1.7", path = "rpc/grpc/server" }
kaspa-hashes = { version = "0.1.7", path = "crypto/hashes" }
kaspa-index-core = { version = "0.1.7", path = "indexes/core" }
kaspa-index-processor = { version = "0.1.7", path = "indexes/processor" }
kaspa-math = { version = "0.1.7", path = "math" }
kaspa-merkle = { version = "0.1.7", path = "crypto/merkle" }
kaspa-mining = { version = "0.1.7", path = "mining" }
kaspa-mining-errors = { path = "mining/errors" }
kaspa-muhash = { version = "0.1.6", path = "crypto/muhash" }
kaspa-notify = { version = "0.1.6", path = "notify" }
kaspa-p2p-flows = { version = "0.1.6", path = "protocol/flows" }
kaspa-p2p-lib = { version = "0.1.6", path = "protocol/p2p" }
kaspa-pow = { version = "0.1.6", path = "consensus/pow" }
kaspa-rpc-core = { version = "0.1.6", path = "rpc/core" }
kaspa-rpc-macros = { version = "0.1.6", path = "rpc/macros" }
kaspa-rpc-service = { version = "0.1.6", path = "rpc/service" }
kaspa-txscript = { version = "0.1.6", path = "crypto/txscript" }
kaspa-txscript-errors = { version = "0.1.6", path = "crypto/txscript/errors" }
kaspa-utils = { version = "0.1.6", path = "utils" }
kaspa-utxoindex = { version = "0.1.6", path = "indexes/utxoindex" }
kaspa-wallet = { version = "0.1.6", path = "wallet/native" }
kaspa-cli = { version = "0.1.6", path = "cli" }
kaspa-wallet-cli-wasm = { version = "0.1.6", path = "wallet/wasm" }
kaspa-wallet-core = { version = "0.1.6", path = "wallet/core" }
kaspa-wasm = { version = "0.1.6", path = "wasm" }
kaspa-wrpc-core = { version = "0.1.6", path = "rpc/wrpc/core" }
kaspa-wrpc-client = { version = "0.1.6", path = "rpc/wrpc/client" }
kaspa-wrpc-proxy = { version = "0.1.6", path = "rpc/wrpc/proxy" }
kaspa-wrpc-server = { version = "0.1.6", path = "rpc/wrpc/server" }
kaspa-wrpc-wasm = { version = "0.1.6", path = "rpc/wrpc/wasm" }
kaspad = { version = "0.1.6", path = "kaspad" }
kaspa-muhash = { version = "0.1.7", path = "crypto/muhash" }
kaspa-notify = { version = "0.1.7", path = "notify" }
kaspa-p2p-flows = { version = "0.1.7", path = "protocol/flows" }
kaspa-p2p-lib = { version = "0.1.7", path = "protocol/p2p" }
kaspa-pow = { version = "0.1.7", path = "consensus/pow" }
kaspa-rpc-core = { version = "0.1.7", path = "rpc/core" }
kaspa-rpc-macros = { version = "0.1.7", path = "rpc/macros" }
kaspa-rpc-service = { version = "0.1.7", path = "rpc/service" }
kaspa-txscript = { version = "0.1.7", path = "crypto/txscript" }
kaspa-txscript-errors = { version = "0.1.7", path = "crypto/txscript/errors" }
kaspa-utils = { version = "0.1.7", path = "utils" }
kaspa-utxoindex = { version = "0.1.7", path = "indexes/utxoindex" }
kaspa-wallet = { version = "0.1.7", path = "wallet/native" }
kaspa-cli = { version = "0.1.7", path = "cli" }
kaspa-wallet-cli-wasm = { version = "0.1.7", path = "wallet/wasm" }
kaspa-wallet-core = { version = "0.1.7", path = "wallet/core" }
kaspa-wasm = { version = "0.1.7", path = "wasm" }
kaspa-wrpc-core = { version = "0.1.7", path = "rpc/wrpc/core" }
kaspa-wrpc-client = { version = "0.1.7", path = "rpc/wrpc/client" }
kaspa-wrpc-proxy = { version = "0.1.7", path = "rpc/wrpc/proxy" }
kaspa-wrpc-server = { version = "0.1.7", path = "rpc/wrpc/server" }
kaspa-wrpc-wasm = { version = "0.1.7", path = "rpc/wrpc/wasm" }
kaspad = { version = "0.1.7", path = "kaspad" }
kaspa-perf-monitor = { path = "metrics/perf_monitor" }

# external
Expand Down
4 changes: 4 additions & 0 deletions consensus/core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl NetworkId {
self.network_type
}

pub fn is_mainnet(&self) -> bool {
self.network_type == NetworkType::Mainnet
}

pub fn suffix(&self) -> Option<u32> {
self.suffix
}
Expand Down
22 changes: 17 additions & 5 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const DEFAULT_DATA_DIR: &str = "datadir";
const CONSENSUS_DB: &str = "consensus";
const UTXOINDEX_DB: &str = "utxoindex";
const META_DB: &str = "meta";
const UTXO_INDEX_DB_FILE_LIMIT: i32 = 100;
const META_DB_FILE_LIMIT: i32 = 5;
const DEFAULT_LOG_DIR: &str = "logs";

fn get_home_dir() -> PathBuf {
Expand Down Expand Up @@ -192,7 +194,8 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
}

// DB used for addresses store and for multi-consensus management
let mut meta_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(meta_db_dir.clone()).build();
let mut meta_db =
kaspa_database::prelude::ConnBuilder::default().with_files_limit(META_DB_FILE_LIMIT).with_db_path(meta_db_dir.clone()).build();

// TEMP: upgrade from Alpha version or any version before this one
if meta_db.get_pinned(b"multi-consensus-metadata-key").is_ok_and(|r| r.is_some()) {
Expand All @@ -213,7 +216,8 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
fs::create_dir_all(utxoindex_db_dir.as_path()).unwrap();

// Reopen the DB
meta_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(meta_db_dir).build();
meta_db =
kaspa_database::prelude::ConnBuilder::default().with_files_limit(META_DB_FILE_LIMIT).with_db_path(meta_db_dir).build();
}

let connect_peers = args.connect_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect::<Vec<_>>();
Expand Down Expand Up @@ -266,7 +270,10 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv));
let index_service: Option<Arc<IndexService>> = if args.utxoindex {
// Use only a single thread for none-consensus databases
let utxoindex_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(utxoindex_db_dir).build();
let utxoindex_db = kaspa_database::prelude::ConnBuilder::default()
.with_files_limit(UTXO_INDEX_DB_FILE_LIMIT)
.with_db_path(utxoindex_db_dir)
.build();
let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap());
let index_service = Arc::new(IndexService::new(&notify_service.notifier(), Some(utxoindex)));
Some(index_service)
Expand All @@ -275,8 +282,13 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
};

let address_manager = AddressManager::new(config.clone(), meta_db);
let mining_manager =
MiningManagerProxy::new(Arc::new(MiningManager::new(config.target_time_per_block, false, config.max_block_mass, None)));
let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new_with_spam_blocking_option(
network.is_mainnet(),
config.target_time_per_block,
false,
config.max_block_mass,
None,
)));

let flow_context = Arc::new(FlowContext::new(
consensus_manager.clone(),
Expand Down
3 changes: 3 additions & 0 deletions mining/errors/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub enum RuleError {
// TODO: This error is added for the tx_relay flow but is never constructed neither in the golang nor in this version. Discuss if it can be removed.
#[error("transaction {0} is invalid")]
RejectInvalid(TransactionId),

#[error("Rejected spam tx {0} from mempool")]
RejectSpamTransaction(TransactionId),
}

impl From<NonStandardError> for RuleError {
Expand Down
16 changes: 16 additions & 0 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ impl MiningManager {
Self::with_config(config, cache_lifetime)
}

pub fn new_with_spam_blocking_option(
block_spam_txs: bool,
target_time_per_block: u64,
relay_non_std_transactions: bool,
max_block_mass: u64,
cache_lifetime: Option<u64>,
) -> Self {
let config = Config::build_default_with_spam_blocking_option(
block_spam_txs,
target_time_per_block,
relay_non_std_transactions,
max_block_mass,
);
Self::with_config(config, cache_lifetime)
}

pub(crate) fn with_config(config: Config, cache_lifetime: Option<u64>) -> Self {
let block_template_builder = BlockTemplateBuilder::new(config.maximum_mass_per_block);
let mempool = RwLock::new(Mempool::new(config));
Expand Down
17 changes: 16 additions & 1 deletion mining/src/mempool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Config {
pub minimum_relay_transaction_fee: u64,
pub minimum_standard_transaction_version: u16,
pub maximum_standard_transaction_version: u16,
pub block_spam_txs: bool,
}

impl Config {
Expand All @@ -56,6 +57,7 @@ impl Config {
minimum_relay_transaction_fee: u64,
minimum_standard_transaction_version: u16,
maximum_standard_transaction_version: u16,
block_spam_txs: bool,
) -> Self {
Self {
maximum_transaction_count,
Expand All @@ -71,12 +73,13 @@ impl Config {
minimum_relay_transaction_fee,
minimum_standard_transaction_version,
maximum_standard_transaction_version,
block_spam_txs,
}
}

/// Build a default config.
/// The arguments should be obtained from the current consensus [`kaspa_consensus_core::config::params::Params`] instance.
pub fn build_default(target_milliseconds_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64) -> Self {
pub const fn build_default(target_milliseconds_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64) -> Self {
Self {
maximum_transaction_count: DEFAULT_MAXIMUM_TRANSACTION_COUNT,
transaction_expire_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS * 1000 / target_milliseconds_per_block,
Expand All @@ -92,6 +95,18 @@ impl Config {
minimum_relay_transaction_fee: DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE,
minimum_standard_transaction_version: DEFAULT_MINIMUM_STANDARD_TRANSACTION_VERSION,
maximum_standard_transaction_version: DEFAULT_MAXIMUM_STANDARD_TRANSACTION_VERSION,
block_spam_txs: false,
}
}

/// Build a default config with optional spam blocking.
/// The arguments should be obtained from the current consensus [`kaspa_consensus_core::config::params::Params`] instance.
pub const fn build_default_with_spam_blocking_option(
block_spam_txs: bool,
target_milliseconds_per_block: u64,
relay_non_std_transactions: bool,
max_block_mass: u64,
) -> Self {
Self { block_spam_txs, ..Self::build_default(target_milliseconds_per_block, relay_non_std_transactions, max_block_mass) }
}
}
17 changes: 16 additions & 1 deletion mining/src/mempool/validate_and_insert_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::mempool::{
};
use kaspa_consensus_core::{
api::ConsensusApi,
constants::UNACCEPTED_DAA_SCORE,
constants::{SOMPI_PER_KASPA, UNACCEPTED_DAA_SCORE},
tx::{MutableTransaction, Transaction, TransactionId, TransactionOutpoint, UtxoEntry},
};
use kaspa_core::info;
Expand Down Expand Up @@ -84,6 +84,21 @@ impl Mempool {
}

fn validate_transaction_in_context(&self, transaction: &MutableTransaction) -> RuleResult<()> {
if self.config.block_spam_txs {
// TEMP: apply go-kaspad mempool dust prevention patch
// Note: we do not apply the part of the patch which modifies BBT since
// we do not support BBT on mainnet yet
let has_coinbase_input = transaction.entries.iter().any(|e| e.as_ref().unwrap().is_coinbase);
let num_extra_outs = transaction.tx.outputs.len() as i64 - transaction.tx.inputs.len() as i64;
if !has_coinbase_input
&& num_extra_outs > 2
&& transaction.calculated_fee.unwrap() < num_extra_outs as u64 * SOMPI_PER_KASPA
{
kaspa_core::trace!("Rejected spam tx {} from mempool ({} outputs)", transaction.id(), transaction.tx.outputs.len());
return Err(RuleError::RejectSpamTransaction(transaction.id()));
}
}

if !self.config.accept_non_standard {
self.check_transaction_standard_in_context(transaction)?;
}
Expand Down
20 changes: 16 additions & 4 deletions protocol/flows/src/v5/txrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct RelayTransactionsFlow {
invs_route: IncomingRoute,
/// A route for other messages such as Transaction and TransactionNotFound
msg_route: IncomingRoute,

/// Track the number of spam txs coming from this peer
spam_counter: u64,
}

#[async_trait::async_trait]
Expand All @@ -59,7 +62,7 @@ impl Flow for RelayTransactionsFlow {

impl RelayTransactionsFlow {
pub fn new(ctx: FlowContext, router: Arc<Router>, invs_route: IncomingRoute, msg_route: IncomingRoute) -> Self {
Self { ctx, router, invs_route, msg_route }
Self { ctx, router, invs_route, msg_route, spam_counter: 0 }
}

pub fn invs_channel_size() -> usize {
Expand Down Expand Up @@ -190,9 +193,18 @@ impl RelayTransactionsFlow {
self.ctx.broadcast_transactions(accepted_transactions.iter().map(|x| x.id())).await?;
}
Err(MiningManagerError::MempoolError(err)) => {
if let RuleError::RejectInvalid(_) = err {
// TODO: discuss a banning process
return Err(ProtocolError::MisbehavingPeer(format!("rejected invalid transaction {}", transaction_id)));
match err {
RuleError::RejectInvalid(_) => {
// TODO: discuss a banning process
return Err(ProtocolError::MisbehavingPeer(format!("rejected invalid transaction {}", transaction_id)));
}
RuleError::RejectSpamTransaction(_) => {
self.spam_counter += 1;
if self.spam_counter % 100 == 0 {
kaspa_core::warn!("Peer {} has shared {} spam txs", self.router, self.spam_counter);
}
}
_ => (),
}
continue;
}
Expand Down
1 change: 1 addition & 0 deletions rpc/grpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ kaspa-grpc-core.workspace = true
kaspa-utils.workspace = true
kaspa-core.workspace = true

uuid.workspace = true
faster-hex.workspace = true
async-channel.workspace = true
parking_lot.workspace = true
Expand Down
16 changes: 13 additions & 3 deletions rpc/grpc/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ use tokio::sync::{
oneshot::{channel as oneshot_channel, Sender as OneshotSender},
};
use tonic::Streaming;
use uuid::Uuid;

pub type GrpcSender = MpscSender<StatusResult<KaspadResponse>>;
pub type StatusResult<T> = Result<T, tonic::Status>;

#[derive(Debug)]
struct Inner {
/// The internal id of this client
id: Uuid,

/// The socket address of this client
net_address: SocketAddr,

Expand Down Expand Up @@ -67,7 +71,13 @@ impl Connection {
) -> Self {
let (shutdown_sender, mut shutdown_receiver) = oneshot_channel();
let connection = Self {
inner: Arc::new(Inner { net_address, outgoing_route, manager, shutdown_signal: Mutex::new(Some(shutdown_sender)) }),
inner: Arc::new(Inner {
id: Uuid::new_v4(),
net_address,
outgoing_route,
manager,
shutdown_signal: Mutex::new(Some(shutdown_sender)),
}),
};
let connection_clone = connection.clone();
let outgoing_route = connection.inner.outgoing_route.clone();
Expand Down Expand Up @@ -150,8 +160,8 @@ impl Connection {
self.inner.net_address
}

pub fn identity(&self) -> SocketAddr {
self.inner.net_address
pub fn identity(&self) -> Uuid {
self.inner.id
}

async fn handle_request(request: KaspadRequest, core_service: &DynRpcService) -> GrpcServerResult<KaspadResponse> {
Expand Down
Loading

0 comments on commit 1c2f766

Please sign in to comment.