From e126a75e7bd2e4e87f43c0ca2cc5edb5b94cf8b5 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 13 Aug 2024 13:29:50 +0200 Subject: [PATCH] more aggregation operations --- .../turbo-tasks-backend/src/backend/mod.rs | 3 + .../backend/operation/aggregation_update.rs | 272 ++++++++++++++++-- .../backend/operation/cleanup_old_edges.rs | 36 ++- .../src/backend/operation/connect_child.rs | 38 +-- .../src/backend/operation/invalidate.rs | 116 ++++---- .../src/backend/operation/mod.rs | 51 ++++ .../src/backend/storage.rs | 110 +++++++ .../crates/turbo-tasks-backend/src/data.rs | 28 +- .../src/derive/key_value_pair_macro.rs | 4 +- 9 files changed, 556 insertions(+), 102 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 40997588f4cc9..a01b918155810 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -752,6 +752,8 @@ impl Backend for TurboTasksBackend { }) .collect::>(); + task.remove(&CachedDataItemKey::Dirty {}); + done_event.notify(usize::MAX); drop(task); @@ -912,6 +914,7 @@ impl Backend for TurboTasksBackend { let mut task = self.storage.access_mut(task_id); task.add(CachedDataItem::new_scheduled(task_id)); task.add(CachedDataItem::AggregateRootType { value: root_type }); + task.add(CachedDataItem::AggregationNumber { value: u32::MAX }); } turbo_tasks.schedule(task_id); task_id diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 9b01fc286cac8..72fa2a37705fb 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1,18 +1,166 @@ +use std::{collections::HashMap, ops::Add}; + use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; -use super::ExecuteContext; -use crate::data::{CachedDataItem, CachedDataItemKey}; +use super::{ExecuteContext, TaskGuard}; +use crate::{ + data::{CachedDataItem, CachedDataItemKey}, + get, get_many, update, update_count, +}; #[derive(Serialize, Deserialize, Clone)] pub enum AggregationUpdateJob { InnerHasNewFollower { upper_ids: Vec, new_follower_id: TaskId, - new_follower_data: (), + }, + InnerHasNewFollowers { + upper_ids: Vec, + new_follower_ids: Vec, + }, + InnerLostFollower { + upper_ids: Vec, + lost_follower_id: TaskId, + }, + AggregatedDataUpdate { + upper_ids: Vec, + update: AggregatedDataUpdate, + }, + DataUpdate { + task_id: TaskId, + update: AggregatedDataUpdate, + }, + ScheduleWhenDirty { + task_ids: Vec, }, } +#[derive(Default, Serialize, Deserialize, Clone)] +pub struct AggregatedDataUpdate { + unfinished: i32, + dirty_tasks_update: HashMap, + // TODO collectibles +} + +impl AggregatedDataUpdate { + fn from_task(task: &mut TaskGuard<'_>) -> Self { + let aggregation = get!(task, AggregationNumber); + if aggregation.is_some() { + let unfinished = get!(task, AggregatedUnfinishedTasks); + let dirty_tasks_update = task + .iter() + .filter_map(|(key, _)| match *key { + CachedDataItemKey::AggregatedDirtyTask { task } => Some((task, 1)), + _ => None, + }) + .collect(); + Self { + unfinished: unfinished.copied().unwrap_or(0) as i32, + dirty_tasks_update, + } + } else { + let dirty = get!(task, Dirty); + if dirty.is_some() { + Self::dirty_task(task.id()) + } else { + Self::default() + } + } + } + + fn apply( + &self, + task: &mut TaskGuard<'_>, + queue: &mut AggregationUpdateQueue, + ) -> AggregatedDataUpdate { + let Self { + unfinished, + dirty_tasks_update, + } = self; + let mut result = Self::default(); + if *unfinished != 0 { + update!(task, AggregatedUnfinishedTasks, |old: Option| { + let old = old.unwrap_or(0); + let new = (old as i32 + *unfinished) as u32; + if new == 0 { + result.unfinished = -1; + None + } else { + if old > 0 { + result.unfinished = 1; + } + Some(new) + } + }); + } + if !dirty_tasks_update.is_empty() { + let mut task_to_schedule = Vec::new(); + let root_type = get!(task, AggregateRootType).copied(); + for (task_id, count) in dirty_tasks_update { + update!( + task, + AggregatedDirtyTask { task: *task_id }, + |old: Option| { + let old = old.unwrap_or(0); + if old == 0 { + if root_type.is_some() { + task_to_schedule.push(*task_id); + } + } + let new = (old as i32 + *count) as u32; + if new == 0 { + result.dirty_tasks_update.insert(*task_id, -1); + None + } else { + if old > 0 { + result.dirty_tasks_update.insert(*task_id, 1); + } + Some(new) + } + } + ); + } + if !task_to_schedule.is_empty() { + queue.push(AggregationUpdateJob::ScheduleWhenDirty { + task_ids: task_to_schedule, + }) + } + } + result + } + + fn is_empty(&self) -> bool { + let Self { + unfinished, + dirty_tasks_update, + } = self; + *unfinished == 0 && dirty_tasks_update.is_empty() + } + + pub fn dirty_task(task_id: TaskId) -> Self { + Self { + unfinished: 1, + dirty_tasks_update: HashMap::from([(task_id, 1)]), + } + } +} + +impl Add for AggregatedDataUpdate { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + let mut dirty_tasks_update = self.dirty_tasks_update; + for (task, count) in rhs.dirty_tasks_update { + *dirty_tasks_update.entry(task).or_default() += count; + } + Self { + unfinished: self.unfinished + rhs.unfinished, + dirty_tasks_update, + } + } +} + #[derive(Default, Serialize, Deserialize, Clone)] pub struct AggregationUpdateQueue { jobs: Vec, @@ -23,6 +171,10 @@ impl AggregationUpdateQueue { Self { jobs: Vec::new() } } + pub fn is_empty(&self) -> bool { + self.jobs.is_empty() + } + pub fn push(&mut self, job: AggregationUpdateJob) { self.jobs.push(job); } @@ -30,12 +182,33 @@ impl AggregationUpdateQueue { pub fn process(&mut self, ctx: &ExecuteContext<'_>) -> bool { if let Some(job) = self.jobs.pop() { match job { + AggregationUpdateJob::InnerHasNewFollowers { + upper_ids, + mut new_follower_ids, + } => { + if let Some(new_follower_id) = new_follower_ids.pop() { + if new_follower_ids.is_empty() { + self.jobs.push(AggregationUpdateJob::InnerHasNewFollower { + upper_ids, + new_follower_id, + }); + } else { + self.jobs.push(AggregationUpdateJob::InnerHasNewFollowers { + upper_ids: upper_ids.clone(), + new_follower_ids, + }); + self.jobs.push(AggregationUpdateJob::InnerHasNewFollower { + upper_ids, + new_follower_id, + }); + } + } + } AggregationUpdateJob::InnerHasNewFollower { mut upper_ids, new_follower_id, - new_follower_data, } => { - upper_ids.retain(|&upper_id| { + upper_ids.retain(|&_upper_id| { // let mut upper = ctx.task(upper_id); // TODO decide if it should be an inner or follower // TODO for now: always inner @@ -46,15 +219,12 @@ impl AggregationUpdateQueue { // TODO return true for inner, false for follower true }); - let children; + let children: Vec; let data; { let mut follower = ctx.task(new_follower_id); upper_ids.retain(|&upper_id| { - if follower.add(CachedDataItem::Upper { - task: upper_id, - value: (), - }) { + if update_count!(follower, Upper { task: upper_id }, 1) { // It's a new upper true } else { @@ -63,31 +233,79 @@ impl AggregationUpdateQueue { } }); if !upper_ids.is_empty() { - // TODO get data - data = (); - children = follower - .iter() - .filter_map(|(key, _)| match *key { - CachedDataItemKey::Child { task } => Some(task), - _ => None, - }) - .collect::>(); + data = AggregatedDataUpdate::from_task(&mut follower); + children = get_many!(follower, Child { task } => task); } else { data = Default::default(); children = Default::default(); } } for upper_id in upper_ids.iter() { - // TODO add data to upper + // add data to upper + let mut upper = ctx.task(*upper_id); + let diff = data.apply(&mut upper, self); + if !diff.is_empty() { + let upper_ids = get_many!(upper, Upper { task } => task); + self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }) + } } - for child_id in children { - let child = ctx.task(child_id); - // TODO get child data - self.jobs.push(AggregationUpdateJob::InnerHasNewFollower { + if !children.is_empty() { + self.jobs.push(AggregationUpdateJob::InnerHasNewFollowers { upper_ids: upper_ids.clone(), - new_follower_id: child_id, - new_follower_data: (), - }) + new_follower_ids: children, + }); + } + } + AggregationUpdateJob::InnerLostFollower { + upper_ids, + lost_follower_id, + } => { + for upper_id in upper_ids { + let mut upper = ctx.task(upper_id); + upper.remove(&CachedDataItemKey::Upper { + task: lost_follower_id, + }); + let diff = AggregatedDataUpdate::dirty_task(lost_follower_id); + let upper_ids = get_many!(upper, Upper { task } => task); + self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }); + } + } + AggregationUpdateJob::AggregatedDataUpdate { upper_ids, update } => { + for upper_id in upper_ids { + let mut upper = ctx.task(upper_id); + let diff = update.apply(&mut upper, self); + if !diff.is_empty() { + let upper_ids = get_many!(upper, Upper { task } => task); + self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }); + } + } + } + AggregationUpdateJob::DataUpdate { task_id, update } => { + let mut task = ctx.task(task_id); + let diff = update.apply(&mut task, self); + if !diff.is_empty() { + let upper_ids = get_many!(task, Upper { task } => task); + self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }); + } + } + AggregationUpdateJob::ScheduleWhenDirty { task_ids } => { + for task_id in task_ids { + let mut task = ctx.task(task_id); + if task.add(CachedDataItem::new_scheduled(task_id)) { + ctx.turbo_tasks.schedule(task_id); + } } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index e9eb3738d84ee..e4dc93f6a6c2c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -1,14 +1,26 @@ +use std::mem::take; + use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; -use super::{ExecuteContext, Operation}; -use crate::data::{CachedDataItemKey, CellRef}; +use super::{ + aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue}, + ExecuteContext, Operation, +}; +use crate::{ + data::{CachedDataItemKey, CellRef}, + get_many, +}; #[derive(Serialize, Deserialize, Clone, Default)] pub enum CleanupOldEdgesOperation { RemoveEdges { task_id: TaskId, outdated: Vec, + queue: AggregationUpdateQueue, + }, + AggregationUpdate { + queue: AggregationUpdateQueue, }, #[default] Done, @@ -24,7 +36,12 @@ pub enum OutdatedEdge { impl CleanupOldEdgesOperation { pub fn run(task_id: TaskId, outdated: Vec, ctx: ExecuteContext<'_>) { - CleanupOldEdgesOperation::RemoveEdges { task_id, outdated }.execute(&ctx); + CleanupOldEdgesOperation::RemoveEdges { + task_id, + outdated, + queue: AggregationUpdateQueue::new(), + } + .execute(&ctx); } } @@ -36,13 +53,18 @@ impl Operation for CleanupOldEdgesOperation { CleanupOldEdgesOperation::RemoveEdges { task_id, ref mut outdated, + ref mut queue, } => { if let Some(edge) = outdated.pop() { match edge { OutdatedEdge::Child(child_id) => { let mut task = ctx.task(task_id); task.remove(&CachedDataItemKey::Child { task: child_id }); - // TODO remove aggregated edge + let upper_ids = get_many!(task, Upper { task } => task); + queue.push(AggregationUpdateJob::InnerLostFollower { + upper_ids, + lost_follower_id: child_id, + }); } OutdatedEdge::CellDependency(CellRef { task: cell_task_id, @@ -83,9 +105,13 @@ impl Operation for CleanupOldEdgesOperation { } if outdated.is_empty() { + self = CleanupOldEdgesOperation::AggregationUpdate { queue: take(queue) }; + } + } + CleanupOldEdgesOperation::AggregationUpdate { ref mut queue } => { + if queue.process(ctx) { self = CleanupOldEdgesOperation::Done; } - continue; } CleanupOldEdgesOperation::Done => { return; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index ee60a6cdc124d..8f9a4da1c135b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -5,7 +5,10 @@ use super::{ aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue}, ExecuteContext, Operation, }; -use crate::data::{CachedDataItem, CachedDataItemKey}; +use crate::{ + data::{CachedDataItem, CachedDataItemKey}, + get, get_many, +}; #[derive(Serialize, Deserialize, Clone, Default)] pub enum ConnectChildOperation { @@ -22,28 +25,29 @@ pub enum ConnectChildOperation { } impl ConnectChildOperation { - pub fn run(parent_task: TaskId, child_task: TaskId, ctx: ExecuteContext<'_>) { - let mut parent_task = ctx.task(parent_task); + pub fn run(parent_task_id: TaskId, child_task_id: TaskId, ctx: ExecuteContext<'_>) { + let mut parent_task = ctx.task(parent_task_id); if parent_task.add(CachedDataItem::Child { - task: child_task, + task: child_task_id, value: (), }) { // Update the task aggregation - let upper_ids = parent_task - .iter() - .filter_map(|(key, _)| match *key { - CachedDataItemKey::Upper { task } => Some(task), - _ => None, - }) - .collect::>(); let mut queue = AggregationUpdateQueue::new(); - queue.push(AggregationUpdateJob::InnerHasNewFollower { - upper_ids, - new_follower_id: child_task, - new_follower_data: (), - }); + if get!(parent_task, AggregationNumber).is_some() { + queue.push(AggregationUpdateJob::InnerHasNewFollower { + upper_ids: vec![parent_task_id], + new_follower_id: child_task_id, + }); + } else { + let upper_ids = get_many!(parent_task, Upper { task } => task); + queue.push(AggregationUpdateJob::InnerHasNewFollower { + upper_ids, + new_follower_id: child_task_id, + }); + } + drop(parent_task); ConnectChildOperation::UpdateAggregation { - task_id: child_task, + task_id: child_task_id, aggregation_update: queue, } .execute(&ctx); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index a76899fc95196..07fb67e5fe180 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -2,10 +2,13 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use turbo_tasks::TaskId; -use super::{ExecuteContext, Operation}; +use super::{ + aggregation_update::{AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue}, + ExecuteContext, Operation, +}; use crate::{ data::{CachedDataItem, InProgressState}, - get, remove, + get, update, }; #[derive(Serialize, Deserialize, Clone, Default)] @@ -14,6 +17,9 @@ pub enum InvalidateOperation { MakeDirty { task_ids: SmallVec<[TaskId; 4]>, }, + AggregationUpdate { + queue: AggregationUpdateQueue, + }, // TODO Add to dirty tasks list #[default] Done, @@ -31,61 +37,75 @@ impl Operation for InvalidateOperation { ctx.operation_suspend_point(&self); match self { InvalidateOperation::MakeDirty { task_ids } => { + let mut queue = AggregationUpdateQueue::new(); for task_id in task_ids { let mut task = ctx.task(task_id); - let in_progress = match get!(task, InProgress) { - Some(InProgressState::Scheduled { clean, .. }) => { - if *clean { - let Some(InProgressState::Scheduled { - clean: _, - done_event, - start_event, - }) = remove!(task, InProgress) - else { - unreachable!(); - }; - task.insert(CachedDataItem::InProgress { - value: InProgressState::Scheduled { - clean: false, - done_event, - start_event, - }, - }); + + if task.add(CachedDataItem::Dirty { value: () }) { + let in_progress = match get!(task, InProgress) { + Some(InProgressState::Scheduled { clean, .. }) => { + if *clean { + update!(task, InProgress, |in_progress| { + let Some(InProgressState::Scheduled { + clean: _, + done_event, + start_event, + }) = in_progress + else { + unreachable!(); + }; + Some(InProgressState::Scheduled { + clean: false, + done_event, + start_event, + }) + }); + } + true } - true - } - Some(InProgressState::InProgress { clean, stale, .. }) => { - if *clean || !*stale { - let Some(InProgressState::InProgress { - clean: _, - stale: _, - done_event, - }) = remove!(task, InProgress) - else { - unreachable!(); - }; - task.insert(CachedDataItem::InProgress { - value: InProgressState::InProgress { - clean: false, - stale: true, - done_event, - }, - }); + Some(InProgressState::InProgress { clean, stale, .. }) => { + if *clean || !*stale { + update!(task, InProgress, |in_progress| { + let Some(InProgressState::InProgress { + clean: _, + stale: _, + done_event, + }) = in_progress + else { + unreachable!(); + }; + Some(InProgressState::InProgress { + clean: false, + stale: true, + done_event, + }) + }); + } + true } - true + None => false, + }; + if !in_progress && task.add(CachedDataItem::new_scheduled(task_id)) { + ctx.turbo_tasks.schedule(task_id) } - None => false, - }; - if task.add(CachedDataItem::Dirty { value: () }) - && !in_progress - && task.add(CachedDataItem::new_scheduled(task_id)) - { - ctx.turbo_tasks.schedule(task_id) + queue.push(AggregationUpdateJob::DataUpdate { + task_id, + update: AggregatedDataUpdate::dirty_task(task_id), + }) } } - self = InvalidateOperation::Done; + if queue.is_empty() { + self = InvalidateOperation::Done + } else { + self = InvalidateOperation::AggregationUpdate { queue } + } continue; } + InvalidateOperation::AggregationUpdate { ref mut queue } => { + if queue.process(ctx) { + self = InvalidateOperation::Done + } + } InvalidateOperation::Done => { return; } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index a218aeff24d91..e23f62df69da5 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -139,6 +139,10 @@ impl<'a> Debug for TaskGuard<'a> { } impl<'a> TaskGuard<'a> { + pub fn id(&self) -> TaskId { + self.task_id + } + pub fn add(&mut self, item: CachedDataItem) -> bool { if !item.is_persistent() { self.task.add(item) @@ -200,6 +204,53 @@ impl<'a> TaskGuard<'a> { } } + pub fn update( + &mut self, + key: &CachedDataItemKey, + update: impl FnOnce(Option) -> Option, + ) { + if !key.is_persistent() { + self.task.update(key, update); + return; + } + let Self { + task, + task_id, + backend, + } = self; + let mut add_persisting_item = false; + task.update(key, |old| { + let old_persistent = old.as_ref().map(|old| old.is_persistent()).unwrap_or(false); + let new = update(old); + let new_persistent = new.as_ref().map(|new| new.is_persistent()).unwrap_or(false); + + match (old_persistent, new_persistent) { + (false, false) => {} + (true, false) => { + add_persisting_item = true; + backend.persisted_storage_log.lock().push(CachedDataUpdate { + key: key.clone(), + task: *task_id, + value: None, + }); + } + (_, true) => { + add_persisting_item = true; + backend.persisted_storage_log.lock().push(CachedDataUpdate { + key: key.clone(), + task: *task_id, + value: new.clone(), + }); + } + } + + new + }); + if add_persisting_item { + task.persistance_state.add_persisting_item(); + } + } + pub fn remove(&mut self, key: &CachedDataItemKey) -> Option { let old_value = self.task.remove(key); if let Some(value) = old_value { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 1b4a3c58fef9b..4832f29ffbc59 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -91,6 +91,31 @@ impl InnerStorage { } } +impl InnerStorage +where + T::Value: Default, + T::Key: Clone, +{ + pub fn update( + &mut self, + key: &T::Key, + update: impl FnOnce(Option) -> Option, + ) { + if let Some(value) = self.map.get_mut(key) { + let v = take(value); + if let Some(v) = update(Some(v)) { + *value = v; + } else { + self.map.remove(key); + } + } else { + if let Some(v) = update(None) { + self.map.insert(key.clone(), v); + } + } + } +} + impl InnerStorage where T::Value: PartialEq, @@ -171,6 +196,91 @@ macro_rules! get { }; } +#[macro_export] +macro_rules! get_many { + ($task:ident, $key:ident $input:tt => $value:ident) => { + $task + .iter() + .filter_map(|(key, _)| match *key { + CachedDataItemKey::$key $input => Some($value), + _ => None, + }) + .collect() + }; + ($task:ident, $key1:ident $input1:tt => $value1:ident, $key2:ident $input2:tt => $value2:ident) => { + $task + .iter() + .filter_map(|(key, _)| match *key { + CachedDataItemKey::$key1 $input1 => Some($value1), + CachedDataItemKey::$key2 $input2 => Some($value2), + _ => None, + }) + .collect() + }; +} + +#[macro_export] +macro_rules! update { + ($task:ident, $key:ident $input:tt, $update:expr) => { + #[allow(unused_mut)] + match $update { + mut update => $task.update(&$crate::data::CachedDataItemKey::$key $input, |old| { + update(old.and_then(|old| { + if let $crate::data::CachedDataItemValue::$key { value } = old { + Some(value) + } else { + None + } + })) + .map(|new| $crate::data::CachedDataItemValue::$key { value: new }) + }) + } + }; + ($task:ident, $key:ident, $update:expr) => { + #[allow(unused_mut)] + match $update { + mut update => $task.update(&$crate::data::CachedDataItemKey::$key {}, |old| { + update(old.and_then(|old| { + if let $crate::data::CachedDataItemValue::$key { value } = old { + Some(value) + } else { + None + } + })) + .map(|new| $crate::data::CachedDataItemValue::$key { value: new }) + }) + } + }; +} + +#[macro_export] +macro_rules! update_count { + ($task:ident, $key:ident $input:tt, $update:expr) => { + match $update { + update => { + let mut state_change = false; + $crate::update!($task, $key $input, |old: Option| { + if old.is_none() { + state_change = true; + } + let old = old.unwrap_or(0); + let new = old as i32 + update; + if new == 0 { + state_change = true; + None + } else { + Some(new as u32) + } + }); + state_change + } + } + }; + ($task:ident, $key:ident, $update:expr) => { + $crate::update_count!($task, $key {}, $update) + }; +} + #[macro_export] macro_rules! remove { ($task:ident, $key:ident $input:tt) => { diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index c079b035247e1..95a7538328855 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use turbo_tasks::{ event::Event, util::SharedError, CellId, KeyValuePair, TaskId, TypedSharedReference, + ValueTypeId, }; #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] @@ -9,6 +10,12 @@ pub struct CellRef { pub cell: CellId, } +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +pub struct CollectiblesRef { + pub task: TaskId, + pub collectible_type: ValueTypeId, +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum OutputValue { Cell(CellRef), @@ -94,6 +101,10 @@ pub enum CachedDataItem { target: CellRef, value: (), }, + CollectiblesDependency { + target: CollectiblesRef, + value: (), + }, // Dependent OutputDependent { @@ -105,6 +116,11 @@ pub enum CachedDataItem { task: TaskId, value: (), }, + CollectiblesDependent { + collectibles_type: ValueTypeId, + task: TaskId, + value: (), + }, // Aggregation Graph AggregationNumber { @@ -112,21 +128,21 @@ pub enum CachedDataItem { }, Follower { task: TaskId, - value: (), + value: u32, }, Upper { task: TaskId, - value: (), + value: u32, }, // Aggregated Data AggregatedDirtyTask { task: TaskId, - value: (), + value: u32, }, AggregatedCollectible { collectible: CellRef, - value: (), + value: u32, }, AggregatedUnfinishedTasks { value: u32, @@ -178,8 +194,10 @@ impl CachedDataItem { CachedDataItem::CellData { .. } => true, CachedDataItem::OutputDependency { target, .. } => !target.is_transient(), CachedDataItem::CellDependency { target, .. } => !target.task.is_transient(), + CachedDataItem::CollectiblesDependency { target, .. } => !target.task.is_transient(), CachedDataItem::OutputDependent { task, .. } => !task.is_transient(), CachedDataItem::CellDependent { task, .. } => !task.is_transient(), + CachedDataItem::CollectiblesDependent { task, .. } => !task.is_transient(), CachedDataItem::AggregationNumber { .. } => true, CachedDataItem::Follower { task, .. } => !task.is_transient(), CachedDataItem::Upper { task, .. } => !task.is_transient(), @@ -220,8 +238,10 @@ impl CachedDataItemKey { CachedDataItemKey::CellData { .. } => true, CachedDataItemKey::OutputDependency { target, .. } => !target.is_transient(), CachedDataItemKey::CellDependency { target, .. } => !target.task.is_transient(), + CachedDataItemKey::CollectiblesDependency { target, .. } => !target.task.is_transient(), CachedDataItemKey::OutputDependent { task, .. } => !task.is_transient(), CachedDataItemKey::CellDependent { task, .. } => !task.is_transient(), + CachedDataItemKey::CollectiblesDependent { task, .. } => !task.is_transient(), CachedDataItemKey::AggregationNumber { .. } => true, CachedDataItemKey::Follower { task, .. } => !task.is_transient(), CachedDataItemKey::Upper { task, .. } => !task.is_transient(), diff --git a/turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs b/turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs index 3026d73f5dcd3..a3a0d192ec9a2 100644 --- a/turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs @@ -106,13 +106,15 @@ pub fn derive_key_value_pair(input: TokenStream) -> TokenStream { )* } - #[derive(Debug, Clone)] + #[derive(Debug, Clone, Default)] #vis enum #value_name { #( #variant_names { #value_decl }, )* + #[default] + Reserved, } } .into()