From 86c55bda2a4bf95c28ffbb38deeeb2b1f0afa932 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Sat, 25 Jan 2025 11:31:59 +0800
Subject: [PATCH 1/3] save

---
 .../elasticsearch_opensearch_config.rs        | 45 +++++++++++--------
 1 file changed, 27 insertions(+), 18 deletions(-)

diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
index 24f79dd84030b..01e1395867320 100644
--- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
+++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
@@ -46,10 +46,10 @@ pub struct ElasticSearchOpenSearchConfig {
     pub delimiter: Option<String>,
     /// The username of elasticsearch or openserach
     #[serde(rename = "username")]
-    pub username: String,
+    pub username: Option<String>,
     /// The username of elasticsearch or openserach
     #[serde(rename = "password")]
-    pub password: String,
+    pub password: Option<String>,
     /// It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one
     #[serde(rename = "index_column")]
     pub index_column: Option<String>,
@@ -110,27 +110,36 @@ impl ElasticSearchOpenSearchConfig {
         let url =
             Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
         if connector.eq(ES_SINK) {
-            let transport = elasticsearch::http::transport::TransportBuilder::new(
+            let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
                 elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
-            )
-            .auth(elasticsearch::auth::Credentials::Basic(
-                self.username.clone(),
-                self.password.clone(),
-            ))
-            .build()
-            .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
+            );
+            if let Some(username) = &self.username
+                && let Some(password) = &self.password
+            {
+                transport_builder = transport_builder.auth(
+                    elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
+                );
+            }
+            let transport = transport_builder
+                .build()
+                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
             let client = elasticsearch::Elasticsearch::new(transport);
             Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
         } else if connector.eq(OPENSEARCH_SINK) {
-            let transport = opensearch::http::transport::TransportBuilder::new(
+            let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
                 opensearch::http::transport::SingleNodeConnectionPool::new(url),
-            )
-            .auth(opensearch::auth::Credentials::Basic(
-                self.username.clone(),
-                self.password.clone(),
-            ))
-            .build()
-            .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
+            );
+            if let Some(username) = &self.username
+                && let Some(password) = &self.password
+            {
+                transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
+                    username.clone(),
+                    password.clone(),
+                ));
+            }
+            let transport = transport_builder
+                .build()
+                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
             let client = opensearch::OpenSearch::new(transport);
             Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
         } else {

From b8d8b3fbf207aeb1d8f30c3876855dcd51e93619 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Sat, 25 Jan 2025 12:02:56 +0800
Subject: [PATCH 2/3] fix bug

---
 .../elasticsearch_opensearch_config.rs                     | 5 +++++
 src/connector/with_options_sink.yaml                       | 7 ++++---
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
index 01e1395867320..01c547ffdc476 100644
--- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
+++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
@@ -78,9 +78,14 @@ pub struct ElasticSearchOpenSearchConfig {
     #[serde(default = "default_concurrent_requests")]
     pub concurrent_requests: usize,
 
+    #[serde(default = "default_type")]
     pub r#type: String,
 }
 
+fn default_type() -> String {
+    "upsert".to_owned()
+}
+
 fn default_retry_on_conflict() -> i32 {
     3
 }
diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml
index 819e597fec448..ead783cec997e 100644
--- a/src/connector/with_options_sink.yaml
+++ b/src/connector/with_options_sink.yaml
@@ -306,11 +306,11 @@ ElasticSearchOpenSearchConfig:
   - name: username
     field_type: String
     comments: The username of elasticsearch or openserach
-    required: true
+    required: false
   - name: password
     field_type: String
     comments: The username of elasticsearch or openserach
-    required: true
+    required: false
   - name: index_column
     field_type: String
     comments: It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one
@@ -333,7 +333,8 @@ ElasticSearchOpenSearchConfig:
     required: true
   - name: r#type
     field_type: String
-    required: true
+    required: false
+    default: '"upsert" . to_owned ()'
 FsConfig:
   fields:
   - name: fs.path

From c7e2f4c290e39aa075c5ed22abb8dffa0b4848b8 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Mon, 27 Jan 2025 10:58:29 +0800
Subject: [PATCH 3/3] fix comm

---
 .../elasticsearch_opensearch_config.rs            | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
index 01c547ffdc476..50e9e48650182 100644
--- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
+++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs
@@ -112,6 +112,19 @@ impl ElasticSearchOpenSearchConfig {
     }
 
     pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
+        let check_username_password = || -> Result<()> {
+            if self.username.is_some() && self.password.is_none() {
+                return Err(SinkError::Config(anyhow!(
+                    "please set the password when the username is set."
+                )));
+            }
+            if self.username.is_none() && self.password.is_some() {
+                return Err(SinkError::Config(anyhow!(
+                    "please set the username when the password is set."
+                )));
+            }
+            Ok(())
+        };
         let url =
             Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
         if connector.eq(ES_SINK) {
@@ -125,6 +138,7 @@ impl ElasticSearchOpenSearchConfig {
                     elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
                 );
             }
+            check_username_password()?;
             let transport = transport_builder
                 .build()
                 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
@@ -142,6 +156,7 @@ impl ElasticSearchOpenSearchConfig {
                     password.clone(),
                 ));
             }
+            check_username_password()?;
             let transport = transport_builder
                 .build()
                 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;