Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pulsar sink): support tls options #22148

Merged
merged 16 commits into from
Jan 22, 2025
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `pulsar` source and sink now support configuration of TLS options via the `tls` configuration field.

authors: pomacanthidae
14 changes: 12 additions & 2 deletions scripts/integration/pulsar/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ version: '3'
services:
pulsar:
image: docker.io/apachepulsar/pulsar:${CONFIG_VERSION}
command: bin/pulsar standalone
command: sh -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
ports:
- 6650:6650
- 6650:6650
- 6651:6651
environment:
- PULSAR_PREFIX_brokerServicePortTls=6651
- PULSAR_PREFIX_tlsKeyFilePath=/etc/pulsar/certs/pulsar.key.pem
- PULSAR_PREFIX_tlsCertificateFilePath=/etc/pulsar/certs/pulsar.cert.pem
- PULSAR_PREFIX_tlsTrustCertsFilePath=/etc/pulsar/certs/ca-chain.cert.pem
volumes:
- ../../../tests/data/ca/intermediate_server/private/pulsar.key.pem:/etc/pulsar/certs/pulsar.key.pem:ro
- ../../../tests/data//ca/intermediate_server/certs/pulsar.cert.pem:/etc/pulsar/certs/pulsar.cert.pem:ro
- ../../../tests/data/ca/intermediate_server/certs/ca-chain.cert.pem:/etc/pulsar/certs/ca-chain.cert.pem:ro
2 changes: 1 addition & 1 deletion scripts/integration/pulsar/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ features:
test_filter: '::pulsar::integration_tests::'

env:
PULSAR_ADDRESS: pulsar://pulsar:6650
PULSAR_HOST: pulsar

matrix:
version: [latest]
Expand Down
46 changes: 38 additions & 8 deletions src/sinks/pulsar/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
pulsar::sink::{healthcheck, PulsarSink},
},
};
use futures_util::FutureExt;
use futures_util::{FutureExt, TryFutureExt};
use pulsar::{
authentication::oauth2::{OAuth2Authentication, OAuth2Params},
compression,
Expand All @@ -14,7 +14,7 @@ use pulsar::{
TokioExecutor,
};
use pulsar::{error::AuthenticationError, OperationRetryOptions};
use snafu::ResultExt;
use std::path::Path;
use std::time::Duration;
use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig};
use vector_lib::config::DataType;
Expand Down Expand Up @@ -82,6 +82,10 @@ pub struct PulsarSinkConfig {
#[configurable(derived)]
#[serde(default)]
pub connection_retry_options: Option<CustomConnectionRetryOptions>,

#[configurable(derived)]
#[serde(default)]
pub(crate) tls: Option<PulsarTlsOptions>,
}

/// Event batching behavior.
Expand Down Expand Up @@ -206,6 +210,25 @@ pub struct CustomConnectionRetryOptions {
pub keep_alive_secs: Option<u64>,
}

#[configurable_component]
#[configurable(description = "TLS options configuration for the Pulsar client.")]
#[derive(Clone, Debug)]
pub struct PulsarTlsOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to reuse the existing config struct we use for TLS options:

pub struct TlsConfig {
?

Or does the pulsar client no support all of the same options?

At the least, we should match the option names (e.g. certificate_chain_file should be ca_file to match).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback.
rust client for pulsar only supports some options https://github.com/streamnative/pulsar-rs/blob/master/src/connection_manager.rs#L73
Changed to use the same field names as TlsConfig 66821d8.

/// File path containing a list of PEM encoded certificates.
#[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
pub ca_file: String,

/// Enables certificate verification.
///
/// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
pub verify_certificate: Option<bool>,

/// Whether hostname verification is enabled when verify_certificate is false.
///
/// Set to true if not specified.
pub verify_hostname: Option<bool>,
}

impl Default for PulsarSinkConfig {
fn default() -> Self {
Self {
Expand All @@ -221,12 +244,13 @@ impl Default for PulsarSinkConfig {
auth: None,
acknowledgements: Default::default(),
connection_retry_options: None,
tls: None,
}
}
}

impl PulsarSinkConfig {
pub(crate) async fn create_pulsar_client(&self) -> Result<Pulsar<TokioExecutor>, PulsarError> {
pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
if let Some(auth) = &self.auth {
builder = match (
Expand All @@ -246,10 +270,10 @@ impl PulsarSinkConfig {
scope: oauth2.scope.clone(),
}),
),
_ => return Err(PulsarError::Authentication(AuthenticationError::Custom(
_ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
"Invalid auth config: can only specify name and token or oauth2 configuration"
.to_string(),
))),
))))?,
};
}

Expand Down Expand Up @@ -292,7 +316,14 @@ impl PulsarSinkConfig {
let operation_retry_opts = OperationRetryOptions::default();
builder = builder.with_operation_retry_options(operation_retry_opts);

builder.build().await
if let Some(options) = &self.tls {
builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
builder =
builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
builder = builder
.with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
}
builder.build().map_err(|e| e.into()).await
}

pub(crate) fn build_producer_options(&self) -> ProducerOptions {
Expand Down Expand Up @@ -354,10 +385,9 @@ impl SinkConfig for PulsarSinkConfig {
let client = self
.create_pulsar_client()
.await
.context(super::sink::CreatePulsarSinkSnafu)?;
.map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;

let sink = PulsarSink::new(client, self.clone())?;

let hc = healthcheck(self.clone()).boxed();

Ok((VectorSink::from_event_streamsink(sink), hc))
Expand Down
29 changes: 25 additions & 4 deletions src/sinks/pulsar/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarSink};
use crate::sinks::pulsar::{config::PulsarSinkConfig, config::PulsarTlsOptions, sink::PulsarSink};
use futures::StreamExt;
use pulsar::SubType;

Expand All @@ -9,10 +9,15 @@ use crate::test_util::{
components::{assert_sink_compliance, SINK_TAGS},
random_lines_with_stream, random_string, trace_init,
};
use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH;
use bytes::Bytes;

fn pulsar_address() -> String {
std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into())
fn pulsar_host() -> String {
std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
}

fn pulsar_address(scheme: &str, port: u16) -> String {
format!("{}://{}:{}", scheme, pulsar_host(), port)
}

async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) {
Expand Down Expand Up @@ -80,7 +85,23 @@ async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) {
#[tokio::test]
async fn pulsar_happy() {
let cnf = PulsarSinkConfig {
endpoint: pulsar_address(),
endpoint: pulsar_address("pulsar", 6650),
// overriden by test
..Default::default()
};

pulsar_happy_reuse(cnf).await
}

#[tokio::test]
async fn pulsar_happy_tls() {
let cnf = PulsarSinkConfig {
endpoint: pulsar_address("pulsar+ssl", 6651),
tls: Some(PulsarTlsOptions {
ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
verify_certificate: None,
verify_hostname: None,
}),
// overriden by test
..Default::default()
};
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/pulsar/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use bytes::Bytes;
use pulsar::{Error as PulsarError, Pulsar, TokioExecutor};
use pulsar::{Pulsar, TokioExecutor};
use serde::Serialize;
use snafu::Snafu;
use std::collections::HashMap;
Expand All @@ -16,7 +16,9 @@ use crate::sinks::prelude::*;
#[snafu(visibility(pub(crate)))]
pub(crate) enum BuildError {
#[snafu(display("creating pulsar producer failed: {}", source))]
CreatePulsarSink { source: PulsarError },
CreatePulsarSink {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

pub(crate) struct PulsarSink {
Expand Down
113 changes: 100 additions & 13 deletions src/sources/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pulsar::{
message::proto::MessageIdData,
Authentication, Consumer, Pulsar, SubType, TokioExecutor,
};
use std::path::Path;
use tokio_util::codec::FramedRead;

use vector_lib::{
Expand Down Expand Up @@ -100,6 +101,10 @@ pub struct PulsarSourceConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

#[configurable(derived)]
#[serde(default)]
tls: Option<TlsOptions>,
}

/// Authentication configuration.
Expand Down Expand Up @@ -172,6 +177,25 @@ struct DeadLetterQueuePolicy {
pub dead_letter_topic: String,
}

#[configurable_component]
#[configurable(description = "TLS options configuration for the Pulsar client.")]
#[derive(Clone, Debug)]
pub struct TlsOptions {
/// File path containing a list of PEM encoded certificates
#[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
pub ca_file: String,

/// Enables certificate verification.
///
/// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
pub verify_certificate: Option<bool>,

/// Whether hostname verification is enabled when verify_certificate is false
///
/// Set to true if not specified.
pub verify_hostname: Option<bool>,
}

#[derive(Debug)]
struct FinalizerEntry {
topic: String,
Expand Down Expand Up @@ -263,10 +287,17 @@ impl PulsarSourceConfig {
),
};
}
if let Some(options) = &self.tls {
builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
builder =
builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
builder = builder
.with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
}

let pulsar = builder.build().await?;

let mut consumer_builder = pulsar
let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
.consumer()
.with_topics(&self.topics)
.with_subscription_type(SubType::Shared)
Expand Down Expand Up @@ -523,37 +554,85 @@ mod integration_tests {
use crate::config::log_schema;
use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS};
use crate::test_util::{collect_n, random_string, trace_init};
use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH;

fn pulsar_address() -> String {
std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into())
fn pulsar_host() -> String {
std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
}

fn pulsar_address(scheme: &str, port: u16) -> String {
format!("{}://{}:{}", scheme, pulsar_host(), port)
}
#[tokio::test]
async fn consumes_event_with_acknowledgements() {
pulsar_send_receive(true, LogNamespace::Legacy).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
true,
LogNamespace::Legacy,
None,
)
.await;
}

#[tokio::test]
async fn consumes_event_with_acknowledgements_vector_namespace() {
pulsar_send_receive(true, LogNamespace::Vector).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
true,
LogNamespace::Vector,
None,
)
.await;
}

#[tokio::test]
async fn consumes_event_without_acknowledgements() {
pulsar_send_receive(false, LogNamespace::Legacy).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
false,
LogNamespace::Legacy,
None,
)
.await;
}

#[tokio::test]
async fn consumes_event_without_acknowledgements_vector_namespace() {
pulsar_send_receive(false, LogNamespace::Vector).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
false,
LogNamespace::Vector,
None,
)
.await;
}

async fn pulsar_send_receive(acknowledgements: bool, log_namespace: LogNamespace) {
#[tokio::test]
async fn consumes_event_with_tls() {
pulsar_send_receive(
&pulsar_address("pulsar+ssl", 6651),
false,
LogNamespace::Vector,
Some(TlsOptions {
ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
verify_certificate: None,
verify_hostname: None,
}),
)
.await;
}

async fn pulsar_send_receive(
endpoint: &str,
acknowledgements: bool,
log_namespace: LogNamespace,
tls: Option<TlsOptions>,
) {
trace_init();

let topic = format!("test-{}", random_string(10));
let cnf = PulsarSourceConfig {
endpoint: pulsar_address(),
endpoint: endpoint.into(),
topics: vec![topic.clone()],
consumer_name: None,
subscription_name: None,
Expand All @@ -565,12 +644,20 @@ mod integration_tests {
decoding: DeserializerConfig::Bytes,
acknowledgements: acknowledgements.into(),
log_namespace: None,
tls: tls.clone(),
};
let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
if let Some(options) = &tls {
builder = builder
.with_certificate_chain_file(Path::new(&options.ca_file))
.unwrap();
builder =
builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
builder = builder
.with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
}

let pulsar = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor)
.build()
.await
.unwrap();
let pulsar = builder.build().await.unwrap();

let consumer = cnf.create_consumer().await.unwrap();
let decoder = DecodingConfig::new(
Expand Down
Loading
Loading