Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to list messages with large size #1097

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/pkg/api/connect/service/console/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alp
FilterInterpreterCode: interpreterCode,
Troubleshoot: req.Msg.GetTroubleshoot(),
IncludeRawPayload: req.Msg.GetIncludeOriginalRawPayload(),
IgnoreMaxSizeLimit: req.Msg.GetIgnoreMaxSizeLimit(),
KeyDeserializer: fromProtoEncoding(req.Msg.GetKeyDeserializer()),
ValueDeserializer: fromProtoEncoding(req.Msg.GetValueDeserializer()),
}
Expand Down
344 changes: 343 additions & 1 deletion backend/pkg/api/handle_topic_messages_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ package api

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"os"
"path/filepath"
"sort"
"testing"
"time"

Expand All @@ -37,7 +41,7 @@ import (
things "github.com/redpanda-data/console/backend/pkg/testutil/testdata/proto/gen/things/v1"
)

// this is a complex test, no reson to refactor it
// this is a complex test, no reason to refactor it
func (s *APIIntegrationTestSuite) TestListMessages() {
t := s.T()

Expand All @@ -57,10 +61,34 @@ func (s *APIIntegrationTestSuite) TestListMessages() {
testutil.CreateTestData(t, context.Background(), s.kafkaClient, s.kafkaAdminClient,
topicName)

topicNameBig := testutil.TopicNameForTest("list_messages_big_0")
testutil.CreateTestData(t, context.Background(), s.kafkaClient, s.kafkaAdminClient,
topicNameBig)

defer func() {
s.kafkaAdminClient.DeleteTopics(context.Background(), topicName)
s.kafkaAdminClient.DeleteTopics(context.Background(), topicName)
}()

// produce too big of a message
order := testutil.Order{ID: randomString(21000)}
serializedOrder, err := json.Marshal(order)
require.NoError(err)

r := &kgo.Record{
Key: []byte("too_big_0"),
Value: serializedOrder,
Topic: topicNameBig,
Headers: []kgo.RecordHeader{
{
Key: "revision",
Value: []byte("0"),
},
},
}
results := s.kafkaClient.ProduceSync(ctx, r)
require.NoError(results.FirstErr())

t.Run("simple happy path", func(t *testing.T) {
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: topicName,
Expand Down Expand Up @@ -262,6 +290,310 @@ func (s *APIIntegrationTestSuite) TestListMessages() {
assert.True(seenZeroOffset)
assert.GreaterOrEqual(progressCount, 0)
})

t.Run("with key filter", func(t *testing.T) {
filterCode := base64.StdEncoding.EncodeToString([]byte(
`return key.endsWith('2') || key.endsWith('3')`,
))

stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: topicName,
StartOffset: -2,
PartitionId: -1,
MaxResults: 100,
FilterInterpreterCode: filterCode,
}))
require.NoError(err)

keys := make([]string, 0, 4)
phaseCount := 0
doneCount := 0
progressCount := 0
errorCount := 0

for stream.Receive() {
msg := stream.Msg()
switch cm := msg.GetControlMessage().(type) {
case *v1pb.ListMessagesResponse_Data:
assert.NotEmpty(cm.Data.Timestamp)
assert.NotEmpty(cm.Data.Compression)
assert.NotEmpty(cm.Data.Headers)

for _, h := range cm.Data.Headers {
h := h
assert.NotEmpty(h)
assert.NotEmpty(h.Key)
assert.NotEmpty(h.Value)
}

key := string(cm.Data.GetKey().GetNormalizedPayload())
keys = append(keys, key)

assert.NotEmpty(cm.Data.GetKey())
assert.NotEmpty(cm.Data.GetKey().GetNormalizedPayload())
assert.Empty(cm.Data.GetKey().GetOriginalPayload())
assert.NotEmpty(cm.Data.GetKey().GetPayloadSize())
assert.Equal(v1pb.PayloadEncoding_PAYLOAD_ENCODING_TEXT, cm.Data.GetKey().GetEncoding())
assert.False(cm.Data.GetKey().GetIsPayloadTooLarge())
assert.Empty(cm.Data.GetKey().GetTroubleshootReport())

assert.NotEmpty(cm.Data.GetValue())
assert.NotEmpty(cm.Data.GetValue().GetNormalizedPayload())
assert.Empty(cm.Data.GetValue().GetOriginalPayload())
assert.NotEmpty(cm.Data.GetValue().GetPayloadSize())
assert.Equal(v1pb.PayloadEncoding_PAYLOAD_ENCODING_JSON, cm.Data.GetValue().GetEncoding())
assert.False(cm.Data.GetValue().GetIsPayloadTooLarge())
assert.Empty(cm.Data.GetValue().GetTroubleshootReport())

assert.Equal(fmt.Sprintf(`{"ID":%q}`, key), string(cm.Data.GetValue().GetNormalizedPayload()))
case *v1pb.ListMessagesResponse_Done:
doneCount++

assert.NotEmpty(cm.Done.GetBytesConsumed())
assert.NotEmpty(cm.Done.GetMessagesConsumed())
assert.NotEmpty(cm.Done.GetElapsedMs())
assert.False(cm.Done.GetIsCancelled())
case *v1pb.ListMessagesResponse_Phase:
if phaseCount == 0 {
assert.Equal("Get Partitions", cm.Phase.GetPhase())
} else if phaseCount == 1 {
assert.Equal("Get Watermarks and calculate consuming requests", cm.Phase.GetPhase())
} else if phaseCount == 2 {
assert.Equal("Consuming messages", cm.Phase.GetPhase())
} else {
assert.Fail("Unknown phase.")
}

phaseCount++
case *v1pb.ListMessagesResponse_Progress:
progressCount++

assert.NotEmpty(cm.Progress)
assert.NotEmpty(cm.Progress.GetBytesConsumed())
assert.NotEmpty(cm.Progress.GetMessagesConsumed())
case *v1pb.ListMessagesResponse_Error:
errorCount++
}
}

assert.Nil(stream.Err())
assert.Nil(stream.Close())

sort.Strings(keys)

assert.Equal([]string{"12", "13", "2", "3"}, keys)
assert.Equal(3, phaseCount)
assert.Equal(0, errorCount)
assert.Equal(1, doneCount)
assert.GreaterOrEqual(progressCount, 0)
})

t.Run("too big", func(t *testing.T) {
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: topicNameBig,
StartOffset: -1,
PartitionId: -1,
MaxResults: 5,
}))
require.NoError(err)

keys := make([]string, 0, 5)
phaseCount := 0
doneCount := 0
progressCount := 0
errorCount := 0
seenZeroOffset := false
for stream.Receive() {
msg := stream.Msg()
switch cm := msg.GetControlMessage().(type) {
case *v1pb.ListMessagesResponse_Data:
if seenZeroOffset {
assert.NotEmpty(cm.Data.Offset)
}

if cm.Data.Offset == 0 {
seenZeroOffset = true
}

assert.NotEmpty(cm.Data.Timestamp)
assert.NotEmpty(cm.Data.Compression)
assert.NotEmpty(cm.Data.Headers)

for _, h := range cm.Data.Headers {
h := h
assert.NotEmpty(h)
assert.NotEmpty(h.Key)
assert.NotEmpty(h.Value)
}

key := string(cm.Data.GetKey().GetNormalizedPayload())
keys = append(keys, key)

assert.NotEmpty(cm.Data.GetKey())
assert.NotEmpty(cm.Data.GetKey().GetNormalizedPayload())
assert.Empty(cm.Data.GetKey().GetOriginalPayload())
assert.NotEmpty(cm.Data.GetKey().GetPayloadSize())
assert.Equal(v1pb.PayloadEncoding_PAYLOAD_ENCODING_TEXT, cm.Data.GetKey().GetEncoding())
assert.False(cm.Data.GetKey().GetIsPayloadTooLarge())
assert.Empty(cm.Data.GetKey().GetTroubleshootReport())

assert.NotEmpty(cm.Data.GetValue())
assert.Empty(cm.Data.GetValue().GetOriginalPayload())
assert.NotEmpty(cm.Data.GetValue().GetPayloadSize())
assert.Equal(v1pb.PayloadEncoding_PAYLOAD_ENCODING_JSON, cm.Data.GetValue().GetEncoding())
assert.Empty(cm.Data.GetValue().GetTroubleshootReport())

if key == "too_big_0" {
assert.True(cm.Data.GetValue().GetIsPayloadTooLarge())
assert.Empty(cm.Data.GetValue().GetNormalizedPayload())
} else {
assert.False(cm.Data.GetValue().GetIsPayloadTooLarge())
assert.NotEmpty(cm.Data.GetValue().GetNormalizedPayload())
}

case *v1pb.ListMessagesResponse_Done:
doneCount++

assert.NotEmpty(cm.Done.GetBytesConsumed())
assert.NotEmpty(cm.Done.GetMessagesConsumed())
assert.NotEmpty(cm.Done.GetElapsedMs())
assert.False(cm.Done.GetIsCancelled())
case *v1pb.ListMessagesResponse_Phase:
if phaseCount == 0 {
assert.Equal("Get Partitions", cm.Phase.GetPhase())
} else if phaseCount == 1 {
assert.Equal("Get Watermarks and calculate consuming requests", cm.Phase.GetPhase())
} else if phaseCount == 2 {
assert.Equal("Consuming messages", cm.Phase.GetPhase())
} else {
assert.Fail("Unknown phase.")
}

phaseCount++
case *v1pb.ListMessagesResponse_Progress:
progressCount++

assert.NotEmpty(cm.Progress)
assert.NotEmpty(cm.Progress.GetBytesConsumed())
assert.NotEmpty(cm.Progress.GetMessagesConsumed())
case *v1pb.ListMessagesResponse_Error:
errorCount++
}
}

assert.Nil(stream.Err())
assert.Nil(stream.Close())
assert.Equal([]string{"16", "17", "18", "19", "too_big_0"}, keys)
assert.Equal(3, phaseCount)
assert.Equal(0, errorCount)
assert.Equal(1, doneCount)
assert.False(seenZeroOffset)
assert.GreaterOrEqual(progressCount, 0)
})

t.Run("too big with ignore", func(t *testing.T) {
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: topicNameBig,
StartOffset: -1,
PartitionId: -1,
MaxResults: 5,
IgnoreMaxSizeLimit: true,
}))
require.NoError(err)

keys := make([]string, 0, 5)
phaseCount := 0
doneCount := 0
progressCount := 0
errorCount := 0
seenZeroOffset := false
for stream.Receive() {
msg := stream.Msg()
switch cm := msg.GetControlMessage().(type) {
case *v1pb.ListMessagesResponse_Data:
if seenZeroOffset {
assert.NotEmpty(cm.Data.Offset)
}

if cm.Data.Offset == 0 {
seenZeroOffset = true
}

assert.NotEmpty(cm.Data.Timestamp)
assert.NotEmpty(cm.Data.Compression)
assert.NotEmpty(cm.Data.Headers)

for _, h := range cm.Data.Headers {
h := h
assert.NotEmpty(h)
assert.NotEmpty(h.Key)
assert.NotEmpty(h.Value)
}

key := string(cm.Data.GetKey().GetNormalizedPayload())
keys = append(keys, key)

assert.NotEmpty(cm.Data.GetKey())
assert.NotEmpty(cm.Data.GetKey().GetNormalizedPayload())
assert.Empty(cm.Data.GetKey().GetOriginalPayload())
assert.NotEmpty(cm.Data.GetKey().GetPayloadSize())
assert.Equal(v1pb.PayloadEncoding_PAYLOAD_ENCODING_TEXT, cm.Data.GetKey().GetEncoding())
assert.False(cm.Data.GetKey().GetIsPayloadTooLarge())
assert.Empty(cm.Data.GetKey().GetTroubleshootReport())

assert.NotEmpty(cm.Data.GetValue())
assert.Empty(cm.Data.GetValue().GetOriginalPayload())
assert.NotEmpty(cm.Data.GetValue().GetPayloadSize())
assert.Equal(v1pb.PayloadEncoding_PAYLOAD_ENCODING_JSON, cm.Data.GetValue().GetEncoding())
assert.Empty(cm.Data.GetValue().GetTroubleshootReport())
assert.False(cm.Data.GetValue().GetIsPayloadTooLarge())
assert.NotEmpty(cm.Data.GetValue().GetNormalizedPayload())

if key == "too_big_0" {
// {"ID":"..."}
// large string of 21000 chars + 9 extra chars
assert.Len(cm.Data.GetValue().GetNormalizedPayload(), 21009)
}

case *v1pb.ListMessagesResponse_Done:
doneCount++

assert.NotEmpty(cm.Done.GetBytesConsumed())
assert.NotEmpty(cm.Done.GetMessagesConsumed())
assert.NotEmpty(cm.Done.GetElapsedMs())
assert.False(cm.Done.GetIsCancelled())
case *v1pb.ListMessagesResponse_Phase:
if phaseCount == 0 {
assert.Equal("Get Partitions", cm.Phase.GetPhase())
} else if phaseCount == 1 {
assert.Equal("Get Watermarks and calculate consuming requests", cm.Phase.GetPhase())
} else if phaseCount == 2 {
assert.Equal("Consuming messages", cm.Phase.GetPhase())
} else {
assert.Fail("Unknown phase.")
}

phaseCount++
case *v1pb.ListMessagesResponse_Progress:
progressCount++

assert.NotEmpty(cm.Progress)
assert.NotEmpty(cm.Progress.GetBytesConsumed())
assert.NotEmpty(cm.Progress.GetMessagesConsumed())
case *v1pb.ListMessagesResponse_Error:
errorCount++
}
}

assert.Nil(stream.Err())
assert.Nil(stream.Close())
assert.Equal([]string{"16", "17", "18", "19", "too_big_0"}, keys)
assert.Equal(3, phaseCount)
assert.Equal(0, errorCount)
assert.Equal(1, doneCount)
assert.False(seenZeroOffset)
assert.GreaterOrEqual(progressCount, 0)
})
}

func (s *APIIntegrationTestSuite) TestPublishMessages() {
Expand Down Expand Up @@ -590,3 +922,13 @@ func (s *APIIntegrationTestSuite) TestPublishMessages() {
assert.Equal(timestamppb.New(objTime), obj2.CreatedAt)
})
}

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func randomString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}
Loading
Loading