Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SASL and TLS config for Kafka client #4536

Merged
merged 19 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Codecov upload
uses: codecov/codecov-action@v4
Expand Down
60 changes: 46 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = "0.5"
# SCRAM-SHA-512 requires https://github.com/dequbed/rsasl/pull/48, https://github.com/influxdata/rskafka/pull/247
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "940c6030012c5b746fad819fb72e3325b26e39de", features = [
"transport-tls",
] }
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
Expand Down
5 changes: 5 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.<br/>**It's only used when the provider is `kafka`**. |
Expand Down
18 changes: 18 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,24 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
# - `PLAIN`
# - `SCRAM-SHA-256`
# - `SCRAM-SHA-512`
# [wal.sasl]
# type = "SCRAM-SHA-512"
# username = "user_kafka"
# password = "secret"

# The Kafka TLS configuration.
# **It's only used when the provider is `kafka`**.
# [wal.tls]
# server_ca_cert_path = "/path/to/server_cert"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"

# Example of using S3 as the storage.
# [storage]
# type = "S3"
Expand Down
18 changes: 18 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,24 @@ backoff_base = 2
## Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
# - `PLAIN`
# - `SCRAM-SHA-256`
# - `SCRAM-SHA-512`
# [wal.sasl]
# type = "SCRAM-SHA-512"
# username = "user_kafka"
# password = "secret"

# The Kafka TLS configuration.
# **It's only used when the provider is `kafka`**.
# [wal.tls]
# server_ca_cert_path = "/path/to/server_cert"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"

## The logging options.
[logging]
## The directory to store the log files.
Expand Down
40 changes: 40 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,28 @@ sync_period = "10s"
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
## **It's only used when the provider is `kafka`**.
num_topics = 64

## Topic selector type.
## Available selector types:
## - `round_robin` (default)
## **It's only used when the provider is `kafka`**.
selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"

## Expected number of replicas of each partition.
## **It's only used when the provider is `kafka`**.
replication_factor = 1

## Above which a topic creation operation will be cancelled.
## **It's only used when the provider is `kafka`**.
create_topic_timeout = "30s"

## The max size of a single producer batch.
## Warning: Kafka has a default limit of 1MB per message in a topic.
## **It's only used when the provider is `kafka`**.
Expand All @@ -196,6 +218,24 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
# - `PLAIN`
# - `SCRAM-SHA-256`
# - `SCRAM-SHA-512`
# [wal.sasl]
# type = "SCRAM-SHA-512"
# username = "user_kafka"
# password = "secret"

# The Kafka TLS configuration.
# **It's only used when the provider is `kafka`**.
# [wal.tls]
# server_ca_cert_path = "/path/to/server_cert"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"

## Metadata storage options.
[metadata_store]
## Kv file size in bytes.
Expand Down
10 changes: 9 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,13 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to create TLS Config"))]
TlsConfig {
#[snafu(implicit)]
location: Location,
source: common_wal::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },

Expand Down Expand Up @@ -714,7 +721,8 @@ impl ErrorExt for Error {
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| MismatchPrefix { .. }
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,
| DelimiterNotFound { .. }
| TlsConfig { .. } => StatusCode::InvalidArguments,

FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,
Expand Down
7 changes: 5 additions & 2 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn prepare_wal_options(

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;

Expand Down Expand Up @@ -166,7 +166,10 @@ mod tests {
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
Expand Down Expand Up @@ -117,15 +117,22 @@ impl TopicManager {
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
let broker_endpoints =
common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &self.config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &self.config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
let client = builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: self.config.broker_endpoints.clone(),
broker_endpoints: self.config.connection.broker_endpoints.clone(),
})?;

let control_client = client
Expand Down Expand Up @@ -242,7 +249,7 @@ impl TopicManager {

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;

use super::*;
Expand Down Expand Up @@ -289,7 +296,10 @@ mod tests {
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
Expand Down
3 changes: 3 additions & 0 deletions src/common/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls-native-certs = "0.7"
rustls-pemfile = "2.1"
serde.workspace = true
serde_with.workspace = true
snafu.workspace = true
Expand Down
Loading