Skip to content

Commit

Permalink
opt: use chan to reduce mem cost for messages
Browse files Browse the repository at this point in the history
  • Loading branch information
lisp-the-great authored and dnwe committed Aug 18, 2023
1 parent 50245fb commit e2712a4
Showing 1 changed file with 51 additions and 33 deletions.
84 changes: 51 additions & 33 deletions tools/kafka-producer-performance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,27 +244,38 @@ func parseVersion(version string) sarama.KafkaVersion {
}

type MessageGenerator interface {
Generate(topic string, partition, messageLoad int) []*sarama.ProducerMessage
Generate(topic string, partition, messageLoad int) <-chan *sarama.ProducerMessage
}

func makeMessageChan(messageLoad int) chan *sarama.ProducerMessage {
var size int = 65536
if messageLoad < 262144 {
size = messageLoad / 4
}
return make(chan *sarama.ProducerMessage, size)
}

type RandomMessageGenerator struct {
MessageSize int
}

func (g *RandomMessageGenerator) Generate(topic string, partition, messageLoad int) []*sarama.ProducerMessage {
messages := make([]*sarama.ProducerMessage, messageLoad)
for i := 0; i < messageLoad; i++ {
payload := make([]byte, g.MessageSize)
if _, err := rand.Read(payload); err != nil {
printErrorAndExit(69, "Failed to generate message payload: %s", err)
}
messages[i] = &sarama.ProducerMessage{
Topic: topic,
Partition: int32(partition),
Value: sarama.ByteEncoder(payload),
func (g *RandomMessageGenerator) Generate(topic string, partition, messageLoad int) <-chan *sarama.ProducerMessage {
messages := makeMessageChan(messageLoad)
go func() {
log.Printf("RandomMessageGenerator is generating %d messages\n", messageLoad)
for i := 0; i < messageLoad; i++ {
payload := make([]byte, g.MessageSize)
if _, err := rand.Read(payload); err != nil {
printErrorAndExit(69, "Failed to generate message payload: %s", err)
}
messages <- &sarama.ProducerMessage{
Topic: topic,
Partition: int32(partition),
Value: sarama.ByteEncoder(payload),
}
}
}
log.Printf("RandomMessageGenerator has generated %d messages\n", len(messages))
close(messages)
}()
return messages
}

Expand All @@ -273,8 +284,8 @@ type FileMessageGenerator struct {
DecoderFunc DecoderFunc
}

func (g *FileMessageGenerator) Generate(topic string, partition, messageLoad int) []*sarama.ProducerMessage {
messages := make([]*sarama.ProducerMessage, messageLoad)
func (g *FileMessageGenerator) Generate(topic string, partition, messageLoad int) <-chan *sarama.ProducerMessage {
messages := makeMessageChan(messageLoad)
in, err := os.Open(g.MessageFile)
if err != nil {
printErrorAndExit(69, "Failed to open message file: %v", err)
Expand All @@ -291,19 +302,23 @@ func (g *FileMessageGenerator) Generate(topic string, partition, messageLoad int
if err = r.Err(); err != nil {
printErrorAndExit(69, "Failed to scan message file: %v", err)
}
for i := 0; i < messageLoad; i++ {
text := records[i%len(records)]
payload, err := g.DecoderFunc(text)
if err != nil {
printErrorAndExit(69, "Failed to decode message data: %s", string(text))
}
messages[i] = &sarama.ProducerMessage{
Topic: topic,
Partition: int32(partition),
Value: sarama.ByteEncoder(payload),

log.Printf("FileMessageGenerator is generating %d messages from %d records\n", messageLoad, len(records))
go func() {
for i := 0; i < messageLoad; i++ {
text := records[i%len(records)]
payload, err := g.DecoderFunc(text)
if err != nil {
printErrorAndExit(69, "Failed to decode message data: %s", string(text))
}
messages <- &sarama.ProducerMessage{
Topic: topic,
Partition: int32(partition),
Value: sarama.ByteEncoder(payload),
}
}
}
log.Printf("FileMessageGenerator has generated %d messages from %d records\n", len(messages), len(records))
close(messages)
}()
return messages
}

Expand Down Expand Up @@ -413,6 +428,7 @@ func main() {
cancel()
<-done
}

func runAsyncProducer(topic string, partition, messageLoad int, messageGenerator MessageGenerator,
config *sarama.Config, brokers []string, throughput int) {
producer, err := sarama.NewAsyncProducer(brokers, config)
Expand Down Expand Up @@ -443,15 +459,17 @@ func runAsyncProducer(topic string, partition, messageLoad int, messageGenerator

if throughput > 0 {
ticker := time.NewTicker(time.Second)
for idx, message := range messages {
var idx int = 0
for message := range messages {
producer.Input() <- message
if (idx+1)%throughput == 0 {
<-ticker.C
}
idx++
}
ticker.Stop()
} else {
for _, message := range messages {
for message := range messages {
producer.Input() <- message
}
}
Expand All @@ -474,7 +492,7 @@ func runSyncProducer(topic string, partition, messageLoad, routines int, message
}
}()

messages := make([][]*sarama.ProducerMessage, routines)
messages := make([]<-chan *sarama.ProducerMessage, routines)
for i := 0; i < routines; i++ {
if i == routines-1 {
messages[i] = messageGenerator.Generate(topic, partition, messageLoad/routines+messageLoad%routines)
Expand All @@ -490,7 +508,7 @@ func runSyncProducer(topic string, partition, messageLoad, routines int, message
wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second)
for _, message := range messages {
for message := range messages {
for i := 0; i < throughput; i++ {
_, _, err = producer.SendMessage(message)
if err != nil {
Expand All @@ -508,7 +526,7 @@ func runSyncProducer(topic string, partition, messageLoad, routines int, message
messages := messages
wg.Add(1)
go func() {
for _, message := range messages {
for message := range messages {
_, _, err = producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to send message: %s", err)
Expand Down

0 comments on commit e2712a4

Please sign in to comment.