Skip to content

Commit

Permalink
apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Oct 8, 2024
1 parent 041751d commit 594b0a8
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 60 deletions.
57 changes: 37 additions & 20 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::{
Arc,
},
thread::available_parallelism,
time::{Duration, Instant},
};

use anyhow::{bail, Result};
Expand All @@ -23,6 +22,7 @@ use dashmap::DashMap;
use parking_lot::{Condvar, Mutex};
use rustc_hash::FxHasher;
use smallvec::smallvec;
use tokio::time::{Duration, Instant};
use turbo_tasks::{
backend::{
Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
Expand Down Expand Up @@ -54,6 +54,9 @@ use crate::{
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded},
};

const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };

const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);

struct SnapshotRequest {
Expand Down Expand Up @@ -123,7 +126,7 @@ struct TurboTasksBackendInner {
/// Condition Variable that is triggered when a snapshot is completed and
/// operations can continue.
snapshot_completed: Condvar,
/// The timestamp of the last started snapshot.
/// The timestamp of the last started snapshot since [`Self::start_time`].
last_snapshot: AtomicU64,

stopping: AtomicBool,
Expand Down Expand Up @@ -291,10 +294,11 @@ impl TurboTasksBackendInner {
child_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend>,
) {
operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
// Safety: Passing `None` is safe.
self.execute_context_with_tx(None, turbo_tasks)
});
operation::ConnectChildOperation::run(
parent_task,
child_task,
self.execute_context(turbo_tasks),
);
}

fn try_read_task_output(
Expand Down Expand Up @@ -541,13 +545,11 @@ impl TurboTasksBackendInner {
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
.fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations
.load(std::sync::atomic::Ordering::Relaxed)
!= SNAPSHOT_REQUESTED_BIT
self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
});
}
let suspended_operations = snapshot_request
Expand All @@ -562,7 +564,7 @@ impl TurboTasksBackendInner {
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
.fetch_sub(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
.fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
self.snapshot_completed.notify_all();
let snapshot_time = Instant::now();
drop(snapshot_request);
Expand Down Expand Up @@ -622,7 +624,7 @@ impl TurboTasksBackendInner {
}

// Schedule the snapshot job
turbo_tasks.schedule_backend_background_job(BackendJobId::from(1));
turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT);
}

fn stopping(&self) {
Expand Down Expand Up @@ -1157,15 +1159,15 @@ impl TurboTasksBackendInner {
turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
if *id == 1 || *id == 2 {
if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
loop {
const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(30);
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(15);
const IDLE_TIMEOUT: Duration = Duration::from_secs(1);

let time = if *id == 1 {
let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
FIRST_SNAPSHOT_WAIT
} else {
SNAPSHOT_INTERVAL
Expand All @@ -1177,7 +1179,11 @@ impl TurboTasksBackendInner {
if !self.stopping.load(Ordering::Acquire) {
let mut idle_start_listener = self.idle_start_event.listen();
let mut idle_end_listener = self.idle_end_event.listen();
let mut idle_time = until + IDLE_TIMEOUT;
let mut idle_time = if turbo_tasks.is_idle() {
Instant::now() + IDLE_TIMEOUT
} else {
far_future()
};
loop {
tokio::select! {
_ = &mut stop_listener => {
Expand All @@ -1191,10 +1197,10 @@ impl TurboTasksBackendInner {
idle_time = until + IDLE_TIMEOUT;
idle_end_listener = self.idle_end_event.listen()
},
_ = tokio::time::sleep_until(until.into()) => {
_ = tokio::time::sleep_until(until) => {
break;
},
_ = tokio::time::sleep_until(idle_time.into()) => {
_ = tokio::time::sleep_until(idle_time) => {
if turbo_tasks.is_idle() {
break;
}
Expand All @@ -1212,10 +1218,12 @@ impl TurboTasksBackendInner {
continue;
}
let last_snapshot = last_snapshot.duration_since(self.start_time);
self.last_snapshot
.store(last_snapshot.as_millis() as u64, Ordering::Relaxed);
self.last_snapshot.store(
last_snapshot.as_millis().try_into().unwrap(),
Ordering::Relaxed,
);

turbo_tasks.schedule_backend_background_job(BackendJobId::from(2));
turbo_tasks.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT);
return;
}
}
Expand Down Expand Up @@ -1525,3 +1533,12 @@ impl Backend for TurboTasksBackend {
todo!()
}
}

// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
fn far_future() -> Instant {
// Roughly 30 years from now.
// API does not provide a way to obtain max `Instant`
// or convert specific date in the future to instant.
// 1000 years overflows on macOS, 100 years overflows on FreeBSD.
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}
56 changes: 23 additions & 33 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ use turbo_tasks::{

use crate::backend::{indexed::Indexed, TaskDataCategory};

// this traits are needed for the transient variants of `CachedDataItem`
// transient variants are never cloned or compared
macro_rules! transient_traits {
($name:ident) => {
impl Clone for $name {
fn clone(&self) -> Self {
// this impl is needed for the transient variants of `CachedDataItem`
// transient variants are never cloned
panic!(concat!(stringify!($name), " cannot be cloned"));
}
}

impl PartialEq for $name {
fn eq(&self, _other: &Self) -> bool {
panic!(concat!(stringify!($name), " cannot be compared"));
}
}
};
}

#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct CellRef {
pub task: TaskId,
Expand Down Expand Up @@ -53,17 +73,7 @@ impl RootState {
}
}

impl Clone for RootState {
fn clone(&self) -> Self {
panic!("RootState cannot be cloned");
}
}

impl PartialEq for RootState {
fn eq(&self, _other: &Self) -> bool {
panic!("RootState cannot be compared");
}
}
transient_traits!(RootState);

impl Eq for RootState {}

Expand All @@ -90,17 +100,7 @@ pub enum InProgressState {
},
}

impl Clone for InProgressState {
fn clone(&self) -> Self {
panic!("InProgressState cannot be cloned");
}
}

impl PartialEq for InProgressState {
fn eq(&self, _other: &Self) -> bool {
panic!("InProgressState cannot be compared");
}
}
transient_traits!(InProgressState);

impl Eq for InProgressState {}

Expand All @@ -109,17 +109,7 @@ pub struct InProgressCellState {
pub event: Event,
}

impl Clone for InProgressCellState {
fn clone(&self) -> Self {
panic!("InProgressCell cannot be cloned");
}
}

impl PartialEq for InProgressCellState {
fn eq(&self, _other: &Self) -> bool {
panic!("InProgressCell cannot be compared");
}
}
transient_traits!(InProgressCellState);

impl Eq for InProgressCellState {}

Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ macro_rules! define_id {
/// # Safety
///
/// The passed `id` must not be zero.
pub unsafe fn new_unchecked(id: $primitive) -> Self {
pub const unsafe fn new_unchecked(id: $primitive) -> Self {
Self { id: unsafe { NonZero::<$primitive>::new_unchecked(id) } }
}
}
Expand Down
6 changes: 4 additions & 2 deletions turbopack/crates/turbo-tasks/src/key_value_pair.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt::Debug;

pub trait KeyValuePair {
type Key: PartialEq + Eq + std::hash::Hash;
type Value;
type Key: Debug + Clone + PartialEq + Eq + std::hash::Hash;
type Value: Debug + Clone + Default + PartialEq + Eq;
fn key(&self) -> Self::Key;
fn value(&self) -> Self::Value;
fn from_key_and_value(key: Self::Key, value: Self::Value) -> Self;
Expand Down
10 changes: 6 additions & 4 deletions turbopack/crates/turbo-tasks/src/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use crate::{turbo_tasks, turbo_tasks_scope, TurboTasksApi};

/// A wrapper around [`rayon::Scope`] that preserves the [`turbo_tasks_scope`].
pub struct Scope<'scope, 'a> {
scope: &'a rayon::Scope<'scope>,
handle: tokio::runtime::Handle,
Expand All @@ -10,9 +11,9 @@ pub struct Scope<'scope, 'a> {
}

impl<'scope, 'a> Scope<'scope, 'a> {

Check failure on line 13 in turbopack/crates/turbo-tasks/src/scope.rs

View workflow job for this annotation

GitHub Actions / rust check / build

the following explicit lifetimes could be elided: 'a
pub fn spawn<BODY>(&self, body: BODY)
pub fn spawn<Body>(&self, body: Body)
where
BODY: FnOnce(&Scope<'scope, '_>) + Send + 'scope,
Body: FnOnce(&Scope<'scope, '_>) + Send + 'scope,
{
let span = self.span.clone();
let handle = self.handle.clone();
Expand All @@ -32,9 +33,10 @@ impl<'scope, 'a> Scope<'scope, 'a> {
}
}

pub fn scope<'scope, OP, R>(op: OP) -> R
/// A wrapper around [`rayon::in_place_scope`] that preserves the [`turbo_tasks_scope`].
pub fn scope<'scope, Op, R>(op: Op) -> R
where
OP: FnOnce(&Scope<'scope, '_>) -> R,
Op: FnOnce(&Scope<'scope, '_>) -> R,
{
let span = tracing::Span::current();
let handle = tokio::runtime::Handle::current();
Expand Down

0 comments on commit 594b0a8

Please sign in to comment.