From b28556721633fac9d1f2914847892d622da9e783 Mon Sep 17 00:00:00 2001 From: Bojan Djurkovic Date: Mon, 5 Feb 2024 15:14:21 -0400 Subject: [PATCH 1/5] backend: in stream reporter use mutex to guard for concurrent writes to stream send() --- .../api/connect/service/console/service.go | 21 +++---- .../console/stream_progress_reporter.go | 58 +++++++++++++++---- backend/pkg/kafka/consumer.go | 4 +- frontend/src/state/backendApi.ts | 28 +++++---- 4 files changed, 77 insertions(+), 34 deletions(-) diff --git a/backend/pkg/api/connect/service/console/service.go b/backend/pkg/api/connect/service/console/service.go index 0e26e42e3..d993fb9dd 100644 --- a/backend/pkg/api/connect/service/console/service.go +++ b/backend/pkg/api/connect/service/console/service.go @@ -52,16 +52,6 @@ func NewService(logger *zap.Logger, // ListMessages consumes a Kafka topic and streams the Kafka records back. func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alpha.ListMessagesRequest], stream *connect.ServerStream[v1alpha.ListMessagesResponse]) error { - timeout := 35 * time.Second - if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest { - // Push-down filters and StartOffset = Newest may be long-running streams. - // There's already a client-side provided timeout which we usually trust. - // But additionally we want to ensure it never takes much longer than that. - timeout = 31 * time.Minute - } - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() lmq := httptypes.ListMessagesRequest{ TopicName: req.Msg.GetTopic(), @@ -122,6 +112,17 @@ func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alp api.authHooks.PrintListMessagesAuditLog(ctx, req, &listReq) + timeout := 35 * time.Second + if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest { + // Push-down filters and StartOffset = Newest may be long-running streams. + // There's already a client-side provided timeout which we usually trust. + // But additionally we want to ensure it never takes much longer than that. + timeout = 31 * time.Minute + } + + ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("list fetch timeout")) + defer cancel() + progress := &streamProgressReporter{ ctx: ctx, logger: api.logger, diff --git a/backend/pkg/api/connect/service/console/stream_progress_reporter.go b/backend/pkg/api/connect/service/console/stream_progress_reporter.go index 790b7e3c6..73b233327 100644 --- a/backend/pkg/api/connect/service/console/stream_progress_reporter.go +++ b/backend/pkg/api/connect/service/console/stream_progress_reporter.go @@ -11,6 +11,7 @@ package console import ( "context" + "sync" "sync/atomic" "time" @@ -31,6 +32,8 @@ type streamProgressReporter struct { messagesConsumed atomic.Int64 bytesConsumed atomic.Int64 + + writeMutex sync.Mutex } func (p *streamProgressReporter) Start() { @@ -44,41 +47,53 @@ func (p *streamProgressReporter) Start() { // topic it may take some time until there are messages. This go routine is in charge of keeping the user up to // date about the progress Kowl made streaming the topic go func() { + ticker := time.NewTicker(1 * time.Second) + for { select { case <-p.ctx.Done(): + ticker.Stop() return - default: + case <-ticker.C: p.reportProgress() } - time.Sleep(1 * time.Second) } }() } func (p *streamProgressReporter) reportProgress() { + p.writeMutex.Lock() + defer p.writeMutex.Unlock() + msg := &v1alpha.ListMessagesResponse_ProgressMessage{ MessagesConsumed: p.messagesConsumed.Load(), BytesConsumed: p.bytesConsumed.Load(), } - p.stream.Send(&v1alpha.ListMessagesResponse{ + if err := p.stream.Send(&v1alpha.ListMessagesResponse{ ControlMessage: &v1alpha.ListMessagesResponse_Progress{ Progress: msg, }, - }) + }); err != nil { + p.logger.Error("send error in stream reportProgress", zap.Error(err)) + } } func (p *streamProgressReporter) OnPhase(name string) { + p.writeMutex.Lock() + defer p.writeMutex.Unlock() + msg := &v1alpha.ListMessagesResponse_PhaseMessage{ Phase: name, } - p.stream.Send(&v1alpha.ListMessagesResponse{ + if err := p.stream.Send(&v1alpha.ListMessagesResponse{ ControlMessage: &v1alpha.ListMessagesResponse_Phase{ Phase: msg, }, - }) + }); err != nil { + p.logger.Error("send error in stream OnPhase", zap.Error(err)) + } } func (p *streamProgressReporter) OnMessageConsumed(size int64) { @@ -87,6 +102,13 @@ func (p *streamProgressReporter) OnMessageConsumed(size int64) { } func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) { + if message == nil { + return + } + + p.writeMutex.Lock() + defer p.writeMutex.Unlock() + headers := make([]*v1alpha.KafkaRecordHeader, 0, len(message.Headers)) for _, mh := range message.Headers { @@ -164,14 +186,19 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) { }) } - p.stream.Send(&v1alpha.ListMessagesResponse{ + if err := p.stream.Send(&v1alpha.ListMessagesResponse{ ControlMessage: &v1alpha.ListMessagesResponse_Data{ Data: data, }, - }) + }); err != nil { + p.logger.Error("send error in stream OnMessage", zap.Error(err)) + } } func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) { + p.writeMutex.Lock() + defer p.writeMutex.Unlock() + msg := &v1alpha.ListMessagesResponse_StreamCompletedMessage{ ElapsedMs: elapsedMs, IsCancelled: isCancelled, @@ -179,21 +206,28 @@ func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) { BytesConsumed: p.bytesConsumed.Load(), } - p.stream.Send(&v1alpha.ListMessagesResponse{ + if err := p.stream.Send(&v1alpha.ListMessagesResponse{ ControlMessage: &v1alpha.ListMessagesResponse_Done{ Done: msg, }, - }) + }); err != nil { + p.logger.Error("send error in stream OnComplete", zap.Error(err)) + } } func (p *streamProgressReporter) OnError(message string) { + p.writeMutex.Lock() + defer p.writeMutex.Unlock() + msg := &v1alpha.ListMessagesResponse_ErrorMessage{ Message: message, } - p.stream.Send(&v1alpha.ListMessagesResponse{ + if err := p.stream.Send(&v1alpha.ListMessagesResponse{ ControlMessage: &v1alpha.ListMessagesResponse_Error{ Error: msg, }, - }) + }); err != nil { + p.logger.Error("send error in stream OnError", zap.Error(err)) + } } diff --git a/backend/pkg/kafka/consumer.go b/backend/pkg/kafka/consumer.go index 9f1ea414e..8c2890c98 100644 --- a/backend/pkg/kafka/consumer.go +++ b/backend/pkg/kafka/consumer.go @@ -115,8 +115,8 @@ func (s *Service) FetchMessages(ctx context.Context, progress IListMessagesProgr // 2. Create consumer workers jobs := make(chan *kgo.Record, 100) resultsCh := make(chan *TopicMessage, 100) - workerCtx, cancel := context.WithCancel(ctx) - defer cancel() + workerCtx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("worker cancel")) wg := sync.WaitGroup{} diff --git a/frontend/src/state/backendApi.ts b/frontend/src/state/backendApi.ts index 79a0cf9ca..d3c00ab60 100644 --- a/frontend/src/state/backendApi.ts +++ b/frontend/src/state/backendApi.ts @@ -245,6 +245,13 @@ const addBearerTokenInterceptor: ConnectRpcInterceptor = (next) => async (req: U }; +const transport = createConnectTransport({ + baseUrl: appConfig.grpcBase, + interceptors: [addBearerTokenInterceptor] +}); + +const consoleClient = createPromiseClient(ConsoleService, transport); + let messageSearchAbortController: AbortController | null = null; // @@ -372,14 +379,9 @@ const apiStore = { // do it const abortController = messageSearchAbortController = new AbortController(); - const transport = createConnectTransport({ - baseUrl: appConfig.grpcBase, - interceptors: [addBearerTokenInterceptor] - }); - - const client = createPromiseClient(ConsoleService, transport); const req = new ListMessagesRequest(); + req.topic = searchRequest.topicName; req.startOffset = BigInt(searchRequest.startOffset); req.startTimestamp = BigInt(searchRequest.startTimestamp); @@ -392,14 +394,19 @@ const apiStore = { // For StartOffset = Newest and any set push-down filter we need to bump the default timeout // from 30s to 30 minutes before ending the request gracefully. - let timeoutMs = 30 * 1000; + const timeoutMs = 30 * 1000; if (searchRequest.startOffset == PartitionOffsetOrigin.End || req.filterInterpreterCode != null) { - const minuteMs = 60 * 1000; - timeoutMs = 30 * minuteMs; + // const minuteMs = 60 * 1000; + // timeoutMs = 30 * minuteMs; + console.log('asdf') } + abortController.signal.addEventListener('abort', () => { + console.log('ABORTED'); + }) + try { - for await (const res of await client.listMessages(req, { signal: abortController.signal, timeoutMs })) { + for await (const res of await consoleClient.listMessages(req, { signal: abortController.signal, timeoutMs })) { if (abortController.signal.aborted) break; @@ -415,6 +422,7 @@ const apiStore = { this.messagesTotalConsumed = Number(res.controlMessage.value.messagesConsumed); break; case 'done': + console.log('done phase') this.messagesElapsedMs = Number(res.controlMessage.value.elapsedMs); this.messagesBytesConsumed = Number(res.controlMessage.value.bytesConsumed); // this.MessageSearchCancelled = msg.isCancelled; From 40a21dcc191c1c4d3546bd39e26fbbabe8a176e0 Mon Sep 17 00:00:00 2001 From: Bojan Djurkovic Date: Mon, 5 Feb 2024 15:16:11 -0400 Subject: [PATCH 2/5] frontend: fix commented out timeout code --- frontend/src/state/backendApi.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/frontend/src/state/backendApi.ts b/frontend/src/state/backendApi.ts index d3c00ab60..50955eb4b 100644 --- a/frontend/src/state/backendApi.ts +++ b/frontend/src/state/backendApi.ts @@ -394,11 +394,10 @@ const apiStore = { // For StartOffset = Newest and any set push-down filter we need to bump the default timeout // from 30s to 30 minutes before ending the request gracefully. - const timeoutMs = 30 * 1000; + let timeoutMs = 30 * 1000; if (searchRequest.startOffset == PartitionOffsetOrigin.End || req.filterInterpreterCode != null) { - // const minuteMs = 60 * 1000; - // timeoutMs = 30 * minuteMs; - console.log('asdf') + const minuteMs = 60 * 1000; + timeoutMs = 30 * minuteMs; } abortController.signal.addEventListener('abort', () => { From 2764393a4410649332aab26a2a945c33857ee419 Mon Sep 17 00:00:00 2001 From: Bojan Djurkovic Date: Mon, 5 Feb 2024 15:23:25 -0400 Subject: [PATCH 3/5] backend: go format --- backend/pkg/api/connect/service/console/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/pkg/api/connect/service/console/service.go b/backend/pkg/api/connect/service/console/service.go index d993fb9dd..2a18627c5 100644 --- a/backend/pkg/api/connect/service/console/service.go +++ b/backend/pkg/api/connect/service/console/service.go @@ -52,7 +52,6 @@ func NewService(logger *zap.Logger, // ListMessages consumes a Kafka topic and streams the Kafka records back. func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alpha.ListMessagesRequest], stream *connect.ServerStream[v1alpha.ListMessagesResponse]) error { - lmq := httptypes.ListMessagesRequest{ TopicName: req.Msg.GetTopic(), StartOffset: req.Msg.GetStartOffset(), From ac6eb61da681b730766d0ccf16c8eb3a5913d77c Mon Sep 17 00:00:00 2001 From: Bojan Djurkovic Date: Mon, 5 Feb 2024 15:46:35 -0400 Subject: [PATCH 4/5] frontend: fix debug logs --- frontend/src/state/backendApi.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/frontend/src/state/backendApi.ts b/frontend/src/state/backendApi.ts index 50955eb4b..9680d8481 100644 --- a/frontend/src/state/backendApi.ts +++ b/frontend/src/state/backendApi.ts @@ -400,10 +400,6 @@ const apiStore = { timeoutMs = 30 * minuteMs; } - abortController.signal.addEventListener('abort', () => { - console.log('ABORTED'); - }) - try { for await (const res of await consoleClient.listMessages(req, { signal: abortController.signal, timeoutMs })) { if (abortController.signal.aborted) @@ -421,7 +417,6 @@ const apiStore = { this.messagesTotalConsumed = Number(res.controlMessage.value.messagesConsumed); break; case 'done': - console.log('done phase') this.messagesElapsedMs = Number(res.controlMessage.value.elapsedMs); this.messagesBytesConsumed = Number(res.controlMessage.value.bytesConsumed); // this.MessageSearchCancelled = msg.isCancelled; From 3803b873b69483cebbe5a5d68220c48a1ef83eab Mon Sep 17 00:00:00 2001 From: Bojan Djurkovic Date: Mon, 5 Feb 2024 15:50:10 -0400 Subject: [PATCH 5/5] frontend: remove extra empty line after req --- frontend/src/state/backendApi.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/frontend/src/state/backendApi.ts b/frontend/src/state/backendApi.ts index 9680d8481..03db96d97 100644 --- a/frontend/src/state/backendApi.ts +++ b/frontend/src/state/backendApi.ts @@ -381,7 +381,6 @@ const apiStore = { const abortController = messageSearchAbortController = new AbortController(); const req = new ListMessagesRequest(); - req.topic = searchRequest.topicName; req.startOffset = BigInt(searchRequest.startOffset); req.startTimestamp = BigInt(searchRequest.startTimestamp);