From 37bcaa2265e56463c340c120c18002a2e00ec8fe Mon Sep 17 00:00:00 2001 From: Alejandro del Castillo Date: Fri, 12 Jun 2020 14:27:48 -0500 Subject: [PATCH] add bestEffortDeduplication config option The connector is currently adding an InsertId per row, which is used by BigQuery to dedupe rows that have the same insertId (in a 1 minute window). Using insertIds throttles the ingestion rate to a maximum of 100k rows per second & 100 MB/s. Insertions without a insertId disable best effort de-duplication [1], which increases the ingestion quota to a maximum of 1 GB/s. For high throughput applications, its desirable to disable dedupe, handling duplication on the query side. [1] https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication Signed-off-by: Alejandro del Castillo --- .../connect/bigquery/BigQuerySinkTask.java | 6 +++++- .../bigquery/config/BigQuerySinkConfig.java | 17 ++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index bf4b43d6d..e2fb32caa 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -173,7 +173,11 @@ private RowToInsert getRecordRow(SinkRecord record) { if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) { convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord); } - return RowToInsert.of(getRowId(record), convertedRecord); + if (config.getBoolean(config.BEST_EFFORT_DEDUPLICATION_CONFIG)) { + return RowToInsert.of(getRowId(record), convertedRecord); + } else { + return RowToInsert.of(convertedRecord); + } } private String getRowId(SinkRecord record) { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index d4362c381..06ecce9b4 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -233,6 +233,15 @@ public class BigQuerySinkConfig extends AbstractConfig { private static final String TABLE_CREATE_DOC = "Automatically create BigQuery tables if they don't already exist"; + public static final String BEST_EFFORT_DEDUPLICATION_CONFIG = "bestEffortDeduplication"; + private static final ConfigDef.Type BEST_EFFORT_DEDUPLICATION_TYPE = ConfigDef.Type.BOOLEAN; + public static final Boolean BEST_EFFORT_DEDUPLICATION_DEFAULT = true; + private static final ConfigDef.Importance BEST_EFFORT_DEDUPLICATION_IMPORTANCE = + ConfigDef.Importance.MEDIUM; + private static final String BEST_EFFORT_DEDUPLICATION_DOC = + "If false, Big Query best effort de-duplication will be disabled, which increases " + + "the streaming ingest quota, at the expense of not checking for duplicates"; + static { config = new ConfigDef() .define( @@ -365,7 +374,13 @@ public class BigQuerySinkConfig extends AbstractConfig { TABLE_CREATE_DEFAULT, TABLE_CREATE_IMPORTANCE, TABLE_CREATE_DOC - ); + ).define( + BEST_EFFORT_DEDUPLICATION_CONFIG, + BEST_EFFORT_DEDUPLICATION_TYPE, + BEST_EFFORT_DEDUPLICATION_DEFAULT, + BEST_EFFORT_DEDUPLICATION_IMPORTANCE, + BEST_EFFORT_DEDUPLICATION_DOC + ); } /** * Throw an exception if the passed-in properties do not constitute a valid sink.