Skip to content

Commit

Permalink
fix: decode performance and error when multiple lines
Browse files Browse the repository at this point in the history
  • Loading branch information
lisp-the-great committed Apr 16, 2022
1 parent 61db3aa commit c8ba6bd
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions tools/kafka-producer-performance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,11 @@ func (g *FileMessageGenerator) Generate(topic string, partition, messageLoad int
records := make([][]byte, 0, 64)
for r.Scan() {
if b := r.Bytes(); len(b) != 0 {
records = append(records, r.Bytes())
text, err := g.DecoderFunc(b)
if err != nil {
printErrorAndExit(69, "Failed to decode message data: %s", string(text))
}
records = append(records, text)
}
}
if err = r.Err(); err != nil {
Expand All @@ -306,15 +310,10 @@ func (g *FileMessageGenerator) Generate(topic string, partition, messageLoad int
log.Printf("FileMessageGenerator is generating %d messages from %d records\n", messageLoad, len(records))
go func() {
for i := 0; i < messageLoad; i++ {
text := records[i%len(records)]
payload, err := g.DecoderFunc(text)
if err != nil {
printErrorAndExit(69, "Failed to decode message data: %s", string(text))
}
messages <- &sarama.ProducerMessage{
Topic: topic,
Partition: int32(partition),
Value: sarama.ByteEncoder(payload),
Value: sarama.ByteEncoder(records[i%len(records)]),
}
}
close(messages)
Expand Down

0 comments on commit c8ba6bd

Please sign in to comment.