Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: list message concurrent stream send fix #1076

Merged
merged 5 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions backend/pkg/api/connect/service/console/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,6 @@ func NewService(logger *zap.Logger,

// ListMessages consumes a Kafka topic and streams the Kafka records back.
func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alpha.ListMessagesRequest], stream *connect.ServerStream[v1alpha.ListMessagesResponse]) error {
timeout := 35 * time.Second
if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest {
// Push-down filters and StartOffset = Newest may be long-running streams.
// There's already a client-side provided timeout which we usually trust.
// But additionally we want to ensure it never takes much longer than that.
timeout = 31 * time.Minute
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

lmq := httptypes.ListMessagesRequest{
TopicName: req.Msg.GetTopic(),
StartOffset: req.Msg.GetStartOffset(),
Expand Down Expand Up @@ -122,6 +111,17 @@ func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alp

api.authHooks.PrintListMessagesAuditLog(ctx, req, &listReq)

timeout := 35 * time.Second
if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest {
// Push-down filters and StartOffset = Newest may be long-running streams.
// There's already a client-side provided timeout which we usually trust.
// But additionally we want to ensure it never takes much longer than that.
timeout = 31 * time.Minute
}

ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("list fetch timeout"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good to add the reason there 👍

defer cancel()

progress := &streamProgressReporter{
ctx: ctx,
logger: api.logger,
Expand Down
58 changes: 46 additions & 12 deletions backend/pkg/api/connect/service/console/stream_progress_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package console

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -31,6 +32,8 @@ type streamProgressReporter struct {

messagesConsumed atomic.Int64
bytesConsumed atomic.Int64

writeMutex sync.Mutex
}

func (p *streamProgressReporter) Start() {
Expand All @@ -44,41 +47,53 @@ func (p *streamProgressReporter) Start() {
// topic it may take some time until there are messages. This go routine is in charge of keeping the user up to
// date about the progress Kowl made streaming the topic
go func() {
ticker := time.NewTicker(1 * time.Second)

for {
select {
case <-p.ctx.Done():
ticker.Stop()
return
default:
case <-ticker.C:
p.reportProgress()
}
time.Sleep(1 * time.Second)
}
}()
}

func (p *streamProgressReporter) reportProgress() {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_ProgressMessage{
MessagesConsumed: p.messagesConsumed.Load(),
BytesConsumed: p.bytesConsumed.Load(),
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Progress{
Progress: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream reportProgress", zap.Error(err))
}
}

func (p *streamProgressReporter) OnPhase(name string) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_PhaseMessage{
Phase: name,
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Phase{
Phase: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream OnPhase", zap.Error(err))
}
}

func (p *streamProgressReporter) OnMessageConsumed(size int64) {
Expand All @@ -87,6 +102,13 @@ func (p *streamProgressReporter) OnMessageConsumed(size int64) {
}

func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
if message == nil {
return
}
Comment on lines +105 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why this is necessary? When would we send nil here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was just a guard I added as I was debugging. we really shouldn't but might be a good idea.


p.writeMutex.Lock()
defer p.writeMutex.Unlock()

headers := make([]*v1alpha.KafkaRecordHeader, 0, len(message.Headers))

for _, mh := range message.Headers {
Expand Down Expand Up @@ -164,36 +186,48 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
})
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Data{
Data: data,
},
})
}); err != nil {
p.logger.Error("send error in stream OnMessage", zap.Error(err))
}
}

func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_StreamCompletedMessage{
ElapsedMs: elapsedMs,
IsCancelled: isCancelled,
MessagesConsumed: p.messagesConsumed.Load(),
BytesConsumed: p.bytesConsumed.Load(),
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Done{
Done: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream OnComplete", zap.Error(err))
}
}

func (p *streamProgressReporter) OnError(message string) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_ErrorMessage{
Message: message,
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Error{
Error: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream OnError", zap.Error(err))
}
}
4 changes: 2 additions & 2 deletions backend/pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (s *Service) FetchMessages(ctx context.Context, progress IListMessagesProgr
// 2. Create consumer workers
jobs := make(chan *kgo.Record, 100)
resultsCh := make(chan *TopicMessage, 100)
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
workerCtx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("worker cancel"))

wg := sync.WaitGroup{}

Expand Down
21 changes: 14 additions & 7 deletions frontend/src/state/backendApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ const addBearerTokenInterceptor: ConnectRpcInterceptor = (next) => async (req: U
};


const transport = createConnectTransport({
baseUrl: appConfig.grpcBase,
interceptors: [addBearerTokenInterceptor]
});

const consoleClient = createPromiseClient(ConsoleService, transport);

let messageSearchAbortController: AbortController | null = null;

//
Expand Down Expand Up @@ -372,14 +379,9 @@ const apiStore = {

// do it
const abortController = messageSearchAbortController = new AbortController();
const transport = createConnectTransport({
baseUrl: appConfig.grpcBase,
interceptors: [addBearerTokenInterceptor]
});

const client = createPromiseClient(ConsoleService, transport);

const req = new ListMessagesRequest();

req.topic = searchRequest.topicName;
req.startOffset = BigInt(searchRequest.startOffset);
req.startTimestamp = BigInt(searchRequest.startTimestamp);
Expand All @@ -398,8 +400,12 @@ const apiStore = {
timeoutMs = 30 * minuteMs;
}

abortController.signal.addEventListener('abort', () => {
console.log('ABORTED');
})
bojand marked this conversation as resolved.
Show resolved Hide resolved

try {
for await (const res of await client.listMessages(req, { signal: abortController.signal, timeoutMs })) {
for await (const res of await consoleClient.listMessages(req, { signal: abortController.signal, timeoutMs })) {
if (abortController.signal.aborted)
break;

Expand All @@ -415,6 +421,7 @@ const apiStore = {
this.messagesTotalConsumed = Number(res.controlMessage.value.messagesConsumed);
break;
case 'done':
console.log('done phase')
bojand marked this conversation as resolved.
Show resolved Hide resolved
this.messagesElapsedMs = Number(res.controlMessage.value.elapsedMs);
this.messagesBytesConsumed = Number(res.controlMessage.value.bytesConsumed);
// this.MessageSearchCancelled = msg.isCancelled;
Expand Down
Loading