From 590a382e7837b2d7c9294ac8ceb9c6b9bdcbb717 Mon Sep 17 00:00:00 2001 From: Daniel Brauner Date: Mon, 7 Oct 2024 14:53:43 +0200 Subject: [PATCH] replaced SupervisorJob with try catch --- .../idea/blaze/base/buildview/BazelService.kt | 74 +++++++++++-------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/base/src/com/google/idea/blaze/base/buildview/BazelService.kt b/base/src/com/google/idea/blaze/base/buildview/BazelService.kt index f12a2a4d29b..a39d216d25a 100644 --- a/base/src/com/google/idea/blaze/base/buildview/BazelService.kt +++ b/base/src/com/google/idea/blaze/base/buildview/BazelService.kt @@ -24,6 +24,7 @@ import com.intellij.openapi.util.Key import com.intellij.util.io.LimitedInputStream import com.intellij.util.ui.EDT import kotlinx.coroutines.* +import java.io.BufferedInputStream import java.io.FileInputStream import kotlin.io.path.pathString @@ -94,47 +95,56 @@ class BazelService(private val project: Project) : Disposable { return exitCode } - private fun CoroutineScope.parseEvents(ctx: BlazeContext, helper: BuildResultHelper): Job { - val handler = CoroutineExceptionHandler { _, e -> LOG.error("error in event parser", e) } - - return launch(handler + CoroutineName("EventParser") + SupervisorJob()) { - // wait for bazel to create the output file - while (!helper.outputFile.exists()) delay(10) - - FileInputStream(helper.outputFile).buffered().use { stream -> - // keep reading events while the coroutine is active, i.e. bazel is still running, - // or while the stream has data available (to ensure that all events are processed) - while (isActive || stream.available() > 0) { + private suspend fun parseEvent(ctx: BlazeContext, stream: BufferedInputStream) { + // make sure that there are at least four bytes already available + while (stream.available() < 4) delay(10) - // make sure that there are at least four bytes already available - while (stream.available() < 4) delay(10) + // protobuf messages are delimited by size (encoded as varint32), + // read size manually to ensure the entire message is already available + val size = CodedInputStream.readRawVarint32(stream.read(), stream) + while (stream.available() < size) delay(10) - // protobuf messages are delimited by size (encoded as varint32), - // read size manually to ensure the entire message is already available - val size = CodedInputStream.readRawVarint32(stream.read(), stream) - while (stream.available() < size) delay(10) + val eventStream = LimitedInputStream(stream, size) + val event = try { + BuildEvent.parseFrom(eventStream) + } catch (e: Exception) { + LOG.error("could not parse event", e) - val eventStream = LimitedInputStream(stream, size) - val event = try { - BuildEvent.parseFrom(eventStream) - } catch (e: Exception) { - LOG.error("could not parse event", e) + // if the message could not be parsed, make sure to skip it + if (eventStream.bytesRead < size) { + stream.skip(size.toLong() - eventStream.bytesRead) + } - // if the message could not be parsed, make sure to skip it - if (eventStream.bytesRead < size) { - stream.skip(size.toLong() - eventStream.bytesRead) - } + return + } - continue - } + if (event == null) { + delay(10) + } else { + BuildEventParser.parse(event)?.let(ctx::output) + } + } - if (event == null) { - delay(10) - } else { - BuildEventParser.parse(event)?.let(ctx::output) + private fun CoroutineScope.parseEvents(ctx: BlazeContext, helper: BuildResultHelper): Job { + return launch(CoroutineName("EventParser")) { + try { + // wait for bazel to create the output file + while (!helper.outputFile.exists()) delay(10) + + FileInputStream(helper.outputFile).buffered().use { stream -> + // keep reading events while the coroutine is active, i.e. bazel is still running, + // or while the stream has data available (to ensure that all events are processed) + while (isActive || stream.available() > 0) { + parseEvent(ctx, stream) } } } + catch (e: CancellationException) { + throw e + } + catch (e: Exception) { + LOG.error("error in event parser", e) + } } }