From 5b2bceb29aac723842fde13c56bfb6097c9d316d Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 18:15:43 +0900 Subject: [PATCH 01/16] add tls_options to pulsar sink --- src/sinks/pulsar/config.rs | 48 +++++++++++++++++++++++++++++++------- src/sinks/pulsar/sink.rs | 6 +++-- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index 93b6e81fd4880..f0860b7e747f5 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -5,7 +5,7 @@ use crate::{ pulsar::sink::{healthcheck, PulsarSink}, }, }; -use futures_util::FutureExt; +use futures_util::{FutureExt, TryFutureExt}; use pulsar::{ authentication::oauth2::{OAuth2Authentication, OAuth2Params}, compression, @@ -14,7 +14,7 @@ use pulsar::{ TokioExecutor, }; use pulsar::{error::AuthenticationError, OperationRetryOptions}; -use snafu::ResultExt; +use std::path::Path; use std::time::Duration; use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig}; use vector_lib::config::DataType; @@ -82,6 +82,10 @@ pub struct PulsarSinkConfig { #[configurable(derived)] #[serde(default)] pub connection_retry_options: Option, + + #[configurable(derived)] + #[serde(default)] + pub(crate) tls_options: Option, } /// Event batching behavior. @@ -206,6 +210,25 @@ pub struct CustomConnectionRetryOptions { pub keep_alive_secs: Option, } +#[configurable_component] +#[configurable(description = "TLS options configuration for the Pulsar client.")] +#[derive(Clone, Debug)] +pub struct PulsarTlsOptions { + /// File path containing a list of PEM encoded certificates + #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] + pub certificate_chain_file: String, + + /// Allow insecure TLS connection if set to true + /// + /// Set to false if not specified. + pub allow_insecure_connection: Option, + + /// Whether hostname verification is enabled when insecure TLS connection is allowed + /// + /// Set to true if not specified. + pub tls_hostname_verification_enabled: Option, +} + impl Default for PulsarSinkConfig { fn default() -> Self { Self { @@ -221,12 +244,13 @@ impl Default for PulsarSinkConfig { auth: None, acknowledgements: Default::default(), connection_retry_options: None, + tls_options: None, } } } impl PulsarSinkConfig { - pub(crate) async fn create_pulsar_client(&self) -> Result, PulsarError> { + pub(crate) async fn create_pulsar_client(&self) -> crate::Result> { let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor); if let Some(auth) = &self.auth { builder = match ( @@ -246,10 +270,10 @@ impl PulsarSinkConfig { scope: oauth2.scope.clone(), }), ), - _ => return Err(PulsarError::Authentication(AuthenticationError::Custom( + _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom( "Invalid auth config: can only specify name and token or oauth2 configuration" .to_string(), - ))), + ))))?, }; } @@ -292,7 +316,16 @@ impl PulsarSinkConfig { let operation_retry_opts = OperationRetryOptions::default(); builder = builder.with_operation_retry_options(operation_retry_opts); - builder.build().await + if let Some(options) = &self.tls_options { + builder = + builder.with_certificate_chain_file(Path::new(&options.certificate_chain_file))?; + builder = builder + .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); + builder = builder.with_tls_hostname_verification_enabled( + options.tls_hostname_verification_enabled.unwrap_or(true), + ); + } + builder.build().map_err(|e| e.into()).await } pub(crate) fn build_producer_options(&self) -> ProducerOptions { @@ -354,10 +387,9 @@ impl SinkConfig for PulsarSinkConfig { let client = self .create_pulsar_client() .await - .context(super::sink::CreatePulsarSinkSnafu)?; + .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?; let sink = PulsarSink::new(client, self.clone())?; - let hc = healthcheck(self.clone()).boxed(); Ok((VectorSink::from_event_streamsink(sink), hc)) diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index 479d4455b97e0..85847d8b14f73 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use bytes::Bytes; -use pulsar::{Error as PulsarError, Pulsar, TokioExecutor}; +use pulsar::{Pulsar, TokioExecutor}; use serde::Serialize; use snafu::Snafu; use std::collections::HashMap; @@ -16,7 +16,9 @@ use crate::sinks::prelude::*; #[snafu(visibility(pub(crate)))] pub(crate) enum BuildError { #[snafu(display("creating pulsar producer failed: {}", source))] - CreatePulsarSink { source: PulsarError }, + CreatePulsarSink { + source: Box, + }, } pub(crate) struct PulsarSink { From 782c1ec893fae1fea13903869d55fb4c699a891d Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 18:17:04 +0900 Subject: [PATCH 02/16] add integration test --- scripts/integration/pulsar/compose.yaml | 14 ++- src/sinks/pulsar/integration_tests.rs | 29 +++++- tests/data/Makefile | 18 ++++ .../certs/pulsar-chain.cert.pem | 98 +++++++++++++++++++ .../intermediate_server/certs/pulsar.cert.pem | 32 ++++++ .../ca/intermediate_server/csr/pulsar.csr.pem | 17 ++++ tests/data/ca/intermediate_server/index.txt | 1 + .../data/ca/intermediate_server/index.txt.old | 1 + .../ca/intermediate_server/newcerts/1008.pem | 32 ++++++ .../private/pulsar.key.pem | 27 +++++ tests/data/ca/intermediate_server/serial | 2 +- tests/data/ca/intermediate_server/serial.old | 2 +- 12 files changed, 265 insertions(+), 8 deletions(-) create mode 100644 tests/data/ca/intermediate_server/certs/pulsar-chain.cert.pem create mode 100644 tests/data/ca/intermediate_server/certs/pulsar.cert.pem create mode 100644 tests/data/ca/intermediate_server/csr/pulsar.csr.pem create mode 100644 tests/data/ca/intermediate_server/newcerts/1008.pem create mode 100644 tests/data/ca/intermediate_server/private/pulsar.key.pem diff --git a/scripts/integration/pulsar/compose.yaml b/scripts/integration/pulsar/compose.yaml index b73d35909be9d..0e963cd2e0bd1 100644 --- a/scripts/integration/pulsar/compose.yaml +++ b/scripts/integration/pulsar/compose.yaml @@ -3,6 +3,16 @@ version: '3' services: pulsar: image: docker.io/apachepulsar/pulsar:${CONFIG_VERSION} - command: bin/pulsar standalone + command: sh -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone" ports: - - 6650:6650 + - 6650:6650 + - 6651:6651 + environment: + - PULSAR_PREFIX_brokerServicePortTls=6651 + - PULSAR_PREFIX_tlsKeyFilePath=/etc/pulsar/certs/pulsar.key.pem + - PULSAR_PREFIX_tlsCertificateFilePath=/etc/pulsar/certs/pulsar.cert.pem + - PULSAR_PREFIX_tlsTrustCertsFilePath=/etc/pulsar/certs/ca-chain.cert.pem + volumes: + - ../../../tests/data/ca/intermediate_server/private/pulsar.key.pem:/etc/pulsar/certs/pulsar.key.pem:ro + - ../../../tests/data//ca/intermediate_server/certs/pulsar.cert.pem:/etc/pulsar/certs/pulsar.cert.pem:ro + - ../../../tests/data/ca/intermediate_server/certs/ca-chain.cert.pem:/etc/pulsar/certs/ca-chain.cert.pem:ro diff --git a/src/sinks/pulsar/integration_tests.rs b/src/sinks/pulsar/integration_tests.rs index 30a3d51a23646..02a11e20ddded 100644 --- a/src/sinks/pulsar/integration_tests.rs +++ b/src/sinks/pulsar/integration_tests.rs @@ -1,4 +1,4 @@ -use crate::sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarSink}; +use crate::sinks::pulsar::{config::PulsarSinkConfig, config::PulsarTlsOptions, sink::PulsarSink}; use futures::StreamExt; use pulsar::SubType; @@ -9,10 +9,15 @@ use crate::test_util::{ components::{assert_sink_compliance, SINK_TAGS}, random_lines_with_stream, random_string, trace_init, }; +use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH; use bytes::Bytes; -fn pulsar_address() -> String { - std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into()) +fn pulsar_host() -> String { + std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into()) +} + +fn pulsar_address(scheme: &str, port: u16) -> String { + format!("{}://{}:{}", scheme, pulsar_host(), port) } async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) { @@ -80,7 +85,23 @@ async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) { #[tokio::test] async fn pulsar_happy() { let cnf = PulsarSinkConfig { - endpoint: pulsar_address(), + endpoint: pulsar_address("pulsar", 6650), + // overriden by test + ..Default::default() + }; + + pulsar_happy_reuse(cnf).await +} + +#[tokio::test] +async fn pulsar_happy_tls() { + let cnf = PulsarSinkConfig { + endpoint: pulsar_address("pulsar+ssl", 6651), + tls_options: Some(PulsarTlsOptions { + certificate_chain_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), + allow_insecure_connection: None, + tls_hostname_verification_enabled: None, + }), // overriden by test ..Default::default() }; diff --git a/tests/data/Makefile b/tests/data/Makefile index e22c324885b16..f3550bf8bf100 100644 --- a/tests/data/Makefile +++ b/tests/data/Makefile @@ -175,6 +175,24 @@ ca/intermediate_server/private/kafka.p12: ca/intermediate_server/private/kafka.k -in ca/intermediate_server/certs/kafka.cert.pem \ -inkey ca/intermediate_server/private/kafka.key.pem +ca/intermediate_server/private/pulsar.key.pem: + openssl genrsa -out ca/intermediate_server/private/pulsar.key.pem 2048 + +ca/intermediate_server/csr/pulsar.csr.pem: ca/intermediate_server/private/pulsar.key.pem + openssl req -config ca/intermediate_server/openssl.cnf \ + -key ca/intermediate_server/private/pulsar.key.pem \ + -subj '/CN=pulsar/OU=Vector/O=Datadog/ST=New York/L=New York/C=US' \ + -new -sha256 -out ca/intermediate_server/csr/pulsar.csr.pem + +ca/intermediate_server/certs/pulsar.cert.pem: ca/intermediate_server/csr/pulsar.csr.pem + openssl ca -batch -config ca/intermediate_server/openssl.cnf \ + -extensions server_cert -days 3650 -notext -md sha256 \ + -in ca/intermediate_server/csr/pulsar.csr.pem \ + -out ca/intermediate_server/certs/pulsar.cert.pem + +ca/intermediate_server/certs/pulsar-chain.cert.pem: ca/intermediate_server/certs/ca-chain.cert.pem ca/intermediate_server/certs/pulsar.cert.pem + cat ca/intermediate_server/certs/pulsar.cert.pem ca/intermediate_server/certs/ca-chain.cert.pem > ca/intermediate_server/certs/pulsar-chain.cert.pem + ca/intermediate_client/private/localhost.p12: ca/intermediate_client/private/localhost.key.pem ca/intermediate_client/certs/localhost.cert.pem ca/intermediate_client/certs/ca-chain.cert.pem openssl pkcs12 -chain -export -password pass:NOPASS -CAfile ca/intermediate_client/certs/ca-chain.cert.pem\ -out ca/intermediate_client/private/localhost.p12 \ diff --git a/tests/data/ca/intermediate_server/certs/pulsar-chain.cert.pem b/tests/data/ca/intermediate_server/certs/pulsar-chain.cert.pem new file mode 100644 index 0000000000000..4c2d3e8ced718 --- /dev/null +++ b/tests/data/ca/intermediate_server/certs/pulsar-chain.cert.pem @@ -0,0 +1,98 @@ +-----BEGIN CERTIFICATE----- +MIIFgzCCA2ugAwIBAgICEAgwDQYJKoZIhvcNAQELBQAwazELMAkGA1UEBhMCVVMx +ETAPBgNVBAgMCE5ldyBZb3JrMRAwDgYDVQQKDAdEYXRhZG9nMQ8wDQYDVQQLDAZW +ZWN0b3IxJjAkBgNVBAMMHVZlY3RvciBJbnRlcm1lZGlhdGUgU2VydmVyIENBMB4X +DTI0MTIzMDA1MzE1OVoXDTM0MTIyODA1MzE1OVowZzELMAkGA1UEBhMCVVMxETAP +BgNVBAgMCE5ldyBZb3JrMREwDwYDVQQHDAhOZXcgWW9yazEQMA4GA1UECgwHRGF0 +YWRvZzEPMA0GA1UECwwGVmVjdG9yMQ8wDQYDVQQDDAZwdWxzYXIwggEiMA0GCSqG +SIb3DQEBAQUAA4IBDwAwggEKAoIBAQChjtFd/pz494HLGdR5yMAgBYNKTlItM1Eb +1XAMWuTNAYhOITkBoAE2v7Dg9Z9gQLr6/HK99z2dfAKBFvaoz4aWcquk8TGOGjfY +TJamJuGhzfPYv4g5zJq15B+RIwd7/WkH1PzAk94xclBsnDob5nQ70SdZuKwF4R60 +8n6fegPSSMxG0PIgmwJnX1Gwz5RBuomZZn9onQvZTdJqyHpPSQYBHQXgtNGwPS2d +3FAPBxlGu+OajwDfkcLPUFz3tPf4t2iSitCqtQu/aGH450syx/hmwtZq63aReEkG +DOLXBq5MrGqDAzKIltbWoBYWgx6pkk56XcWv83AlxqvauHBmCN+9AgMBAAGjggEz +MIIBLzAJBgNVHRMEAjAAMBEGCWCGSAGG+EIBAQQEAwIGQDAzBglghkgBhvhCAQ0E +JhYkT3BlblNTTCBHZW5lcmF0ZWQgU2VydmVyIENlcnRpZmljYXRlMB0GA1UdDgQW +BBSP3J69fVPZ+HDbxHtOcgbm16ZK3jCBlQYDVR0jBIGNMIGKgBQ8PTovzNWCA32Z +xFjx4ds2760NQaFupGwwajESMBAGA1UEAwwJVmVjdG9yIENBMQ8wDQYDVQQLDAZW +ZWN0b3IxEDAOBgNVBAoMB0RhdGFkb2cxETAPBgNVBAgMCE5ldyBZb3JrMREwDwYD +VQQHDAhOZXcgWW9yazELMAkGA1UEBhMCVVOCAhAAMA4GA1UdDwEB/wQEAwIFoDAT +BgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsFAAOCAgEAaED846BaRdU7 +GGNJcAk5Xzq6vjO5gG+0xMzfiG/oQEIizNDJ1PXAN343QUDHR7q+ISRY7w+nr3Ar +pHSSdn3iPzznfVWWhtPnMhDxcvZPlwCR+TXwbkhNMQyK/K+pnRBUjZzWqIT2h0fG +vr6b6TsVc445ED/fb2W3HKEwOTxeiLFRAsvS/Z5pbKYGP5qTNQgkGD6WBx9vQvrT +1zQvZNo9vur41dYwPdcRqkvc/lB9I69zGSG78xzAc48ZRKbrhZ/zigD63FBFzcaL +FeCsAIg2fzp3dw6SrUhIzsnDxML79kd1cTyZv9IeRWx0Af6DDRVLZahC1fh+26cH +a9yt4002e9bbardqmyoX71Jc6ybmDhSik32PYYkE53c/qEwCuGy8/jm1UW5QixNw +ntsBw4iuf8Q3MTf46l9DmBzfxMOK0CX/aDoPCnj9BJ3UT+eviyvwvYzXn0hJyfLZ +taC9HLDJV2Xx3ggLO8wBGXlCqvwDH+EzNaSFIRGgY1gacjcYMWBPgddUgJ/mgsH4 +28J3Q8QS5jjXUaDtTuDRkWhF5j801YOpgCXA4jVFvUmw+aqbkRArWz/iXFxTe2UP +Q1F7GIUhalRtO2Fti7Tb+KfUB5ybs2S7NQ9HUi2SQBCnTZKEFGW6RahHMC9mn3E9 +Na6qmTaFImoYHs7zrfGEFcEowaQlMcI= +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFtzCCA5+gAwIBAgICEAAwDQYJKoZIhvcNAQELBQAwajESMBAGA1UEAwwJVmVj +dG9yIENBMQ8wDQYDVQQLDAZWZWN0b3IxEDAOBgNVBAoMB0RhdGFkb2cxETAPBgNV +BAgMCE5ldyBZb3JrMREwDwYDVQQHDAhOZXcgWW9yazELMAkGA1UEBhMCVVMwHhcN +MjIwNjA3MjIyNzUzWhcNMzIwNjA0MjIyNzUzWjBrMQswCQYDVQQGEwJVUzERMA8G +A1UECAwITmV3IFlvcmsxEDAOBgNVBAoMB0RhdGFkb2cxDzANBgNVBAsMBlZlY3Rv +cjEmMCQGA1UEAwwdVmVjdG9yIEludGVybWVkaWF0ZSBTZXJ2ZXIgQ0EwggIiMA0G +CSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCy/mB0/ZwfgKrSZPQIFaGPtRA9xL2N +o2SsHndZ8k2XOCV225Hb2fzNH+o2WGNSjwmGjLP/uXb47KH0cHCAyFGzSjp++8/O +zoZaFiO0P5El02hQxmoabO3Cqu/N62EFsLfpSM828JM6YOn9p+WXUDn1+YPNoOOE +H142p4/RjFnXNHkzR3geXU4Pfi3KXDrMi8vK42lDqXPLPs6rhreBAfQ2dsYyqhz6 +tg6FzZuXxxzEYyYtNgGh+zTji99WCBMLbCmRcDurRjdTDO7m4O3PrwbGUy0xdLeb +HJiNGvUDCPH4bfwLiNqwVIZY38RBCAqbCnrqRhDaZIfAUev4mq3Kqh6KUeO/U7Vx +/5J5rL5ApREKOfWPATHMprBuEU2rs3N+MPBA04HoiFlu311urCxVEA1qsZCTkoCg +GHuDIVSU4E4hT4co95/J0to4zWgPlfPg1+cXyU8lAIMe7JdCGkG9cDe7Umw/GSbt +ZdoCMQZ6WyyiW2Hw+7sFD3V3VzYa5YA/rjKZRduPmGWKrs+mAa5J5pM2M22rrjbd +EpfTHWLS9s6cPN3/jxpCxn6Hv/KhIYRAcIterugag1+clvS1ajVjxBRavOxPBsf+ +hYdh7S5NTZnT98gjkc3yOuGQm7BPtXau+IYZRlWcB0dJ4/E2P69hmWQezSo9VVWh +5/K1RkbPvqTGZQIDAQABo2YwZDAdBgNVHQ4EFgQUPD06L8zVggN9mcRY8eHbNu+t +DUEwHwYDVR0jBBgwFoAURTWK6ARqnZkz8rktUc5PrtasIh8wEgYDVR0TAQH/BAgw +BgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwDQYJKoZIhvcNAQELBQADggIBAGqaGBuL +2J6Na8RHx/GmSeuZFiVcWhmd/I9bVpeMSYHSZujA2nay6OGaUYs0Lq/G5OKgsuT9 +AIHnsn7VUE1zqoDfXac/K8sXlOig8if7rTb+06jgymaP1YSELg3R+pBsdkZnXVil +izh/9FvzoyV+QQlIhojqCIybVFgxa1XFHq4QCPhDfwkg+tp9RctfwNmWgsJ63H19 +RmxN+H2xIrySvObwXnB4j6D4wvgu468QXQMEuSsnLcIQFg6Zteqe8fixbqTiOTBf +Dk1k+EpB9VMEkIPvMdfa48vseXdBEe6Ma9zGuJC76q4q1ZapVLTvOUP5Y24khlgd +cj5tfP7o7yc6HqymfXAcD1lzP2JQhqaRxA4I18Nrd+aHi+G1EM2c3cicuD3n6Iw9 +9oqdCwmMfS25fv5cyA5B6hRusIZ9wRopTi7at+JHl0GIt/FelaTYI7kRmAqgakQe +oEKLpXcH8lRJW802DmXm7ka4eQzwxa7Ngyf8O+JOFtGO0+EshuLJovxiPl6IyLyG +NJ/dHq3ad+46YVManbHdyjHxgT5PSvJFkq0Yluvf44NIyP5QRTCAvfH76bu7hXgS +QoQj5t5ILn6meQRTR79r2iwpQTanPLTEdoZvmrE4TeUBev9BA5KpiPPA3i3ZF/oV +0EYorXCNri7M/jylGW7AuWvNUyaVR6xgxAn6 +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFujCCA6KgAwIBAgIJAKhPL9BkNaFGMA0GCSqGSIb3DQEBCwUAMGoxEjAQBgNV +BAMMCVZlY3RvciBDQTEPMA0GA1UECwwGVmVjdG9yMRAwDgYDVQQKDAdEYXRhZG9n +MREwDwYDVQQIDAhOZXcgWW9yazERMA8GA1UEBwwITmV3IFlvcmsxCzAJBgNVBAYT +AlVTMB4XDTIyMDYwNzIyMjc1MloXDTQyMDYwMjIyMjc1MlowajESMBAGA1UEAwwJ +VmVjdG9yIENBMQ8wDQYDVQQLDAZWZWN0b3IxEDAOBgNVBAoMB0RhdGFkb2cxETAP +BgNVBAgMCE5ldyBZb3JrMREwDwYDVQQHDAhOZXcgWW9yazELMAkGA1UEBhMCVVMw +ggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQC9c1T+NXTNmqiiV36NSEJt +7mo0cyv8Byk2ZGdC85vHBm45QDY5USoh0vgonzPpWgSMggPn1WbR0f1y+LBwXdlM ++ZyZh2RVVeUrSjJ88lLHVn4DfywpdDkwQaFj1VmOsj2I9rMMrgc5x5n1Hj7lwZ+t +uPVSAGmgKp4iFfzLph9r/rjP1TUAnVUComfTUVS+Gd7zoGPOc14cMJXG6g2P2aAU +P6dg5uQlTxRmagnlx7bwm3lRwv6LMtnAdnjwBDBxr933nucAnk21GgE92GejiO3Z +OwlzIdzBI23lPcWi5pq+vCTgAArNq24W1Ha+7Jn5QewNTGKFyyYAJetZAwCUR8QS +Ip++2GE2pNhaGqcV5u1Tbwl02eD6p2qRqjfgLxmb+aC6xfl0n9kiFGPZppjCqDEW +sw+gX66nf+qxZVRWpJon2kWcFvhTnLqoa3T3+9+KIeamz2lW6wxMnki/Co2EA1Wa +mmedaUUcRPCgMx9aCktRkMyH6bEY8/vfJ07juxUsszOc46T00Scmn6Vkuo9Uc3Kf +2Q2N6Wo4jtyAiMO4gAwq5kzzpBAhNgRfLHOb83r2gAUj2Y4Vln/UUR/KR8ZbJi4i +r1BjX16Lz3yblJXXb1lp4uZynlbHNaAevXyGlRqHddM2ykKtAX/vgJcZRGSvms11 +uce/cqzrzx60AhpLRma5CwIDAQABo2MwYTAdBgNVHQ4EFgQURTWK6ARqnZkz8rkt +Uc5PrtasIh8wHwYDVR0jBBgwFoAURTWK6ARqnZkz8rktUc5PrtasIh8wDwYDVR0T +AQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAYYwDQYJKoZIhvcNAQELBQADggIBAEf5 +TR3hq/DtSAmsYotu1lAWz/OlTpG+7AdqSOHB878X4ETN3xaQ+KWvSwvf0K70ZDTV +tFOTh/r43cpzPifPKd1P+2ctnQEzrBtAacvyETLq1ABRK9VJOtfJ6Xk5KZXPhKdY +t353PQgBgW8YzQ2adq2B7FtgIlX7f1DIndjcMZBbolETR6xt9QwB/UnPI7Mwt01T ++bCBhr1fWAbZ4YAMlQ0xRam4qUOTjxgfmePrmSrv4HO7cXHMsRMLiXk+BLcx959/ +K/B6xzpzn6366Eqnqlo/uDiMpo5ud2I/Snz5PduB6oLztPMEf/8RmkG5tpHXYdWr +tM64WqNGO+ikluIrrtYvtyZS4DfsLAMfMYZcxX/Uw56gHo0i2c8I6+6JvGWdvOJ0 +FjrsKeIQoRlV77z025kI4V9jKi3XNMEsAIH+W7KNSut0X80yX7SugvQGoe0GDkXu +0fy8hMC3uTN2LEycYFRRfoIeKPLi6OZFK0PdS2E15d8PEU3n3W4eBCPgMtmiOKLY +d8QNBC8XLAuBoK9R8luCJpOJWUcFXjLpjcDab4V2hKTuAs+GQyDh/Xx4wF1yHX0r +zIkyN0EkOD/SvD8X4uFaM4mdsAh+ucn4ryUV7i5PgvDM9z4InHAMAee1ebBl0U+h ++NzMWF5c5OwxD5o6/Wh1HopmzJiVNT2v9u0kHT/f +-----END CERTIFICATE----- diff --git a/tests/data/ca/intermediate_server/certs/pulsar.cert.pem b/tests/data/ca/intermediate_server/certs/pulsar.cert.pem new file mode 100644 index 0000000000000..2a83a2e04258e --- /dev/null +++ b/tests/data/ca/intermediate_server/certs/pulsar.cert.pem @@ -0,0 +1,32 @@ +-----BEGIN CERTIFICATE----- +MIIFgzCCA2ugAwIBAgICEAgwDQYJKoZIhvcNAQELBQAwazELMAkGA1UEBhMCVVMx +ETAPBgNVBAgMCE5ldyBZb3JrMRAwDgYDVQQKDAdEYXRhZG9nMQ8wDQYDVQQLDAZW +ZWN0b3IxJjAkBgNVBAMMHVZlY3RvciBJbnRlcm1lZGlhdGUgU2VydmVyIENBMB4X +DTI0MTIzMDA1MzE1OVoXDTM0MTIyODA1MzE1OVowZzELMAkGA1UEBhMCVVMxETAP +BgNVBAgMCE5ldyBZb3JrMREwDwYDVQQHDAhOZXcgWW9yazEQMA4GA1UECgwHRGF0 +YWRvZzEPMA0GA1UECwwGVmVjdG9yMQ8wDQYDVQQDDAZwdWxzYXIwggEiMA0GCSqG +SIb3DQEBAQUAA4IBDwAwggEKAoIBAQChjtFd/pz494HLGdR5yMAgBYNKTlItM1Eb +1XAMWuTNAYhOITkBoAE2v7Dg9Z9gQLr6/HK99z2dfAKBFvaoz4aWcquk8TGOGjfY +TJamJuGhzfPYv4g5zJq15B+RIwd7/WkH1PzAk94xclBsnDob5nQ70SdZuKwF4R60 +8n6fegPSSMxG0PIgmwJnX1Gwz5RBuomZZn9onQvZTdJqyHpPSQYBHQXgtNGwPS2d +3FAPBxlGu+OajwDfkcLPUFz3tPf4t2iSitCqtQu/aGH450syx/hmwtZq63aReEkG +DOLXBq5MrGqDAzKIltbWoBYWgx6pkk56XcWv83AlxqvauHBmCN+9AgMBAAGjggEz +MIIBLzAJBgNVHRMEAjAAMBEGCWCGSAGG+EIBAQQEAwIGQDAzBglghkgBhvhCAQ0E +JhYkT3BlblNTTCBHZW5lcmF0ZWQgU2VydmVyIENlcnRpZmljYXRlMB0GA1UdDgQW +BBSP3J69fVPZ+HDbxHtOcgbm16ZK3jCBlQYDVR0jBIGNMIGKgBQ8PTovzNWCA32Z +xFjx4ds2760NQaFupGwwajESMBAGA1UEAwwJVmVjdG9yIENBMQ8wDQYDVQQLDAZW +ZWN0b3IxEDAOBgNVBAoMB0RhdGFkb2cxETAPBgNVBAgMCE5ldyBZb3JrMREwDwYD +VQQHDAhOZXcgWW9yazELMAkGA1UEBhMCVVOCAhAAMA4GA1UdDwEB/wQEAwIFoDAT +BgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsFAAOCAgEAaED846BaRdU7 +GGNJcAk5Xzq6vjO5gG+0xMzfiG/oQEIizNDJ1PXAN343QUDHR7q+ISRY7w+nr3Ar +pHSSdn3iPzznfVWWhtPnMhDxcvZPlwCR+TXwbkhNMQyK/K+pnRBUjZzWqIT2h0fG +vr6b6TsVc445ED/fb2W3HKEwOTxeiLFRAsvS/Z5pbKYGP5qTNQgkGD6WBx9vQvrT +1zQvZNo9vur41dYwPdcRqkvc/lB9I69zGSG78xzAc48ZRKbrhZ/zigD63FBFzcaL +FeCsAIg2fzp3dw6SrUhIzsnDxML79kd1cTyZv9IeRWx0Af6DDRVLZahC1fh+26cH +a9yt4002e9bbardqmyoX71Jc6ybmDhSik32PYYkE53c/qEwCuGy8/jm1UW5QixNw +ntsBw4iuf8Q3MTf46l9DmBzfxMOK0CX/aDoPCnj9BJ3UT+eviyvwvYzXn0hJyfLZ +taC9HLDJV2Xx3ggLO8wBGXlCqvwDH+EzNaSFIRGgY1gacjcYMWBPgddUgJ/mgsH4 +28J3Q8QS5jjXUaDtTuDRkWhF5j801YOpgCXA4jVFvUmw+aqbkRArWz/iXFxTe2UP +Q1F7GIUhalRtO2Fti7Tb+KfUB5ybs2S7NQ9HUi2SQBCnTZKEFGW6RahHMC9mn3E9 +Na6qmTaFImoYHs7zrfGEFcEowaQlMcI= +-----END CERTIFICATE----- diff --git a/tests/data/ca/intermediate_server/csr/pulsar.csr.pem b/tests/data/ca/intermediate_server/csr/pulsar.csr.pem new file mode 100644 index 0000000000000..42c5e55ba3b61 --- /dev/null +++ b/tests/data/ca/intermediate_server/csr/pulsar.csr.pem @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICrDCCAZQCAQAwZzEPMA0GA1UEAwwGcHVsc2FyMQ8wDQYDVQQLDAZWZWN0b3Ix +EDAOBgNVBAoMB0RhdGFkb2cxETAPBgNVBAgMCE5ldyBZb3JrMREwDwYDVQQHDAhO +ZXcgWW9yazELMAkGA1UEBhMCVVMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQChjtFd/pz494HLGdR5yMAgBYNKTlItM1Eb1XAMWuTNAYhOITkBoAE2v7Dg +9Z9gQLr6/HK99z2dfAKBFvaoz4aWcquk8TGOGjfYTJamJuGhzfPYv4g5zJq15B+R +Iwd7/WkH1PzAk94xclBsnDob5nQ70SdZuKwF4R608n6fegPSSMxG0PIgmwJnX1Gw +z5RBuomZZn9onQvZTdJqyHpPSQYBHQXgtNGwPS2d3FAPBxlGu+OajwDfkcLPUFz3 +tPf4t2iSitCqtQu/aGH450syx/hmwtZq63aReEkGDOLXBq5MrGqDAzKIltbWoBYW +gx6pkk56XcWv83AlxqvauHBmCN+9AgMBAAGgADANBgkqhkiG9w0BAQsFAAOCAQEA +Vz+KO9XWHosTrmB3ZkYdKZt2ktItXLkWqFMggWDotdKdTNj1OaLfxX4Dmq4aO+Xw +hOlYKh40PEi0+3BqWhjysyssZTs/fQWmXtY+qyBF3fG+8wPwnhUvAczcSKVJqIIW +nKoixmHm1yrOGCGEraMldS3cfn1H8KBzWZKqY5oX0TIo323C/W54BcepJUjsFM+W +Fe5mO/rjeg/XCzbs7mFDNlv958BJ6but3Q6jUEjf7FhhVRyaNwIuzfDB0qbmEdQU +6djIqwEGs5MpAymLAvO+mIhIiYU3CxhSGLp0o85FYrKchk4joMD6U1Y18gEjrCye ++EOtC2qh34fI/6vZYYQAjw== +-----END CERTIFICATE REQUEST----- diff --git a/tests/data/ca/intermediate_server/index.txt b/tests/data/ca/intermediate_server/index.txt index ab4efe2c2f6bd..0402ad0946130 100644 --- a/tests/data/ca/intermediate_server/index.txt +++ b/tests/data/ca/intermediate_server/index.txt @@ -5,3 +5,4 @@ V 320613195026Z 1004 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/C V 320613195253Z 1005 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=kafka V 320731200837Z 1006 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=dufs-https V 330412000039Z 1007 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=rabbitmq +V 341228053159Z 1008 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=pulsar diff --git a/tests/data/ca/intermediate_server/index.txt.old b/tests/data/ca/intermediate_server/index.txt.old index c76d7b6e20f0d..ab4efe2c2f6bd 100644 --- a/tests/data/ca/intermediate_server/index.txt.old +++ b/tests/data/ca/intermediate_server/index.txt.old @@ -4,3 +4,4 @@ V 320613194901Z 1003 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/C V 320613195026Z 1004 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=postgres V 320613195253Z 1005 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=kafka V 320731200837Z 1006 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=dufs-https +V 330412000039Z 1007 unknown /C=US/ST=New York/L=New York/O=Datadog/OU=Vector/CN=rabbitmq diff --git a/tests/data/ca/intermediate_server/newcerts/1008.pem b/tests/data/ca/intermediate_server/newcerts/1008.pem new file mode 100644 index 0000000000000..2a83a2e04258e --- /dev/null +++ b/tests/data/ca/intermediate_server/newcerts/1008.pem @@ -0,0 +1,32 @@ +-----BEGIN CERTIFICATE----- +MIIFgzCCA2ugAwIBAgICEAgwDQYJKoZIhvcNAQELBQAwazELMAkGA1UEBhMCVVMx +ETAPBgNVBAgMCE5ldyBZb3JrMRAwDgYDVQQKDAdEYXRhZG9nMQ8wDQYDVQQLDAZW +ZWN0b3IxJjAkBgNVBAMMHVZlY3RvciBJbnRlcm1lZGlhdGUgU2VydmVyIENBMB4X +DTI0MTIzMDA1MzE1OVoXDTM0MTIyODA1MzE1OVowZzELMAkGA1UEBhMCVVMxETAP +BgNVBAgMCE5ldyBZb3JrMREwDwYDVQQHDAhOZXcgWW9yazEQMA4GA1UECgwHRGF0 +YWRvZzEPMA0GA1UECwwGVmVjdG9yMQ8wDQYDVQQDDAZwdWxzYXIwggEiMA0GCSqG +SIb3DQEBAQUAA4IBDwAwggEKAoIBAQChjtFd/pz494HLGdR5yMAgBYNKTlItM1Eb +1XAMWuTNAYhOITkBoAE2v7Dg9Z9gQLr6/HK99z2dfAKBFvaoz4aWcquk8TGOGjfY +TJamJuGhzfPYv4g5zJq15B+RIwd7/WkH1PzAk94xclBsnDob5nQ70SdZuKwF4R60 +8n6fegPSSMxG0PIgmwJnX1Gwz5RBuomZZn9onQvZTdJqyHpPSQYBHQXgtNGwPS2d +3FAPBxlGu+OajwDfkcLPUFz3tPf4t2iSitCqtQu/aGH450syx/hmwtZq63aReEkG +DOLXBq5MrGqDAzKIltbWoBYWgx6pkk56XcWv83AlxqvauHBmCN+9AgMBAAGjggEz +MIIBLzAJBgNVHRMEAjAAMBEGCWCGSAGG+EIBAQQEAwIGQDAzBglghkgBhvhCAQ0E +JhYkT3BlblNTTCBHZW5lcmF0ZWQgU2VydmVyIENlcnRpZmljYXRlMB0GA1UdDgQW +BBSP3J69fVPZ+HDbxHtOcgbm16ZK3jCBlQYDVR0jBIGNMIGKgBQ8PTovzNWCA32Z +xFjx4ds2760NQaFupGwwajESMBAGA1UEAwwJVmVjdG9yIENBMQ8wDQYDVQQLDAZW +ZWN0b3IxEDAOBgNVBAoMB0RhdGFkb2cxETAPBgNVBAgMCE5ldyBZb3JrMREwDwYD +VQQHDAhOZXcgWW9yazELMAkGA1UEBhMCVVOCAhAAMA4GA1UdDwEB/wQEAwIFoDAT +BgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsFAAOCAgEAaED846BaRdU7 +GGNJcAk5Xzq6vjO5gG+0xMzfiG/oQEIizNDJ1PXAN343QUDHR7q+ISRY7w+nr3Ar +pHSSdn3iPzznfVWWhtPnMhDxcvZPlwCR+TXwbkhNMQyK/K+pnRBUjZzWqIT2h0fG +vr6b6TsVc445ED/fb2W3HKEwOTxeiLFRAsvS/Z5pbKYGP5qTNQgkGD6WBx9vQvrT +1zQvZNo9vur41dYwPdcRqkvc/lB9I69zGSG78xzAc48ZRKbrhZ/zigD63FBFzcaL +FeCsAIg2fzp3dw6SrUhIzsnDxML79kd1cTyZv9IeRWx0Af6DDRVLZahC1fh+26cH +a9yt4002e9bbardqmyoX71Jc6ybmDhSik32PYYkE53c/qEwCuGy8/jm1UW5QixNw +ntsBw4iuf8Q3MTf46l9DmBzfxMOK0CX/aDoPCnj9BJ3UT+eviyvwvYzXn0hJyfLZ +taC9HLDJV2Xx3ggLO8wBGXlCqvwDH+EzNaSFIRGgY1gacjcYMWBPgddUgJ/mgsH4 +28J3Q8QS5jjXUaDtTuDRkWhF5j801YOpgCXA4jVFvUmw+aqbkRArWz/iXFxTe2UP +Q1F7GIUhalRtO2Fti7Tb+KfUB5ybs2S7NQ9HUi2SQBCnTZKEFGW6RahHMC9mn3E9 +Na6qmTaFImoYHs7zrfGEFcEowaQlMcI= +-----END CERTIFICATE----- diff --git a/tests/data/ca/intermediate_server/private/pulsar.key.pem b/tests/data/ca/intermediate_server/private/pulsar.key.pem new file mode 100644 index 0000000000000..455e171c03f93 --- /dev/null +++ b/tests/data/ca/intermediate_server/private/pulsar.key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAoY7RXf6c+PeByxnUecjAIAWDSk5SLTNRG9VwDFrkzQGITiE5 +AaABNr+w4PWfYEC6+vxyvfc9nXwCgRb2qM+GlnKrpPExjho32EyWpibhoc3z2L+I +OcyateQfkSMHe/1pB9T8wJPeMXJQbJw6G+Z0O9EnWbisBeEetPJ+n3oD0kjMRtDy +IJsCZ19RsM+UQbqJmWZ/aJ0L2U3Sash6T0kGAR0F4LTRsD0tndxQDwcZRrvjmo8A +35HCz1Bc97T3+LdokorQqrULv2hh+OdLMsf4ZsLWaut2kXhJBgzi1wauTKxqgwMy +iJbW1qAWFoMeqZJOel3Fr/NwJcar2rhwZgjfvQIDAQABAoIBAQCRDSJLayPSoJ5H +FPzhDu34khqLp4A+lvl2rQ55+U2+Bmc0Br4hCGCuoDMV91HN4LBAUlJO3uHSOJoi +/tQdQp4LDA+x+t/MPxvutuz3ZqdkV/5cl7KeemZZcuUosTOyDhaz8nWwDdMLwCAX +M7k0fZfUcPgMpTLF4JZqTeKEGs+9hzOsXMYLEsJX1oo3/ITb88DA2Uh0zFBZeGaG +wgb8tuXLpbwJY8KQTP9Y24UPEWW5hYNvSzBTKqGmfhQ4o/zDFU688ZYraQ+w3qxG +/dQjKiwlZPwyLq6LPhb160lZ/5inWaOH36vKbqLuToZ9m4VHSAfVTWFjhzQkYULB +cwmDPbcdAoGBAM7UcSNDZz0YtwluAml2r/yIjZ/cyI/4WnSJm7RVoIwZVvubDslk +sseWs0k7TmxQH/4npgV7nuZ2GtuOOtJ97UQ/ejfvBFIDA4EkEdVZNeFokE9Ab9Lt +iE9Nym9Ep5u5Y58ibVHTLpaBxap6i6tkEehWy4Fk4tfBLyJ8PL7JLyoDAoGBAMf3 +Ja9ovBFhkgT6s+COEVbkfLg07hL5stUqps61TfzqdcQ/M6KVfvC4cGr0vcRRehC8 +IlbK+3nW1j1DhUlsxXYYTYGLPWaf6IwkV7GiC6tDOr+OA795/ms/QbF47toXH2Ft +NvW3veBBSTz0viSsJpvLdth5cNfT/FeHVBo++oM/AoGAQ/QQZ+GRXcVs/bAjIrtX +/sRuo4NKceLK9VbwzUMEyILazTeYmBp5kpG0ve66SWPZ3wrvxQVko5tSH6iPMvEk +rzOb2Byzcq6CzD8pjMsOpMxR/XfSRZ69FjEcvYn093jlfjc1a5jSyahBZU117g2m +xIsfuZeH4BMchVwEKTq4QDkCgYApKRRWwOZFZaIa6EfyZwvmqO2LNUn0GfXXrwna +7rL45oILPT8xrjgM8MojGfGd4W+Q1kjzOKD++VvsDGP5MyyKHIKHsdOXtj616h0q +8UIZpKMJHwBif4gBJ2osT8pKlgvdkA+KEKJC8O1UYMRq5AymcQErmgPCSV5d3ftP +07rZHQKBgET20wFb0gN3kM7RLqz8xgPuu0ZVIoPr9+lZTqRMe0U2eI/WKFijiQ2L ++xiYsJkhsMY1nQc4uXiuUl2aGwbQRIIspP8H9qKjK7kzfadERCUu1I1qvHWwTwkp +1GNzsoKNMDf9le9DuEOMfP+TePOhPGf5GtEKeHZmiIMr/YhnDPUc +-----END RSA PRIVATE KEY----- diff --git a/tests/data/ca/intermediate_server/serial b/tests/data/ca/intermediate_server/serial index 617ba1c154075..6cb3869343bf8 100644 --- a/tests/data/ca/intermediate_server/serial +++ b/tests/data/ca/intermediate_server/serial @@ -1 +1 @@ -1008 +1009 diff --git a/tests/data/ca/intermediate_server/serial.old b/tests/data/ca/intermediate_server/serial.old index fb35a14c02716..617ba1c154075 100644 --- a/tests/data/ca/intermediate_server/serial.old +++ b/tests/data/ca/intermediate_server/serial.old @@ -1 +1 @@ -1007 +1008 From 42b7875b188df6fcd2e6ceb11001517340513417 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 18:17:19 +0900 Subject: [PATCH 03/16] add tls options to source --- scripts/integration/pulsar/test.yaml | 2 +- src/sources/pulsar.rs | 116 ++++++++++++++++++++++++--- 2 files changed, 104 insertions(+), 14 deletions(-) diff --git a/scripts/integration/pulsar/test.yaml b/scripts/integration/pulsar/test.yaml index 824f0e0f290d4..b629a582d0a62 100644 --- a/scripts/integration/pulsar/test.yaml +++ b/scripts/integration/pulsar/test.yaml @@ -4,7 +4,7 @@ features: test_filter: '::pulsar::integration_tests::' env: - PULSAR_ADDRESS: pulsar://pulsar:6650 + PULSAR_HOST: pulsar matrix: version: [latest] diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index ad0922ec93375..6470dff1b705e 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -10,6 +10,7 @@ use pulsar::{ message::proto::MessageIdData, Authentication, Consumer, Pulsar, SubType, TokioExecutor, }; +use std::path::Path; use tokio_util::codec::FramedRead; use vector_lib::{ @@ -32,6 +33,7 @@ use vector_lib::{ }; use vrl::{owned_value_path, path, value::Kind}; +use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH; use crate::{ codecs::{Decoder, DecodingConfig}, config::{SourceConfig, SourceContext}, @@ -100,6 +102,10 @@ pub struct PulsarSourceConfig { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + #[configurable(derived)] + #[serde(default)] + tls_options: Option, } /// Authentication configuration. @@ -172,6 +178,25 @@ struct DeadLetterQueuePolicy { pub dead_letter_topic: String, } +#[configurable_component] +#[configurable(description = "TLS options configuration for the Pulsar client.")] +#[derive(Clone, Debug)] +pub struct TlsOptions { + /// File path containing a list of PEM encoded certificates + #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] + pub certificate_chain_file: String, + + /// Allow insecure TLS connection if set to true + /// + /// Set to false if not specified. + pub allow_insecure_connection: Option, + + /// Whether hostname verification is enabled when insecure TLS connection is allowed + /// + /// Set to true if not specified. + pub tls_hostname_verification_enabled: Option, +} + #[derive(Debug)] struct FinalizerEntry { topic: String, @@ -263,10 +288,19 @@ impl PulsarSourceConfig { ), }; } + if let Some(options) = &self.tls_options { + builder = + builder.with_certificate_chain_file(Path::new(&options.certificate_chain_file))?; + builder = builder + .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); + builder = builder.with_tls_hostname_verification_enabled( + options.tls_hostname_verification_enabled.unwrap_or(true), + ); + } let pulsar = builder.build().await?; - let mut consumer_builder = pulsar + let mut consumer_builder: pulsar::ConsumerBuilder = pulsar .consumer() .with_topics(&self.topics) .with_subscription_type(SubType::Shared) @@ -524,36 +558,83 @@ mod integration_tests { use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS}; use crate::test_util::{collect_n, random_string, trace_init}; - fn pulsar_address() -> String { - std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into()) + fn pulsar_host() -> String { + std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into()) } + fn pulsar_address(scheme: &str, port: u16) -> String { + format!("{}://{}:{}", scheme, pulsar_host(), port) + } #[tokio::test] async fn consumes_event_with_acknowledgements() { - pulsar_send_receive(true, LogNamespace::Legacy).await; + pulsar_send_receive( + &pulsar_address("pulsar", 6650), + true, + LogNamespace::Legacy, + None, + ) + .await; } #[tokio::test] async fn consumes_event_with_acknowledgements_vector_namespace() { - pulsar_send_receive(true, LogNamespace::Vector).await; + pulsar_send_receive( + &pulsar_address("pulsar", 6650), + true, + LogNamespace::Vector, + None, + ) + .await; } #[tokio::test] async fn consumes_event_without_acknowledgements() { - pulsar_send_receive(false, LogNamespace::Legacy).await; + pulsar_send_receive( + &pulsar_address("pulsar", 6650), + false, + LogNamespace::Legacy, + None, + ) + .await; } #[tokio::test] async fn consumes_event_without_acknowledgements_vector_namespace() { - pulsar_send_receive(false, LogNamespace::Vector).await; + pulsar_send_receive( + &pulsar_address("pulsar", 6650), + false, + LogNamespace::Vector, + None, + ) + .await; } - async fn pulsar_send_receive(acknowledgements: bool, log_namespace: LogNamespace) { + #[tokio::test] + async fn consumes_event_with_tls() { + pulsar_send_receive( + &pulsar_address("pulsar+ssl", 6651), + false, + LogNamespace::Vector, + Some(TlsOptions { + certificate_chain_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), + allow_insecure_connection: None, + tls_hostname_verification_enabled: None, + }), + ) + .await; + } + + async fn pulsar_send_receive( + endpoint: &str, + acknowledgements: bool, + log_namespace: LogNamespace, + tls_options: Option, + ) { trace_init(); let topic = format!("test-{}", random_string(10)); let cnf = PulsarSourceConfig { - endpoint: pulsar_address(), + endpoint: endpoint.into(), topics: vec![topic.clone()], consumer_name: None, subscription_name: None, @@ -565,12 +646,21 @@ mod integration_tests { decoding: DeserializerConfig::Bytes, acknowledgements: acknowledgements.into(), log_namespace: None, + tls_options: tls_options.clone(), }; + let mut builder = Pulsar::::builder(&cnf.endpoint, TokioExecutor); + if let Some(options) = &tls_options { + builder = builder + .with_certificate_chain_file(Path::new(&options.certificate_chain_file)) + .unwrap(); + builder = builder + .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); + builder = builder.with_tls_hostname_verification_enabled( + options.tls_hostname_verification_enabled.unwrap_or(true), + ); + } - let pulsar = Pulsar::::builder(&cnf.endpoint, TokioExecutor) - .build() - .await - .unwrap(); + let pulsar = builder.build().await.unwrap(); let consumer = cnf.create_consumer().await.unwrap(); let decoder = DecodingConfig::new( From 7daae8e52d5b7971d253515e7c12fb4c90323087 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 19:13:13 +0900 Subject: [PATCH 04/16] add changelog for tls_options --- changelog.d/pulsar_sink_source_support_tls_optoins | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/pulsar_sink_source_support_tls_optoins diff --git a/changelog.d/pulsar_sink_source_support_tls_optoins b/changelog.d/pulsar_sink_source_support_tls_optoins new file mode 100644 index 0000000000000..4049971378cf1 --- /dev/null +++ b/changelog.d/pulsar_sink_source_support_tls_optoins @@ -0,0 +1,3 @@ +Tls options to set custom certifcate chain are now available for `pulsar` sink and source. + +authers:pomacanthidae \ No newline at end of file From 12f973bd56ac00bf90281abb405b5ad3ed32f158 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 19:23:59 +0900 Subject: [PATCH 05/16] rename changelog --- ..._optoins => pulsar_sink_source_support_tls_optoins.feature.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{pulsar_sink_source_support_tls_optoins => pulsar_sink_source_support_tls_optoins.feature.md} (100%) diff --git a/changelog.d/pulsar_sink_source_support_tls_optoins b/changelog.d/pulsar_sink_source_support_tls_optoins.feature.md similarity index 100% rename from changelog.d/pulsar_sink_source_support_tls_optoins rename to changelog.d/pulsar_sink_source_support_tls_optoins.feature.md From 22340edb52f0a51fcf6246fa4e7cd48203ceac99 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 19:35:00 +0900 Subject: [PATCH 06/16] move unused import to test --- src/sources/pulsar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index 6470dff1b705e..3087de8d6d8d1 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -33,7 +33,6 @@ use vector_lib::{ }; use vrl::{owned_value_path, path, value::Kind}; -use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH; use crate::{ codecs::{Decoder, DecodingConfig}, config::{SourceConfig, SourceContext}, @@ -557,6 +556,7 @@ mod integration_tests { use crate::config::log_schema; use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS}; use crate::test_util::{collect_n, random_string, trace_init}; + use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH; fn pulsar_host() -> String { std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into()) From 3b9afb3de2d7d3362e78f5fc2af5707330f8f892 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 19:47:48 +0900 Subject: [PATCH 07/16] add cue for tls options --- .../components/sinks/base/pulsar.cue | 29 +++++++++++++++++++ .../components/sources/base/pulsar.cue | 29 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index b1cfed8a1e5a5..8676514e73992 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -539,6 +539,35 @@ base: components: sinks: pulsar: configuration: { required: false type: string: {} } + tls_options: { + description: "TLS options configuration for the Pulsar client." + required: false + type: object: options: { + allow_insecure_connection: { + description: """ + Allow insecure TLS connection if set to true + + Set to false if not specified. + """ + required: false + type: bool: {} + } + certificate_chain_file: { + description: "File path containing a list of PEM encoded certificates" + required: true + type: string: examples: ["/etc/certs/chain.pem"] + } + tls_hostname_verification_enabled: { + description: """ + Whether hostname verification is enabled when insecure TLS connection is allowed + + Set to true if not specified. + """ + required: false + type: bool: {} + } + } + } topic: { description: "The Pulsar topic name to write events to." required: true diff --git a/website/cue/reference/components/sources/base/pulsar.cue b/website/cue/reference/components/sources/base/pulsar.cue index 14452ffe67932..d566ee8b9c5c2 100644 --- a/website/cue/reference/components/sources/base/pulsar.cue +++ b/website/cue/reference/components/sources/base/pulsar.cue @@ -541,6 +541,35 @@ base: components: sources: pulsar: configuration: { required: false type: string: examples: ["subscription_name"] } + tls_options: { + description: "TLS options configuration for the Pulsar client." + required: false + type: object: options: { + allow_insecure_connection: { + description: """ + Allow insecure TLS connection if set to true + + Set to false if not specified. + """ + required: false + type: bool: {} + } + certificate_chain_file: { + description: "File path containing a list of PEM encoded certificates" + required: true + type: string: examples: ["/etc/certs/chain.pem"] + } + tls_hostname_verification_enabled: { + description: """ + Whether hostname verification is enabled when insecure TLS connection is allowed + + Set to true if not specified. + """ + required: false + type: bool: {} + } + } + } topics: { description: "The Pulsar topic names to read events from." required: true From 292bc4a89b07451b7caeb784214ce07f6bdac6ed Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Thu, 9 Jan 2025 20:20:22 +0900 Subject: [PATCH 08/16] fix typos --- changelog.d/pulsar_sink_source_support_tls_options.feature.md | 3 +++ changelog.d/pulsar_sink_source_support_tls_optoins.feature.md | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 changelog.d/pulsar_sink_source_support_tls_options.feature.md delete mode 100644 changelog.d/pulsar_sink_source_support_tls_optoins.feature.md diff --git a/changelog.d/pulsar_sink_source_support_tls_options.feature.md b/changelog.d/pulsar_sink_source_support_tls_options.feature.md new file mode 100644 index 0000000000000..0761859b6b5b9 --- /dev/null +++ b/changelog.d/pulsar_sink_source_support_tls_options.feature.md @@ -0,0 +1,3 @@ +Tls options to set custom certificate chain are now available for `pulsar` sink and source. + +authors: pomacanthidae \ No newline at end of file diff --git a/changelog.d/pulsar_sink_source_support_tls_optoins.feature.md b/changelog.d/pulsar_sink_source_support_tls_optoins.feature.md deleted file mode 100644 index 4049971378cf1..0000000000000 --- a/changelog.d/pulsar_sink_source_support_tls_optoins.feature.md +++ /dev/null @@ -1,3 +0,0 @@ -Tls options to set custom certifcate chain are now available for `pulsar` sink and source. - -authers:pomacanthidae \ No newline at end of file From 66821d880245535f0611176b68991c9cff809533 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Mon, 13 Jan 2025 11:13:00 +0900 Subject: [PATCH 09/16] rename fields --- src/sinks/pulsar/config.rs | 12 +++++------- src/sinks/pulsar/integration_tests.rs | 4 ++-- src/sources/pulsar.rs | 23 ++++++++++------------- 3 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index f0860b7e747f5..0e58ada47d181 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -216,7 +216,7 @@ pub struct CustomConnectionRetryOptions { pub struct PulsarTlsOptions { /// File path containing a list of PEM encoded certificates #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] - pub certificate_chain_file: String, + pub ca_file: String, /// Allow insecure TLS connection if set to true /// @@ -226,7 +226,7 @@ pub struct PulsarTlsOptions { /// Whether hostname verification is enabled when insecure TLS connection is allowed /// /// Set to true if not specified. - pub tls_hostname_verification_enabled: Option, + pub verify_hostname: Option, } impl Default for PulsarSinkConfig { @@ -317,13 +317,11 @@ impl PulsarSinkConfig { builder = builder.with_operation_retry_options(operation_retry_opts); if let Some(options) = &self.tls_options { - builder = - builder.with_certificate_chain_file(Path::new(&options.certificate_chain_file))?; + builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?; builder = builder .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); - builder = builder.with_tls_hostname_verification_enabled( - options.tls_hostname_verification_enabled.unwrap_or(true), - ); + builder = builder + .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true)); } builder.build().map_err(|e| e.into()).await } diff --git a/src/sinks/pulsar/integration_tests.rs b/src/sinks/pulsar/integration_tests.rs index 02a11e20ddded..930183bffc84c 100644 --- a/src/sinks/pulsar/integration_tests.rs +++ b/src/sinks/pulsar/integration_tests.rs @@ -98,9 +98,9 @@ async fn pulsar_happy_tls() { let cnf = PulsarSinkConfig { endpoint: pulsar_address("pulsar+ssl", 6651), tls_options: Some(PulsarTlsOptions { - certificate_chain_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), + ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), allow_insecure_connection: None, - tls_hostname_verification_enabled: None, + verify_hostname: None, }), // overriden by test ..Default::default() diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index 3087de8d6d8d1..14c0158527d70 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -183,7 +183,7 @@ struct DeadLetterQueuePolicy { pub struct TlsOptions { /// File path containing a list of PEM encoded certificates #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] - pub certificate_chain_file: String, + pub ca_file: String, /// Allow insecure TLS connection if set to true /// @@ -193,7 +193,7 @@ pub struct TlsOptions { /// Whether hostname verification is enabled when insecure TLS connection is allowed /// /// Set to true if not specified. - pub tls_hostname_verification_enabled: Option, + pub verify_hostname: Option, } #[derive(Debug)] @@ -288,13 +288,11 @@ impl PulsarSourceConfig { }; } if let Some(options) = &self.tls_options { - builder = - builder.with_certificate_chain_file(Path::new(&options.certificate_chain_file))?; + builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?; builder = builder .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); - builder = builder.with_tls_hostname_verification_enabled( - options.tls_hostname_verification_enabled.unwrap_or(true), - ); + builder = builder + .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true)); } let pulsar = builder.build().await?; @@ -616,9 +614,9 @@ mod integration_tests { false, LogNamespace::Vector, Some(TlsOptions { - certificate_chain_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), + ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), allow_insecure_connection: None, - tls_hostname_verification_enabled: None, + verify_hostname: None, }), ) .await; @@ -651,13 +649,12 @@ mod integration_tests { let mut builder = Pulsar::::builder(&cnf.endpoint, TokioExecutor); if let Some(options) = &tls_options { builder = builder - .with_certificate_chain_file(Path::new(&options.certificate_chain_file)) + .with_certificate_chain_file(Path::new(&options.ca_file)) .unwrap(); builder = builder .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); - builder = builder.with_tls_hostname_verification_enabled( - options.tls_hostname_verification_enabled.unwrap_or(true), - ); + builder = builder + .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true)); } let pulsar = builder.build().await.unwrap(); From e0b7868e445891bb09c82fb73314fc778ed0ed2b Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Mon, 13 Jan 2025 11:13:08 +0900 Subject: [PATCH 10/16] fix format --- changelog.d/pulsar_sink_source_support_tls_options.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/pulsar_sink_source_support_tls_options.feature.md b/changelog.d/pulsar_sink_source_support_tls_options.feature.md index 0761859b6b5b9..66aeecabb74b3 100644 --- a/changelog.d/pulsar_sink_source_support_tls_options.feature.md +++ b/changelog.d/pulsar_sink_source_support_tls_options.feature.md @@ -1,3 +1,3 @@ Tls options to set custom certificate chain are now available for `pulsar` sink and source. -authors: pomacanthidae \ No newline at end of file +authors: pomacanthidae From 028a18366683030688e379800c7fdcc986368a4d Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Tue, 14 Jan 2025 09:07:50 +0900 Subject: [PATCH 11/16] update cue --- website/cue/reference/components/sinks/base/pulsar.cue | 4 ++-- website/cue/reference/components/sources/base/pulsar.cue | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index 8676514e73992..30665ed4507e4 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -552,12 +552,12 @@ base: components: sinks: pulsar: configuration: { required: false type: bool: {} } - certificate_chain_file: { + ca_file: { description: "File path containing a list of PEM encoded certificates" required: true type: string: examples: ["/etc/certs/chain.pem"] } - tls_hostname_verification_enabled: { + verify_hostname: { description: """ Whether hostname verification is enabled when insecure TLS connection is allowed diff --git a/website/cue/reference/components/sources/base/pulsar.cue b/website/cue/reference/components/sources/base/pulsar.cue index d566ee8b9c5c2..7d92c49fede62 100644 --- a/website/cue/reference/components/sources/base/pulsar.cue +++ b/website/cue/reference/components/sources/base/pulsar.cue @@ -554,12 +554,12 @@ base: components: sources: pulsar: configuration: { required: false type: bool: {} } - certificate_chain_file: { + ca_file: { description: "File path containing a list of PEM encoded certificates" required: true type: string: examples: ["/etc/certs/chain.pem"] } - tls_hostname_verification_enabled: { + verify_hostname: { description: """ Whether hostname verification is enabled when insecure TLS connection is allowed From ed01093b8506a1dec3502446412208d395187ce0 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Fri, 17 Jan 2025 23:16:57 +0900 Subject: [PATCH 12/16] rename to verify_certificate --- src/sinks/pulsar/config.rs | 14 +++++++------- src/sinks/pulsar/integration_tests.rs | 2 +- src/sources/pulsar.rs | 18 +++++++++--------- .../reference/components/sinks/base/pulsar.cue | 18 +++++++++--------- .../components/sources/base/pulsar.cue | 18 +++++++++--------- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index 0e58ada47d181..4118e0fc76305 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -214,16 +214,16 @@ pub struct CustomConnectionRetryOptions { #[configurable(description = "TLS options configuration for the Pulsar client.")] #[derive(Clone, Debug)] pub struct PulsarTlsOptions { - /// File path containing a list of PEM encoded certificates + /// File path containing a list of PEM encoded certificates. #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] pub ca_file: String, - /// Allow insecure TLS connection if set to true + /// Enables certificate verification. /// - /// Set to false if not specified. - pub allow_insecure_connection: Option, + /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + pub verify_certificate: Option, - /// Whether hostname verification is enabled when insecure TLS connection is allowed + /// Whether hostname verification is enabled when verify_certificate is false. /// /// Set to true if not specified. pub verify_hostname: Option, @@ -318,8 +318,8 @@ impl PulsarSinkConfig { if let Some(options) = &self.tls_options { builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?; - builder = builder - .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); + builder = + builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true)); builder = builder .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true)); } diff --git a/src/sinks/pulsar/integration_tests.rs b/src/sinks/pulsar/integration_tests.rs index 930183bffc84c..3b3fb58fd32b8 100644 --- a/src/sinks/pulsar/integration_tests.rs +++ b/src/sinks/pulsar/integration_tests.rs @@ -99,7 +99,7 @@ async fn pulsar_happy_tls() { endpoint: pulsar_address("pulsar+ssl", 6651), tls_options: Some(PulsarTlsOptions { ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), - allow_insecure_connection: None, + verify_certificate: None, verify_hostname: None, }), // overriden by test diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index 14c0158527d70..7a0a875e21ade 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -185,12 +185,12 @@ pub struct TlsOptions { #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] pub ca_file: String, - /// Allow insecure TLS connection if set to true + /// Enables certificate verification. /// - /// Set to false if not specified. - pub allow_insecure_connection: Option, + /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + pub verify_certificate: Option, - /// Whether hostname verification is enabled when insecure TLS connection is allowed + /// Whether hostname verification is enabled when verify_certificate is false /// /// Set to true if not specified. pub verify_hostname: Option, @@ -289,8 +289,8 @@ impl PulsarSourceConfig { } if let Some(options) = &self.tls_options { builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?; - builder = builder - .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); + builder = + builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true)); builder = builder .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true)); } @@ -615,7 +615,7 @@ mod integration_tests { LogNamespace::Vector, Some(TlsOptions { ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), - allow_insecure_connection: None, + verify_certificate: None, verify_hostname: None, }), ) @@ -651,8 +651,8 @@ mod integration_tests { builder = builder .with_certificate_chain_file(Path::new(&options.ca_file)) .unwrap(); - builder = builder - .with_allow_insecure_connection(options.allow_insecure_connection.unwrap_or(false)); + builder = + builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true)); builder = builder .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true)); } diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index 30665ed4507e4..15ecfc22de039 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -543,23 +543,23 @@ base: components: sinks: pulsar: configuration: { description: "TLS options configuration for the Pulsar client." required: false type: object: options: { - allow_insecure_connection: { + ca_file: { + description: "File path containing a list of PEM encoded certificates." + required: true + type: string: examples: ["/etc/certs/chain.pem"] + } + verify_certificate: { description: """ - Allow insecure TLS connection if set to true + Enables certificate verification. - Set to false if not specified. + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. """ required: false type: bool: {} } - ca_file: { - description: "File path containing a list of PEM encoded certificates" - required: true - type: string: examples: ["/etc/certs/chain.pem"] - } verify_hostname: { description: """ - Whether hostname verification is enabled when insecure TLS connection is allowed + Whether hostname verification is enabled when verify_certificate is false. Set to true if not specified. """ diff --git a/website/cue/reference/components/sources/base/pulsar.cue b/website/cue/reference/components/sources/base/pulsar.cue index 7d92c49fede62..4837835c305b2 100644 --- a/website/cue/reference/components/sources/base/pulsar.cue +++ b/website/cue/reference/components/sources/base/pulsar.cue @@ -545,23 +545,23 @@ base: components: sources: pulsar: configuration: { description: "TLS options configuration for the Pulsar client." required: false type: object: options: { - allow_insecure_connection: { + ca_file: { + description: "File path containing a list of PEM encoded certificates" + required: true + type: string: examples: ["/etc/certs/chain.pem"] + } + verify_certificate: { description: """ - Allow insecure TLS connection if set to true + Enables certificate verification. - Set to false if not specified. + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. """ required: false type: bool: {} } - ca_file: { - description: "File path containing a list of PEM encoded certificates" - required: true - type: string: examples: ["/etc/certs/chain.pem"] - } verify_hostname: { description: """ - Whether hostname verification is enabled when insecure TLS connection is allowed + Whether hostname verification is enabled when verify_certificate is false Set to true if not specified. """ From 20c7032ca23be193c2922135ab21750cb83a8e82 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Sat, 18 Jan 2025 08:24:19 +0900 Subject: [PATCH 13/16] fix format --- src/sinks/pulsar/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index 4118e0fc76305..171ebe0207327 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -218,7 +218,7 @@ pub struct PulsarTlsOptions { #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] pub ca_file: String, - /// Enables certificate verification. + /// Enables certificate verification. /// /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. pub verify_certificate: Option, From 59f063490af1522516515056008c34245ac9b078 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Wed, 22 Jan 2025 07:43:31 +0900 Subject: [PATCH 14/16] rename tls_options to tls --- src/sinks/pulsar/config.rs | 6 +++--- src/sinks/pulsar/integration_tests.rs | 2 +- src/sources/pulsar.rs | 10 +++++----- website/cue/reference/components/sinks/base/pulsar.cue | 2 +- .../cue/reference/components/sources/base/pulsar.cue | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index 171ebe0207327..d565b73ae4a24 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -85,7 +85,7 @@ pub struct PulsarSinkConfig { #[configurable(derived)] #[serde(default)] - pub(crate) tls_options: Option, + pub(crate) tls: Option, } /// Event batching behavior. @@ -244,7 +244,7 @@ impl Default for PulsarSinkConfig { auth: None, acknowledgements: Default::default(), connection_retry_options: None, - tls_options: None, + tls: None, } } } @@ -316,7 +316,7 @@ impl PulsarSinkConfig { let operation_retry_opts = OperationRetryOptions::default(); builder = builder.with_operation_retry_options(operation_retry_opts); - if let Some(options) = &self.tls_options { + if let Some(options) = &self.tls { builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?; builder = builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true)); diff --git a/src/sinks/pulsar/integration_tests.rs b/src/sinks/pulsar/integration_tests.rs index 3b3fb58fd32b8..3b96ec05690d5 100644 --- a/src/sinks/pulsar/integration_tests.rs +++ b/src/sinks/pulsar/integration_tests.rs @@ -97,7 +97,7 @@ async fn pulsar_happy() { async fn pulsar_happy_tls() { let cnf = PulsarSinkConfig { endpoint: pulsar_address("pulsar+ssl", 6651), - tls_options: Some(PulsarTlsOptions { + tls: Some(PulsarTlsOptions { ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(), verify_certificate: None, verify_hostname: None, diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index 7a0a875e21ade..ba0480f89d173 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -104,7 +104,7 @@ pub struct PulsarSourceConfig { #[configurable(derived)] #[serde(default)] - tls_options: Option, + tls: Option, } /// Authentication configuration. @@ -287,7 +287,7 @@ impl PulsarSourceConfig { ), }; } - if let Some(options) = &self.tls_options { + if let Some(options) = &self.tls { builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?; builder = builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true)); @@ -626,7 +626,7 @@ mod integration_tests { endpoint: &str, acknowledgements: bool, log_namespace: LogNamespace, - tls_options: Option, + tls: Option, ) { trace_init(); @@ -644,10 +644,10 @@ mod integration_tests { decoding: DeserializerConfig::Bytes, acknowledgements: acknowledgements.into(), log_namespace: None, - tls_options: tls_options.clone(), + tls: tls.clone(), }; let mut builder = Pulsar::::builder(&cnf.endpoint, TokioExecutor); - if let Some(options) = &tls_options { + if let Some(options) = &tls { builder = builder .with_certificate_chain_file(Path::new(&options.ca_file)) .unwrap(); diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index 15ecfc22de039..ebf79edd5cd96 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -539,7 +539,7 @@ base: components: sinks: pulsar: configuration: { required: false type: string: {} } - tls_options: { + tls: { description: "TLS options configuration for the Pulsar client." required: false type: object: options: { diff --git a/website/cue/reference/components/sources/base/pulsar.cue b/website/cue/reference/components/sources/base/pulsar.cue index 4837835c305b2..881d4d7f2944d 100644 --- a/website/cue/reference/components/sources/base/pulsar.cue +++ b/website/cue/reference/components/sources/base/pulsar.cue @@ -541,7 +541,7 @@ base: components: sources: pulsar: configuration: { required: false type: string: examples: ["subscription_name"] } - tls_options: { + tls: { description: "TLS options configuration for the Pulsar client." required: false type: object: options: { From 85de10c5985602d1a4cf8408b7adaf355a28db8b Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Wed, 22 Jan 2025 07:43:43 +0900 Subject: [PATCH 15/16] fix changelog --- changelog.d/pulsar_sink_source_support_tls_options.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/pulsar_sink_source_support_tls_options.feature.md b/changelog.d/pulsar_sink_source_support_tls_options.feature.md index 66aeecabb74b3..1a326ece8d05d 100644 --- a/changelog.d/pulsar_sink_source_support_tls_options.feature.md +++ b/changelog.d/pulsar_sink_source_support_tls_options.feature.md @@ -1,3 +1,3 @@ -Tls options to set custom certificate chain are now available for `pulsar` sink and source. +The `pulsar` source and sink now support configuration of TLS options via the `tls` configuration field. authors: pomacanthidae From 1bfae12781e8d30c162fb2f66848ae4f13cd7789 Mon Sep 17 00:00:00 2001 From: pomacanthidae Date: Wed, 22 Jan 2025 08:05:40 +0900 Subject: [PATCH 16/16] fix format --- src/sources/pulsar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index ba0480f89d173..349279667c91e 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -185,7 +185,7 @@ pub struct TlsOptions { #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))] pub ca_file: String, - /// Enables certificate verification. + /// Enables certificate verification. /// /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. pub verify_certificate: Option,