Skip to content

Commit

Permalink
sink-postgres: add conditions to invalidate query
Browse files Browse the repository at this point in the history
**Summary**
This commit adds an option to conditionally invalidate rows.
This is useful when multiple indexers write to the same table.
  • Loading branch information
fracek committed Nov 6, 2023
1 parent c6b46c6 commit 9f6ba8e
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 18 deletions.
24 changes: 12 additions & 12 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions sinks/sink-postgres/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{path::PathBuf, str::FromStr};
use apibara_sink_common::SinkOptions;
use clap::Args;
use error_stack::{Result, ResultExt};
use serde::Deserialize;
use tokio_postgres::Config;

use crate::sink::SinkPostgresError;
Expand All @@ -24,6 +25,7 @@ pub struct SinkPostgresConfiguration {
pub pg: Config,
pub table_name: String,
pub tls: TlsConfiguration,
pub invalidate: Vec<InvalidateColumn>,
}

#[derive(Debug, Args, Default, SinkOptions)]
Expand Down Expand Up @@ -56,6 +58,17 @@ pub struct SinkPostgresOptions {
/// Use Server Name Indication (SNI).
#[arg(long, env = "POSTGRES_TLS_USE_SNI")]
pub tls_use_sni: Option<bool>,
/// Additional conditions for the invalidate query.
#[clap(skip)]
pub invalidate: Option<Vec<InvalidateColumn>>,
}

#[derive(Debug, Default, Deserialize)]
pub struct InvalidateColumn {
/// Column name.
pub column: String,
/// Column value.
pub value: String,
}

impl SinkOptions for SinkPostgresOptions {
Expand All @@ -75,6 +88,7 @@ impl SinkOptions for SinkPostgresOptions {
.tls_accept_invalid_hostnames
.or(other.tls_accept_invalid_hostnames),
tls_use_sni: self.tls_use_sni.or(other.tls_use_sni),
invalidate: self.invalidate.or(other.invalidate),
}
}
}
Expand Down Expand Up @@ -105,10 +119,13 @@ impl SinkPostgresOptions {
}
};

let invalidate = self.invalidate.unwrap_or_default();

Ok(SinkPostgresConfiguration {
pg,
table_name,
tls,
invalidate,
})
}
}
2 changes: 1 addition & 1 deletion sinks/sink-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod configuration;
mod sink;

pub use self::configuration::{SinkPostgresConfiguration, SinkPostgresOptions};
pub use self::configuration::{InvalidateColumn, SinkPostgresConfiguration, SinkPostgresOptions};
pub use self::sink::{PostgresSink, SinkPostgresError};
26 changes: 23 additions & 3 deletions sinks/sink-postgres/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio_postgres::types::Json;
use tokio_postgres::{Client, NoTls, Statement};
use tracing::{info, warn};

use crate::configuration::TlsConfiguration;
use crate::configuration::{InvalidateColumn, TlsConfiguration};
use crate::SinkPostgresOptions;

#[derive(Debug)]
Expand Down Expand Up @@ -116,8 +116,28 @@ impl Sink for PostgresSink {
"INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json)",
&table_name, &table_name
);
let delete_query = format!("DELETE FROM {} WHERE _cursor > $1", &table_name);
let delete_all_query = format!("DELETE FROM {}", &table_name);

let additional_conditions: String = if config.invalidate.is_empty() {
"".into()
} else {
// TODO: this is quite fragile. It should properly format the column name
// and value.
config.invalidate.iter().fold(
String::default(),
|acc, InvalidateColumn { column, value }| {
format!("{acc} AND \"{column}\" = '{value}'")
},
)
};

let delete_query = format!(
"DELETE FROM {} WHERE _cursor > $1 {}",
table_name, additional_conditions
);
let delete_all_query = format!(
"DELETE FROM {} WHERE true {}",
table_name, additional_conditions
);

let insert_statement = client
.prepare(&query)
Expand Down
87 changes: 85 additions & 2 deletions sinks/sink-postgres/tests/test_sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use apibara_core::node::v1alpha2::{Cursor, DataFinality};
use apibara_sink_common::{CursorAction, Sink};
use apibara_sink_postgres::{PostgresSink, SinkPostgresError, SinkPostgresOptions};
use apibara_sink_postgres::{
InvalidateColumn, PostgresSink, SinkPostgresError, SinkPostgresOptions,
};
use error_stack::Result;
use serde_json::{json, Value};
use testcontainers::{clients, core::WaitFor, GenericImage};
Expand All @@ -24,6 +26,15 @@ fn new_cursor(order_key: u64) -> Cursor {
}

fn new_batch(start_cursor: &Option<Cursor>, end_cursor: &Cursor) -> Value {
new_batch_with_additional_columns(start_cursor, end_cursor, None, None)
}

fn new_batch_with_additional_columns(
start_cursor: &Option<Cursor>,
end_cursor: &Cursor,
col1: Option<String>,
col2: Option<String>,
) -> Value {
let mut batch = Vec::new();

let start_block_num = match start_cursor {
Expand All @@ -37,6 +48,8 @@ fn new_batch(start_cursor: &Option<Cursor>, end_cursor: &Cursor) -> Value {
batch.push(json!({
"block_num": i,
"block_str": format!("block_{}", i),
"col1": col1,
"col2": col2,
}));
}
json!(batch)
Expand Down Expand Up @@ -98,7 +111,7 @@ async fn get_num_rows(client: &Client) -> i64 {

async fn create_test_table(port: u16) {
let create_table_query =
"CREATE TABLE test ( block_num int primary key, block_str varchar(10), _cursor bigint);";
"CREATE TABLE test(block_num int, block_str varchar(10), col1 text, col2 text, _cursor bigint);";

let connection_string = format!("postgresql://postgres@localhost:{}", port);
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls)
Expand All @@ -115,10 +128,18 @@ async fn create_test_table(port: u16) {
}

async fn new_sink(port: u16) -> PostgresSink {
new_sink_with_invalidate(port, None).await
}

async fn new_sink_with_invalidate(
port: u16,
invalidate: Option<Vec<InvalidateColumn>>,
) -> PostgresSink {
let options = SinkPostgresOptions {
connection_string: Some(format!("postgresql://postgres@localhost:{}", port)),
table_name: Some("test".into()),
no_tls: Some(true),
invalidate,
..Default::default()
};
PostgresSink::from_options(options).await.unwrap()
Expand Down Expand Up @@ -276,3 +297,65 @@ async fn test_handle_invalidate() -> Result<(), SinkPostgresError> {

Ok(())
}

#[tokio::test]
#[ignore]
async fn test_handle_invalidate_with_additional_condition() -> Result<(), SinkPostgresError> {
let docker = clients::Cli::default();
let postgres = docker.run(new_postgres_image());
let port = postgres.get_host_port_ipv4(5432);

create_test_table(port).await;

let invalidate = vec![
InvalidateColumn {
column: "col1".into(),
value: "a".into(),
},
InvalidateColumn {
column: "col2".into(),
value: "a".into(),
},
];
let mut sink = new_sink_with_invalidate(port, Some(invalidate)).await;

let batch_size = 2;
let num_batches = 5;

for order_key in 0..num_batches {
let cursor = Some(new_cursor(order_key * batch_size));
let end_cursor = new_cursor((order_key + 1) * batch_size);
let finality = DataFinality::DataStatusFinalized;

let batch = new_batch_with_additional_columns(
&cursor,
&end_cursor,
Some("a".into()),
Some("b".into()),
);
let action = sink
.handle_data(&cursor, &end_cursor, &finality, &batch)
.await?;
assert_eq!(action, CursorAction::Persist);

let batch = new_batch_with_additional_columns(
&cursor,
&end_cursor,
Some("a".into()),
Some("a".into()),
);
let action = sink
.handle_data(&cursor, &end_cursor, &finality, &batch)
.await?;
assert_eq!(action, CursorAction::Persist);
}

sink.handle_invalidate(&Some(new_cursor(2))).await?;

let rows = get_all_rows(&sink.client).await;
// 10 rows with col1 = "a" and col2 = "b"
// 2 rows with col1 = "a" and col2 = "a"
assert_eq!(rows.len(), 12);

Ok(())
}

0 comments on commit 9f6ba8e

Please sign in to comment.