Skip to content

Commit

Permalink
Refactor Reader.Read API (#49)
Browse files Browse the repository at this point in the history
The API was inconsistent with Writer.Write() and was error-prone.

The new API is consistent with Writer.Write(). Both Writer.Write()
and Reader.Read() now work with a Record that is an exporter field
in the Writer/Reader struct. This makes it obvious that the Record
is owned by the struct. Previously it was not clear if the
pointer to the record returned by Reader.Read() was owned by the
caller and could be modified and what it's lifetime was.
Now it is more intuitively clear that the caller does not own the
record and should not touch it and that the next Read() operation
is going to overwrite it.

Also added pkg.ReadOptions to support reading options. The struct
does not have any options yet, but options are coming in a future PR.
  • Loading branch information
tigrannajaryan authored Feb 28, 2025
1 parent 636488c commit c5c038a
Show file tree
Hide file tree
Showing 22 changed files with 128 additions and 133 deletions.
5 changes: 1 addition & 4 deletions benchmarks/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,10 @@ func BenchmarkSTEFReaderRead(b *testing.B) {
}

for {
readRecord, err := reader.Read()
err := reader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
if readRecord == nil {
panic("nil record")
}
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/cmd/stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/splunk/stef/go/otel/oteltef"
"github.com/splunk/stef/go/pkg"

"github.com/splunk/stef/benchmarks/cmd"
)
Expand Down Expand Up @@ -141,13 +142,14 @@ func CollectRecordStats(reader *oteltef.MetricsReader, fileStats *MetricsStats)
}

for {
record, err := reader.Read()
err := reader.Read(pkg.ReadOptions{})
if err != nil {
if err == io.EOF {
break
}
return err
}
record := &reader.Record

fileStats.DatapointCount++
fileStats.UniqueMetricNames[string(record.Metric().Name())] = true
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/cmd/stefbench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func readBench(inputFilePath string) {
recordCount := 0
start := time.Now()
for {
record, err := reader.Read()
record = record
err := reader.Read(pkg.ReadOptions{})
if err != nil {
if err == io.EOF {
break
Expand Down Expand Up @@ -108,13 +107,14 @@ func writeBench(inputFilePath string) {
recordCount := 0
start := time.Now()
for {
record, err := reader.Read()
err = reader.Read(pkg.ReadOptions{})
if err != nil {
if err == io.EOF {
break
}
log.Fatalf("Error reading from %s: %v", inputFilePath, err)
}
record := &reader.Record

if record.IsEnvelopeModified() {
writer.Record.Envelope().CopyFrom(record.Envelope())
Expand Down
22 changes: 1 addition & 21 deletions benchmarks/correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func TestTEFMultiPart(t *testing.T) {
"testdata/hostandcollector-otelmetrics.zst",
}

//stefEncoding := stef.STEFEncoding{}
tefEncoding := stef.STEFEncoding{}

for _, inputFile := range testInputOtlpFiles {
Expand All @@ -106,46 +105,27 @@ func TestTEFMultiPart(t *testing.T) {
parts, err := testutils.ReadMultipartOTLPFile(inputFile)
require.NoError(t, err)

//stefStream, err := stefEncoding.StartMultipart("")
//require.NoError(t, err)

tefStream, err := tefEncoding.StartMultipart("")
require.NoError(t, err)

for _, part := range parts {
//err = stefStream.AppendPart(part)
//require.NoError(t, err)
err = tefStream.AppendPart(part)
require.NoError(t, err)
}
//stefBytes, err := stefStream.FinishStream()
//require.NoError(t, err)

tefBytes, err := tefStream.FinishStream()
require.NoError(t, err)

//stefReader, err := metrics.NewReader(bytes.NewBuffer(stefBytes))
//require.NoError(t, err)

tefReader, err := oteltef.NewMetricsReader(bytes.NewBuffer(tefBytes))
require.NoError(t, err)

i := 0
for {
//stefRec, err := stefReader.Read()
//stefRec = stefRec
//if err == io.EOF {
// break
//}
//require.NoError(t, err)

tefRec, err := tefReader.Read()
err := tefReader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
require.NoError(t, err, i)
require.NotNil(t, tefRec, i)
//oteltef.EqualRecord(t, stefRec, tefRec)
i++
}
},
Expand Down
5 changes: 1 addition & 4 deletions benchmarks/encodings/stef/stef.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@ func (d *STEFEncoding) Decode(b []byte) (any, error) {
}

for {
readRecord, err := r.Read()
err := r.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
if readRecord == nil {
panic("nil record")
}
if err != nil {
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions benchmarks/encodings/stef/stefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,10 @@ func (d *STEFSEncoding) Decode(b []byte) (any, error) {
}

for {
readRecord, err := r.Read()
err := r.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
if readRecord == nil {
panic("nil record")
}
if err != nil {
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions benchmarks/encodings/stef/stefu.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@ func (d *STEFUEncoding) Decode(b []byte) (any, error) {
}

for {
readRecord, err := r.Read()
err := r.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
if readRecord == nil {
panic("nil record")
}
if err != nil {
return nil, err
}
Expand Down
30 changes: 8 additions & 22 deletions benchmarks/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCopy(t *testing.T) {

recCount := 0
for {
readRec, err := tefReader.Read()
err := tefReader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
Expand All @@ -50,7 +50,7 @@ func TestCopy(t *testing.T) {
_ = recCount
}

copyModified(&tefWriter.Record, readRec)
copyModified(&tefWriter.Record, &tefReader.Record)

err = tefWriter.Write()
require.NoError(t, err)
Expand Down Expand Up @@ -89,18 +89,15 @@ func BenchmarkReadSTEF(b *testing.B) {

recCount := 0
for {
readRec, err := tefSrc.Read()
err := tefSrc.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
if readRec == nil {
panic("nil record")
}
if err != nil {
panic(err)
}

copyModified(&tefWriter.Record, readRec)
copyModified(&tefWriter.Record, &tefSrc.Record)

err = tefWriter.Write()
if err != nil {
Expand All @@ -121,8 +118,7 @@ func BenchmarkReadSTEF(b *testing.B) {
}

for i := 0; i < recCount; i++ {
tefRec, err := reader.Read()
_ = tefRec
err := reader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
Expand All @@ -148,8 +144,7 @@ func BenchmarkReadSTEFZ(b *testing.B) {

recCount = 0
for {
tefRec, err := reader.Read()
_ = tefRec
err := reader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
Expand Down Expand Up @@ -182,24 +177,15 @@ func BenchmarkReadSTEFZWriteSTEF(b *testing.B) {

recCount = 0
for {
readRec, err := tefReader.Read()
if err == io.EOF {
break
}
err := tefReader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}
if err == io.EOF {
break
}
if readRec == nil {
panic("nil record")
}
if err != nil {
panic(err)
}

copyModified(&tefWriter.Record, readRec)
copyModified(&tefWriter.Record, &tefReader.Record)

err = tefWriter.Write()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ func TestSTEFVeryShortFrames(t *testing.T) {
require.NoError(t, err)

for {
readRecord, err := tefReader.Read()
err := tefReader.Read(pkg.ReadOptions{})
if err == io.EOF {
break
}

tefWriter.Record.CopyFrom(readRecord)
tefWriter.Record.CopyFrom(&tefReader.Record)

err = tefWriter.Write()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions go/otel/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func TestGrpcWriteRead(t *testing.T) {
require.NoError(t, err)

// Read and verify that received records match what was sent.
record, err := reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.EqualValues(t, "abc", record.Metric().Name())
require.EqualValues(t, "abc", reader.Record.Metric().Name())

// Send acknowledgment to the client.
err = ackFunc(reader.RecordCount())
Expand Down Expand Up @@ -181,9 +181,9 @@ func TestDictReset(t *testing.T) {

// Read and verify that received records match what was sent.
for i, metricName := range metricNames {
record, err := reader.Read()
err := reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
assert.EqualValues(t, metricName, record.Metric().Name(), i)
assert.EqualValues(t, metricName, reader.Record.Metric().Name(), i)
}

// Send acknowledgment to the client.
Expand Down
39 changes: 16 additions & 23 deletions go/otel/manual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,21 @@ func TestWriterDictLimit(t *testing.T) {
reader, err := oteltef.NewMetricsReader(bytes.NewBuffer(buf.Bytes()))
require.NoError(t, err)

readRecord, err := reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.NotNil(t, readRecord)
assert.EqualValues(t, "cpu.utilization", readRecord.Metric().Name())
assert.EqualValues(t, "cpu.utilization", reader.Record.Metric().Name())

readRecord, err = reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.NotNil(t, readRecord)
assert.EqualValues(t, schemaUrl1, readRecord.Resource().SchemaURL())
assert.EqualValues(t, schemaUrl1, reader.Record.Resource().SchemaURL())

readRecord, err = reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.NotNil(t, readRecord)
assert.EqualValues(t, schemaUrl2, readRecord.Resource().SchemaURL())
assert.EqualValues(t, schemaUrl2, reader.Record.Resource().SchemaURL())

readRecord, err = reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.NotNil(t, readRecord)
assert.EqualValues(t, schemaUrl2, readRecord.Resource().SchemaURL())
assert.EqualValues(t, schemaUrl2, reader.Record.Resource().SchemaURL())
}

func TestWriterFrameLimit(t *testing.T) {
Expand Down Expand Up @@ -295,11 +291,10 @@ func TestAnyValue(t *testing.T) {
require.NoError(t, err)

for i := 0; i < len(writeAttrs); i++ {
readRecord, err := reader.Read()
err := reader.Read(pkg.ReadOptions{})
require.NoError(t, err, i)
require.NotNil(t, readRecord, i)

readAttr := tefToMap(readRecord.Attributes())
readAttr := tefToMap(reader.Record.Attributes())
require.EqualValues(t, writeAttrs[i], readAttr, i)
}
}
Expand All @@ -323,10 +318,10 @@ func writeReadRecord(t *testing.T, withSchema *schema.WireSchema) *oteltef.Metri
reader, err := oteltef.NewMetricsReader(bytes.NewBuffer(buf.Bytes()))
require.NoError(t, err)

readRecord, err := reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)

return readRecord
return &reader.Record
}

func TestWriteOverrideSchema(t *testing.T) {
Expand Down Expand Up @@ -412,15 +407,13 @@ func TestLargeMultimap(t *testing.T) {
reader, err := oteltef.NewMetricsReader(bytes.NewBuffer(buf.Bytes()))
require.NoError(t, err)

readRecord, err := reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.NotNil(t, readRecord)

require.True(t, readRecord.Attributes().IsEqual(&attrs1Copy))
require.True(t, reader.Record.Attributes().IsEqual(&attrs1Copy))

readRecord, err = reader.Read()
err = reader.Read(pkg.ReadOptions{})
require.NoError(t, err)
require.NotNil(t, readRecord)

require.True(t, readRecord.Attributes().IsEqual(&attrs2Copy))
require.True(t, reader.Record.Attributes().IsEqual(&attrs2Copy))
}
Loading

0 comments on commit c5c038a

Please sign in to comment.