From ab0ad5d056b8f31cbe58583887d02a46657e2749 Mon Sep 17 00:00:00 2001 From: dieriba Date: Tue, 28 Jan 2025 18:52:19 +0100 Subject: [PATCH 01/11] feat: :construction: add postgres trigger config in back --- backend/windmill-api/openapi.yaml | 2 +- backend/windmill-api/src/capture.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 28abe897b448f..4ce412b2b7805 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -14582,7 +14582,7 @@ components: CaptureTriggerKind: type: string - enum: [webhook, http, websocket, kafka, email, nats] + enum: [webhook, http, websocket, kafka, email, nats, postgres] Capture: type: object diff --git a/backend/windmill-api/src/capture.rs b/backend/windmill-api/src/capture.rs index 22824791a116f..ff808acf0edaa 100644 --- a/backend/windmill-api/src/capture.rs +++ b/backend/windmill-api/src/capture.rs @@ -87,6 +87,7 @@ pub enum TriggerKind { Kafka, Email, Nats, + Postgres } impl fmt::Display for TriggerKind { @@ -98,6 +99,7 @@ impl fmt::Display for TriggerKind { TriggerKind::Kafka => "kafka", TriggerKind::Email => "email", TriggerKind::Nats => "nats", + TriggerKind::Postgres => "postgres" }; write!(f, "{}", s) } @@ -132,6 +134,15 @@ pub struct NatsTriggerConfig { pub use_jetstream: bool, } +#[cfg(feature = "postgres_trigger")] +#[derive(Serialize, Deserialize, Debug)] +pub struct PostgresTriggerConfig { + postgres_resource_path: String, + publication_name: Option, + replication_slot_name: Option +} + +#[cfg(feature = "websocket")] #[derive(Serialize, Deserialize, Debug)] pub struct WebsocketTriggerConfig { pub url: String, @@ -144,6 +155,9 @@ pub struct WebsocketTriggerConfig { enum TriggerConfig { #[cfg(feature = "http_trigger")] Http(HttpTriggerConfig), + #[cfg(feature = "postgres_trigger")] + Postgres(PostgresTriggerConfig), + #[cfg(feature = "websocket")] Websocket(WebsocketTriggerConfig), #[cfg(all(feature = "enterprise", feature = "kafka"))] Kafka(KafkaTriggerConfig), From c53a31633961db4984956e8a768fce2ba50aa589 Mon Sep 17 00:00:00 2001 From: dieriba Date: Tue, 28 Jan 2025 19:14:15 +0100 Subject: [PATCH 02/11] feat: add capture section for postgres --- backend/windmill-api/openapi.yaml | 28 +++++++++++ .../triggers/TestTriggerConnection.svelte | 9 +++- .../PostgresEditorConfigSection.svelte | 48 +++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 4ce412b2b7805..0372423bf8555 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -8745,6 +8745,34 @@ paths: schema: type: string + /w/{workspace}/postgres_triggers/test: + post: + summary: test postgres connection + operationId: testPostgresConnection + tags: + - postgres_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + requestBody: + description: test postgres connection + required: true + content: + application/json: + schema: + type: object + properties: + database: + type: string + required: + - database + responses: + "200": + description: successfuly connected to postgres + content: + text/plain: + schema: + type: string + /groups/list: get: summary: list instance groups diff --git a/frontend/src/lib/components/triggers/TestTriggerConnection.svelte b/frontend/src/lib/components/triggers/TestTriggerConnection.svelte index 8c56f3627be0b..b929b843b6b0c 100644 --- a/frontend/src/lib/components/triggers/TestTriggerConnection.svelte +++ b/frontend/src/lib/components/triggers/TestTriggerConnection.svelte @@ -3,13 +3,14 @@ CancelablePromise, KafkaTriggerService, NatsTriggerService, + PostgresTriggerService, WebsocketTriggerService } from '$lib/gen' import { workspaceStore } from '$lib/stores' import { sendUserToast } from '$lib/toast' import Button from '../common/button/Button.svelte' - export let kind: 'websocket' | 'nats' | 'kafka' + export let kind: 'websocket' | 'nats' | 'kafka' | 'postgres' export let args: Record const kindToName: { [key: string]: string } = { @@ -44,6 +45,12 @@ requestBody: args as any }) } + else if (kind === 'postgres') { + promise = PostgresTriggerService.testPostgresConnection({ + workspace: $workspaceStore!, + requestBody: args as any + }) + } await promise sendUserToast(`Successfully connected to ${kindToName[kind]}`) } catch (err) { diff --git a/frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte b/frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte new file mode 100644 index 0000000000000..545ff6e0d3013 --- /dev/null +++ b/frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte @@ -0,0 +1,48 @@ + + +
+ {#if showCapture && captureInfo} + + {/if} +
+
+ { + url = ev.detail === 'runnable' ? '$script:' : '' + url_runnable_args = {} + }} + > + + + +
+ +
+
From d5ba797fde67325a6897daf91b96286f6e7e5d8e Mon Sep 17 00:00:00 2001 From: dieriba Date: Wed, 29 Jan 2025 02:48:47 +0100 Subject: [PATCH 03/11] feat: update postgres trigger panel, update capture migration --- .../20250102145420_more_captures.up.sql | 2 +- backend/windmill-api/src/capture.rs | 7 +- .../src/postgres_triggers/handler.rs | 34 +++++ .../windmill-api/src/postgres_triggers/mod.rs | 4 +- frontend/src/lib/components/triggers.ts | 2 + .../components/triggers/CaptureButton.svelte | 11 +- .../components/triggers/CaptureWrapper.svelte | 32 ++++- .../triggers/TestTriggerConnection.svelte | 7 +- .../components/triggers/TriggersEditor.svelte | 13 +- .../triggers/TriggersEditorSection.svelte | 3 +- .../triggers/TriggersWrapper.svelte | 8 ++ .../PostgresEditorConfigSection.svelte | 74 +++++++--- .../postgres/PostgresTriggerEditor.svelte | 4 +- .../PostgresTriggerEditorInner.svelte | 21 +-- .../postgres/PostgresTriggersPanel.svelte | 128 ++++++++++-------- .../triggers/postgres/RelationPicker.svelte | 78 +++++++++-- 16 files changed, 316 insertions(+), 112 deletions(-) diff --git a/backend/migrations/20250102145420_more_captures.up.sql b/backend/migrations/20250102145420_more_captures.up.sql index 21a15cec57ee9..2106ccd2d55bd 100644 --- a/backend/migrations/20250102145420_more_captures.up.sql +++ b/backend/migrations/20250102145420_more_captures.up.sql @@ -1,5 +1,5 @@ -- Add up migration script here -CREATE TYPE TRIGGER_KIND AS ENUM ('webhook', 'http', 'websocket', 'kafka', 'email'); +CREATE TYPE TRIGGER_KIND AS ENUM ('webhook', 'http', 'websocket', 'kafka', 'email', 'postgres'); ALTER TABLE capture ADD COLUMN is_flow BOOLEAN NOT NULL DEFAULT TRUE, ADD COLUMN trigger_kind TRIGGER_KIND NOT NULL DEFAULT 'webhook', ADD COLUMN trigger_extra JSONB; ALTER TABLE capture ALTER COLUMN is_flow DROP DEFAULT, ALTER COLUMN trigger_kind DROP DEFAULT; ALTER TABLE capture DROP CONSTRAINT capture_pkey; diff --git a/backend/windmill-api/src/capture.rs b/backend/windmill-api/src/capture.rs index ff808acf0edaa..6b2a8a74847a0 100644 --- a/backend/windmill-api/src/capture.rs +++ b/backend/windmill-api/src/capture.rs @@ -77,7 +77,7 @@ pub fn workspaced_unauthed_service() -> Router { } } -#[derive(sqlx::Type, Serialize, Deserialize)] +#[derive(sqlx::Type, Serialize, Deserialize, Debug)] #[sqlx(type_name = "TRIGGER_KIND", rename_all = "lowercase")] #[serde(rename_all = "lowercase")] pub enum TriggerKind { @@ -150,7 +150,7 @@ pub struct WebsocketTriggerConfig { pub url_runnable_args: Option, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] enum TriggerConfig { #[cfg(feature = "http_trigger")] @@ -165,7 +165,7 @@ enum TriggerConfig { Nats(NatsTriggerConfig), } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] struct NewCaptureConfig { trigger_kind: TriggerKind, path: String, @@ -212,7 +212,6 @@ async fn set_config( Json(nc): Json, ) -> Result<()> { let mut tx = user_db.begin(&authed).await?; - sqlx::query!( "INSERT INTO capture_config (workspace_id, path, is_flow, trigger_kind, trigger_config, owner, email) diff --git a/backend/windmill-api/src/postgres_triggers/handler.rs b/backend/windmill-api/src/postgres_triggers/handler.rs index 5bcfb69acc18e..e4656f6f885ae 100644 --- a/backend/windmill-api/src/postgres_triggers/handler.rs +++ b/backend/windmill-api/src/postgres_triggers/handler.rs @@ -112,6 +112,40 @@ pub struct NewPostgresTrigger { publication: Option, } +#[derive(Serialize, Deserialize)] +pub struct TestPostgres { + pub postgres_resource_path: String, +} + +pub async fn test_postgres_connection( + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path(workspace_id): Path, + Json(test_postgres): Json, +) -> error::Result<()> { + let connect_f = async { + get_database_connection( + authed, + Some(user_db), + &db, + &test_postgres.postgres_resource_path, + &workspace_id, + ) + .await + .map_err(|err| { + error::Error::BadConfig(format!("Error connecting to postgres: {}", err.to_string())) + }) + }; + tokio::time::timeout(tokio::time::Duration::from_secs(30), connect_f) + .await + .map_err(|_| { + error::Error::BadConfig(format!("Timeout connecting to websocket after 30 seconds")) + })??; + + Ok(()) +} + pub async fn get_database_connection( authed: ApiAuthed, user_db: Option, diff --git a/backend/windmill-api/src/postgres_triggers/mod.rs b/backend/windmill-api/src/postgres_triggers/mod.rs index 0bd87ed262771..be4a9ae818261 100644 --- a/backend/windmill-api/src/postgres_triggers/mod.rs +++ b/backend/windmill-api/src/postgres_triggers/mod.rs @@ -16,7 +16,8 @@ use handler::{ create_template_script, delete_postgres_trigger, delete_publication, drop_slot_name, exists_postgres_trigger, get_postgres_trigger, get_publication_info, get_template_script, is_database_in_logical_level, list_database_publication, list_postgres_triggers, - list_slot_name, set_enabled, update_postgres_trigger, Database, PostgresTrigger, + list_slot_name, set_enabled, test_postgres_connection, update_postgres_trigger, Database, + PostgresTrigger, }; use windmill_common::{db::UserDB, error::Error, utils::StripPath}; use windmill_queue::PushArgsOwned; @@ -86,6 +87,7 @@ fn slot_service() -> Router { pub fn workspaced_service() -> Router { Router::new() + .route("/test", post(test_postgres_connection)) .route("/create", post(create_postgres_trigger)) .route("/list", get(list_postgres_triggers)) .route("/get/*path", get(get_postgres_trigger)) diff --git a/frontend/src/lib/components/triggers.ts b/frontend/src/lib/components/triggers.ts index 03530d0e98a3c..ac4a905da59cc 100644 --- a/frontend/src/lib/components/triggers.ts +++ b/frontend/src/lib/components/triggers.ts @@ -65,6 +65,8 @@ export function captureTriggerKindToTriggerKind(kind: CaptureTriggerKind): Trigg return 'kafka' case 'nats': return 'nats' + case 'postgres': + return 'postgres' default: throw new Error(`Unknown CaptureTriggerKind: ${kind}`) } diff --git a/frontend/src/lib/components/triggers/CaptureButton.svelte b/frontend/src/lib/components/triggers/CaptureButton.svelte index 045f72ea378ac..b90070d2c13b0 100644 --- a/frontend/src/lib/components/triggers/CaptureButton.svelte +++ b/frontend/src/lib/components/triggers/CaptureButton.svelte @@ -1,7 +1,7 @@
@@ -30,19 +41,46 @@ bind:captureTable /> {/if} -
-
- { - url = ev.detail === 'runnable' ? '$script:' : '' - url_runnable_args = {} - }} - > - - - +
+
+
+

+ Pick a database to connect to +

+ + {#if postgres_resource_path} + + {/if} +
+
+

+ Choose the types of database transactions that should trigger a script or flow. You can + select from Insert, Update, + Delete, or any combination of these operations to define when the trigger + should activate. +

+ +
+
+

+ Select the tables to track. You can choose to track + all tables in your database, + all tables within a specific schema, + specific tables in a schema, or even + specific columns of a table. Additionally, you can apply a + filter to retrieve only rows that do not match the specified criteria. +

+ +
-
diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte index d5f410e594588..9b46ac2b0baa8 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte @@ -9,10 +9,10 @@ drawer?.openEdit(ePath, isFlow) } - export async function openNew(is_flow: boolean, initial_script_path?: string) { + export async function openNew(is_flow: boolean, initial_script_path?: string, defaultValues?: Record) { open = true await tick() - drawer?.openNew(is_flow, initial_script_path) + drawer?.openNew(is_flow, initial_script_path, defaultValues) } let drawer: PostgresTriggerEditorInner diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte index 14310d1e9d5c2..072b5634b5085 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte @@ -121,13 +121,17 @@ } } - export async function openNew(nis_flow: boolean, fixedScriptPath_?: string) { + export async function openNew( + nis_flow: boolean, + fixedScriptPath_?: string, + defaultValues?: Record + ) { drawerLoading = true try { selectedPublicationAction = 'create' selectedSlotAction = 'create' selectedTable = 'specific' - tab = 'basic' + tab = defaultValues ? 'advanced' : 'basic' drawer?.openDrawer() is_flow = nis_flow @@ -137,16 +141,15 @@ script_path = fixedScriptPath path = '' initialPath = '' - replication_slot_name = '' - publication_name = '' - postgres_resource_path = '' + postgres_resource_path = defaultValues?.postgres_resource_path ?? '' edit = false dirtyPath = false config.show = false - publication_name = `windmill_publication_${random_adj()}` - replication_slot_name = `windmill_replication_${random_adj()}` - transaction_to_track = ['Insert', 'Update', 'Delete'] - relations = [ + publication_name = defaultValues?.publication_name || `windmill_publication_${random_adj()}` + replication_slot_name = + defaultValues?.replication_slot_name || `windmill_replication_${random_adj()}` + transaction_to_track = defaultValues?.transaction_to_track || ['Insert', 'Update', 'Delete'] + relations = defaultValues?.relations || [ { schema_name: 'public', table_to_track: [] diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte index 15742bd04c20d..727e57b48daad 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte @@ -1,18 +1,23 @@
@@ -23,7 +76,7 @@
diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte index 072b5634b5085..ffaec5947559d 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte @@ -360,9 +360,14 @@ Pick a database to connect to

- + {#if postgres_resource_path}
{:else} {/if} - +
diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte index 727e57b48daad..ba0403f8490e8 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte @@ -3,7 +3,7 @@ import { PostgresTriggerService, type PostgresTrigger } from '$lib/gen' import { canWrite, sendUserToast } from '$lib/utils' - import { getContext } from 'svelte' + import { getContext, onMount } from 'svelte' import { isCloudHosted } from '$lib/cloud' import { Alert, Skeleton } from '$lib/components/common' import type { TriggerContext } from '$lib/components/triggers' @@ -23,12 +23,25 @@ $: path && loadTriggers() - const { triggersCount } = getContext('TriggerContext') + const { triggersCount, selectedTrigger, defaultValues } = + getContext('TriggerContext') - let databaseTriggers: (PostgresTrigger & { canWrite: boolean })[] | undefined = undefined + + onMount(() => { + if ( + defaultValues && + $selectedTrigger === 'postgres' && + Object.keys($defaultValues ?? {}).length > 0 + ) { + postgresTriggerEditor.openNew(isFlow, path, $defaultValues) + defaultValues.set(undefined) + } + }) + + let postgresTriggers: (PostgresTrigger & { canWrite: boolean })[] | undefined = undefined export async function loadTriggers() { try { - databaseTriggers = ( + postgresTriggers = ( await PostgresTriggerService.listPostgresTriggers({ workspace: $workspaceStore ?? '', path, @@ -37,7 +50,7 @@ ).map((x) => { return { canWrite: canWrite(x.path, x.extra_perms!, $userStore), ...x } }) - $triggersCount = { ...($triggersCount ?? {}), postgres_count: databaseTriggers?.length } + $triggersCount = { ...($triggersCount ?? {}), postgres_count: postgresTriggers?.length } } catch (err) { sendUserToast(`Could not load postgres triggers ${err.body}`, true) } @@ -82,21 +95,21 @@ {#if !newItem}
- {#if databaseTriggers} - {#if databaseTriggers.length == 0} + {#if postgresTriggers} + {#if postgresTriggers.length == 0}
No Postgres triggers
{:else}
- {#each databaseTriggers as databaseTriggers (databaseTriggers.path)} + {#each postgresTriggers as postgresTriggers (postgresTriggers.path)}
-
{databaseTriggers.path}
+
{postgresTriggers.path}