Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] add support for collectibles to new backend #70798

Open
wants to merge 12 commits into
base: sokra/add-persistent-caching-option
Choose a base branch
from
4 changes: 1 addition & 3 deletions crates/napi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ __internal_dhat-heap = ["dhat"]
# effectively does nothing.
__internal_dhat-ad-hoc = ["dhat"]

new-backend = ["dep:turbo-tasks-backend"]

# Enable specific tls features per-target.
[target.'cfg(all(target_os = "windows", target_arch = "aarch64"))'.dependencies]
next-core = { workspace = true, features = ["native-tls"] }
Expand Down Expand Up @@ -107,7 +105,7 @@ lightningcss-napi = { workspace = true }
tokio = { workspace = true, features = ["full"] }
turbo-tasks = { workspace = true }
turbo-tasks-memory = { workspace = true }
turbo-tasks-backend = { workspace = true, optional = true }
turbo-tasks-backend = { workspace = true }
turbo-tasks-fs = { workspace = true }
next-api = { workspace = true }
next-build = { workspace = true }
Expand Down
39 changes: 19 additions & 20 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use tokio::{io::AsyncWriteExt, time::Instant};
use tracing::Instrument;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry};
use turbo_tasks::{Completion, RcStr, ReadRef, TransientInstance, TurboTasks, UpdateInfo, Vc};
use turbo_tasks::{Completion, RcStr, ReadRef, TransientInstance, UpdateInfo, Vc};
use turbo_tasks_fs::{DiskFileSystem, FileContent, FileSystem, FileSystemPath};
use turbopack_core::{
diagnostics::PlainDiagnostic,
Expand All @@ -44,7 +44,7 @@
endpoint::ExternalEndpoint,
utils::{
create_turbo_tasks, get_diagnostics, get_issues, subscribe, NapiDiagnostic, NapiIssue,
NextBackend, RootTask, TurbopackResult, VcArc,
NextTurboTasks, RootTask, TurbopackResult, VcArc,
},
};
use crate::register;
Expand Down Expand Up @@ -188,6 +188,8 @@

#[napi(object)]
pub struct NapiTurboEngineOptions {
/// Use the new backend with persistent caching enabled.
pub persistent_caching: Option<bool>,
/// An upper bound of memory that turbopack will attempt to stay under.
pub memory_limit: Option<f64>,
}
Expand Down Expand Up @@ -272,7 +274,7 @@
}

pub struct ProjectInstance {
turbo_tasks: Arc<TurboTasks<NextBackend>>,
turbo_tasks: NextTurboTasks,
container: Vc<ProjectContainer>,
exit_receiver: tokio::sync::Mutex<Option<ExitReceiver>>,
}
Expand Down Expand Up @@ -338,13 +340,20 @@
.memory_limit
.map(|m| m as usize)
.unwrap_or(usize::MAX);
let turbo_tasks = create_turbo_tasks(PathBuf::from(&options.dist_dir), memory_limit)?;
#[cfg(not(feature = "new-backend"))]
{
let persistent_caching = turbo_engine_options.persistent_caching.unwrap_or_default();
let turbo_tasks = create_turbo_tasks(
PathBuf::from(&options.dist_dir),
persistent_caching,
memory_limit,
)?;
if !persistent_caching {
use std::io::Write;
let stats_path = std::env::var_os("NEXT_TURBOPACK_TASK_STATISTICS");
if let Some(stats_path) = stats_path {
let task_stats = turbo_tasks.backend().task_statistics().enable().clone();
let Some(backend) = turbo_tasks.memory_backend() else {
return Err(anyhow!("task statistics require a memory backend").into());
};
let task_stats = backend.task_statistics().enable().clone();
exit.on_exit(async move {
tokio::task::spawn_blocking(move || {
let mut file = std::fs::File::create(&stats_path)
Expand Down Expand Up @@ -498,11 +507,7 @@
}

impl NapiRoute {
fn from_route(
pathname: String,
value: Route,
turbo_tasks: &Arc<TurboTasks<NextBackend>>,
) -> Self {
fn from_route(pathname: String, value: Route, turbo_tasks: &NextTurboTasks) -> Self {
let convert_endpoint = |endpoint: Vc<Box<dyn Endpoint>>| {
Some(External::new(ExternalEndpoint(VcArc::new(
turbo_tasks.clone(),
Expand Down Expand Up @@ -566,10 +571,7 @@
}

impl NapiMiddleware {
fn from_middleware(
value: &Middleware,
turbo_tasks: &Arc<TurboTasks<NextBackend>>,
) -> Result<Self> {
fn from_middleware(value: &Middleware, turbo_tasks: &NextTurboTasks) -> Result<Self> {
Ok(NapiMiddleware {
endpoint: External::new(ExternalEndpoint(VcArc::new(
turbo_tasks.clone(),
Expand All @@ -586,10 +588,7 @@
}

impl NapiInstrumentation {
fn from_instrumentation(
value: &Instrumentation,
turbo_tasks: &Arc<TurboTasks<NextBackend>>,
) -> Result<Self> {
fn from_instrumentation(value: &Instrumentation, turbo_tasks: &NextTurboTasks) -> Result<Self> {
Ok(NapiInstrumentation {
node_js: External::new(ExternalEndpoint(VcArc::new(
turbo_tasks.clone(),
Expand Down Expand Up @@ -930,7 +929,7 @@
}
}

/// Subscribes to lifecycle events of the compilation.

Check warning on line 932 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`

Check warning on line 932 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 932 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 932 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`
///
/// Emits an [UpdateMessage::Start] event when any computation starts.
/// Emits an [UpdateMessage::End] event when there was no computation for the
Expand Down
139 changes: 119 additions & 20 deletions crates/napi/src/next_api/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{collections::HashMap, future::Future, ops::Deref, path::PathBuf, sync::Arc};
use std::{
collections::HashMap, future::Future, ops::Deref, path::PathBuf, sync::Arc, time::Duration,
};

use anyhow::{anyhow, Context, Result};
use napi::{
Expand All @@ -7,7 +9,9 @@ use napi::{
JsFunction, JsObject, JsUnknown, NapiRaw, NapiValue, Status,
};
use serde::Serialize;
use turbo_tasks::{ReadRef, TaskId, TryJoinIterExt, TurboTasks, Vc};
use turbo_tasks::{
trace::TraceRawVcs, ReadRef, TaskId, TryJoinIterExt, TurboTasks, UpdateInfo, Vc,
};
use turbo_tasks_fs::FileContent;
use turbopack_core::{
diagnostics::{Diagnostic, DiagnosticContextExt, PlainDiagnostic},
Expand All @@ -18,41 +22,136 @@ use turbopack_core::{

use crate::util::log_internal_error_and_inform;

#[cfg(not(feature = "new-backend"))]
pub type NextBackend = turbo_tasks_memory::MemoryBackend;
#[cfg(feature = "new-backend")]
pub type NextBackend = turbo_tasks_backend::TurboTasksBackend;
#[derive(Clone)]
pub enum NextTurboTasks {
Memory(Arc<TurboTasks<turbo_tasks_memory::MemoryBackend>>),
PersistentCaching(Arc<TurboTasks<turbo_tasks_backend::TurboTasksBackend>>),
}

impl NextTurboTasks {
pub fn dispose_root_task(&self, task: TaskId) {
match self {
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.dispose_root_task(task),
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.dispose_root_task(task),
}
}

pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
where
T: Send,
F: Fn() -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<Vc<T>>> + Send,
{
match self {
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.spawn_root_task(functor),
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.spawn_root_task(functor),
}
}

pub async fn run_once<T: TraceRawVcs + Send + 'static>(
&self,
future: impl Future<Output = Result<T>> + Send + 'static,
) -> Result<T> {
match self {
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.run_once(future).await,
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.run_once(future).await,
}
}

pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
where
T: Send,
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
{
match self {
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.spawn_once_task(future),
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.spawn_once_task(future),
}
}

pub async fn aggregated_update_info(
&self,
aggregation: Duration,
timeout: Duration,
) -> Option<UpdateInfo> {
match self {
NextTurboTasks::Memory(turbo_tasks) => {
turbo_tasks
.aggregated_update_info(aggregation, timeout)
.await
}
NextTurboTasks::PersistentCaching(turbo_tasks) => {
turbo_tasks
.aggregated_update_info(aggregation, timeout)
.await
}
}
}

pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
match self {
NextTurboTasks::Memory(turbo_tasks) => {
turbo_tasks
.get_or_wait_aggregated_update_info(aggregation)
.await
}
NextTurboTasks::PersistentCaching(turbo_tasks) => {
turbo_tasks
.get_or_wait_aggregated_update_info(aggregation)
.await
}
}
}

pub fn memory_backend(&self) -> Option<&turbo_tasks_memory::MemoryBackend> {
match self {
NextTurboTasks::Memory(turbo_tasks) => Some(turbo_tasks.backend()),
NextTurboTasks::PersistentCaching(_) => None,
}
}

pub async fn stop_and_wait(&self) {
match self {
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.stop_and_wait().await,
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.stop_and_wait().await,
}
}
}

#[allow(unused_variables, reason = "feature-gated")]
pub fn create_turbo_tasks(
output_path: PathBuf,
persistent_caching: bool,
memory_limit: usize,
) -> Result<Arc<TurboTasks<NextBackend>>> {
#[cfg(not(feature = "new-backend"))]
let backend = TurboTasks::new(turbo_tasks_memory::MemoryBackend::new(memory_limit));
#[cfg(feature = "new-backend")]
let backend = TurboTasks::new(turbo_tasks_backend::TurboTasksBackend::new(Arc::new(
turbo_tasks_backend::LmdbBackingStorage::new(&output_path.join("cache/turbopack"))?,
)));
Ok(backend)
) -> Result<NextTurboTasks> {
Ok(if persistent_caching {
NextTurboTasks::PersistentCaching(TurboTasks::new(
turbo_tasks_backend::TurboTasksBackend::new(Arc::new(
turbo_tasks_backend::LmdbBackingStorage::new(&output_path.join("cache/turbopack"))?,
)),
))
} else {
NextTurboTasks::Memory(TurboTasks::new(turbo_tasks_memory::MemoryBackend::new(
memory_limit,
)))
})
}

/// A helper type to hold both a Vc operation and the TurboTasks root process.
/// Without this, we'd need to pass both individually all over the place
#[derive(Clone)]
pub struct VcArc<T> {
turbo_tasks: Arc<TurboTasks<NextBackend>>,
turbo_tasks: NextTurboTasks,
/// The Vc. Must be resolved, otherwise you are referencing an inactive
/// operation.
vc: T,
}

impl<T> VcArc<T> {
pub fn new(turbo_tasks: Arc<TurboTasks<NextBackend>>, vc: T) -> Self {
pub fn new(turbo_tasks: NextTurboTasks, vc: T) -> Self {
Self { turbo_tasks, vc }
}

pub fn turbo_tasks(&self) -> &Arc<TurboTasks<NextBackend>> {
pub fn turbo_tasks(&self) -> &NextTurboTasks {
&self.turbo_tasks
}
}
Expand All @@ -75,7 +174,7 @@ pub fn serde_enum_to_string<T: Serialize>(value: &T) -> Result<String> {
/// The root of our turbopack computation.
pub struct RootTask {
#[allow(dead_code)]
turbo_tasks: Arc<TurboTasks<NextBackend>>,
turbo_tasks: NextTurboTasks,
#[allow(dead_code)]
task_id: Option<TaskId>,
}
Expand Down Expand Up @@ -319,7 +418,7 @@ impl<T: ToNapiValue> ToNapiValue for TurbopackResult<T> {
}

pub fn subscribe<T: 'static + Send + Sync, F: Future<Output = Result<T>> + Send, V: ToNapiValue>(
turbo_tasks: Arc<TurboTasks<NextBackend>>,
turbo_tasks: NextTurboTasks,
func: JsFunction,
handler: impl 'static + Sync + Send + Clone + Fn() -> F,
mapper: impl 'static + Sync + Send + FnMut(ThreadSafeCallContext<T>) -> napi::Result<Vec<V>>,
Expand Down
48 changes: 27 additions & 21 deletions crates/napi/src/turbotrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,52 @@ use std::{path::PathBuf, sync::Arc};

use napi::bindgen_prelude::*;
use node_file_trace::{start, Args};
use turbo_tasks::TurboTasks;
use turbopack::{
module_options::{EcmascriptOptionsContext, ModuleOptionsContext},
resolve_options_context::ResolveOptionsContext,
};

use crate::next_api::utils::{self, NextBackend};
use crate::next_api::utils::{self, NextTurboTasks};

#[napi]
pub fn create_turbo_tasks(
output_path: String,
persistent_caching: bool,
memory_limit: Option<i64>,
) -> External<Arc<TurboTasks<NextBackend>>> {
) -> External<NextTurboTasks> {
let limit = memory_limit.map(|u| u as usize).unwrap_or(usize::MAX);
let turbo_tasks = utils::create_turbo_tasks(PathBuf::from(&output_path), limit)
.expect("Failed to create TurboTasks");
let turbo_tasks =
utils::create_turbo_tasks(PathBuf::from(&output_path), persistent_caching, limit)
.expect("Failed to create TurboTasks");
External::new_with_size_hint(turbo_tasks, limit)
}

#[napi]
pub async fn run_turbo_tracing(
options: Buffer,
turbo_tasks: External<Arc<TurboTasks<NextBackend>>>,
turbo_tasks: External<NextTurboTasks>,
) -> napi::Result<Vec<String>> {
let args: Args = serde_json::from_slice(options.as_ref())?;
let files = start(
Arc::new(args),
turbo_tasks.clone(),
Some(ModuleOptionsContext {
ecmascript: EcmascriptOptionsContext {
enable_types: true,
..Default::default()
},
enable_mdx: true,
let args = Arc::new(args);
let module_options = Some(ModuleOptionsContext {
ecmascript: EcmascriptOptionsContext {
enable_types: true,
..Default::default()
}),
Some(ResolveOptionsContext {
..Default::default()
}),
)
.await?;
},
enable_mdx: true,
..Default::default()
});
let resolve_options = Some(ResolveOptionsContext {
..Default::default()
});
let files = match &*turbo_tasks {
NextTurboTasks::Memory(turbo_tasks) => {
start(args, turbo_tasks.clone(), module_options, resolve_options).await?
}
NextTurboTasks::PersistentCaching(turbo_tasks) => {
start(args, turbo_tasks.clone(), module_options, resolve_options).await?
}
};

Ok(files.into_iter().map(|f| f.to_string()).collect())
}
Loading
Loading