-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pulsar sink): support tls options #22148
Changes from 8 commits
5b2bceb
782c1ec
42b7875
7daae8e
12f973b
22340ed
3b9afb3
292bc4a
66821d8
e0b7868
028a183
ed01093
20c7032
59f0634
85de10c
1bfae12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Tls options to set custom certificate chain are now available for `pulsar` sink and source. | ||
|
||
authors: pomacanthidae |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -5,7 +5,7 @@ use crate::{ | |||||
pulsar::sink::{healthcheck, PulsarSink}, | ||||||
}, | ||||||
}; | ||||||
use futures_util::FutureExt; | ||||||
use futures_util::{FutureExt, TryFutureExt}; | ||||||
use pulsar::{ | ||||||
authentication::oauth2::{OAuth2Authentication, OAuth2Params}, | ||||||
compression, | ||||||
|
@@ -14,7 +14,7 @@ use pulsar::{ | |||||
TokioExecutor, | ||||||
}; | ||||||
use pulsar::{error::AuthenticationError, OperationRetryOptions}; | ||||||
use snafu::ResultExt; | ||||||
use std::path::Path; | ||||||
use std::time::Duration; | ||||||
use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig}; | ||||||
use vector_lib::config::DataType; | ||||||
|
@@ -82,6 +82,10 @@ pub struct PulsarSinkConfig { | |||||
#[configurable(derived)] | ||||||
#[serde(default)] | ||||||
pub connection_retry_options: Option<CustomConnectionRetryOptions>, | ||||||
|
||||||
#[configurable(derived)] | ||||||
#[serde(default)] | ||||||
pub(crate) tls_options: Option<PulsarTlsOptions>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Apologies, missed this one before. We should call this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 59f0634 |
||||||
} | ||||||
|
||||||
/// Event batching behavior. | ||||||
|
@@ -206,6 +210,25 @@ pub struct CustomConnectionRetryOptions { | |||||
pub keep_alive_secs: Option<u64>, | ||||||
} | ||||||
|
||||||
#[configurable_component] | ||||||
#[configurable(description = "TLS options configuration for the Pulsar client.")] | ||||||
#[derive(Clone, Debug)] | ||||||
pub struct PulsarTlsOptions { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to reuse the existing config struct we use for TLS options:
Or does the pulsar client no support all of the same options? At the least, we should match the option names (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your feedback. |
||||||
/// File path containing a list of PEM encoded certificates | ||||||
#[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] | ||||||
pub certificate_chain_file: String, | ||||||
|
||||||
/// Allow insecure TLS connection if set to true | ||||||
/// | ||||||
/// Set to false if not specified. | ||||||
pub allow_insecure_connection: Option<bool>, | ||||||
|
||||||
/// Whether hostname verification is enabled when insecure TLS connection is allowed | ||||||
/// | ||||||
/// Set to true if not specified. | ||||||
pub tls_hostname_verification_enabled: Option<bool>, | ||||||
} | ||||||
|
||||||
impl Default for PulsarSinkConfig { | ||||||
fn default() -> Self { | ||||||
Self { | ||||||
|
@@ -221,12 +244,13 @@ impl Default for PulsarSinkConfig { | |||||
auth: None, | ||||||
acknowledgements: Default::default(), | ||||||
connection_retry_options: None, | ||||||
tls_options: None, | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl PulsarSinkConfig { | ||||||
pub(crate) async fn create_pulsar_client(&self) -> Result<Pulsar<TokioExecutor>, PulsarError> { | ||||||
pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> { | ||||||
let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor); | ||||||
if let Some(auth) = &self.auth { | ||||||
builder = match ( | ||||||
|
@@ -246,10 +270,10 @@ impl PulsarSinkConfig { | |||||
scope: oauth2.scope.clone(), | ||||||
}), | ||||||
), | ||||||
_ => return Err(PulsarError::Authentication(AuthenticationError::Custom( | ||||||
_ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom( | ||||||
"Invalid auth config: can only specify name and token or oauth2 configuration" | ||||||
.to_string(), | ||||||
))), | ||||||
))))?, | ||||||
}; | ||||||
} | ||||||
|
||||||
|
@@ -292,7 +316,16 @@ impl PulsarSinkConfig { | |||||
let operation_retry_opts = OperationRetryOptions::default(); | ||||||
builder = builder.with_operation_retry_options(operation_retry_opts); | ||||||
|
||||||
builder.build().await | ||||||
if let Some(options) = &self.tls_options { | ||||||
builder = | ||||||
builder.with_certificate_chain_file(Path::new(&options.certificate_chain_file))?; | ||||||
builder = builder | ||||||
.with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); | ||||||
builder = builder.with_tls_hostname_verification_enabled( | ||||||
options.tls_hostname_verification_enabled.unwrap_or(true), | ||||||
); | ||||||
} | ||||||
builder.build().map_err(|e| e.into()).await | ||||||
} | ||||||
|
||||||
pub(crate) fn build_producer_options(&self) -> ProducerOptions { | ||||||
|
@@ -354,10 +387,9 @@ impl SinkConfig for PulsarSinkConfig { | |||||
let client = self | ||||||
.create_pulsar_client() | ||||||
.await | ||||||
.context(super::sink::CreatePulsarSinkSnafu)?; | ||||||
.map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?; | ||||||
|
||||||
let sink = PulsarSink::new(client, self.clone())?; | ||||||
|
||||||
let hc = healthcheck(self.clone()).boxed(); | ||||||
|
||||||
Ok((VectorSink::from_event_streamsink(sink), hc)) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaning this up a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 85de10c