Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only share execution time estimates to consensus for tx with overutilized objects #21350

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 211 additions & 19 deletions crates/sui-core/src/authority/execution_time_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
collections::HashMap,
num::NonZeroUsize,
sync::{Arc, Weak},
time::{Duration, Instant},
time::Duration,
};

use super::authority_per_epoch_store::AuthorityPerEpochStore;
Expand All @@ -17,6 +17,7 @@ use mysten_metrics::{monitored_scope, spawn_monitored_task};
use simple_moving_average::{SingleSumSMA, SMA};
use sui_protocol_config::PerObjectCongestionControlMode;
use sui_types::{
base_types::ObjectID,
committee::Committee,
error::SuiError,
execution::{ExecutionTimeObservationKey, ExecutionTiming},
Expand All @@ -26,24 +27,41 @@ use sui_types::{
TransactionDataAPI, TransactionKind,
},
};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::Instant};
use tracing::{debug, info, warn};

// TODO: Move all these consts into protocol configs once design stabilizes.

const LOCAL_OBSERVATION_WINDOW_SIZE: usize = 10;

// If our current local observation differs from the last one we shared by more than
// this percent, we share a new one.
// We won't share a new observation with consensus unless our current local observation differs
// from the last one we shared by more than this percentage.
const OBSERVATION_SHARING_DIFF_THRESHOLD: f64 = 0.05;

// We won't share a new observation with consensus unless target object utilization is exceeded
// by at least this amount.
const OBSERVATION_SHARING_OBJECT_UTILIZATION_THRESHOLD: Duration = Duration::from_millis(500);

// Minimum interval between sharing multiple observations of the same key.
const OBSERVATION_SHARING_MIN_INTERVAL: Duration = Duration::from_secs(5);

const OBJECT_UTILIZATION_TRACKER_CAPACITY: usize = 50_000;

// TODO: source from time-based utilization target param in ProtocolConfig when available.
const TARGET_OBJECT_UTILIZATION: f64 = 0.5;

// Collects local execution time estimates to share via consensus.
pub struct ExecutionTimeObserver {
epoch_store: Weak<AuthorityPerEpochStore>,
consensus_adapter: Box<dyn SubmitToConsensus>,
observation_sharing_object_utilization_threshold: Duration,

local_observations: LruCache<ExecutionTimeObservationKey, LocalObservations>,

// For each object, tracks the amount of time above our utilization target that we spent
// executing transactions. This is used to decide which observations should be shared
// via consensus.
object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
}

#[derive(Debug, Clone)]
Expand All @@ -52,13 +70,19 @@ pub struct LocalObservations {
last_shared: Option<(Duration, Instant)>,
}

#[derive(Debug, Clone)]
pub struct ObjectUtilization {
excess_execution_time: Duration,
last_measured: Option<Instant>,
}

// Tracks local execution time observations and shares them via consensus.
impl ExecutionTimeObserver {
pub fn spawn(
epoch_store: Arc<AuthorityPerEpochStore>,
consensus_adapter: Box<dyn SubmitToConsensus>,
channel_size: usize,
lru_cache_size: NonZeroUsize,
observation_cache_size: NonZeroUsize,
) {
if epoch_store
.protocol_config()
Expand All @@ -76,7 +100,12 @@ impl ExecutionTimeObserver {
let mut observer = Self {
epoch_store: Arc::downgrade(&epoch_store),
consensus_adapter,
local_observations: LruCache::new(lru_cache_size),
local_observations: LruCache::new(observation_cache_size),
object_utilization_tracker: LruCache::new(
NonZeroUsize::new(OBJECT_UTILIZATION_TRACKER_CAPACITY).unwrap(),
),
observation_sharing_object_utilization_threshold:
OBSERVATION_SHARING_OBJECT_UTILIZATION_THRESHOLD,
};
spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
while let Some((tx, timings, total_duration)) = rx_local_execution_time.recv().await {
Expand All @@ -92,11 +121,16 @@ impl ExecutionTimeObserver {
fn new_for_testing(
epoch_store: Arc<AuthorityPerEpochStore>,
consensus_adapter: Box<dyn SubmitToConsensus>,
observation_sharing_object_utilization_threshold: Duration,
) -> Self {
Self {
epoch_store: Arc::downgrade(&epoch_store),
consensus_adapter,
local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
object_utilization_tracker: LruCache::new(
NonZeroUsize::new(OBJECT_UTILIZATION_TRACKER_CAPACITY).unwrap(),
),
observation_sharing_object_utilization_threshold,
}
}

Expand All @@ -114,6 +148,43 @@ impl ExecutionTimeObserver {

assert!(tx.commands.len() >= timings.len());

// Update the accumulated excess execution time for each mutable shared object
// used in this transaction, and determine the max overage.
let max_excess_per_object_execution_time = tx
.shared_input_objects()
.filter_map(|obj| obj.mutable.then_some(obj.id))
.map(|id| {
// For each object:
// - add the execution time of the current transaction to the tracker
// - subtract the maximum amount of time available for execution according
// to our utilization target since the last report was received
// (clamping to zero)
//
// What remains is the amount of excess time spent executing transactions on
// the object above the intended limit. If this value is greater than zero,
// it means the object is overutilized.
let now = Instant::now();
let utilization =
self.object_utilization_tracker
.get_or_insert_mut(id, || ObjectUtilization {
excess_execution_time: Duration::ZERO,
last_measured: None,
});
utilization.excess_execution_time += total_duration;
utilization.excess_execution_time =
utilization.excess_execution_time.saturating_sub(
utilization
.last_measured
.map(|last_measured| now.duration_since(last_measured))
.unwrap_or(Duration::MAX)
.mul_f64(TARGET_OBJECT_UTILIZATION),
);
utilization.last_measured = Some(now);
utilization.excess_execution_time
})
.max()
.unwrap_or(Duration::ZERO);

let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum();
let extra_overhead = total_duration - total_command_duration;

Expand Down Expand Up @@ -148,19 +219,22 @@ impl ExecutionTimeObserver {
.moving_average
.add_sample(command_duration);

// Send a new observation through consensus if our current moving average
// differs too much from the last one we shared.
// TODO: Consider only sharing observations for entrypoints with congestion.
// Send a new observation through consensus if:
// - our current moving average differs too much from the last one we shared, and
// - the tx has at least one mutable shared object with utilization that's too high
// TODO: Consider only sharing observations that disagree with consensus estimate.
let new_average = local_observation.moving_average.get_average();
if local_observation
.last_shared
.is_none_or(|(last_shared, last_shared_timestamp)| {
let diff = last_shared.abs_diff(new_average);
diff > new_average.mul_f64(OBSERVATION_SHARING_DIFF_THRESHOLD)
&& last_shared_timestamp.elapsed() > OBSERVATION_SHARING_MIN_INTERVAL
})
{
let diff_exceeds_threshold =
local_observation
.last_shared
.is_none_or(|(last_shared, last_shared_timestamp)| {
let diff = last_shared.abs_diff(new_average);
diff >= new_average.mul_f64(OBSERVATION_SHARING_DIFF_THRESHOLD)
&& last_shared_timestamp.elapsed() >= OBSERVATION_SHARING_MIN_INTERVAL
});
let utilization_exceeds_threshold = max_excess_per_object_execution_time
>= self.observation_sharing_object_utilization_threshold;
if diff_exceeds_threshold && utilization_exceeds_threshold {
debug!("sharing new execution time observation for {key:?}: {new_average:?}");
to_share.push((key, new_average));
local_observation.last_shared = Some((new_average, Instant::now()));
Expand Down Expand Up @@ -403,8 +477,8 @@ mod tests {
ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
MockConsensusClient,
};
use sui_types::base_types::{ObjectID, SuiAddress};
use sui_types::transaction::{Argument, ProgrammableMoveCall};
use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
use sui_types::transaction::{Argument, CallArg, ObjectArg, ProgrammableMoveCall};

#[tokio::test]
async fn test_record_local_observations() {
Expand All @@ -428,6 +502,7 @@ mod tests {
let mut observer = ExecutionTimeObserver::new_for_testing(
epoch_store.clone(),
Box::new(consensus_adapter.clone()),
Duration::ZERO, // disable object utilization thresholds for this test
);

// Create a simple PTB with one move call
Expand Down Expand Up @@ -553,6 +628,7 @@ mod tests {
let mut observer = ExecutionTimeObserver::new_for_testing(
epoch_store.clone(),
Box::new(consensus_adapter.clone()),
Duration::ZERO, // disable object utilization thresholds for this test
);

// Create a PTB with multiple commands.
Expand Down Expand Up @@ -612,6 +688,122 @@ mod tests {
);
}

#[tokio::test]
async fn test_record_local_observations_with_object_utilization_threshold() {
telemetry_subscribers::init_for_testing();

let mock_consensus_client = MockConsensusClient::new();
let authority = TestAuthorityBuilder::new().build().await;
let epoch_store = authority.epoch_store_for_testing();
let consensus_adapter = Arc::new(ConsensusAdapter::new(
Arc::new(mock_consensus_client),
CheckpointStore::new_for_tests(),
authority.name,
Arc::new(ConnectionMonitorStatusForTests {}),
100_000,
100_000,
None,
None,
ConsensusAdapterMetrics::new_test(),
epoch_store.protocol_config().clone(),
));
let mut observer = ExecutionTimeObserver::new_for_testing(
epoch_store.clone(),
Box::new(consensus_adapter.clone()),
Duration::from_millis(500), // only share observations with excess utilization >= 500ms
);

// Create a simple PTB with one move call and one mutable shared input
let package = ObjectID::random();
let module = "test_module".to_string();
let function = "test_function".to_string();
let ptb = ProgrammableTransaction {
inputs: vec![CallArg::Object(ObjectArg::SharedObject {
id: ObjectID::random(),
initial_shared_version: SequenceNumber::new(),
mutable: true,
})],
commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
package,
module: module.clone(),
function: function.clone(),
type_arguments: vec![],
arguments: vec![],
}))],
};
let key = ExecutionTimeObservationKey::MoveEntryPoint {
package,
module: module.clone(),
function: function.clone(),
type_arguments: vec![],
};

tokio::time::pause();

// First observation - should not share due to low utilization
let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
observer
.record_local_observations(&ptb, &timings, Duration::from_secs(2))
.await;
assert!(observer
.local_observations
.get(&key)
.unwrap()
.last_shared
.is_none());

// Second observation - no time has passed, so now utilization is high; should share
let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
observer
.record_local_observations(&ptb, &timings, Duration::from_secs(2))
.await;
assert_eq!(
observer
.local_observations
.get(&key)
.unwrap()
.last_shared
.unwrap()
.0,
Duration::from_secs(2)
);

// Third execution still with high utilization - time has passed but not enough to clear excess
// when accounting for the new observation; should share
tokio::time::advance(Duration::from_secs(5)).await;
let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))];
observer
.record_local_observations(&ptb, &timings, Duration::from_secs(5))
.await;
assert_eq!(
observer
.local_observations
.get(&key)
.unwrap()
.last_shared
.unwrap()
.0,
Duration::from_secs(3)
);

// Fourth execution after utilization drops - should not share, even though diff still high
tokio::time::advance(Duration::from_secs(60)).await;
let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))];
observer
.record_local_observations(&ptb, &timings, Duration::from_secs(11))
.await;
assert_eq!(
observer
.local_observations
.get(&key)
.unwrap()
.last_shared
.unwrap()
.0,
Duration::from_secs(3) // still the old value
);
}

#[tokio::test]
async fn test_stake_weighted_median() {
telemetry_subscribers::init_for_testing();
Expand Down
1 change: 1 addition & 0 deletions crates/sui-types/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub enum ExecutionTimeObservationKey {
/// The function to be called.
function: String,
/// The type arguments to the function.
/// NOTE: This field is currently not populated.
type_arguments: Vec<TypeInput>,
},
TransferObjects,
Expand Down
33 changes: 15 additions & 18 deletions crates/sui-types/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,24 +1094,21 @@ impl ProgrammableTransaction {
Ok(())
}

fn shared_input_objects(&self) -> impl Iterator<Item = SharedInputObject> + '_ {
self.inputs
.iter()
.filter_map(|arg| match arg {
CallArg::Pure(_)
| CallArg::Object(ObjectArg::Receiving(_))
| CallArg::Object(ObjectArg::ImmOrOwnedObject(_)) => None,
CallArg::Object(ObjectArg::SharedObject {
id,
initial_shared_version,
mutable,
}) => Some(vec![SharedInputObject {
id: *id,
initial_shared_version: *initial_shared_version,
mutable: *mutable,
}]),
})
.flatten()
pub fn shared_input_objects(&self) -> impl Iterator<Item = SharedInputObject> + '_ {
self.inputs.iter().filter_map(|arg| match arg {
CallArg::Pure(_)
| CallArg::Object(ObjectArg::Receiving(_))
| CallArg::Object(ObjectArg::ImmOrOwnedObject(_)) => None,
CallArg::Object(ObjectArg::SharedObject {
id,
initial_shared_version,
mutable,
}) => Some(SharedInputObject {
id: *id,
initial_shared_version: *initial_shared_version,
mutable: *mutable,
}),
})
}

fn move_calls(&self) -> Vec<(&ObjectID, &str, &str)> {
Expand Down
Loading