Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching at kafka receiver #351

Open
srikanthccv opened this issue Jul 19, 2024 · 3 comments
Open

Batching at kafka receiver #351

srikanthccv opened this issue Jul 19, 2024 · 3 comments

Comments

@srikanthccv
Copy link
Member

Increasing the receive size doesn't help in reducing the inserts because the kafkareceiver sends each message to the next consumer. The ideal implementation should combine the individual message spans/logs/metrics into one big ResourceSpans/Logs/Metrics by appending them and sending them across the pipeline.

for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
start := time.Now()
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
if !c.messageMarking.After {
session.MarkMessage(message, "")
}
ctx := c.obsrecv.StartTracesOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))
traces, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)

@srikanthccv
Copy link
Member Author

There is work to move batching to exporter but It will take a while to get it to stable.

@grandwizard28
Copy link
Collaborator

Just curious, why not use the batch processor?

@srikanthccv
Copy link
Member Author

The batch processor queues the item and returns immediately. Kafka receiver then marks the message as consumed, despite it not yet being written to storage. This creates a risk of data loss if the collector crashes or the storage backend becomes unavailable for an extended period. We should only mark messages as consumed after receiving confirmation from ClickHouse that the data has been successfully written.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants