Skip to content

Commit

Permalink
fixup! backend: use v2 tables through views where possible (v2 phase 3)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Feb 1, 2025
1 parent 025d41b commit 442e4b3
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SELECT
CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END
AS schedule_path,
j.permissioned_as,
f.flow_status,
s.flow_status,
j.raw_flow,
j.flow_step_id IS NOT NULL AS is_flow_step,
j.script_lang AS language,
Expand All @@ -33,8 +33,8 @@ SELECT
j.permissioned_as_email AS email,
j.visible_to_owner,
r.memory_peak AS mem_peak,
j.flow_root_job AS root_job,
f.leaf_jobs,
j.flow_innermost_root_job AS root_job,
s.flow_leaf_jobs AS leaf_jobs,
j.tag,
j.concurrent_limit,
j.concurrency_time_window_s,
Expand All @@ -46,5 +46,5 @@ SELECT
FROM v2_job_queue q
JOIN v2_job j USING (id)
LEFT JOIN v2_job_runtime r USING (id)
LEFT JOIN v2_job_flow_runtime f USING (id)
LEFT JOIN v2_job_status s USING (id)
;
2 changes: 1 addition & 1 deletion backend/windmill-api/src/job_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async fn set_job_progress(
if let Some(flow_job_id) = flow_job_id {
// TODO: Return error if trying to set completed job?
sqlx::query!(
"UPDATE v2_job_flow_runtime
"UPDATE v2_job_status
SET flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step', 'progress'], $1)
WHERE id = $2",
serde_json::json!(percent.clamp(0, 99)),
Expand Down
12 changes: 6 additions & 6 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ async fn get_flow_job_debug_info(

let mut job_ids = vec![];
let jobs_with_root = sqlx::query_scalar!(
"SELECT id FROM v2_job WHERE workspace_id = $1 and flow_root_job = $2",
"SELECT id FROM v2_job WHERE workspace_id = $1 and flow_innermost_root_job = $2",
&w_id,
&id,
)
Expand Down Expand Up @@ -1504,7 +1504,7 @@ async fn cancel_jobs(
, $4
, $1
, 'cancel all'
, (SELECT flow_status FROM v2_job_flow_runtime WHERE id = q.id)
, (SELECT flow_status FROM v2_job_status WHERE id = q.id)
, 'canceled'::job_status
, worker
FROM v2_job_queue q
Expand Down Expand Up @@ -2061,7 +2061,7 @@ async fn get_suspended_parent_flow_info(job_id: Uuid, db: &DB) -> error::Result<
SELECT q.id, f.flow_status, q.suspend, j.runnable_path AS script_path
FROM v2_job_queue q
JOIN v2_job j USING (id)
JOIN v2_job_flow_runtime f USING (id)
JOIN v2_job_status f USING (id)
WHERE id = ( SELECT parent_job FROM v2_job WHERE id = $1 )
FOR UPDATE
"#,
Expand Down Expand Up @@ -2301,7 +2301,7 @@ pub async fn set_flow_user_state(
let mut tx = user_db.begin(&authed).await?;
let r = sqlx::query_scalar!(
r#"
UPDATE v2_job_flow_runtime f SET flow_status = JSONB_SET(flow_status, ARRAY['user_states'], JSONB_SET(COALESCE(flow_status->'user_states', '{}'::jsonb), ARRAY[$1], $2))
UPDATE v2_job_status f SET flow_status = JSONB_SET(flow_status, ARRAY['user_states'], JSONB_SET(COALESCE(flow_status->'user_states', '{}'::jsonb), ARRAY[$1], $2))
FROM v2_job j
WHERE f.id = $3 AND f.id = j.id AND j.workspace_id = $4 AND kind IN ('flow', 'flowpreview', 'flownode') RETURNING 1
"#,
Expand Down Expand Up @@ -3417,7 +3417,7 @@ pub async fn run_workflow_as_code(

if !wkflow_query.skip_update.unwrap_or(false) {
sqlx::query!(
"UPDATE v2_job_flow_runtime SET flow_status = jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], jsonb_set(jsonb_set('{}'::jsonb, '{scheduled_for}', to_jsonb(now()::text)), '{name}', to_jsonb($3::text))) WHERE id = $2",
"UPDATE v2_job_status SET flow_status = jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], jsonb_set(jsonb_set('{}'::jsonb, '{scheduled_for}', to_jsonb(now()::text)), '{name}', to_jsonb($3::text))) WHERE id = $2",
uuid.to_string(),
job_id,
entrypoint
Expand Down Expand Up @@ -4884,7 +4884,7 @@ async fn add_batch_jobs(

if let Some(flow_status) = flow_status {
sqlx::query!(
"INSERT INTO v2_job_flow_runtime (id, flow_status)
"INSERT INTO v2_job_status (id, flow_status)
SELECT unnest($1::uuid[]), $2",
&uuids,
sqlx::types::Json(flow_status) as sqlx::types::Json<FlowStatus>
Expand Down
14 changes: 10 additions & 4 deletions backend/windmill-common/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,11 @@ pub async fn benchmark_init(benchmark_jobs: i32, db: &DB) {
.execute(&mut *tx)
.await
.unwrap_or_else(|_e| panic!("failed to insert parallelflow jobs (3)"));
sqlx::query!("INSERT INTO v2_job_flow_runtime (id, flow_status) SELECT unnest($1::uuid[]), $2", &uuids,
serde_json::from_str::<serde_json::Value>(r#"
sqlx::query!(
"INSERT INTO v2_job_status (id, flow_status) SELECT unnest($1::uuid[]), $2",
&uuids,
serde_json::from_str::<serde_json::Value>(
r#"
{
"step": 0,
"modules": [
Expand All @@ -209,10 +212,13 @@ pub async fn benchmark_init(benchmark_jobs: i32, db: &DB) {
"preprocessor_module": null
}
"#).unwrap()
"#
)
.unwrap()
)
.execute(&mut *tx)
.await.unwrap_or_else(|_e| panic!("failed to insert parallelflow jobs (4)"));
.await
.unwrap_or_else(|_e| panic!("failed to insert parallelflow jobs (4)"));
}
_ => {
let uuids = sqlx::query_scalar!("INSERT INTO v2_job (id, runnable_id, runnable_path, kind, script_lang, tag, created_by, permissioned_as, permissioned_as_email, workspace_id) (SELECT gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9 FROM generate_series(1, $10)) RETURNING id",
Expand Down
6 changes: 3 additions & 3 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn format_pull_query(peek: String) -> String {
runnable_path AS script_path, args, kind AS job_kind,
CASE WHEN trigger_kind = 'schedule' THEN trigger END AS schedule_path,
permissioned_as, permissioned_as_email AS email, script_lang AS language,
flow_root_job AS root_job, flow_step_id, flow_step_id IS NOT NULL AS is_flow_step,
flow_innermost_root_job AS root_job, flow_step_id, flow_step_id IS NOT NULL AS is_flow_step,
same_worker, pre_run_error, visible_to_owner, tag, concurrent_limit,
concurrency_time_window_s, timeout, cache_ttl, priority, raw_code, raw_lock,
raw_flow
Expand All @@ -139,11 +139,11 @@ fn format_pull_query(peek: String) -> String {
canceled_reason, last_ping, job_kind, schedule_path, permissioned_as,
flow_status, is_flow_step, language, suspend, suspend_until,
same_worker, pre_run_error, email, visible_to_owner, mem_peak,
root_job, leaf_jobs, tag, concurrent_limit, concurrency_time_window_s,
root_job, flow_leaf_jobs as leaf_jobs, tag, concurrent_limit, concurrency_time_window_s,
timeout, flow_step_id, cache_ttl, priority,
raw_code, raw_lock, raw_flow
FROM q, r, j
LEFT JOIN v2_job_flow_runtime f USING (id)",
LEFT JOIN v2_job_status f USING (id)",
peek
)
}
Expand Down
26 changes: 14 additions & 12 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
|| queued_job.job_kind == JobKind::Preview)
{
if let Err(e) = sqlx::query!(
"UPDATE v2_job_completed SET flow_status = f.flow_status FROM v2_job_flow_runtime f WHERE v2_job_completed.id = $1 AND f.id = $1 AND v2_job_completed.workspace_id = $2",
"UPDATE v2_job_completed SET flow_status = f.flow_status FROM v2_job_status f WHERE v2_job_completed.id = $1 AND f.id = $1 AND v2_job_completed.workspace_id = $2",
&queued_job.id,
&queued_job.workspace_id
)
Expand All @@ -604,7 +604,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}
if let Some(parent_job) = queued_job.parent_job {
if let Err(e) = sqlx::query_scalar!(
"UPDATE v2_job_flow_runtime SET
"UPDATE v2_job_status SET
flow_status = jsonb_set(
jsonb_set(
COALESCE(flow_status, '{}'::jsonb),
Expand Down Expand Up @@ -2187,7 +2187,7 @@ pub async fn get_result_by_id(
id As \"id!\",
flow_status->'restarted_from'->'flow_job_id' AS \"restarted_from: Json<Uuid>\"
FROM v2_as_queue
WHERE COALESCE((SELECT flow_root_job FROM v2_job WHERE id = $1), $1) = id AND workspace_id = $2",
WHERE COALESCE((SELECT flow_innermost_root_job FROM v2_job WHERE id = $1), $1) = id AND workspace_id = $2",
flow_id,
&w_id
)
Expand Down Expand Up @@ -2329,7 +2329,7 @@ pub async fn get_result_by_id_from_running_flow_inner(
let flow_job_result = sqlx::query!(
"SELECT leaf_jobs->$1::text AS \"leaf_jobs: Json<Box<RawValue>>\", parent_job
FROM v2_as_queue
WHERE COALESCE((SELECT flow_root_job FROM v2_job WHERE id = $2), $2) = id AND workspace_id = $3",
WHERE COALESCE((SELECT flow_innermost_root_job FROM v2_job WHERE id = $2), $2) = id AND workspace_id = $3",
node_id,
flow_id,
w_id,
Expand All @@ -2350,12 +2350,14 @@ pub async fn get_result_by_id_from_running_flow_inner(

if job_result.is_none() && flow_job_result.parent_job.is_some() {
let parent_job = flow_job_result.parent_job.unwrap();
let root_job =
sqlx::query_scalar!("SELECT flow_root_job FROM v2_job WHERE id = $1", parent_job)
.fetch_optional(db)
.await?
.flatten()
.unwrap_or(parent_job);
let root_job = sqlx::query_scalar!(
"SELECT flow_innermost_root_job FROM v2_job WHERE id = $1",
parent_job
)
.fetch_optional(db)
.await?
.flatten()
.unwrap_or(parent_job);
return get_result_by_id_from_running_flow_inner(db, w_id, &root_job, node_id).await;
}

Expand Down Expand Up @@ -3677,7 +3679,7 @@ pub async fn push<'c, 'd>(
"INSERT INTO v2_job (id, workspace_id, raw_code, raw_lock, raw_flow, tag, parent_job,
created_by, permissioned_as, runnable_id, runnable_path, args, kind, trigger,
script_lang, same_worker, pre_run_error, permissioned_as_email, visible_to_owner,
flow_root_job, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id,
flow_innermost_root_job, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id,
cache_ttl, priority, trigger_kind)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18,
$19, $20, $21, $22, $23, $24, $25, $26,
Expand Down Expand Up @@ -3740,7 +3742,7 @@ pub async fn push<'c, 'd>(
.await?;
if let Some(flow_status) = flow_status {
sqlx::query!(
"INSERT INTO v2_job_flow_runtime (id, flow_status) VALUES ($1, $2)",
"INSERT INTO v2_job_status (id, flow_status) VALUES ($1, $2)",
job_id,
Json(flow_status) as Json<FlowStatus>,
)
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ pub async fn run_worker(
tokio::task::spawn(
(async move {
tracing::info!(worker = %worker_name, hostname = %hostname, "vacuuming queue");
if let Err(e) = sqlx::query!("VACUUM (skip_locked) v2_job_queue, v2_job_runtime, v2_job_flow_runtime")
if let Err(e) = sqlx::query!("VACUUM (skip_locked) v2_job_queue, v2_job_runtime, v2_job_status")
.execute(&db2)
.await
{
Expand Down Expand Up @@ -2005,7 +2005,7 @@ async fn handle_queued_job(
.await?;
} else if let Some(parent_job) = job.parent_job {
if let Err(e) = sqlx::query_scalar!(
"UPDATE v2_job_flow_runtime SET
"UPDATE v2_job_status SET
flow_status = jsonb_set(
jsonb_set(
COALESCE(flow_status, '{}'::jsonb),
Expand Down
Loading

0 comments on commit 442e4b3

Please sign in to comment.