diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs index 24f79dd84030b..50e9e48650182 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs @@ -46,10 +46,10 @@ pub struct ElasticSearchOpenSearchConfig { pub delimiter: Option, /// The username of elasticsearch or openserach #[serde(rename = "username")] - pub username: String, + pub username: Option, /// The username of elasticsearch or openserach #[serde(rename = "password")] - pub password: String, + pub password: Option, /// It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one #[serde(rename = "index_column")] pub index_column: Option, @@ -78,9 +78,14 @@ pub struct ElasticSearchOpenSearchConfig { #[serde(default = "default_concurrent_requests")] pub concurrent_requests: usize, + #[serde(default = "default_type")] pub r#type: String, } +fn default_type() -> String { + "upsert".to_owned() +} + fn default_retry_on_conflict() -> i32 { 3 } @@ -107,30 +112,54 @@ impl ElasticSearchOpenSearchConfig { } pub fn build_client(&self, connector: &str) -> Result { + let check_username_password = || -> Result<()> { + if self.username.is_some() && self.password.is_none() { + return Err(SinkError::Config(anyhow!( + "please set the password when the username is set." + ))); + } + if self.username.is_none() && self.password.is_some() { + return Err(SinkError::Config(anyhow!( + "please set the username when the password is set." + ))); + } + Ok(()) + }; let url = Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; if connector.eq(ES_SINK) { - let transport = elasticsearch::http::transport::TransportBuilder::new( + let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new( elasticsearch::http::transport::SingleNodeConnectionPool::new(url), - ) - .auth(elasticsearch::auth::Credentials::Basic( - self.username.clone(), - self.password.clone(), - )) - .build() - .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; + ); + if let Some(username) = &self.username + && let Some(password) = &self.password + { + transport_builder = transport_builder.auth( + elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()), + ); + } + check_username_password()?; + let transport = transport_builder + .build() + .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; let client = elasticsearch::Elasticsearch::new(transport); Ok(ElasticSearchOpenSearchClient::ElasticSearch(client)) } else if connector.eq(OPENSEARCH_SINK) { - let transport = opensearch::http::transport::TransportBuilder::new( + let mut transport_builder = opensearch::http::transport::TransportBuilder::new( opensearch::http::transport::SingleNodeConnectionPool::new(url), - ) - .auth(opensearch::auth::Credentials::Basic( - self.username.clone(), - self.password.clone(), - )) - .build() - .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; + ); + if let Some(username) = &self.username + && let Some(password) = &self.password + { + transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic( + username.clone(), + password.clone(), + )); + } + check_username_password()?; + let transport = transport_builder + .build() + .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?; let client = opensearch::OpenSearch::new(transport); Ok(ElasticSearchOpenSearchClient::OpenSearch(client)) } else { diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 819e597fec448..ead783cec997e 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -306,11 +306,11 @@ ElasticSearchOpenSearchConfig: - name: username field_type: String comments: The username of elasticsearch or openserach - required: true + required: false - name: password field_type: String comments: The username of elasticsearch or openserach - required: true + required: false - name: index_column field_type: String comments: It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one @@ -333,7 +333,8 @@ ElasticSearchOpenSearchConfig: required: true - name: r#type field_type: String - required: true + required: false + default: '"upsert" . to_owned ()' FsConfig: fields: - name: fs.path