From 26e59485ccd5298f243af9f91f3ff60c0eb80586 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Thu, 31 Oct 2024 17:31:33 +0800 Subject: [PATCH] fix: jdk8 send jfr data error (#725) --- .../AsyncProfilerDataSender.java | 116 +++++++++--------- .../AsyncProfilerTaskExecutionService.java | 19 +-- 2 files changed, 65 insertions(+), 70 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index 5e09fab5b0..46a25c4cc2 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -39,10 +39,10 @@ import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.Objects; +import java.nio.file.Files; import java.util.concurrent.TimeUnit; import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE; @@ -87,69 +87,71 @@ public void statusChanged(GRPCChannelStatus status) { this.status = status; } - public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException { - if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) { + public void sendData(AsyncProfilerTask task, File dumpFile) throws IOException, InterruptedException { + if (status != GRPCChannelStatus.CONNECTED) { return; } - int size = Math.toIntExact(channel.size()); - final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); - StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( - GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS - ).collect(new ClientResponseObserver() { - ClientCallStreamObserver requestStream; - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - this.requestStream = requestStream; - } + try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) { + long fileSize = Files.size(dumpFile.toPath()); + int size = Math.toIntExact(fileSize); + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( + GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS + ).collect(new ClientResponseObserver() { + ClientCallStreamObserver requestStream; + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this.requestStream = requestStream; + } - @Override - public void onNext(AsyncProfilerCollectionResponse value) { - if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { - LOGGER.warn("JFR is too large to be received by the oap server"); - } else { - ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); - try { - while (channel.read(buf) > 0) { - buf.flip(); - AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() - .setContent(ByteString.copyFrom(buf)) - .build(); - requestStream.onNext(asyncProfilerData); - buf.clear(); + @Override + public void onNext(AsyncProfilerCollectionResponse value) { + if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { + LOGGER.warn("JFR is too large to be received by the oap server"); + } else { + byte[] buf = new byte[DATA_CHUNK_SIZE]; + try { + int bytesRead; + while ((bytesRead = fileInputStream.read(buf)) != -1) { + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() + .setContent(ByteString.copyFrom(buf, 0, bytesRead)) + .build(); + requestStream.onNext(asyncProfilerData); + } + } catch (IOException e) { + LOGGER.error("Failed to read JFR file and failed to upload to oap", e); } - } catch (IOException e) { - LOGGER.error("Failed to read JFR file and failed to upload to oap", e); } - } - - requestStream.onCompleted(); - } - @Override - public void onError(Throwable t) { - status.finished(); - LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception."); - ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); - } + requestStream.onCompleted(); + } - @Override - public void onCompleted() { - status.finished(); - } - }); - AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() - .setService(Config.Agent.SERVICE_NAME) - .setServiceInstance(Config.Agent.INSTANCE_NAME) - .setType(AsyncProfilingStatus.PROFILING_SUCCESS) - .setContentSize(size) - .setTaskId(task.getTaskId()) - .build(); - AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); - dataStreamObserver.onNext(asyncProfilerData); + @Override + public void onError(Throwable t) { + status.finished(); + LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception."); + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); + } - status.wait4Finish(); + @Override + public void onCompleted() { + status.finished(); + } + }); + AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() + .setService(Config.Agent.SERVICE_NAME) + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setType(AsyncProfilingStatus.PROFILING_SUCCESS) + .setContentSize(size) + .setTaskId(task.getTaskId()) + .build(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); + dataStreamObserver.onNext(asyncProfilerData); + + status.wait4Finish(); + } } public void sendError(AsyncProfilerTask task, String errorMessage) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index 550923c1b0..b5d7a5b643 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -27,9 +27,7 @@ import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.nio.channels.FileChannel; import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -90,20 +88,15 @@ private void stopWhenError(AsyncProfilerTask task, String errorMessage) { } private void stopWhenSuccess(AsyncProfilerTask task) { - + // stop task and send data try { File dumpFile = task.stop(getAsyncProfiler()); - // stop task - try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) { - // upload file - FileChannel channel = fileInputStream.getChannel(); - + if (dumpFile != null && dumpFile.exists()) { AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); - dataSender.sendData(task, channel); - } - - if (!dumpFile.delete()) { - LOGGER.warn("Fail to delete the dump file of async profiler."); + dataSender.sendData(task, dumpFile); + if (!dumpFile.delete()) { + LOGGER.warn("Fail to delete the dump file of async profiler."); + } } } catch (Exception e) { LOGGER.error("stop async profiler task error", e);