From 9f6ba8e0702361d5b0313ff8c5ffd564801bb95f Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Mon, 6 Nov 2023 12:42:03 +0100 Subject: [PATCH] sink-postgres: add conditions to invalidate query **Summary** This commit adds an option to conditionally invalidate rows. This is useful when multiple indexers write to the same table. --- flake.lock | 24 +++---- sinks/sink-postgres/src/configuration.rs | 17 +++++ sinks/sink-postgres/src/lib.rs | 2 +- sinks/sink-postgres/src/sink.rs | 26 ++++++- sinks/sink-postgres/tests/test_sink.rs | 87 +++++++++++++++++++++++- 5 files changed, 138 insertions(+), 18 deletions(-) diff --git a/flake.lock b/flake.lock index c1643390..a3721864 100644 --- a/flake.lock +++ b/flake.lock @@ -7,11 +7,11 @@ ] }, "locked": { - "lastModified": 1699030822, - "narHash": "sha256-a25bCHvTPJfAvK3qLoi5uI2pvwnOYhMQLRpJYNEt55o=", + "lastModified": 1699218802, + "narHash": "sha256-5l0W4Q7z7A4BCstaF5JuBqXOVrZ3Vqst5+hUnP7EdUc=", "owner": "ipetkov", "repo": "crane", - "rev": "2c89c36bffac32d8267e719f73b0d06e313ede30", + "rev": "2d6c2aaff5a05e443eb15efddc21f9c73720340c", "type": "github" }, "original": { @@ -113,11 +113,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1698931758, - "narHash": "sha256-pwl9xS9JFMXXR1lUP/QOqO9hiZKukEcVUU1A0DKQwi4=", + "lastModified": 1699186365, + "narHash": "sha256-Pxrw5U8mBsL3NlrJ6q1KK1crzvSUcdfwb9083sKDrcU=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "b644d97bda6dae837d577e28383c10aa51e5e2d2", + "rev": "a0b3b06b7a82c965ae0bb1d59f6e386fe755001d", "type": "github" }, "original": { @@ -168,11 +168,11 @@ "nixpkgs-stable": "nixpkgs-stable" }, "locked": { - "lastModified": 1698852633, - "narHash": "sha256-Hsc/cCHud8ZXLvmm8pxrXpuaPEeNaaUttaCvtdX/Wug=", + "lastModified": 1699271226, + "narHash": "sha256-8Jt1KW3xTjolD6c6OjJm9USx/jmL+VVmbooADCkdDfU=", "owner": "cachix", "repo": "pre-commit-hooks.nix", - "rev": "dec10399e5b56aa95fcd530e0338be72ad6462a0", + "rev": "ea758da1a6dcde6dc36db348ed690d09b9864128", "type": "github" }, "original": { @@ -198,11 +198,11 @@ ] }, "locked": { - "lastModified": 1698977568, - "narHash": "sha256-bnbCqPDFdOUcSANJv9Br3q/b1LyK9vyB1I7os5T4jXI=", + "lastModified": 1699236891, + "narHash": "sha256-J0uhoYlufJncIFbM/pAoggzHK/qERB9KfQRkmYD56yo=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "321affd863e3e4e669990a1db5fdabef98387b95", + "rev": "a7f9bf91dc5065d470cd57169a9f2ebdbdfe1f24", "type": "github" }, "original": { diff --git a/sinks/sink-postgres/src/configuration.rs b/sinks/sink-postgres/src/configuration.rs index fad90753..79182374 100644 --- a/sinks/sink-postgres/src/configuration.rs +++ b/sinks/sink-postgres/src/configuration.rs @@ -3,6 +3,7 @@ use std::{path::PathBuf, str::FromStr}; use apibara_sink_common::SinkOptions; use clap::Args; use error_stack::{Result, ResultExt}; +use serde::Deserialize; use tokio_postgres::Config; use crate::sink::SinkPostgresError; @@ -24,6 +25,7 @@ pub struct SinkPostgresConfiguration { pub pg: Config, pub table_name: String, pub tls: TlsConfiguration, + pub invalidate: Vec, } #[derive(Debug, Args, Default, SinkOptions)] @@ -56,6 +58,17 @@ pub struct SinkPostgresOptions { /// Use Server Name Indication (SNI). #[arg(long, env = "POSTGRES_TLS_USE_SNI")] pub tls_use_sni: Option, + /// Additional conditions for the invalidate query. + #[clap(skip)] + pub invalidate: Option>, +} + +#[derive(Debug, Default, Deserialize)] +pub struct InvalidateColumn { + /// Column name. + pub column: String, + /// Column value. + pub value: String, } impl SinkOptions for SinkPostgresOptions { @@ -75,6 +88,7 @@ impl SinkOptions for SinkPostgresOptions { .tls_accept_invalid_hostnames .or(other.tls_accept_invalid_hostnames), tls_use_sni: self.tls_use_sni.or(other.tls_use_sni), + invalidate: self.invalidate.or(other.invalidate), } } } @@ -105,10 +119,13 @@ impl SinkPostgresOptions { } }; + let invalidate = self.invalidate.unwrap_or_default(); + Ok(SinkPostgresConfiguration { pg, table_name, tls, + invalidate, }) } } diff --git a/sinks/sink-postgres/src/lib.rs b/sinks/sink-postgres/src/lib.rs index 79d15e19..019bf7b4 100644 --- a/sinks/sink-postgres/src/lib.rs +++ b/sinks/sink-postgres/src/lib.rs @@ -1,5 +1,5 @@ mod configuration; mod sink; -pub use self::configuration::{SinkPostgresConfiguration, SinkPostgresOptions}; +pub use self::configuration::{InvalidateColumn, SinkPostgresConfiguration, SinkPostgresOptions}; pub use self::sink::{PostgresSink, SinkPostgresError}; diff --git a/sinks/sink-postgres/src/sink.rs b/sinks/sink-postgres/src/sink.rs index a6baec14..38e6e83d 100644 --- a/sinks/sink-postgres/src/sink.rs +++ b/sinks/sink-postgres/src/sink.rs @@ -11,7 +11,7 @@ use tokio_postgres::types::Json; use tokio_postgres::{Client, NoTls, Statement}; use tracing::{info, warn}; -use crate::configuration::TlsConfiguration; +use crate::configuration::{InvalidateColumn, TlsConfiguration}; use crate::SinkPostgresOptions; #[derive(Debug)] @@ -116,8 +116,28 @@ impl Sink for PostgresSink { "INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json)", &table_name, &table_name ); - let delete_query = format!("DELETE FROM {} WHERE _cursor > $1", &table_name); - let delete_all_query = format!("DELETE FROM {}", &table_name); + + let additional_conditions: String = if config.invalidate.is_empty() { + "".into() + } else { + // TODO: this is quite fragile. It should properly format the column name + // and value. + config.invalidate.iter().fold( + String::default(), + |acc, InvalidateColumn { column, value }| { + format!("{acc} AND \"{column}\" = '{value}'") + }, + ) + }; + + let delete_query = format!( + "DELETE FROM {} WHERE _cursor > $1 {}", + table_name, additional_conditions + ); + let delete_all_query = format!( + "DELETE FROM {} WHERE true {}", + table_name, additional_conditions + ); let insert_statement = client .prepare(&query) diff --git a/sinks/sink-postgres/tests/test_sink.rs b/sinks/sink-postgres/tests/test_sink.rs index cd3d2555..47f12b4b 100644 --- a/sinks/sink-postgres/tests/test_sink.rs +++ b/sinks/sink-postgres/tests/test_sink.rs @@ -1,6 +1,8 @@ use apibara_core::node::v1alpha2::{Cursor, DataFinality}; use apibara_sink_common::{CursorAction, Sink}; -use apibara_sink_postgres::{PostgresSink, SinkPostgresError, SinkPostgresOptions}; +use apibara_sink_postgres::{ + InvalidateColumn, PostgresSink, SinkPostgresError, SinkPostgresOptions, +}; use error_stack::Result; use serde_json::{json, Value}; use testcontainers::{clients, core::WaitFor, GenericImage}; @@ -24,6 +26,15 @@ fn new_cursor(order_key: u64) -> Cursor { } fn new_batch(start_cursor: &Option, end_cursor: &Cursor) -> Value { + new_batch_with_additional_columns(start_cursor, end_cursor, None, None) +} + +fn new_batch_with_additional_columns( + start_cursor: &Option, + end_cursor: &Cursor, + col1: Option, + col2: Option, +) -> Value { let mut batch = Vec::new(); let start_block_num = match start_cursor { @@ -37,6 +48,8 @@ fn new_batch(start_cursor: &Option, end_cursor: &Cursor) -> Value { batch.push(json!({ "block_num": i, "block_str": format!("block_{}", i), + "col1": col1, + "col2": col2, })); } json!(batch) @@ -98,7 +111,7 @@ async fn get_num_rows(client: &Client) -> i64 { async fn create_test_table(port: u16) { let create_table_query = - "CREATE TABLE test ( block_num int primary key, block_str varchar(10), _cursor bigint);"; + "CREATE TABLE test(block_num int, block_str varchar(10), col1 text, col2 text, _cursor bigint);"; let connection_string = format!("postgresql://postgres@localhost:{}", port); let (client, connection) = tokio_postgres::connect(&connection_string, NoTls) @@ -115,10 +128,18 @@ async fn create_test_table(port: u16) { } async fn new_sink(port: u16) -> PostgresSink { + new_sink_with_invalidate(port, None).await +} + +async fn new_sink_with_invalidate( + port: u16, + invalidate: Option>, +) -> PostgresSink { let options = SinkPostgresOptions { connection_string: Some(format!("postgresql://postgres@localhost:{}", port)), table_name: Some("test".into()), no_tls: Some(true), + invalidate, ..Default::default() }; PostgresSink::from_options(options).await.unwrap() @@ -276,3 +297,65 @@ async fn test_handle_invalidate() -> Result<(), SinkPostgresError> { Ok(()) } + +#[tokio::test] +#[ignore] +async fn test_handle_invalidate_with_additional_condition() -> Result<(), SinkPostgresError> { + let docker = clients::Cli::default(); + let postgres = docker.run(new_postgres_image()); + let port = postgres.get_host_port_ipv4(5432); + + create_test_table(port).await; + + let invalidate = vec![ + InvalidateColumn { + column: "col1".into(), + value: "a".into(), + }, + InvalidateColumn { + column: "col2".into(), + value: "a".into(), + }, + ]; + let mut sink = new_sink_with_invalidate(port, Some(invalidate)).await; + + let batch_size = 2; + let num_batches = 5; + + for order_key in 0..num_batches { + let cursor = Some(new_cursor(order_key * batch_size)); + let end_cursor = new_cursor((order_key + 1) * batch_size); + let finality = DataFinality::DataStatusFinalized; + + let batch = new_batch_with_additional_columns( + &cursor, + &end_cursor, + Some("a".into()), + Some("b".into()), + ); + let action = sink + .handle_data(&cursor, &end_cursor, &finality, &batch) + .await?; + assert_eq!(action, CursorAction::Persist); + + let batch = new_batch_with_additional_columns( + &cursor, + &end_cursor, + Some("a".into()), + Some("a".into()), + ); + let action = sink + .handle_data(&cursor, &end_cursor, &finality, &batch) + .await?; + assert_eq!(action, CursorAction::Persist); + } + + sink.handle_invalidate(&Some(new_cursor(2))).await?; + + let rows = get_all_rows(&sink.client).await; + // 10 rows with col1 = "a" and col2 = "b" + // 2 rows with col1 = "a" and col2 = "a" + assert_eq!(rows.len(), 12); + + Ok(()) +}