Skip to content

Commit

Permalink
feat(kafka): tenant topics (#15977)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Jan 30, 2025
1 parent f163e44 commit c258419
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 16 deletions.
22 changes: 22 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,28 @@ otlp_config:
# Enable writes to Ingesters during Push requests. Defaults to true.
# CLI flag: -distributor.ingester-writes-enabled
[ingester_writes_enabled: <boolean> | default = true]
tenant_topic:
# Enable the tenant topic tee, which writes logs to Kafka topics based on
# tenant IDs instead of using multitenant topics/partitions.
# CLI flag: -distributor.tenant-topic-tee.enabled
[enabled: <boolean> | default = false]
# Prefix to prepend to tenant IDs to form the final Kafka topic name
# CLI flag: -distributor.tenant-topic-tee.topic-prefix
[topic_prefix: <string> | default = "loki.tenant"]
# Maximum number of bytes that can be buffered before producing to Kafka
# CLI flag: -distributor.tenant-topic-tee.max-buffered-bytes
[max_buffered_bytes: <int> | default = 100MiB]
# Maximum size of a single Kafka record in bytes
# CLI flag: -distributor.tenant-topic-tee.max-record-size-bytes
[max_record_size_bytes: <int> | default = 15MiB249KiB]
# Topic strategy to use. Valid values are 'simple' or 'automatic'
# CLI flag: -distributor.tenant-topic-tee.strategy
[strategy: <string> | default = "simple"]
```

### etcd
Expand Down
17 changes: 17 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Config struct {
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
KafkaConfig kafka.Config `yaml:"-"`

// TODO: cleanup config
TenantTopic TenantTopicConfig `yaml:"tenant_topic" category:"experimental"`
}

// RegisterFlags registers distributor-related flags.
Expand All @@ -107,6 +110,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)
cfg.TenantTopic.RegisterFlags(fs)
fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.")
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
Expand All @@ -116,6 +120,9 @@ func (cfg *Config) Validate() error {
if !cfg.KafkaEnabled && !cfg.IngesterEnabled {
return fmt.Errorf("at least one of kafka and ingestor writes must be enabled")
}
if err := cfg.TenantTopic.Validate(); err != nil {
return errors.Wrap(err, "validating tenant topic config")
}
return nil
}

Expand Down Expand Up @@ -246,6 +253,16 @@ func New(
}
kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("loki_", registerer))

// TODO: cleanup/make independent of whether we write kafka as primary?
if cfg.TenantTopic.Enabled {
w, err := NewTenantTopicWriter(cfg.TenantTopic, kafkaClient, overrides, registerer, logger)
if err != nil {
return nil, fmt.Errorf("failed to start tenant topic tee: %w", err)
}

tee = WrapTee(tee, w)
}
}

d := &Distributor{
Expand Down
7 changes: 0 additions & 7 deletions pkg/distributor/ratestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,6 @@ func (s *rateStore) aggregateByShard(ctx context.Context, streamRates map[string
return rates
}

func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

func (s *rateStore) getRates(ctx context.Context, clients []ingesterClient) map[string]map[uint64]*logproto.StreamRate {
if s.debug {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
Expand Down
Loading

0 comments on commit c258419

Please sign in to comment.