Skip to content

Commit

Permalink
feat(pulsar sink): support tls options (#22148)
Browse files Browse the repository at this point in the history
* add tls_options to pulsar sink

* add integration test

* add tls options to source

* add changelog for tls_options

* rename changelog

* move unused import to test

* add cue for tls options

* fix typos

* rename fields

* fix format

* update cue

* rename to verify_certificate

* fix format

* rename tls_options to tls

* fix changelog

* fix format
  • Loading branch information
pomacanthidae authored Jan 22, 2025
1 parent 604a51b commit b4aaaa8
Show file tree
Hide file tree
Showing 19 changed files with 469 additions and 32 deletions.
3 changes: 3 additions & 0 deletions changelog.d/pulsar_sink_source_support_tls_options.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `pulsar` source and sink now support configuration of TLS options via the `tls` configuration field.

authors: pomacanthidae
14 changes: 12 additions & 2 deletions scripts/integration/pulsar/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ version: '3'
services:
pulsar:
image: docker.io/apachepulsar/pulsar:${CONFIG_VERSION}
command: bin/pulsar standalone
command: sh -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
ports:
- 6650:6650
- 6650:6650
- 6651:6651
environment:
- PULSAR_PREFIX_brokerServicePortTls=6651
- PULSAR_PREFIX_tlsKeyFilePath=/etc/pulsar/certs/pulsar.key.pem
- PULSAR_PREFIX_tlsCertificateFilePath=/etc/pulsar/certs/pulsar.cert.pem
- PULSAR_PREFIX_tlsTrustCertsFilePath=/etc/pulsar/certs/ca-chain.cert.pem
volumes:
- ../../../tests/data/ca/intermediate_server/private/pulsar.key.pem:/etc/pulsar/certs/pulsar.key.pem:ro
- ../../../tests/data//ca/intermediate_server/certs/pulsar.cert.pem:/etc/pulsar/certs/pulsar.cert.pem:ro
- ../../../tests/data/ca/intermediate_server/certs/ca-chain.cert.pem:/etc/pulsar/certs/ca-chain.cert.pem:ro
2 changes: 1 addition & 1 deletion scripts/integration/pulsar/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ features:
test_filter: '::pulsar::integration_tests::'

env:
PULSAR_ADDRESS: pulsar://pulsar:6650
PULSAR_HOST: pulsar

matrix:
version: [latest]
Expand Down
46 changes: 38 additions & 8 deletions src/sinks/pulsar/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
pulsar::sink::{healthcheck, PulsarSink},
},
};
use futures_util::FutureExt;
use futures_util::{FutureExt, TryFutureExt};
use pulsar::{
authentication::oauth2::{OAuth2Authentication, OAuth2Params},
compression,
Expand All @@ -14,7 +14,7 @@ use pulsar::{
TokioExecutor,
};
use pulsar::{error::AuthenticationError, OperationRetryOptions};
use snafu::ResultExt;
use std::path::Path;
use std::time::Duration;
use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig};
use vector_lib::config::DataType;
Expand Down Expand Up @@ -82,6 +82,10 @@ pub struct PulsarSinkConfig {
#[configurable(derived)]
#[serde(default)]
pub connection_retry_options: Option<CustomConnectionRetryOptions>,

#[configurable(derived)]
#[serde(default)]
pub(crate) tls: Option<PulsarTlsOptions>,
}

/// Event batching behavior.
Expand Down Expand Up @@ -206,6 +210,25 @@ pub struct CustomConnectionRetryOptions {
pub keep_alive_secs: Option<u64>,
}

#[configurable_component]
#[configurable(description = "TLS options configuration for the Pulsar client.")]
#[derive(Clone, Debug)]
pub struct PulsarTlsOptions {
/// File path containing a list of PEM encoded certificates.
#[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
pub ca_file: String,

/// Enables certificate verification.
///
/// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
pub verify_certificate: Option<bool>,

/// Whether hostname verification is enabled when verify_certificate is false.
///
/// Set to true if not specified.
pub verify_hostname: Option<bool>,
}

impl Default for PulsarSinkConfig {
fn default() -> Self {
Self {
Expand All @@ -221,12 +244,13 @@ impl Default for PulsarSinkConfig {
auth: None,
acknowledgements: Default::default(),
connection_retry_options: None,
tls: None,
}
}
}

impl PulsarSinkConfig {
pub(crate) async fn create_pulsar_client(&self) -> Result<Pulsar<TokioExecutor>, PulsarError> {
pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
if let Some(auth) = &self.auth {
builder = match (
Expand All @@ -246,10 +270,10 @@ impl PulsarSinkConfig {
scope: oauth2.scope.clone(),
}),
),
_ => return Err(PulsarError::Authentication(AuthenticationError::Custom(
_ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
"Invalid auth config: can only specify name and token or oauth2 configuration"
.to_string(),
))),
))))?,
};
}

Expand Down Expand Up @@ -292,7 +316,14 @@ impl PulsarSinkConfig {
let operation_retry_opts = OperationRetryOptions::default();
builder = builder.with_operation_retry_options(operation_retry_opts);

builder.build().await
if let Some(options) = &self.tls {
builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
builder =
builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
builder = builder
.with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
}
builder.build().map_err(|e| e.into()).await
}

pub(crate) fn build_producer_options(&self) -> ProducerOptions {
Expand Down Expand Up @@ -354,10 +385,9 @@ impl SinkConfig for PulsarSinkConfig {
let client = self
.create_pulsar_client()
.await
.context(super::sink::CreatePulsarSinkSnafu)?;
.map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;

let sink = PulsarSink::new(client, self.clone())?;

let hc = healthcheck(self.clone()).boxed();

Ok((VectorSink::from_event_streamsink(sink), hc))
Expand Down
29 changes: 25 additions & 4 deletions src/sinks/pulsar/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarSink};
use crate::sinks::pulsar::{config::PulsarSinkConfig, config::PulsarTlsOptions, sink::PulsarSink};
use futures::StreamExt;
use pulsar::SubType;

Expand All @@ -9,10 +9,15 @@ use crate::test_util::{
components::{assert_sink_compliance, SINK_TAGS},
random_lines_with_stream, random_string, trace_init,
};
use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH;
use bytes::Bytes;

fn pulsar_address() -> String {
std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into())
fn pulsar_host() -> String {
std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
}

fn pulsar_address(scheme: &str, port: u16) -> String {
format!("{}://{}:{}", scheme, pulsar_host(), port)
}

async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) {
Expand Down Expand Up @@ -80,7 +85,23 @@ async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) {
#[tokio::test]
async fn pulsar_happy() {
let cnf = PulsarSinkConfig {
endpoint: pulsar_address(),
endpoint: pulsar_address("pulsar", 6650),
// overriden by test
..Default::default()
};

pulsar_happy_reuse(cnf).await
}

#[tokio::test]
async fn pulsar_happy_tls() {
let cnf = PulsarSinkConfig {
endpoint: pulsar_address("pulsar+ssl", 6651),
tls: Some(PulsarTlsOptions {
ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
verify_certificate: None,
verify_hostname: None,
}),
// overriden by test
..Default::default()
};
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/pulsar/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use bytes::Bytes;
use pulsar::{Error as PulsarError, Pulsar, TokioExecutor};
use pulsar::{Pulsar, TokioExecutor};
use serde::Serialize;
use snafu::Snafu;
use std::collections::HashMap;
Expand All @@ -16,7 +16,9 @@ use crate::sinks::prelude::*;
#[snafu(visibility(pub(crate)))]
pub(crate) enum BuildError {
#[snafu(display("creating pulsar producer failed: {}", source))]
CreatePulsarSink { source: PulsarError },
CreatePulsarSink {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

pub(crate) struct PulsarSink {
Expand Down
113 changes: 100 additions & 13 deletions src/sources/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pulsar::{
message::proto::MessageIdData,
Authentication, Consumer, Pulsar, SubType, TokioExecutor,
};
use std::path::Path;
use tokio_util::codec::FramedRead;

use vector_lib::{
Expand Down Expand Up @@ -100,6 +101,10 @@ pub struct PulsarSourceConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

#[configurable(derived)]
#[serde(default)]
tls: Option<TlsOptions>,
}

/// Authentication configuration.
Expand Down Expand Up @@ -172,6 +177,25 @@ struct DeadLetterQueuePolicy {
pub dead_letter_topic: String,
}

#[configurable_component]
#[configurable(description = "TLS options configuration for the Pulsar client.")]
#[derive(Clone, Debug)]
pub struct TlsOptions {
/// File path containing a list of PEM encoded certificates
#[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
pub ca_file: String,

/// Enables certificate verification.
///
/// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
pub verify_certificate: Option<bool>,

/// Whether hostname verification is enabled when verify_certificate is false
///
/// Set to true if not specified.
pub verify_hostname: Option<bool>,
}

#[derive(Debug)]
struct FinalizerEntry {
topic: String,
Expand Down Expand Up @@ -263,10 +287,17 @@ impl PulsarSourceConfig {
),
};
}
if let Some(options) = &self.tls {
builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
builder =
builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
builder = builder
.with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
}

let pulsar = builder.build().await?;

let mut consumer_builder = pulsar
let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
.consumer()
.with_topics(&self.topics)
.with_subscription_type(SubType::Shared)
Expand Down Expand Up @@ -523,37 +554,85 @@ mod integration_tests {
use crate::config::log_schema;
use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS};
use crate::test_util::{collect_n, random_string, trace_init};
use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH;

fn pulsar_address() -> String {
std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into())
fn pulsar_host() -> String {
std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
}

fn pulsar_address(scheme: &str, port: u16) -> String {
format!("{}://{}:{}", scheme, pulsar_host(), port)
}
#[tokio::test]
async fn consumes_event_with_acknowledgements() {
pulsar_send_receive(true, LogNamespace::Legacy).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
true,
LogNamespace::Legacy,
None,
)
.await;
}

#[tokio::test]
async fn consumes_event_with_acknowledgements_vector_namespace() {
pulsar_send_receive(true, LogNamespace::Vector).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
true,
LogNamespace::Vector,
None,
)
.await;
}

#[tokio::test]
async fn consumes_event_without_acknowledgements() {
pulsar_send_receive(false, LogNamespace::Legacy).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
false,
LogNamespace::Legacy,
None,
)
.await;
}

#[tokio::test]
async fn consumes_event_without_acknowledgements_vector_namespace() {
pulsar_send_receive(false, LogNamespace::Vector).await;
pulsar_send_receive(
&pulsar_address("pulsar", 6650),
false,
LogNamespace::Vector,
None,
)
.await;
}

async fn pulsar_send_receive(acknowledgements: bool, log_namespace: LogNamespace) {
#[tokio::test]
async fn consumes_event_with_tls() {
pulsar_send_receive(
&pulsar_address("pulsar+ssl", 6651),
false,
LogNamespace::Vector,
Some(TlsOptions {
ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
verify_certificate: None,
verify_hostname: None,
}),
)
.await;
}

async fn pulsar_send_receive(
endpoint: &str,
acknowledgements: bool,
log_namespace: LogNamespace,
tls: Option<TlsOptions>,
) {
trace_init();

let topic = format!("test-{}", random_string(10));
let cnf = PulsarSourceConfig {
endpoint: pulsar_address(),
endpoint: endpoint.into(),
topics: vec![topic.clone()],
consumer_name: None,
subscription_name: None,
Expand All @@ -565,12 +644,20 @@ mod integration_tests {
decoding: DeserializerConfig::Bytes,
acknowledgements: acknowledgements.into(),
log_namespace: None,
tls: tls.clone(),
};
let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
if let Some(options) = &tls {
builder = builder
.with_certificate_chain_file(Path::new(&options.ca_file))
.unwrap();
builder =
builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
builder = builder
.with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
}

let pulsar = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor)
.build()
.await
.unwrap();
let pulsar = builder.build().await.unwrap();

let consumer = cnf.create_consumer().await.unwrap();
let decoder = DecodingConfig::new(
Expand Down
Loading

0 comments on commit b4aaaa8

Please sign in to comment.