Skip to content

Commit

Permalink
Use partition-ring aware balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis committed Feb 7, 2025
1 parent d580b0b commit 988b42b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 18 deletions.
31 changes: 19 additions & 12 deletions pkg/limits/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import (
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/kafka/partitionring/consumer"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
)

const (
// Ring
RingKey = "ingest-limits"
RingName = "ingest-limits"

// Kafka
consumerGroup = "ingest-limits"
)

type metrics struct {
Expand Down Expand Up @@ -84,8 +88,9 @@ type IngestLimits struct {

cfg Config
logger log.Logger
client *kgo.Client
client *consumer.Client

partitionRing ring.PartitionRingReader
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

Expand All @@ -111,13 +116,14 @@ func (s *IngestLimits) TransferOut(_ context.Context) error {

// NewIngestLimits creates a new IngestLimits service. It initializes the metadata map and sets up a Kafka client
// The client is configured to consume stream metadata from a dedicated topic with the metadata suffix.
func NewIngestLimits(cfg Config, logger log.Logger, reg prometheus.Registerer) (*IngestLimits, error) {
func NewIngestLimits(cfg Config, partitionRing ring.PartitionRingReader, logger log.Logger, reg prometheus.Registerer) (*IngestLimits, error) {
var err error
s := &IngestLimits{
cfg: cfg,
logger: logger,
metadata: make(map[string]map[uint64]streamMetadata),
metrics: newMetrics(reg),
cfg: cfg,
logger: logger,
partitionRing: partitionRing,
metadata: make(map[string]map[uint64]streamMetadata),
metrics: newMetrics(reg),
}

// Initialize lifecycler
Expand All @@ -138,12 +144,13 @@ func NewIngestLimits(cfg Config, logger log.Logger, reg prometheus.Registerer) (
kCfg := cfg.KafkaConfig
kCfg.Topic = kafka.MetadataTopicFor(kCfg.Topic)

s.client, err = client.NewReaderClient(kCfg, metrics, logger,
kgo.ConsumerGroup("ingest-limits"),
kgo.ConsumeTopics(kCfg.Topic),
kgo.Balancers(kgo.RoundRobinBalancer()),
s.client, err = consumer.NewGroupClient(
kCfg,
partitionRing,
consumerGroup,
metrics,
logger,
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().Add(-s.cfg.WindowSize).UnixMilli())),
kgo.DisableAutoCommit(),
kgo.OnPartitionsAssigned(s.onPartitionsAssigned),
kgo.OnPartitionsRevoked(s.onPartitionsRevoked),
kgo.OnPartitionsLost(s.onPartitionsLost),
Expand Down
24 changes: 19 additions & 5 deletions pkg/limits/ingest_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

type mockPartitionRing struct {
ring.PartitionRingReader
}

func (m *mockPartitionRing) PartitionRing() *ring.PartitionRing {
return ring.NewPartitionRing(ring.PartitionRingDesc{
Partitions: map[int32]ring.PartitionDesc{
0: {Id: 0, Tokens: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}},
},
})
}

func TestIngestLimits_GetStreamUsage(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -140,8 +152,9 @@ func TestIngestLimits_GetStreamUsage(t *testing.T) {
ObservePeriod: 100 * time.Millisecond,
},
},
logger: log.NewNopLogger(),
metadata: tt.setupMetadata,
partitionRing: &mockPartitionRing{},
logger: log.NewNopLogger(),
metadata: tt.setupMetadata,
}

// Create request
Expand Down Expand Up @@ -193,8 +206,9 @@ func TestIngestLimits_GetStreamUsage_Concurrent(t *testing.T) {
ObservePeriod: 100 * time.Millisecond,
},
},
logger: log.NewNopLogger(),
metadata: metadata,
partitionRing: &mockPartitionRing{},
logger: log.NewNopLogger(),
metadata: metadata,
}

// Run concurrent requests
Expand Down Expand Up @@ -246,7 +260,7 @@ func TestNewIngestLimits(t *testing.T) {
},
}

s, err := NewIngestLimits(cfg, log.NewNopLogger(), prometheus.NewRegistry())
s, err := NewIngestLimits(cfg, &mockPartitionRing{}, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
require.NotNil(t, s)
require.NotNil(t, s.client)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func (t *Loki) setupModuleManager() error {
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, IngestLimitsFrontendRing},
IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV},
IngestLimits: {MemberlistKV, Server},
IngestLimits: {MemberlistKV, Server, PartitionRing},
IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV},
IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV},
Store: {Overrides, IndexGatewayRing},
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (t *Loki) initIngestLimits() (services.Service, error) {

ingestLimits, err := limits.NewIngestLimits(
t.Cfg.IngestLimits,
t.partitionRing,
util_log.Logger,
prometheus.DefaultRegisterer,
)
Expand Down

0 comments on commit 988b42b

Please sign in to comment.