diff --git a/crates/sui-core/src/authority/execution_time_estimator.rs b/crates/sui-core/src/authority/execution_time_estimator.rs index 643494c890b2c..8e20e5f5d94a9 100644 --- a/crates/sui-core/src/authority/execution_time_estimator.rs +++ b/crates/sui-core/src/authority/execution_time_estimator.rs @@ -5,7 +5,7 @@ use std::{ collections::HashMap, num::NonZeroUsize, sync::{Arc, Weak}, - time::{Duration, Instant}, + time::Duration, }; use super::authority_per_epoch_store::AuthorityPerEpochStore; @@ -17,6 +17,7 @@ use mysten_metrics::{monitored_scope, spawn_monitored_task}; use simple_moving_average::{SingleSumSMA, SMA}; use sui_protocol_config::PerObjectCongestionControlMode; use sui_types::{ + base_types::ObjectID, committee::Committee, error::SuiError, execution::{ExecutionTimeObservationKey, ExecutionTiming}, @@ -26,24 +27,41 @@ use sui_types::{ TransactionDataAPI, TransactionKind, }, }; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::Instant}; use tracing::{debug, info, warn}; +// TODO: Move all these consts into protocol configs once design stabilizes. + const LOCAL_OBSERVATION_WINDOW_SIZE: usize = 10; -// If our current local observation differs from the last one we shared by more than -// this percent, we share a new one. +// We won't share a new observation with consensus unless our current local observation differs +// from the last one we shared by more than this percentage. const OBSERVATION_SHARING_DIFF_THRESHOLD: f64 = 0.05; +// We won't share a new observation with consensus unless target object utilization is exceeded +// by at least this amount. +const OBSERVATION_SHARING_OBJECT_UTILIZATION_THRESHOLD: Duration = Duration::from_millis(500); + // Minimum interval between sharing multiple observations of the same key. const OBSERVATION_SHARING_MIN_INTERVAL: Duration = Duration::from_secs(5); +const OBJECT_UTILIZATION_TRACKER_CAPACITY: usize = 50_000; + +// TODO: source from time-based utilization target param in ProtocolConfig when available. +const TARGET_OBJECT_UTILIZATION: f64 = 0.5; + // Collects local execution time estimates to share via consensus. pub struct ExecutionTimeObserver { epoch_store: Weak, consensus_adapter: Box, + observation_sharing_object_utilization_threshold: Duration, local_observations: LruCache, + + // For each object, tracks the amount of time above our utilization target that we spent + // executing transactions. This is used to decide which observations should be shared + // via consensus. + object_utilization_tracker: LruCache, } #[derive(Debug, Clone)] @@ -52,13 +70,19 @@ pub struct LocalObservations { last_shared: Option<(Duration, Instant)>, } +#[derive(Debug, Clone)] +pub struct ObjectUtilization { + excess_execution_time: Duration, + last_measured: Option, +} + // Tracks local execution time observations and shares them via consensus. impl ExecutionTimeObserver { pub fn spawn( epoch_store: Arc, consensus_adapter: Box, channel_size: usize, - lru_cache_size: NonZeroUsize, + observation_cache_size: NonZeroUsize, ) { if epoch_store .protocol_config() @@ -76,7 +100,12 @@ impl ExecutionTimeObserver { let mut observer = Self { epoch_store: Arc::downgrade(&epoch_store), consensus_adapter, - local_observations: LruCache::new(lru_cache_size), + local_observations: LruCache::new(observation_cache_size), + object_utilization_tracker: LruCache::new( + NonZeroUsize::new(OBJECT_UTILIZATION_TRACKER_CAPACITY).unwrap(), + ), + observation_sharing_object_utilization_threshold: + OBSERVATION_SHARING_OBJECT_UTILIZATION_THRESHOLD, }; spawn_monitored_task!(epoch_store.within_alive_epoch(async move { while let Some((tx, timings, total_duration)) = rx_local_execution_time.recv().await { @@ -92,11 +121,16 @@ impl ExecutionTimeObserver { fn new_for_testing( epoch_store: Arc, consensus_adapter: Box, + observation_sharing_object_utilization_threshold: Duration, ) -> Self { Self { epoch_store: Arc::downgrade(&epoch_store), consensus_adapter, local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()), + object_utilization_tracker: LruCache::new( + NonZeroUsize::new(OBJECT_UTILIZATION_TRACKER_CAPACITY).unwrap(), + ), + observation_sharing_object_utilization_threshold, } } @@ -114,6 +148,43 @@ impl ExecutionTimeObserver { assert!(tx.commands.len() >= timings.len()); + // Update the accumulated excess execution time for each mutable shared object + // used in this transaction, and determine the max overage. + let max_excess_per_object_execution_time = tx + .shared_input_objects() + .filter_map(|obj| obj.mutable.then_some(obj.id)) + .map(|id| { + // For each object: + // - add the execution time of the current transaction to the tracker + // - subtract the maximum amount of time available for execution according + // to our utilization target since the last report was received + // (clamping to zero) + // + // What remains is the amount of excess time spent executing transactions on + // the object above the intended limit. If this value is greater than zero, + // it means the object is overutilized. + let now = Instant::now(); + let utilization = + self.object_utilization_tracker + .get_or_insert_mut(id, || ObjectUtilization { + excess_execution_time: Duration::ZERO, + last_measured: None, + }); + utilization.excess_execution_time += total_duration; + utilization.excess_execution_time = + utilization.excess_execution_time.saturating_sub( + utilization + .last_measured + .map(|last_measured| now.duration_since(last_measured)) + .unwrap_or(Duration::MAX) + .mul_f64(TARGET_OBJECT_UTILIZATION), + ); + utilization.last_measured = Some(now); + utilization.excess_execution_time + }) + .max() + .unwrap_or(Duration::ZERO); + let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum(); let extra_overhead = total_duration - total_command_duration; @@ -148,19 +219,22 @@ impl ExecutionTimeObserver { .moving_average .add_sample(command_duration); - // Send a new observation through consensus if our current moving average - // differs too much from the last one we shared. - // TODO: Consider only sharing observations for entrypoints with congestion. + // Send a new observation through consensus if: + // - our current moving average differs too much from the last one we shared, and + // - the tx has at least one mutable shared object with utilization that's too high // TODO: Consider only sharing observations that disagree with consensus estimate. let new_average = local_observation.moving_average.get_average(); - if local_observation - .last_shared - .is_none_or(|(last_shared, last_shared_timestamp)| { - let diff = last_shared.abs_diff(new_average); - diff > new_average.mul_f64(OBSERVATION_SHARING_DIFF_THRESHOLD) - && last_shared_timestamp.elapsed() > OBSERVATION_SHARING_MIN_INTERVAL - }) - { + let diff_exceeds_threshold = + local_observation + .last_shared + .is_none_or(|(last_shared, last_shared_timestamp)| { + let diff = last_shared.abs_diff(new_average); + diff >= new_average.mul_f64(OBSERVATION_SHARING_DIFF_THRESHOLD) + && last_shared_timestamp.elapsed() >= OBSERVATION_SHARING_MIN_INTERVAL + }); + let utilization_exceeds_threshold = max_excess_per_object_execution_time + >= self.observation_sharing_object_utilization_threshold; + if diff_exceeds_threshold && utilization_exceeds_threshold { debug!("sharing new execution time observation for {key:?}: {new_average:?}"); to_share.push((key, new_average)); local_observation.last_shared = Some((new_average, Instant::now())); @@ -403,8 +477,8 @@ mod tests { ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics, MockConsensusClient, }; - use sui_types::base_types::{ObjectID, SuiAddress}; - use sui_types::transaction::{Argument, ProgrammableMoveCall}; + use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress}; + use sui_types::transaction::{Argument, CallArg, ObjectArg, ProgrammableMoveCall}; #[tokio::test] async fn test_record_local_observations() { @@ -428,6 +502,7 @@ mod tests { let mut observer = ExecutionTimeObserver::new_for_testing( epoch_store.clone(), Box::new(consensus_adapter.clone()), + Duration::ZERO, // disable object utilization thresholds for this test ); // Create a simple PTB with one move call @@ -553,6 +628,7 @@ mod tests { let mut observer = ExecutionTimeObserver::new_for_testing( epoch_store.clone(), Box::new(consensus_adapter.clone()), + Duration::ZERO, // disable object utilization thresholds for this test ); // Create a PTB with multiple commands. @@ -612,6 +688,122 @@ mod tests { ); } + #[tokio::test] + async fn test_record_local_observations_with_object_utilization_threshold() { + telemetry_subscribers::init_for_testing(); + + let mock_consensus_client = MockConsensusClient::new(); + let authority = TestAuthorityBuilder::new().build().await; + let epoch_store = authority.epoch_store_for_testing(); + let consensus_adapter = Arc::new(ConsensusAdapter::new( + Arc::new(mock_consensus_client), + CheckpointStore::new_for_tests(), + authority.name, + Arc::new(ConnectionMonitorStatusForTests {}), + 100_000, + 100_000, + None, + None, + ConsensusAdapterMetrics::new_test(), + epoch_store.protocol_config().clone(), + )); + let mut observer = ExecutionTimeObserver::new_for_testing( + epoch_store.clone(), + Box::new(consensus_adapter.clone()), + Duration::from_millis(500), // only share observations with excess utilization >= 500ms + ); + + // Create a simple PTB with one move call and one mutable shared input + let package = ObjectID::random(); + let module = "test_module".to_string(); + let function = "test_function".to_string(); + let ptb = ProgrammableTransaction { + inputs: vec![CallArg::Object(ObjectArg::SharedObject { + id: ObjectID::random(), + initial_shared_version: SequenceNumber::new(), + mutable: true, + })], + commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall { + package, + module: module.clone(), + function: function.clone(), + type_arguments: vec![], + arguments: vec![], + }))], + }; + let key = ExecutionTimeObservationKey::MoveEntryPoint { + package, + module: module.clone(), + function: function.clone(), + type_arguments: vec![], + }; + + tokio::time::pause(); + + // First observation - should not share due to low utilization + let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))]; + observer + .record_local_observations(&ptb, &timings, Duration::from_secs(2)) + .await; + assert!(observer + .local_observations + .get(&key) + .unwrap() + .last_shared + .is_none()); + + // Second observation - no time has passed, so now utilization is high; should share + let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))]; + observer + .record_local_observations(&ptb, &timings, Duration::from_secs(2)) + .await; + assert_eq!( + observer + .local_observations + .get(&key) + .unwrap() + .last_shared + .unwrap() + .0, + Duration::from_secs(2) + ); + + // Third execution still with high utilization - time has passed but not enough to clear excess + // when accounting for the new observation; should share + tokio::time::advance(Duration::from_secs(5)).await; + let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))]; + observer + .record_local_observations(&ptb, &timings, Duration::from_secs(5)) + .await; + assert_eq!( + observer + .local_observations + .get(&key) + .unwrap() + .last_shared + .unwrap() + .0, + Duration::from_secs(3) + ); + + // Fourth execution after utilization drops - should not share, even though diff still high + tokio::time::advance(Duration::from_secs(60)).await; + let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))]; + observer + .record_local_observations(&ptb, &timings, Duration::from_secs(11)) + .await; + assert_eq!( + observer + .local_observations + .get(&key) + .unwrap() + .last_shared + .unwrap() + .0, + Duration::from_secs(3) // still the old value + ); + } + #[tokio::test] async fn test_stake_weighted_median() { telemetry_subscribers::init_for_testing(); diff --git a/crates/sui-types/src/execution.rs b/crates/sui-types/src/execution.rs index c05700f7309e5..62e3873076129 100644 --- a/crates/sui-types/src/execution.rs +++ b/crates/sui-types/src/execution.rs @@ -183,6 +183,7 @@ pub enum ExecutionTimeObservationKey { /// The function to be called. function: String, /// The type arguments to the function. + /// NOTE: This field is currently not populated. type_arguments: Vec, }, TransferObjects, diff --git a/crates/sui-types/src/transaction.rs b/crates/sui-types/src/transaction.rs index fd692b4a8c4c8..4b1be5d2aa1d9 100644 --- a/crates/sui-types/src/transaction.rs +++ b/crates/sui-types/src/transaction.rs @@ -1094,24 +1094,21 @@ impl ProgrammableTransaction { Ok(()) } - fn shared_input_objects(&self) -> impl Iterator + '_ { - self.inputs - .iter() - .filter_map(|arg| match arg { - CallArg::Pure(_) - | CallArg::Object(ObjectArg::Receiving(_)) - | CallArg::Object(ObjectArg::ImmOrOwnedObject(_)) => None, - CallArg::Object(ObjectArg::SharedObject { - id, - initial_shared_version, - mutable, - }) => Some(vec![SharedInputObject { - id: *id, - initial_shared_version: *initial_shared_version, - mutable: *mutable, - }]), - }) - .flatten() + pub fn shared_input_objects(&self) -> impl Iterator + '_ { + self.inputs.iter().filter_map(|arg| match arg { + CallArg::Pure(_) + | CallArg::Object(ObjectArg::Receiving(_)) + | CallArg::Object(ObjectArg::ImmOrOwnedObject(_)) => None, + CallArg::Object(ObjectArg::SharedObject { + id, + initial_shared_version, + mutable, + }) => Some(SharedInputObject { + id: *id, + initial_shared_version: *initial_shared_version, + mutable: *mutable, + }), + }) } fn move_calls(&self) -> Vec<(&ObjectID, &str, &str)> {