Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add event trigger to handle job cleanup on table drop in vectorize schema #178

Merged
27 changes: 27 additions & 0 deletions extension/sql/meta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,33 @@ GRANT SELECT ON ALL SEQUENCES IN SCHEMA vectorize TO pg_monitor;
ALTER DEFAULT PRIVILEGES IN SCHEMA vectorize GRANT SELECT ON TABLES TO pg_monitor;
ALTER DEFAULT PRIVILEGES IN SCHEMA vectorize GRANT SELECT ON SEQUENCES TO pg_monitor;

CREATE OR REPLACE FUNCTION handle_table_drop()
RETURNS event_trigger AS $$
DECLARE
obj RECORD;
schema_name TEXT;
table_name TEXT;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF obj.object_type = 'table' THEN
schema_name := split_part(obj.object_identity, '.', 1);
table_name := split_part(obj.object_identity, '.', 2);

-- Perform cleanup: delete the associated job from the vectorize.job table
DELETE FROM vectorize.job
WHERE params ->> 'table' = table_name
AND params ->> 'schema' = schema_name;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

DROP EVENT TRIGGER IF EXISTS vectorize_job_drop_trigger;

CREATE EVENT TRIGGER vectorize_job_drop_trigger
ON sql_drop
WHEN TAG IN ('DROP TABLE')
EXECUTE FUNCTION handle_table_drop();

INSERT INTO vectorize.prompts (prompt_type, sys_prompt, user_prompt)
VALUES (
Expand Down
63 changes: 63 additions & 0 deletions extension/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,3 +860,66 @@ async fn test_cohere() {
.unwrap();
assert_eq!(search_results.len(), 3);
}

#[ignore]
#[tokio::test]
async fn test_event_trigger_on_table_drop() {
let conn = common::init_database().await;
let mut rng = rand::thread_rng();
let test_num = rng.gen_range(1..100000);
let test_table_name = format!("products_test_{}", test_num);
let job_name = format!("job_{}", test_num);

// Initialize the test table and job
common::init_test_table(&test_table_name, &conn).await;
common::init_embedding_svc_url(&conn).await;

let _ = sqlx::query(&format!(
"SELECT vectorize.table(
job_name => '{job_name}',
\"table\" => '{test_table_name}',
primary_key => 'product_id',
columns => ARRAY['product_name'],
transformer => 'sentence-transformers/all-MiniLM-L6-v2'
);"
))
.execute(&conn)
.await
.expect("failed to initialize vectorize job");

// Check the job table before dropping the test table
let job_count_before = common::row_count("vectorize.job", &conn).await;
assert_eq!(job_count_before, 1);

// Drop the test table
let drop_result = sqlx::query(&format!("DROP TABLE {test_table_name} CASCADE;"))
.execute(&conn)
.await;
assert!(drop_result.is_ok(), "Failed to drop the test table");

// Debug: Check job table after dropping the test table
let job_count_after = common::row_count("vectorize.job", &conn).await;
assert_eq!(job_count_after, 0, "Job entry was not removed after table drop");

// Check if the job was deleted
let deleted_job = sqlx::query("SELECT * FROM vectorize.job WHERE params->>'table' = $1 AND params->>'schema' = $2")
.bind(test_table_name)
.bind("public")
.fetch_optional(&conn)
.await
.expect("Failed to fetch job");

assert!(deleted_job.is_none(), "Job was not deleted after table drop");

// Attempt to drop a non-associated table and verify no action is taken
let unrelated_table_name = format!("unrelated_test_{}", test_num);
common::init_test_table(&unrelated_table_name, &conn).await;
let _ = sqlx::query(&format!("DROP TABLE {unrelated_table_name};"))
.execute(&conn)
.await
.expect("Failed to drop the unrelated test table");

// Ensure vectorize.job is unaffected
let final_job_count = common::row_count("vectorize.job", &conn).await;
assert_eq!(final_job_count, 0, "vectorize.job should remain unaffected by unrelated table drops");
}
Loading