Skip to content

Commit

Permalink
added unique colums option to sink postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
ametel01 authored and fracek committed Mar 14, 2024
1 parent 598e2f7 commit 5a1c572
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
7 changes: 7 additions & 0 deletions sinks/sink-postgres/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct SinkPostgresConfiguration {
pub entity_mode: bool,
pub invalidate: Vec<InvalidateColumn>,
pub batch_seconds: u64,
pub unique_columns: bool,
}

#[derive(Debug, Args, Default, SinkOptions)]
Expand Down Expand Up @@ -67,6 +68,9 @@ pub struct SinkPostgresOptions {
pub invalidate: Option<Vec<InvalidateColumn>>,
#[arg(long, env = "POSTGRES_BATCH_SECONDS")]
pub batch_seconds: Option<u64>,
/// Enable unique columns .
#[clap(skip)]
pub unique_columns: Option<bool>,
}

#[derive(Debug, Default, Deserialize)]
Expand Down Expand Up @@ -97,6 +101,7 @@ impl SinkOptions for SinkPostgresOptions {
entity_mode: self.entity_mode.or(other.entity_mode),
invalidate: self.invalidate.or(other.invalidate),
batch_seconds: self.batch_seconds.or(other.batch_seconds),
unique_columns: self.unique_columns.or(other.unique_columns),
}
}
}
Expand Down Expand Up @@ -125,6 +130,7 @@ impl SinkPostgresOptions {
let entity_mode = self.entity_mode.unwrap_or(false);
let invalidate = self.invalidate.unwrap_or_default();
let batch_seconds = self.batch_seconds.unwrap_or(0);
let unique_columns = self.unique_columns.unwrap_or(false);

Ok(SinkPostgresConfiguration {
pg,
Expand All @@ -133,6 +139,7 @@ impl SinkPostgresOptions {
entity_mode,
invalidate,
batch_seconds,
unique_columns,
})
}
}
16 changes: 12 additions & 4 deletions sinks/sink-postgres/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ struct StandardSink {
impl StandardSink {
async fn new(client: Client, config: &SinkPostgresConfiguration) -> Result<Self, SinkError> {
let table_name = &config.table_name;
let query = format!(
"INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json)",
&table_name, &table_name
);

let query: String = if config.unique_columns {
format!(
"INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json) ON CONFLICT DO NOTHING",
&table_name, &table_name
)
} else {
format!(
"INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json)",
&table_name, &table_name
)
};

let additional_conditions: String = if config.invalidate.is_empty() {
"".into()
Expand Down

0 comments on commit 5a1c572

Please sign in to comment.