From 2c0adf1d21d0f88af1266b9126919b71e6f479a4 Mon Sep 17 00:00:00 2001 From: Haidong Date: Mon, 11 Sep 2023 15:21:25 +0800 Subject: [PATCH] fix unit test and add snapshot_fetch_size for mongoDB to increase snapshot performance. Signed-off-by: Haidong --- .../plugins/kafkaconnect/configuration/MongoDBConfig.java | 4 ++++ .../kafka-plugins/src/test/resources/sample-pipelines.yaml | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/configuration/MongoDBConfig.java b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/configuration/MongoDBConfig.java index b2446d9912..44a7919217 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/configuration/MongoDBConfig.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/configuration/MongoDBConfig.java @@ -21,6 +21,7 @@ public class MongoDBConfig extends ConnectorConfig { private static final String DEFAULT_PORT = "27017"; private static final String DEFAULT_SNAPSHOT_MODE = "initial"; private static final Boolean SSL_ENABLED = false; + private static final String DEFAULT_SNAPSHOT_FETCH_SIZE = "1000"; @JsonProperty("hostname") @NotNull private String hostname; @@ -30,6 +31,8 @@ public class MongoDBConfig extends ConnectorConfig { private CredentialsConfig credentialsConfig; @JsonProperty("snapshot_mode") private String snapshotMode = DEFAULT_SNAPSHOT_MODE; + @JsonProperty("snapshot_fetch_size") + private String snapshotFetchSize = DEFAULT_SNAPSHOT_FETCH_SIZE; @JsonProperty("collections") private List collections = new ArrayList<>(); @JsonProperty("ssl") @@ -51,6 +54,7 @@ private Map buildConfig(final CollectionConfig collection) { config.put("mongodb.user", credentialsConfig.getUsername()); config.put("mongodb.password", credentialsConfig.getPassword()); config.put("snapshot.mode", snapshotMode); + config.put("snapshot.fetch.size", snapshotFetchSize); config.put("topic.prefix", collection.getTopicPrefix()); config.put("collection.include.list", collection.getCollectionName()); config.put("mongodb.ssl.enabled", ssl.toString()); diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index 18eb541d87..f656dde93f 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -4,7 +4,8 @@ log-pipeline: bootstrap_servers: - 127.0.0.1:9093 client_dns_lookup: use_all_dns_ips - encryption: plaintext + encryption: + type: none topics: - name: my-topic-1 group_id: my-test-group