Skip to content

Commit

Permalink
fixed data race in TestReadersWritersStress
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Dec 12, 2023
1 parent ed15d04 commit 0030c51
Showing 1 changed file with 32 additions and 8 deletions.
40 changes: 32 additions & 8 deletions tests/integration/topic_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func stressTestInATopic(
topicWriters, topicReaders int,
) error {
maxMessagesInBatch := 5
var createdMessagesCount xatomic.Int64
var readedMessagesCount xatomic.Int64
var writeStatusWriterSeqno sync.Map // map[writerId string] int64 - lastseqno
var readStatusWriterMaxSeqNo sync.Map // map[writerId string] int64 - lastseqno

var stopWrite xatomic.Bool

Expand All @@ -107,25 +107,29 @@ func stressTestInATopic(
writer, err := db.Topic().StartWriter(topicPath,
topicoptions.WithProducerID(producerID),
topicoptions.WithSyncWrite(true),
topicoptions.WithWriterSetAutoSeqNo(false),
)
if err != nil {
return xerrors.WithStackTrace(err)
}

seqNo := int64(0)
for !stopWrite.Load() {
messageCount := rand.Intn(maxMessagesInBatch) + 1 //nolint:gosec
var messages []topicwriter.Message
for i := 0; i < messageCount; i++ {
newMessageContent := createdMessagesCount.Add(1)
seqNo++
message := topicwriter.Message{
Data: strings.NewReader(strconv.FormatInt(newMessageContent, 10)),
SeqNo: seqNo,
Data: strings.NewReader(strconv.FormatInt(seqNo, 10) + "-content"),
}
messages = append(messages, message)
}
err = writer.Write(ctx, messages...)
if err != nil {
return err
}
writeStatusWriterSeqno.Store(producerID, seqNo)
}
return nil
}
Expand Down Expand Up @@ -157,7 +161,17 @@ func stressTestInATopic(
return err
}

readedMessagesCount.Add(1)
// store max readed seqno for every producer id
for {
val, _ := readStatusWriterMaxSeqNo.LoadOrStore(mess.ProducerID, int64(0))
oldSeq := val.(int64)
if mess.SeqNo <= oldSeq {
break
}
if readStatusWriterMaxSeqNo.CompareAndSwap(mess.ProducerID, val, mess.SeqNo) {

Check failure on line 171 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 23.1)

readStatusWriterMaxSeqNo.CompareAndSwap undefined (type sync.Map has no field or method CompareAndSwap)

Check failure on line 171 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 22.5)

readStatusWriterMaxSeqNo.CompareAndSwap undefined (type sync.Map has no field or method CompareAndSwap)

Check failure on line 171 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 23.2)

readStatusWriterMaxSeqNo.CompareAndSwap undefined (type sync.Map has no field or method CompareAndSwap)

Check failure on line 171 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 23.3)

readStatusWriterMaxSeqNo.CompareAndSwap undefined (type sync.Map has no field or method CompareAndSwap)
break
}
}
err = reader.Commit(ctx, mess)
if err != nil {
return err
Expand Down Expand Up @@ -200,9 +214,19 @@ func stressTestInATopic(
}

xtest.SpinWaitProgressWithTimeout(t, time.Minute, func() (progressValue interface{}, finished bool) {
createdMessages := createdMessagesCount.Load()
readedMessages := readedMessagesCount.Load()
return readedMessages, readedMessages == createdMessages
needReadMessages := int64(0)
writeStatusWriterSeqno.Range(func(key, value any) bool {

Check failure on line 218 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 23.1)

undefined: any

Check failure on line 218 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 22.5)

undefined: any

Check failure on line 218 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 23.2)

undefined: any

Check failure on line 218 in tests/integration/topic_stress_test.go

View workflow job for this annotation

GitHub Actions / integration (1.17.x, 23.3)

undefined: any
writtenSeqno := value.(int64)

readed, ok := readStatusWriterMaxSeqNo.Load(key)
if !ok {
readed = int64(0)
}
readedSeqNo := readed.(int64)
needReadMessages += writtenSeqno - readedSeqNo
return true
})
return needReadMessages, needReadMessages == 0
})

stopReader()
Expand Down

0 comments on commit 0030c51

Please sign in to comment.