From 3ff2170e2de8b323009050b9bc3bf578d4555a89 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 4 Sep 2024 09:07:13 +0200 Subject: [PATCH] aggregation improvements --- .../backend/operation/aggregation_update.rs | 199 ++++++++++-------- .../backend/operation/cleanup_old_edges.rs | 22 +- .../src/backend/operation/connect_child.rs | 4 +- .../src/backend/storage.rs | 36 ++-- .../crates/turbo-tasks-backend/src/data.rs | 10 +- 5 files changed, 151 insertions(+), 120 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 3f26af76aa9e7..71d39e7663013 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -27,7 +27,7 @@ fn get_followers_with_aggregation_number( aggregation_number: u32, ) -> Vec { if is_aggregating_node(aggregation_number) { - get_many!(task, Follower { task } => task) + get_many!(task, Follower { task } count if count > 0 => task) } else { get_many!(task, Child { task } => task) } @@ -37,11 +37,19 @@ fn get_followers(task: &TaskGuard<'_>) -> Vec { get_followers_with_aggregation_number(task, get_aggregation_number(task)) } +pub fn get_uppers(task: &TaskGuard<'_>) -> Vec { + get_many!(task, Upper { task } count if count > 0 => task) +} + +fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator + 'a { + iter_many!(task, Upper { task } count if count > 0 => task) +} + pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 { get!(task, AggregationNumber).copied().unwrap_or_default() } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum AggregationUpdateJob { UpdateAggregationNumber { task_id: TaskId, @@ -80,9 +88,9 @@ pub enum AggregationUpdateJob { }, } -#[derive(Default, Serialize, Deserialize, Clone)] +#[derive(Default, Serialize, Deserialize, Clone, Debug)] pub struct AggregatedDataUpdate { - unfinished: i32, + dirty_task_count: i32, dirty_tasks_update: HashMap, // TODO collectibles } @@ -92,7 +100,7 @@ impl AggregatedDataUpdate { let aggregation = get_aggregation_number(task); let dirty = get!(task, Dirty).is_some(); if is_aggregating_node(aggregation) { - let mut unfinished = get!(task, AggregatedDirtyTaskCount).copied().unwrap_or(0); + let mut dirty_task_count = get!(task, AggregatedDirtyTaskCount).copied().unwrap_or(0); let mut dirty_tasks_update = task .iter() .filter_map(|(key, _)| match *key { @@ -101,11 +109,11 @@ impl AggregatedDataUpdate { }) .collect::>(); if dirty { - unfinished += 1; + dirty_task_count += 1; dirty_tasks_update.insert(task.id(), 1); } Self { - unfinished: unfinished as i32, + dirty_task_count: dirty_task_count as i32, dirty_tasks_update, } } else if dirty { @@ -116,7 +124,7 @@ impl AggregatedDataUpdate { } fn invert(mut self) -> Self { - self.unfinished = -self.unfinished; + self.dirty_task_count = -self.dirty_task_count; for value in self.dirty_tasks_update.values_mut() { *value = -*value; } @@ -129,27 +137,22 @@ impl AggregatedDataUpdate { queue: &mut AggregationUpdateQueue, ) -> AggregatedDataUpdate { let Self { - unfinished, + dirty_task_count, dirty_tasks_update, } = self; let mut result = Self::default(); - if *unfinished != 0 { - update!(task, AggregatedDirtyTaskCount, |old: Option| { + if *dirty_task_count != 0 { + update!(task, AggregatedDirtyTaskCount, |old: Option| { let old = old.unwrap_or(0); - let new = old as i32 + *unfinished; - debug_assert!(new >= 0); - let new = new as u32; - if new == 0 { - result.unfinished = -1; - None - } else { - if old <= 0 && new > 0 { - result.unfinished = 1; - } - Some(new) + let new = old + *dirty_task_count; + if old <= 0 && new > 0 { + result.dirty_task_count = 1; + } else if old > 0 && new <= 0 { + result.dirty_task_count = -1; } + (new != 0).then_some(new) }); - if result.unfinished == -1 { + if result.dirty_task_count == -1 { if let Some(root_state) = get!(task, AggregateRoot) { root_state.all_clean_event.notify(usize::MAX); } @@ -162,23 +165,18 @@ impl AggregatedDataUpdate { update!( task, AggregatedDirtyTask { task: *task_id }, - |old: Option| { + |old: Option| { let old = old.unwrap_or(0); - let new = old as i32 + *count; - debug_assert!(new >= 0); - let new = new as u32; - if new == 0 { - result.dirty_tasks_update.insert(*task_id, -1); - None - } else { - if old <= 0 && new > 0 { - if root { - task_to_schedule.push(*task_id); - } - result.dirty_tasks_update.insert(*task_id, 1); + let new = old + *count; + if old <= 0 && new > 0 { + if root { + task_to_schedule.push(*task_id); } - Some(new) + result.dirty_tasks_update.insert(*task_id, 1); + } else if old > 0 && new <= 0 { + result.dirty_tasks_update.insert(*task_id, -1); } + (new != 0).then_some(new) } ); } @@ -193,22 +191,22 @@ impl AggregatedDataUpdate { fn is_empty(&self) -> bool { let Self { - unfinished, + dirty_task_count, dirty_tasks_update, } = self; - *unfinished == 0 && dirty_tasks_update.is_empty() + *dirty_task_count == 0 && dirty_tasks_update.is_empty() } pub fn dirty_task(task_id: TaskId) -> Self { Self { - unfinished: 1, + dirty_task_count: 1, dirty_tasks_update: HashMap::from([(task_id, 1)]), } } pub fn no_longer_dirty_task(task_id: TaskId) -> Self { Self { - unfinished: -1, + dirty_task_count: -1, dirty_tasks_update: HashMap::from([(task_id, -1)]), } } @@ -236,7 +234,7 @@ impl Add for AggregatedDataUpdate { } } Self { - unfinished: self.unfinished + rhs.unfinished, + dirty_task_count: self.dirty_task_count + rhs.dirty_task_count, dirty_tasks_update, } } @@ -297,13 +295,21 @@ impl AggregationUpdateQueue { if is_aggregating_node(aggregation_number) { // followers might become inner nodes when the aggregation number is // increased - let followers = iter_many!(task, Follower { task } => task); + let followers = + iter_many!(task, Follower { task } count if count > 0 => task); for follower_id in followers { self.jobs.push_back(AggregationUpdateJob::BalanceEdge { upper_id: task_id, task_id: follower_id, }); } + let uppers = iter_uppers(&task); + for upper_id in uppers { + self.jobs.push_back(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id, + }); + } } else { let children = iter_many!(task, Child { task } => task); for child_id in children { @@ -383,18 +389,20 @@ impl AggregationUpdateQueue { let children: Vec<_> = get_followers(&follower); drop(follower); - for upper_id in upper_ids.iter() { - // add data to upper - let mut upper = ctx.task(*upper_id); - let diff = data.apply(&mut upper, self); - if !diff.is_empty() { - let upper_ids = get_many!(upper, Upper { task } => task); - self.jobs.push_back( - AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }, - ) + if !data.is_empty() { + for upper_id in upper_ids.iter() { + // add data to upper + let mut upper = ctx.task(*upper_id); + let diff = data.apply(&mut upper, self); + if !diff.is_empty() { + let upper_ids = get_uppers(&upper); + self.jobs.push_back( + AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }, + ) + } } } if !children.is_empty() { @@ -471,17 +479,20 @@ impl AggregationUpdateQueue { let children: Vec<_> = get_followers(&follower); drop(follower); - for upper_id in upper_ids.iter() { - // remove data from upper - let mut upper = ctx.task(*upper_id); - let diff = data.apply(&mut upper, self); - if !diff.is_empty() { - let upper_ids = get_many!(upper, Upper { task } => task); - self.jobs - .push_back(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }) + if !data.is_empty() { + for upper_id in upper_ids.iter() { + // remove data from upper + let mut upper = ctx.task(*upper_id); + let diff = data.apply(&mut upper, self); + if !diff.is_empty() { + let upper_ids = get_uppers(&upper); + self.jobs.push_back( + AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }, + ) + } } } if !children.is_empty() { @@ -517,7 +528,7 @@ impl AggregationUpdateQueue { let mut upper = ctx.task(upper_id); let diff = update.apply(&mut upper, self); if !diff.is_empty() { - let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task); + let upper_ids = get_uppers(&upper); if !upper_ids.is_empty() { self.jobs .push_back(AggregationUpdateJob::AggregatedDataUpdate { @@ -529,17 +540,14 @@ impl AggregationUpdateQueue { } } AggregationUpdateJob::DataUpdate { task_id, update } => { - let mut task = ctx.task(task_id); - let diff = update.apply(&mut task, self); - if !diff.is_empty() { - let upper_ids: Vec<_> = get_many!(task, Upper { task } => task); - if !upper_ids.is_empty() { - self.jobs - .push_back(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }); - } + let task = ctx.task(task_id); + let upper_ids: Vec<_> = get_uppers(&task); + if !upper_ids.is_empty() { + self.jobs + .push_back(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: update.clone(), + }); } } AggregationUpdateJob::ScheduleWhenDirty { task_ids } => { @@ -565,16 +573,13 @@ impl AggregationUpdateQueue { if should_be_inner { // remove all follower edges let count = remove!(upper, Follower { task: task_id }).unwrap_or_default(); - if count > 0 { - // notify uppers about lost follower - let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task); - if !upper_ids.is_empty() { - self.jobs - .push_back(AggregationUpdateJob::InnerLostFollower { - upper_ids, - lost_follower_id: task_id, - }); - } + if count < 0 { + upper.add_new(CachedDataItem::Follower { + task: task_id, + value: count, + }) + } else if count > 0 { + let upper_ids = get_uppers(&upper); // Add the same amount of upper edges if update_count!(task, Upper { task: upper_id }, count as i32) { @@ -584,7 +589,6 @@ impl AggregationUpdateQueue { let followers = get_followers(&task); let diff = data.apply(&mut upper, self); - let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task); if !upper_ids.is_empty() { if !diff.is_empty() { // Notify uppers about changed aggregated data @@ -598,26 +602,36 @@ impl AggregationUpdateQueue { if !followers.is_empty() { self.jobs.push_back( AggregationUpdateJob::InnerHasNewFollowers { - upper_ids, + upper_ids: upper_ids.clone(), new_follower_ids: followers, }, ); } } } + + // notify uppers about lost follower + if !upper_ids.is_empty() { + self.jobs + .push_back(AggregationUpdateJob::InnerLostFollower { + upper_ids, + lost_follower_id: task_id, + }); + } } } else if should_be_follower { // Remove the upper edge let count = remove!(task, Upper { task: upper_id }).unwrap_or_default(); if count > 0 { + let upper_ids: Vec<_> = get_uppers(&upper); + // Add the same amount of follower edges if update_count!(upper, Follower { task: task_id }, count as i32) { // notify uppers about new follower - let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task); if !upper_ids.is_empty() { self.jobs.push_back( AggregationUpdateJob::InnerHasNewFollower { - upper_ids, + upper_ids: upper_ids.clone(), new_follower_id: task_id, }, ); @@ -629,7 +643,6 @@ impl AggregationUpdateQueue { let data = AggregatedDataUpdate::from_task(&mut task).invert(); let followers = get_followers(&task); let diff = data.apply(&mut upper, self); - let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task); if !upper_ids.is_empty() { if !diff.is_empty() { self.jobs.push_back( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index faaf54b6203fa..04a266c98d150 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -4,7 +4,10 @@ use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; use super::{ - aggregation_update::{AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue}, + aggregation_update::{ + get_aggregation_number, get_uppers, is_aggregating_node, AggregatedDataUpdate, + AggregationUpdateJob, AggregationUpdateQueue, + }, invalidate::make_task_dirty, ExecuteContext, Operation, }; @@ -74,11 +77,18 @@ impl Operation for CleanupOldEdgesOperation { OutdatedEdge::Child(child_id) => { let mut task = ctx.task(task_id); task.remove(&CachedDataItemKey::Child { task: child_id }); - let upper_ids = get_many!(task, Upper { task } => task); - queue.push(AggregationUpdateJob::InnerLostFollower { - upper_ids, - lost_follower_id: child_id, - }); + if is_aggregating_node(get_aggregation_number(&task)) { + queue.push(AggregationUpdateJob::InnerLostFollower { + upper_ids: vec![task_id], + lost_follower_id: child_id, + }); + } else { + let upper_ids = get_uppers(&task); + queue.push(AggregationUpdateJob::InnerLostFollower { + upper_ids, + lost_follower_id: child_id, + }); + } } OutdatedEdge::CellDependency(CellRef { task: cell_task_id, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index be8db5749a175..7fc8107ea5471 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -3,7 +3,7 @@ use turbo_tasks::TaskId; use super::{ aggregation_update::{ - is_aggregating_node, is_root_node, AggregationUpdateJob, AggregationUpdateQueue, + get_uppers, is_aggregating_node, is_root_node, AggregationUpdateJob, AggregationUpdateQueue, }, ExecuteContext, Operation, }; @@ -59,7 +59,7 @@ impl ConnectChildOperation { new_follower_id: child_task_id, }); } else { - let upper_ids = get_many!(parent_task, Upper { task } => task); + let upper_ids = get_uppers(&parent_task); queue.push(AggregationUpdateJob::InnerHasNewFollower { upper_ids, new_follower_id: child_task_id, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index a6c4a5dd8c200..78a7ab9e23b90 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -182,7 +182,7 @@ macro_rules! iter_many { $task .iter() .filter_map(|(key, _)| match *key { - CachedDataItemKey::$key $input => Some($value), + $crate::data::CachedDataItemKey::$key $input => Some($value), _ => None, }) }; @@ -190,7 +190,15 @@ macro_rules! iter_many { $task .iter() .filter_map(|(key, value)| match (key, value) { - (&CachedDataItemKey::$key $input, &CachedDataItemValue::$key { value: $value_ident }) => Some($value), + (&$crate::data::CachedDataItemKey::$key $input, &$crate::data::CachedDataItemValue::$key { value: $value_ident }) => Some($value), + _ => None, + }) + }; + ($task:ident, $key:ident $input:tt $value_ident:ident if $cond:expr => $value:expr) => { + $task + .iter() + .filter_map(|(key, value)| match (key, value) { + (&$crate::data::CachedDataItemKey::$key $input, &$crate::data::CachedDataItemValue::$key { value: $value_ident }) if $cond => Some($value), _ => None, }) }; @@ -198,8 +206,8 @@ macro_rules! iter_many { $task .iter() .filter_map(|(key, _)| match *key { - CachedDataItemKey::$key1 $input1 => Some($value1), - CachedDataItemKey::$key2 $input2 => Some($value2), + $crate::data::CachedDataItemKey::$key1 $input1 => Some($value1), + $crate::data::CachedDataItemKey::$key2 $input2 => Some($value2), _ => None, }) }; @@ -213,6 +221,9 @@ macro_rules! get_many { ($task:ident, $key:ident $input:tt $value_ident:ident => $value:expr) => { $crate::iter_many!($task, $key $input $value_ident => $value).collect() }; + ($task:ident, $key:ident $input:tt $value_ident:ident if $cond:expr => $value:expr) => { + $crate::iter_many!($task, $key $input $value_ident if $cond => $value).collect() + }; ($task:ident, $key1:ident $input1:tt => $value1:ident, $key2:ident $input2:tt => $value2:ident) => { $crate::iter_many!($task, $key1 $input1 => $value1, $key2 $input2 => $value2).collect() }; @@ -258,17 +269,14 @@ macro_rules! update_count { match $update { update => { let mut state_change = false; - $crate::update!($task, $key $input, |old: Option| { - if old.is_none() { - state_change = true; - } - let old = old.unwrap_or(0); - let new = old as i32 + update; - if new == 0 { - state_change = true; - None + $crate::update!($task, $key $input, |old: Option| { + if let Some(old) = old { + let new = old + update; + state_change = old <= 0 && new > 0 || old > 0 && new <= 0; + (new != 0).then_some(new) } else { - Some(new as u32) + state_change = update > 0; + (update != 0).then_some(update) } }); state_change diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index a9173e5db55ee..a33d8263acf3f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -173,24 +173,24 @@ pub enum CachedDataItem { }, Follower { task: TaskId, - value: u32, + value: i32, }, Upper { task: TaskId, - value: u32, + value: i32, }, // Aggregated Data AggregatedDirtyTask { task: TaskId, - value: u32, + value: i32, }, AggregatedCollectible { collectible: CellRef, - value: u32, + value: i32, }, AggregatedDirtyTaskCount { - value: u32, + value: i32, }, // Transient Root Type