From b63661041bf53ad2b3d1b1b9862c9d56d46e246f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Correa=20Casablanca?= Date: Wed, 17 Apr 2024 19:13:25 +0200 Subject: [PATCH] new: Add `mutex` option to task configuration (#1416) * feat: add mutex option to task configuration Signed-off-by: Andres Correa Casablanca * docs: add mutex section to task.options docs Signed-off-by: Andres Correa Casablanca * refactor: remove useless impls Signed-off-by: Andres Correa Casablanca * refactor: use dashmap Signed-off-by: Andres Correa Casablanca * perf: remove allocation Signed-off-by: Andres Correa Casablanca * refactor: fix code style issues Signed-off-by: Andres Correa Casablanca --------- Signed-off-by: Andres Correa Casablanca --- Cargo.lock | 2 + Cargo.toml | 1 + crates/core/action-context/Cargo.toml | 2 + crates/core/action-context/src/lib.rs | 6 ++- .../action-pipeline/src/actions/run_task.rs | 39 +++++++++++++------ .../config/src/project/task_options_config.rs | 4 ++ nextgen/task-builder/src/tasks_builder.rs | 4 ++ nextgen/task/src/task_options.rs | 3 ++ packages/types/src/project.ts | 1 + packages/types/src/tasks-config.ts | 10 +++++ website/docs/config/project.mdx | 25 ++++++++++++ website/static/schemas/project.json | 13 +++++++ website/static/schemas/tasks.json | 13 +++++++ website/tsconfig.json | 9 ++++- 14 files changed, 117 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9138042961..9615058c106 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3059,11 +3059,13 @@ name = "moon_action_context" version = "0.0.1" dependencies = [ "clap", + "dashmap", "moon_common", "moon_target", "relative-path", "rustc-hash", "serde", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 558722123fd..ff738708ee7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ clap = { version = "4.5.4", default-features = false, features = [ ] } clap_complete = "4.5.2" console = "0.15.8" +dashmap = "5.5.3" dirs = "5.0.1" miette = "7.2.0" once_cell = "1.19.0" diff --git a/crates/core/action-context/Cargo.toml b/crates/core/action-context/Cargo.toml index 134d73e369a..eff7fa137c9 100644 --- a/crates/core/action-context/Cargo.toml +++ b/crates/core/action-context/Cargo.toml @@ -8,6 +8,8 @@ publish = false moon_common = { path = "../../../nextgen/common" } moon_target = { path = "../../../nextgen/target" } clap = { workspace = true } +dashmap = { workspace = true } relative-path = { workspace = true, features = ["serde"] } rustc-hash = { workspace = true } serde = { workspace = true } +tokio = { workspace = true } diff --git a/crates/core/action-context/src/lib.rs b/crates/core/action-context/src/lib.rs index f09b74f5db7..980ec6e01fa 100644 --- a/crates/core/action-context/src/lib.rs +++ b/crates/core/action-context/src/lib.rs @@ -1,9 +1,10 @@ use clap::ValueEnum; +use dashmap::DashMap; use moon_common::path::WorkspaceRelativePathBuf; use moon_target::{Target, TargetLocator}; use rustc_hash::{FxHashMap, FxHashSet}; use serde::{Deserialize, Serialize}; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; #[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)] pub enum ProfileType { @@ -33,6 +34,9 @@ pub struct ActionContext { pub initial_targets: FxHashSet, + #[serde(skip)] + pub named_mutexes: DashMap>>, + pub passthrough_args: Vec, pub primary_targets: FxHashSet, diff --git a/crates/core/action-pipeline/src/actions/run_task.rs b/crates/core/action-pipeline/src/actions/run_task.rs index d4a422d0d8f..44c655ef9af 100644 --- a/crates/core/action-pipeline/src/actions/run_task.rs +++ b/crates/core/action-pipeline/src/actions/run_task.rs @@ -94,18 +94,33 @@ pub async fn run_task( .insert(target.clone(), TargetState::Passthrough); } - let attempts_result = if is_cache_enabled { - let context = context.read().await; - - runner.create_and_run_command(&context, runtime).await - } else { - // Concurrent long-running tasks will cause a deadlock, as some threads will - // attempt to write to context while others are reading from it, and long-running - // tasks may never release the lock. Unfortuantely we have to clone here to work - // around it, so revisit in the future. - let context = (context.read().await).clone(); - - runner.create_and_run_command(&context, runtime).await + let attempts_result = { + let _ctx: RwLock; + let ctx = if is_cache_enabled { + context.read().await + } else { + // Concurrent long-running tasks will cause a deadlock, as some threads will + // attempt to write to context while others are reading from it, and long-running + // tasks may never release the lock. Unfortuantely we have to clone here to work + // around it, so revisit in the future. + _ctx = RwLock::new(context.read().await.clone()); + _ctx.read().await + }; + + if let Some(mutex_name) = &task.options.mutex { + if let Some(named_mutex) = ctx.named_mutexes.get(mutex_name) { + let _guard = named_mutex.lock().await; + + runner.create_and_run_command(&ctx, runtime).await + } else { + Result::Err(miette::Report::msg(format!( + "Unable to acquire named mutex \"{}\"", + mutex_name + ))) + } + } else { + runner.create_and_run_command(&ctx, runtime).await + } }; match attempts_result { diff --git a/nextgen/config/src/project/task_options_config.rs b/nextgen/config/src/project/task_options_config.rs index 434636c9b86..f3125d10b41 100644 --- a/nextgen/config/src/project/task_options_config.rs +++ b/nextgen/config/src/project/task_options_config.rs @@ -201,6 +201,10 @@ cacheable!( /// The strategy to use when merging `outputs` with an inherited task. pub merge_outputs: Option, + /// Creates an exclusive lock on a virtual resource, preventing other + /// tasks using the same resource from running concurrently. + pub mutex: Option, + /// The style in which task output will be printed to the console. #[setting(env = "MOON_OUTPUT_STYLE")] pub output_style: Option, diff --git a/nextgen/task-builder/src/tasks_builder.rs b/nextgen/task-builder/src/tasks_builder.rs index d671a3f25a4..5c84c4b3f14 100644 --- a/nextgen/task-builder/src/tasks_builder.rs +++ b/nextgen/task-builder/src/tasks_builder.rs @@ -516,6 +516,10 @@ impl<'proj> TasksBuilder<'proj> { options.merge_outputs = *merge_outputs; } + if let Some(mutex) = &config.mutex { + options.mutex = Some(mutex.clone()); + } + if let Some(output_style) = &config.output_style { options.output_style = Some(*output_style); } diff --git a/nextgen/task/src/task_options.rs b/nextgen/task/src/task_options.rs index d7e1a32913f..16bfaa59dac 100644 --- a/nextgen/task/src/task_options.rs +++ b/nextgen/task/src/task_options.rs @@ -31,6 +31,8 @@ cacheable!( pub merge_outputs: TaskMergeStrategy, + pub mutex: Option, + pub output_style: Option, pub persistent: bool, @@ -67,6 +69,7 @@ impl Default for TaskOptions { merge_env: TaskMergeStrategy::Append, merge_inputs: TaskMergeStrategy::Append, merge_outputs: TaskMergeStrategy::Append, + mutex: None, output_style: None, persistent: false, retry_count: 0, diff --git a/packages/types/src/project.ts b/packages/types/src/project.ts index e50f9807ddd..3f06d549ca2 100644 --- a/packages/types/src/project.ts +++ b/packages/types/src/project.ts @@ -39,6 +39,7 @@ export interface TaskOptions { mergeInputs: TaskMergeStrategy; mergeOutputs: TaskMergeStrategy; outputStyle: TaskOutputStyle | null; + mutex: string | null; persistent: boolean; retryCount: number; runDepsInParallel: boolean; diff --git a/packages/types/src/tasks-config.ts b/packages/types/src/tasks-config.ts index 2ff7aecbe6e..cee8f7be1bf 100644 --- a/packages/types/src/tasks-config.ts +++ b/packages/types/src/tasks-config.ts @@ -88,6 +88,11 @@ export interface TaskOptionsConfig { * @default 'append' */ mergeOutputs: TaskMergeStrategy | null; + /** + * Creates an exclusive lock on a virtual resource, preventing other tasks + * using the same resource from running concurrently. + */ + mutex: string | null; /** * The style in which task output will be printed to the console. * @@ -302,6 +307,11 @@ export interface PartialTaskOptionsConfig { * @default 'append' */ mergeOutputs?: TaskMergeStrategy | null; + /** + * Creates an exclusive lock on a virtual resource, preventing other tasks + * using the same resource from running concurrently. + */ + mutex?: string | null; /** * The style in which task output will be printed to the console. * diff --git a/website/docs/config/project.mdx b/website/docs/config/project.mdx index 2a37d5f4c42..62c920f2fe2 100644 --- a/website/docs/config/project.mdx +++ b/website/docs/config/project.mdx @@ -981,6 +981,31 @@ The [strategy](../concepts/task-inheritance#merge-strategies) to use when mergin The [strategy](../concepts/task-inheritance#merge-strategies) to use when merging the [`outputs`](#outputs) list with an inherited task. Defaults to "append". +#### `mutex` + + + +Creates an exclusive lock on a "virtual resource", preventing other tasks using the same "virtual +resource" from running concurrently. + +If you have many tasks that require exclusive access to a resource that can't be tracked by Moon +(like a database, an ignored file, a file that's not part of the project, or a remote resource) you +can use the `mutex` option to prevent them from running at the same time. + +```yaml title="moon.yml" {5} +tasks: + task_a: + # ... + options: + mutex: 'virtual_resource_name' + + # task_b doesn't necessarily have to be in the same project + task_b: + # ... + options: + mutex: 'virtual_resource_name' +``` + #### `outputStyle` diff --git a/website/static/schemas/project.json b/website/static/schemas/project.json index 292c1ba51b2..5fca5f9b3d7 100644 --- a/website/static/schemas/project.json +++ b/website/static/schemas/project.json @@ -1098,6 +1098,19 @@ ], "markdownDescription": "The strategy to use when merging `outputs` with an inherited task." }, + "mutex": { + "title": "mutex", + "description": "Creates an exclusive lock on a virtual resource, preventing other tasks using the same resource from running concurrently.", + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "markdownDescription": "Creates an exclusive lock on a virtual resource, preventing other tasks using the same resource from running concurrently." + }, "outputStyle": { "title": "outputStyle", "description": "The style in which task output will be printed to the console.", diff --git a/website/static/schemas/tasks.json b/website/static/schemas/tasks.json index 73e409387a7..049886758b9 100644 --- a/website/static/schemas/tasks.json +++ b/website/static/schemas/tasks.json @@ -503,6 +503,19 @@ ], "markdownDescription": "The strategy to use when merging `outputs` with an inherited task." }, + "mutex": { + "title": "mutex", + "description": "Creates an exclusive lock on a virtual resource, preventing other tasks using the same resource from running concurrently.", + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "markdownDescription": "Creates an exclusive lock on a virtual resource, preventing other tasks using the same resource from running concurrently." + }, "outputStyle": { "title": "outputStyle", "description": "The style in which task output will be printed to the console.", diff --git a/website/tsconfig.json b/website/tsconfig.json index 6f4159813d4..f9cccdc77c0 100644 --- a/website/tsconfig.json +++ b/website/tsconfig.json @@ -1,5 +1,8 @@ { - "extends": ["../tsconfig.options.json", "@docusaurus/tsconfig"], + "extends": [ + "../tsconfig.options.json", + "@docusaurus/tsconfig" + ], "compilerOptions": { "emitDeclarationOnly": true, "noEmit": false, @@ -7,7 +10,9 @@ "verbatimModuleSyntax": false, "baseUrl": ".", "paths": { - "@site/*": ["./*"] + "@site/*": [ + "./*" + ] } }, "include": [