diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index c9a30009b0e27..9f0e241ed8e99 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -14,7 +14,6 @@ use std::{ Arc, }, thread::available_parallelism, - time::{Duration, Instant}, }; use anyhow::{bail, Result}; @@ -23,6 +22,7 @@ use dashmap::DashMap; use parking_lot::{Condvar, Mutex}; use rustc_hash::FxHasher; use smallvec::smallvec; +use tokio::time::{Duration, Instant}; use turbo_tasks::{ backend::{ Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot, @@ -54,6 +54,9 @@ use crate::{ utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded}, }; +const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) }; +const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) }; + const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1); struct SnapshotRequest { @@ -123,7 +126,7 @@ struct TurboTasksBackendInner { /// Condition Variable that is triggered when a snapshot is completed and /// operations can continue. snapshot_completed: Condvar, - /// The timestamp of the last started snapshot. + /// The timestamp of the last started snapshot since [`Self::start_time`]. last_snapshot: AtomicU64, stopping: AtomicBool, @@ -291,10 +294,11 @@ impl TurboTasksBackendInner { child_task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, ) { - operation::ConnectChildOperation::run(parent_task, child_task, unsafe { - // Safety: Passing `None` is safe. - self.execute_context_with_tx(None, turbo_tasks) - }); + operation::ConnectChildOperation::run( + parent_task, + child_task, + self.execute_context(turbo_tasks), + ); } fn try_read_task_output( @@ -541,13 +545,11 @@ impl TurboTasksBackendInner { snapshot_request.snapshot_requested = true; let active_operations = self .in_progress_operations - .fetch_or(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed); + .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed); if active_operations != 0 { self.operations_suspended .wait_while(&mut snapshot_request, |_| { - self.in_progress_operations - .load(std::sync::atomic::Ordering::Relaxed) - != SNAPSHOT_REQUESTED_BIT + self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT }); } let suspended_operations = snapshot_request @@ -562,7 +564,7 @@ impl TurboTasksBackendInner { let mut snapshot_request = self.snapshot_request.lock(); snapshot_request.snapshot_requested = false; self.in_progress_operations - .fetch_sub(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed); + .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed); self.snapshot_completed.notify_all(); let snapshot_time = Instant::now(); drop(snapshot_request); @@ -622,7 +624,7 @@ impl TurboTasksBackendInner { } // Schedule the snapshot job - turbo_tasks.schedule_backend_background_job(BackendJobId::from(1)); + turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT); } fn stopping(&self) { @@ -1157,7 +1159,7 @@ impl TurboTasksBackendInner { turbo_tasks: &'a dyn TurboTasksBackendApi, ) -> Pin + Send + 'a>> { Box::pin(async move { - if *id == 1 || *id == 2 { + if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT { let last_snapshot = self.last_snapshot.load(Ordering::Relaxed); let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot); loop { @@ -1165,7 +1167,7 @@ impl TurboTasksBackendInner { const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(15); const IDLE_TIMEOUT: Duration = Duration::from_secs(1); - let time = if *id == 1 { + let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT { FIRST_SNAPSHOT_WAIT } else { SNAPSHOT_INTERVAL @@ -1177,7 +1179,11 @@ impl TurboTasksBackendInner { if !self.stopping.load(Ordering::Acquire) { let mut idle_start_listener = self.idle_start_event.listen(); let mut idle_end_listener = self.idle_end_event.listen(); - let mut idle_time = until + IDLE_TIMEOUT; + let mut idle_time = if turbo_tasks.is_idle() { + Instant::now() + IDLE_TIMEOUT + } else { + far_future() + }; loop { tokio::select! { _ = &mut stop_listener => { @@ -1191,10 +1197,10 @@ impl TurboTasksBackendInner { idle_time = until + IDLE_TIMEOUT; idle_end_listener = self.idle_end_event.listen() }, - _ = tokio::time::sleep_until(until.into()) => { + _ = tokio::time::sleep_until(until) => { break; }, - _ = tokio::time::sleep_until(idle_time.into()) => { + _ = tokio::time::sleep_until(idle_time) => { if turbo_tasks.is_idle() { break; } @@ -1212,10 +1218,12 @@ impl TurboTasksBackendInner { continue; } let last_snapshot = last_snapshot.duration_since(self.start_time); - self.last_snapshot - .store(last_snapshot.as_millis() as u64, Ordering::Relaxed); + self.last_snapshot.store( + last_snapshot.as_millis().try_into().unwrap(), + Ordering::Relaxed, + ); - turbo_tasks.schedule_backend_background_job(BackendJobId::from(2)); + turbo_tasks.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT); return; } } @@ -1525,3 +1533,12 @@ impl Backend for TurboTasksBackend { todo!() } } + +// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63 +fn far_future() -> Instant { + // Roughly 30 years from now. + // API does not provide a way to obtain max `Instant` + // or convert specific date in the future to instant. + // 1000 years overflows on macOS, 100 years overflows on FreeBSD. + Instant::now() + Duration::from_secs(86400 * 365 * 30) +} diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 62285cb04436f..4500d7037e1a4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -8,6 +8,26 @@ use turbo_tasks::{ use crate::backend::{indexed::Indexed, TaskDataCategory}; +// this traits are needed for the transient variants of `CachedDataItem` +// transient variants are never cloned or compared +macro_rules! transient_traits { + ($name:ident) => { + impl Clone for $name { + fn clone(&self) -> Self { + // this impl is needed for the transient variants of `CachedDataItem` + // transient variants are never cloned + panic!(concat!(stringify!($name), " cannot be cloned")); + } + } + + impl PartialEq for $name { + fn eq(&self, _other: &Self) -> bool { + panic!(concat!(stringify!($name), " cannot be compared")); + } + } + }; +} + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct CellRef { pub task: TaskId, @@ -53,17 +73,7 @@ impl RootState { } } -impl Clone for RootState { - fn clone(&self) -> Self { - panic!("RootState cannot be cloned"); - } -} - -impl PartialEq for RootState { - fn eq(&self, _other: &Self) -> bool { - panic!("RootState cannot be compared"); - } -} +transient_traits!(RootState); impl Eq for RootState {} @@ -90,17 +100,7 @@ pub enum InProgressState { }, } -impl Clone for InProgressState { - fn clone(&self) -> Self { - panic!("InProgressState cannot be cloned"); - } -} - -impl PartialEq for InProgressState { - fn eq(&self, _other: &Self) -> bool { - panic!("InProgressState cannot be compared"); - } -} +transient_traits!(InProgressState); impl Eq for InProgressState {} @@ -109,17 +109,7 @@ pub struct InProgressCellState { pub event: Event, } -impl Clone for InProgressCellState { - fn clone(&self) -> Self { - panic!("InProgressCell cannot be cloned"); - } -} - -impl PartialEq for InProgressCellState { - fn eq(&self, _other: &Self) -> bool { - panic!("InProgressCell cannot be compared"); - } -} +transient_traits!(InProgressCellState); impl Eq for InProgressCellState {} diff --git a/turbopack/crates/turbo-tasks/src/id.rs b/turbopack/crates/turbo-tasks/src/id.rs index 566800b0dbb17..99c7b0997a047 100644 --- a/turbopack/crates/turbo-tasks/src/id.rs +++ b/turbopack/crates/turbo-tasks/src/id.rs @@ -30,7 +30,7 @@ macro_rules! define_id { /// # Safety /// /// The passed `id` must not be zero. - pub unsafe fn new_unchecked(id: $primitive) -> Self { + pub const unsafe fn new_unchecked(id: $primitive) -> Self { Self { id: unsafe { NonZero::<$primitive>::new_unchecked(id) } } } } diff --git a/turbopack/crates/turbo-tasks/src/key_value_pair.rs b/turbopack/crates/turbo-tasks/src/key_value_pair.rs index 6aceaea04d4f7..dfd0e7bdb92e0 100644 --- a/turbopack/crates/turbo-tasks/src/key_value_pair.rs +++ b/turbopack/crates/turbo-tasks/src/key_value_pair.rs @@ -1,6 +1,8 @@ +use std::fmt::Debug; + pub trait KeyValuePair { - type Key: PartialEq + Eq + std::hash::Hash; - type Value; + type Key: Debug + Clone + PartialEq + Eq + std::hash::Hash; + type Value: Debug + Clone + Default + PartialEq + Eq; fn key(&self) -> Self::Key; fn value(&self) -> Self::Value; fn from_key_and_value(key: Self::Key, value: Self::Value) -> Self; diff --git a/turbopack/crates/turbo-tasks/src/scope.rs b/turbopack/crates/turbo-tasks/src/scope.rs index e7bb8713bd4cb..6420a71e3591f 100644 --- a/turbopack/crates/turbo-tasks/src/scope.rs +++ b/turbopack/crates/turbo-tasks/src/scope.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::{turbo_tasks, turbo_tasks_scope, TurboTasksApi}; +/// A wrapper around [`rayon::Scope`] that preserves the [`turbo_tasks_scope`]. pub struct Scope<'scope, 'a> { scope: &'a rayon::Scope<'scope>, handle: tokio::runtime::Handle, @@ -10,9 +11,9 @@ pub struct Scope<'scope, 'a> { } impl<'scope, 'a> Scope<'scope, 'a> { - pub fn spawn(&self, body: BODY) + pub fn spawn(&self, body: Body) where - BODY: FnOnce(&Scope<'scope, '_>) + Send + 'scope, + Body: FnOnce(&Scope<'scope, '_>) + Send + 'scope, { let span = self.span.clone(); let handle = self.handle.clone(); @@ -32,9 +33,10 @@ impl<'scope, 'a> Scope<'scope, 'a> { } } -pub fn scope<'scope, OP, R>(op: OP) -> R +/// A wrapper around [`rayon::in_place_scope`] that preserves the [`turbo_tasks_scope`]. +pub fn scope<'scope, Op, R>(op: Op) -> R where - OP: FnOnce(&Scope<'scope, '_>) -> R, + Op: FnOnce(&Scope<'scope, '_>) -> R, { let span = tracing::Span::current(); let handle = tokio::runtime::Handle::current();