From 420a35e58fe8dd7164a74c25156e7bfb84908e07 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 23 Sep 2024 16:23:55 +0200 Subject: [PATCH] optimize aggregation number based on number of children --- .../turbo-tasks-backend/src/backend/mod.rs | 22 +++++--- .../backend/operation/aggregation_update.rs | 49 ++++++++++++---- .../src/backend/operation/connect_child.rs | 56 ++++++++++++++++--- .../src/backend/operation/mod.rs | 4 +- .../crates/turbo-tasks-backend/src/data.rs | 9 ++- 5 files changed, 113 insertions(+), 27 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 2e06ccfc9217d..77377117d7f9c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -21,8 +21,8 @@ use auto_hash_map::{AutoMap, AutoSet}; use dashmap::DashMap; pub use operation::AnyOperation; use operation::{ - is_root_node, AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, - CleanupOldEdgesOperation, ConnectChildOperation, OutdatedEdge, + get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, + AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, OutdatedEdge, }; use parking_lot::{Condvar, Mutex}; use rustc_hash::FxHasher; @@ -43,8 +43,9 @@ use self::{operation::ExecuteContext, storage::Storage}; use crate::{ backing_storage::BackingStorage, data::{ - ActiveType, CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue, - CachedDataUpdate, CellRef, InProgressCellState, InProgressState, OutputValue, RootState, + ActiveType, AggregationNumber, CachedDataItem, CachedDataItemIndex, CachedDataItemKey, + CachedDataItemValue, CachedDataUpdate, CellRef, InProgressCellState, InProgressState, + OutputValue, RootState, }, get, get_many, remove, utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc}, @@ -265,7 +266,7 @@ impl TurboTasksBackend { if matches!(consistency, ReadConsistency::Strong) { // Ensure it's an root node loop { - let aggregation_number = get!(task, AggregationNumber).copied().unwrap_or_default(); + let aggregation_number = get_aggregation_number(&task); if is_root_node(aggregation_number) { break; } @@ -273,7 +274,8 @@ impl TurboTasksBackend { AggregationUpdateQueue::run( AggregationUpdateJob::UpdateAggregationNumber { task_id, - aggregation_number: u32::MAX, + base_aggregation_number: u32::MAX, + distance: None, }, &ctx, ); @@ -1207,7 +1209,13 @@ impl Backend for TurboTasksBackend { ); { let mut task = self.storage.access_mut(task_id); - let _ = task.add(CachedDataItem::AggregationNumber { value: u32::MAX }); + let _ = task.add(CachedDataItem::AggregationNumber { + value: AggregationNumber { + base: u32::MAX, + distance: 0, + effective: u32::MAX, + }, + }); let _ = task.add(CachedDataItem::AggregateRoot { value: RootState::new(root_type), }); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index b6383bb6d2a32..574ed8f8c0e50 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1,11 +1,11 @@ -use std::collections::VecDeque; +use std::{cmp::max, collections::VecDeque, num::NonZeroU32}; use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; use super::{ExecuteContext, Operation, TaskGuard}; use crate::{ - data::{ActiveType, CachedDataItem, CachedDataItemKey, RootState}, + data::{ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, RootState}, get, get_many, iter_many, remove, update, update_count, }; @@ -43,14 +43,17 @@ fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator + 'a } pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 { - get!(task, AggregationNumber).copied().unwrap_or_default() + get!(task, AggregationNumber) + .map(|a| a.effective) + .unwrap_or_default() } #[derive(Serialize, Deserialize, Clone, Debug)] pub enum AggregationUpdateJob { UpdateAggregationNumber { task_id: TaskId, - aggregation_number: u32, + base_aggregation_number: u32, + distance: Option, }, InnerHasNewFollower { upper_ids: Vec, @@ -266,14 +269,36 @@ impl AggregationUpdateQueue { match job { AggregationUpdateJob::UpdateAggregationNumber { task_id, - aggregation_number, + base_aggregation_number, + distance: base_effective_distance, } => { self.stats.update_aggregation_number += 1; let mut task = ctx.task(task_id); - let old = get_aggregation_number(&task); - if old < aggregation_number { + let current = get!(task, AggregationNumber).copied().unwrap_or_default(); + // The wanted new distance is either the provided one or the old distance + let distance = base_effective_distance.map_or(current.distance, |d| d.get()); + // The base aggregation number can only increase + let base_aggregation_number = max(current.base, base_aggregation_number); + let old = current.effective; + // The new target effecive aggregation number is base + distance + let aggregation_number = base_aggregation_number.saturating_add(distance); + if old >= aggregation_number { + if base_aggregation_number != current.base && distance != current.distance { + task.insert(CachedDataItem::AggregationNumber { + value: AggregationNumber { + base: base_aggregation_number, + distance, + effective: old, + }, + }); + } + } else { task.insert(CachedDataItem::AggregationNumber { - value: aggregation_number, + value: AggregationNumber { + base: base_aggregation_number, + distance, + effective: aggregation_number, + }, }); if !is_aggregating_node(old) && is_aggregating_node(aggregation_number) { @@ -308,7 +333,8 @@ impl AggregationUpdateQueue { for child_id in children { self.push(AggregationUpdateJob::UpdateAggregationNumber { task_id: child_id, - aggregation_number: aggregation_number + 1, + base_aggregation_number: aggregation_number + 1, + distance: None, }); } } @@ -656,10 +682,11 @@ impl AggregationUpdateQueue { } else { // both nodes have the same aggregation number // We need to change the aggregation number of the task - let new_aggregation_number = upper_aggregation_number + 1; + let current = get!(task, AggregationNumber).copied().unwrap_or_default(); self.push(AggregationUpdateJob::UpdateAggregationNumber { task_id, - aggregation_number: new_aggregation_number, + base_aggregation_number: current.base + 1, + distance: None, }); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index 0339ae4f631a7..8dcc9c0924354 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -1,3 +1,5 @@ +use std::{cmp::max, num::NonZeroU32}; + use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; @@ -8,7 +10,8 @@ use super::{ ExecuteContext, Operation, }; use crate::{ - data::{CachedDataItem, CachedDataItemKey}, + backend::operation::is_root_node, + data::{CachedDataItem, CachedDataItemIndex, CachedDataItemKey}, get, }; @@ -26,13 +29,19 @@ pub enum ConnectChildOperation { impl ConnectChildOperation { pub fn run(parent_task_id: TaskId, child_task_id: TaskId, ctx: ExecuteContext<'_>) { let mut parent_task = ctx.task(parent_task_id); - parent_task.remove(&CachedDataItemKey::OutdatedChild { - task: child_task_id, - }); if parent_task.add(CachedDataItem::Child { task: child_task_id, value: (), }) { + // Quick skip if the child was already connected before + if parent_task + .remove(&CachedDataItemKey::OutdatedChild { + task: child_task_id, + }) + .is_some() + { + return; + } // When task is added to a AggregateRoot is need to be scheduled, // indirect connections are handled by the aggregation update. let mut should_schedule = false; @@ -41,19 +50,52 @@ impl ConnectChildOperation { } // Update the task aggregation let mut queue = AggregationUpdateQueue::new(); - let parent_aggregation = get!(parent_task, AggregationNumber) + + // Compute new parent aggregation number based on the number of children + let current_parent_aggregation = get!(parent_task, AggregationNumber) .copied() .unwrap_or_default(); + let parent_aggregation = if is_root_node(current_parent_aggregation.base) { + u32::MAX + } else { + let children_count = parent_task + .iter(CachedDataItemIndex::Children) + .filter(|(k, _)| { + matches!( + *k, + CachedDataItemKey::Child { .. } + | CachedDataItemKey::OutdatedChild { .. } + ) + }) + .count(); + let target_distance = children_count.ilog2() as u32 * 2; + let parent_aggregation = current_parent_aggregation + .base + .saturating_add(target_distance); + if target_distance != current_parent_aggregation.distance { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: parent_task_id, + base_aggregation_number: 0, + distance: NonZeroU32::new(target_distance), + }) + } + max(current_parent_aggregation.effective, parent_aggregation) + }; + + // Update child aggregation number based on parent aggregation number let is_aggregating_node = is_aggregating_node(parent_aggregation); if parent_task_id.is_transient() && !child_task_id.is_transient() { queue.push(AggregationUpdateJob::UpdateAggregationNumber { task_id: child_task_id, - aggregation_number: u32::MAX, + base_aggregation_number: u32::MAX, + distance: None, }); } else if !is_aggregating_node { queue.push(AggregationUpdateJob::UpdateAggregationNumber { task_id: child_task_id, - aggregation_number: parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE + 1, + base_aggregation_number: parent_aggregation + .saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE), + distance: None, }); } if is_aggregating_node { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 8ef495be391e8..a1da9d122ae51 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -460,7 +460,9 @@ impl_operation!(Invalidate invalidate::InvalidateOperation); impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation); impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); -pub use aggregation_update::{is_root_node, AggregatedDataUpdate, AggregationUpdateJob}; +pub use aggregation_update::{ + get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, +}; pub use cleanup_old_edges::OutdatedEdge; pub use update_cell::UpdateCellOperation; pub use update_output::UpdateOutputOperation; diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 3c635767854df..2b49683805756 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -109,6 +109,13 @@ impl InProgressCellState { } } +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub struct AggregationNumber { + pub base: u32, + pub distance: u32, + pub effective: u32, +} + #[derive(Debug, Clone, KeyValuePair, Serialize, Deserialize)] pub enum CachedDataItem { // Output @@ -176,7 +183,7 @@ pub enum CachedDataItem { // Aggregation Graph AggregationNumber { - value: u32, + value: AggregationNumber, }, Follower { task: TaskId,