diff --git a/tests/integration/topic_stress_test.go b/tests/integration/topic_stress_test.go index 78ca25bf1..74211237b 100644 --- a/tests/integration/topic_stress_test.go +++ b/tests/integration/topic_stress_test.go @@ -86,8 +86,8 @@ func stressTestInATopic( topicWriters, topicReaders int, ) error { maxMessagesInBatch := 5 - var createdMessagesCount xatomic.Int64 - var readedMessagesCount xatomic.Int64 + var writeStatusWriterSeqno sync.Map // map[writerId string] int64 - lastseqno + var readStatusWriterMaxSeqNo sync.Map // map[writerId string] int64 - lastseqno var stopWrite xatomic.Bool @@ -107,18 +107,21 @@ func stressTestInATopic( writer, err := db.Topic().StartWriter(topicPath, topicoptions.WithProducerID(producerID), topicoptions.WithSyncWrite(true), + topicoptions.WithWriterSetAutoSeqNo(false), ) if err != nil { return xerrors.WithStackTrace(err) } + seqNo := int64(0) for !stopWrite.Load() { messageCount := rand.Intn(maxMessagesInBatch) + 1 //nolint:gosec var messages []topicwriter.Message for i := 0; i < messageCount; i++ { - newMessageContent := createdMessagesCount.Add(1) + seqNo++ message := topicwriter.Message{ - Data: strings.NewReader(strconv.FormatInt(newMessageContent, 10)), + SeqNo: seqNo, + Data: strings.NewReader(strconv.FormatInt(seqNo, 10) + "-content"), } messages = append(messages, message) } @@ -126,6 +129,7 @@ func stressTestInATopic( if err != nil { return err } + writeStatusWriterSeqno.Store(producerID, seqNo) } return nil } @@ -157,7 +161,17 @@ func stressTestInATopic( return err } - readedMessagesCount.Add(1) + // store max readed seqno for every producer id + for { + val, _ := readStatusWriterMaxSeqNo.LoadOrStore(mess.ProducerID, int64(0)) + oldSeq := val.(int64) + if mess.SeqNo <= oldSeq { + break + } + if readStatusWriterMaxSeqNo.CompareAndSwap(mess.ProducerID, val, mess.SeqNo) { + break + } + } err = reader.Commit(ctx, mess) if err != nil { return err @@ -200,9 +214,19 @@ func stressTestInATopic( } xtest.SpinWaitProgressWithTimeout(t, time.Minute, func() (progressValue interface{}, finished bool) { - createdMessages := createdMessagesCount.Load() - readedMessages := readedMessagesCount.Load() - return readedMessages, readedMessages == createdMessages + needReadMessages := int64(0) + writeStatusWriterSeqno.Range(func(key, value any) bool { + writtenSeqno := value.(int64) + + readed, ok := readStatusWriterMaxSeqNo.Load(key) + if !ok { + readed = int64(0) + } + readedSeqNo := readed.(int64) + needReadMessages += writtenSeqno - readedSeqNo + return true + }) + return needReadMessages, needReadMessages == 0 }) stopReader()