From 2027a98b1ec28ec24be3b7aac346d11bb3f3ccfe Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:22:33 +0800 Subject: [PATCH] Support async profiler feature (#720) --- .../command/AsyncProfilerTaskCommand.java | 133 ++++++++++++ .../command/CommandDeserializer.java | 3 + apm-protocol/apm-network/src/main/proto | 2 +- apm-sniffer/apm-agent-core/pom.xml | 4 + .../AsyncProfilerDataSender.java | 194 ++++++++++++++++++ .../core/asyncprofiler/AsyncProfilerTask.java | 146 +++++++++++++ .../AsyncProfilerTaskChannelService.java | 131 ++++++++++++ .../AsyncProfilerTaskExecutionService.java | 160 +++++++++++++++ .../core/commands/CommandExecutorService.java | 5 + .../AsyncProfilerCommandExecutor.java | 44 ++++ .../apm/agent/core/conf/Config.java | 27 +++ ...skywalking.apm.agent.core.boot.BootService | 3 + .../agent/core/boot/ServiceManagerTest.java | 4 +- apm-sniffer/config/agent.config | 6 + dist-material/LICENSE | 1 + pom.xml | 6 + 16 files changed, 866 insertions(+), 3 deletions(-) create mode 100644 apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java new file mode 100644 index 0000000000..41d6043dee --- /dev/null +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.network.trace.component.command; + +import org.apache.skywalking.apm.network.common.v3.Command; +import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair; + +import java.util.List; +import java.util.Objects; + +public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable { + public static final Deserializable DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0); + public static final String NAME = "AsyncProfilerTaskQuery"; + + /** + * async-profiler taskId + */ + private final String taskId; + /** + * run profiling for duration (second) + */ + private final int duration; + /** + * async profiler extended parameters. Here is a table of optional parameters. + * + *

lock[=DURATION] - profile contended locks overflowing the DURATION ns bucket (default: 10us)

+ *

alloc[=BYTES] - profile allocations with BYTES interval

+ *

interval=N - sampling interval in ns (default: 10'000'000, i.e. 10 ms)

+ *

jstackdepth=N - maximum Java stack depth (default: 2048)

+ *

chunksize=N - approximate size of JFR chunk in bytes (default: 100 MB)

+ *

chunktime=N - duration of JFR chunk in seconds (default: 1 hour)

+ * details @see async-profiler argument + */ + private final String execArgs; + /** + * task create time + */ + private final long createTime; + + public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration, + List events, String execArgs, long createTime) { + super(NAME, serialNumber); + this.taskId = taskId; + this.duration = duration; + this.createTime = createTime; + String comma = ","; + StringBuilder sb = new StringBuilder(); + if (Objects.nonNull(events) && !events.isEmpty()) { + sb.append("event=") + .append(String.join(comma, events)) + .append(comma); + } + if (execArgs != null && !execArgs.isEmpty()) { + sb.append(execArgs); + } + this.execArgs = sb.toString(); + } + + public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration, + String execArgs, long createTime) { + super(NAME, serialNumber); + this.taskId = taskId; + this.duration = duration; + this.execArgs = execArgs; + this.createTime = createTime; + } + + @Override + public AsyncProfilerTaskCommand deserialize(Command command) { + final List argsList = command.getArgsList(); + String taskId = null; + int duration = 0; + String execArgs = null; + long createTime = 0; + String serialNumber = null; + for (final KeyStringValuePair pair : argsList) { + if ("SerialNumber".equals(pair.getKey())) { + serialNumber = pair.getValue(); + } else if ("TaskId".equals(pair.getKey())) { + taskId = pair.getValue(); + } else if ("Duration".equals(pair.getKey())) { + duration = Integer.parseInt(pair.getValue()); + } else if ("ExecArgs".equals(pair.getKey())) { + execArgs = pair.getValue(); + } else if ("CreateTime".equals(pair.getKey())) { + createTime = Long.parseLong(pair.getValue()); + } + } + return new AsyncProfilerTaskCommand(serialNumber, taskId, duration, execArgs, createTime); + } + + @Override + public Command.Builder serialize() { + final Command.Builder builder = commandBuilder(); + builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId)) + .addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration))) + .addArgs(KeyStringValuePair.newBuilder().setKey("ExecArgs").setValue(execArgs)) + .addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime))); + return builder; + } + + public String getTaskId() { + return taskId; + } + + public int getDuration() { + return duration; + } + + public String getExecArgs() { + return execArgs; + } + + public long getCreateTime() { + return createTime; + } +} diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java index ff8680bcb3..4fd737ff98 100644 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java @@ -27,7 +27,10 @@ public static BaseCommand deserialize(final Command command) { return ProfileTaskCommand.DESERIALIZER.deserialize(command); } else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) { return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command); + } else if (AsyncProfilerTaskCommand.NAME.equals(commandName)) { + return AsyncProfilerTaskCommand.DESERIALIZER.deserialize(command); } + throw new UnsupportedCommandException(command); } diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index d4da569991..bd1f91f7e1 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit d4da5699915ee52288f8ff1c954decf6363485bc +Subproject commit bd1f91f7e1cb4de9d9b5ccb71f36ce6b1c7c97f5 diff --git a/apm-sniffer/apm-agent-core/pom.xml b/apm-sniffer/apm-agent-core/pom.xml index 8ba309156f..fcca095f25 100644 --- a/apm-sniffer/apm-agent-core/pom.xml +++ b/apm-sniffer/apm-agent-core/pom.xml @@ -143,6 +143,10 @@ jmh-generator-annprocess test + + tools.profiler + async-profiler + diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java new file mode 100644 index 0000000000..5e09fab5b0 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import com.google.protobuf.ByteString; +import io.grpc.Channel; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; +import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectionResponse; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE; +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + +@DefaultImplementor +public class AsyncProfilerDataSender implements BootService, GRPCChannelListener { + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerDataSender.class); + + private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; + + private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskStub asyncProfilerTaskStub; + + @Override + public void prepare() throws Throwable { + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); + } + + @Override + public void boot() throws Throwable { + + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + + } + + @Override + public void statusChanged(GRPCChannelStatus status) { + if (GRPCChannelStatus.CONNECTED.equals(status)) { + Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); + asyncProfilerTaskStub = AsyncProfilerTaskGrpc.newStub(channel); + } else { + asyncProfilerTaskStub = null; + } + this.status = status; + } + + public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException { + if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) { + return; + } + + int size = Math.toIntExact(channel.size()); + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( + GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS + ).collect(new ClientResponseObserver() { + ClientCallStreamObserver requestStream; + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this.requestStream = requestStream; + } + + @Override + public void onNext(AsyncProfilerCollectionResponse value) { + if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { + LOGGER.warn("JFR is too large to be received by the oap server"); + } else { + ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); + try { + while (channel.read(buf) > 0) { + buf.flip(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() + .setContent(ByteString.copyFrom(buf)) + .build(); + requestStream.onNext(asyncProfilerData); + buf.clear(); + } + } catch (IOException e) { + LOGGER.error("Failed to read JFR file and failed to upload to oap", e); + } + } + + requestStream.onCompleted(); + } + + @Override + public void onError(Throwable t) { + status.finished(); + LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception."); + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() + .setService(Config.Agent.SERVICE_NAME) + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setType(AsyncProfilingStatus.PROFILING_SUCCESS) + .setContentSize(size) + .setTaskId(task.getTaskId()) + .build(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); + dataStreamObserver.onNext(asyncProfilerData); + + status.wait4Finish(); + } + + public void sendError(AsyncProfilerTask task, String errorMessage) { + if (status != GRPCChannelStatus.CONNECTED) { + return; + } + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( + GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS + ).collect(new StreamObserver() { + @Override + public void onNext(AsyncProfilerCollectionResponse value) { + } + + @Override + public void onError(Throwable t) { + status.finished(); + LOGGER.error(t, "Send async profiler task execute error fail with a grpc internal exception."); + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() + .setService(Config.Agent.SERVICE_NAME) + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setTaskId(task.getTaskId()) + .setType(AsyncProfilingStatus.EXECUTION_TASK_ERROR) + .setContentSize(-1) + .build(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() + .setMetaData(metaData) + .setErrorMessage(errorMessage) + .build(); + dataStreamObserver.onNext(asyncProfilerData); + dataStreamObserver.onCompleted(); + status.wait4Finish(); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java new file mode 100644 index 0000000000..95948762b2 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import one.profiler.AsyncProfiler; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.util.StringUtil; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class AsyncProfilerTask { + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTask.class); + private static final String COMMA = ","; + /** + * task id + */ + private String taskId; + /** + * async profiler optional extended parameters + * + * @see org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand + */ + private String execArgs; + /** + * run profiling for duration (second) + */ + private int duration; + /** + * The time when oap server created this task + */ + private long createTime; + /** + * tempFile generated by async-profiler execution + */ + private Path tempFile; + + private static String execute(AsyncProfiler asyncProfiler, String args) + throws IllegalArgumentException, IOException { + LOGGER.info("async profiler execute args:{}", args); + return asyncProfiler.execute(args); + } + + /** + * start async profiler + */ + public String start(AsyncProfiler asyncProfiler) throws IOException { + tempFile = getProfilerFilePath(); + StringBuilder startArgs = new StringBuilder(); + startArgs.append("start").append(COMMA); + if (StringUtil.isNotEmpty(execArgs)) { + startArgs.append(execArgs).append(COMMA); + } + startArgs.append("file=").append(tempFile.toString()); + + return execute(asyncProfiler, startArgs.toString()); + } + + /** + * stop async profiler and get file + */ + public File stop(AsyncProfiler asyncProfiler) throws IOException { + LOGGER.info("async profiler process stop and dump file"); + String stopArgs = "stop" + COMMA + "file=" + tempFile.toAbsolutePath(); + execute(asyncProfiler, stopArgs); + return tempFile.toFile(); + } + + /** + * if outputPath is configured, the JFR file will be generated at outputPath, + * otherwise createTemp will be used to create the file + */ + public Path getProfilerFilePath() throws IOException { + if (StringUtil.isNotEmpty(Config.AsyncProfiler.OUTPUT_PATH)) { + Path tempFilePath = Paths.get(Config.AsyncProfiler.OUTPUT_PATH, taskId + getFileExtension()); + return Files.createFile(tempFilePath.toAbsolutePath()); + } else { + return Files.createTempFile(taskId, getFileExtension()); + } + } + + private String getFileExtension() { + return ".jfr"; + } + + public void setExecArgs(String execArgs) { + this.execArgs = execArgs; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public void setTempFile(Path tempFile) { + this.tempFile = tempFile; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void setCreateTime(long createTime) { + this.createTime = createTime; + } + + public String getExecArgs() { + return execArgs; + } + + public int getDuration() { + return duration; + } + + public Path getTempFile() { + return tempFile; + } + + public String getTaskId() { + return taskId; + } + + public long getCreateTime() { + return createTime; + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java new file mode 100644 index 0000000000..7a2b26a46a --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import io.grpc.Channel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.commands.CommandService; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; +import org.apache.skywalking.apm.network.common.v3.Commands; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskCommandQuery; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; +import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + +@DefaultImplementor +public class AsyncProfilerTaskChannelService implements BootService, Runnable, GRPCChannelListener { + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTaskChannelService.class); + + // channel status + private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; + private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskBlockingStub asyncProfilerTaskBlockingStub; + + // query task schedule + private volatile ScheduledFuture getTaskFuture; + + @Override + public void run() { + if (status == GRPCChannelStatus.CONNECTED) { + try { + // test start command and 10s after put stop command + long lastCommandCreateTime = ServiceManager.INSTANCE + .findService(AsyncProfilerTaskExecutionService.class).getLastCommandCreateTime(); + + AsyncProfilerTaskCommandQuery query = AsyncProfilerTaskCommandQuery.newBuilder() + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setService(Config.Agent.SERVICE_NAME) + .setLastCommandTime(lastCommandCreateTime) + .build(); + Commands commands = asyncProfilerTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) + .getAsyncProfilerTaskCommands(query); + ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); + } catch (Throwable t) { + if (!(t instanceof StatusRuntimeException)) { + LOGGER.error(t, "fail to query async-profiler task from backend"); + return; + } + final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; + if (Status.Code.UNIMPLEMENTED.equals(statusRuntimeException.getStatus().getCode())) { + LOGGER.warn("Backend doesn't support async-profiler, async-profiler will be disabled"); + if (getTaskFuture != null) { + getTaskFuture.cancel(true); + } + } + } + } + } + + @Override + public void statusChanged(GRPCChannelStatus status) { + if (GRPCChannelStatus.CONNECTED.equals(status)) { + Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); + asyncProfilerTaskBlockingStub = AsyncProfilerTaskGrpc.newBlockingStub(channel); + } else { + asyncProfilerTaskBlockingStub = null; + } + this.status = status; + } + + @Override + public void prepare() throws Throwable { + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); + } + + @Override + public void boot() throws Throwable { + + if (Config.AsyncProfiler.ACTIVE) { + getTaskFuture = Executors.newSingleThreadScheduledExecutor( + new DefaultNamedThreadFactory("AsyncProfilerGetTaskService") + ).scheduleWithFixedDelay( + new RunnableWithExceptionProtection( + this, + t -> LOGGER.error("Query async profiler task list failure.", t) + ), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS + ); + } + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + if (getTaskFuture != null) { + getTaskFuture.cancel(true); + } + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java new file mode 100644 index 0000000000..550923c1b0 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import one.profiler.AsyncProfiler; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@DefaultImplementor +public class AsyncProfilerTaskExecutionService implements BootService { + + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTaskChannelService.class); + + private AsyncProfiler asyncProfilerInstance; + + private static final String SUCCESS_RESULT = "Profiling started\n"; + + // profile executor thread pool, only running one thread + private volatile ScheduledExecutorService asyncProfilerExecutor; + + // last command create time, use to next query task list + private volatile long lastCommandCreateTime = -1; + + // task schedule future + private volatile ScheduledFuture scheduledFuture; + + public void processAsyncProfilerTask(AsyncProfilerTask task) { + if (task.getCreateTime() <= lastCommandCreateTime) { + LOGGER.warn("get repeat task because createTime is less than lastCommandCreateTime"); + return; + } + lastCommandCreateTime = task.getCreateTime(); + LOGGER.info("add async profiler task: {}", task.getTaskId()); + // add task to list + getAsyncProfilerExecutor().execute(() -> { + try { + if (Objects.nonNull(scheduledFuture) && !scheduledFuture.isDone()) { + LOGGER.info("AsyncProfilerTask already running"); + return; + } + + String result = task.start(getAsyncProfiler()); + if (!SUCCESS_RESULT.equals(result)) { + stopWhenError(task, result); + return; + } + scheduledFuture = getAsyncProfilerExecutor().schedule( + () -> stopWhenSuccess(task), task.getDuration(), TimeUnit.SECONDS + ); + } catch (IOException e) { + LOGGER.error("AsyncProfilerTask executor error:" + e.getMessage(), e); + } + }); + } + + private void stopWhenError(AsyncProfilerTask task, String errorMessage) { + LOGGER.error("AsyncProfilerTask fails to start: " + errorMessage); + AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); + dataSender.sendError(task, errorMessage); + } + + private void stopWhenSuccess(AsyncProfilerTask task) { + + try { + File dumpFile = task.stop(getAsyncProfiler()); + // stop task + try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) { + // upload file + FileChannel channel = fileInputStream.getChannel(); + + AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); + dataSender.sendData(task, channel); + } + + if (!dumpFile.delete()) { + LOGGER.warn("Fail to delete the dump file of async profiler."); + } + } catch (Exception e) { + LOGGER.error("stop async profiler task error", e); + return; + } + } + + public long getLastCommandCreateTime() { + return lastCommandCreateTime; + } + + @Override + public void prepare() throws Throwable { + + } + + @Override + public void boot() throws Throwable { + + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + getAsyncProfilerExecutor().shutdown(); + if (Objects.nonNull(scheduledFuture)) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + } + + private AsyncProfiler getAsyncProfiler() { + if (asyncProfilerInstance == null) { + asyncProfilerInstance = AsyncProfiler.getInstance(); + } + return asyncProfilerInstance; + } + + private ScheduledExecutorService getAsyncProfilerExecutor() { + if (asyncProfilerExecutor == null) { + synchronized (this) { + if (asyncProfilerExecutor == null) { + asyncProfilerExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultNamedThreadFactory("ASYNC-PROFILING-TASK")); + } + } + } + return asyncProfilerExecutor; + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java index 819b0b9ff1..c619051c9b 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java @@ -21,9 +21,11 @@ import java.util.Map; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.commands.executor.AsyncProfilerCommandExecutor; import org.apache.skywalking.apm.agent.core.commands.executor.ConfigurationDiscoveryCommandExecutor; import org.apache.skywalking.apm.agent.core.commands.executor.NoopCommandExecutor; import org.apache.skywalking.apm.agent.core.commands.executor.ProfileTaskCommandExecutor; +import org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand; import org.apache.skywalking.apm.network.trace.component.command.BaseCommand; import org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand; import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand; @@ -48,6 +50,9 @@ public void prepare() throws Throwable { //Get ConfigurationDiscoveryCommand executor. commandExecutorMap.put(ConfigurationDiscoveryCommand.NAME, new ConfigurationDiscoveryCommandExecutor()); + + // AsyncProfiler task executor + commandExecutorMap.put(AsyncProfilerTaskCommand.NAME, new AsyncProfilerCommandExecutor()); } @Override diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java new file mode 100644 index 0000000000..530b655f84 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.commands.executor; + +import org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTask; +import org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTaskExecutionService; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.commands.CommandExecutionException; +import org.apache.skywalking.apm.agent.core.commands.CommandExecutor; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand; +import org.apache.skywalking.apm.network.trace.component.command.BaseCommand; + +public class AsyncProfilerCommandExecutor implements CommandExecutor { + @Override + public void execute(BaseCommand command) throws CommandExecutionException { + AsyncProfilerTaskCommand asyncProfilerTaskCommand = (AsyncProfilerTaskCommand) command; + + AsyncProfilerTask asyncProfilerTask = new AsyncProfilerTask(); + asyncProfilerTask.setTaskId(asyncProfilerTaskCommand.getTaskId()); + int duration = Math.min(Config.AsyncProfiler.MAX_DURATION, asyncProfilerTaskCommand.getDuration()); + asyncProfilerTask.setDuration(duration); + asyncProfilerTask.setExecArgs(asyncProfilerTaskCommand.getExecArgs()); + asyncProfilerTask.setCreateTime(asyncProfilerTaskCommand.getCreateTime()); + ServiceManager.INSTANCE.findService(AsyncProfilerTaskExecutionService.class) + .processAsyncProfilerTask(asyncProfilerTask); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index be7d54a4e4..fe2b7448e1 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -252,6 +252,33 @@ public static class Profile { public static int SNAPSHOT_TRANSPORT_BUFFER_SIZE = 500; } + public static class AsyncProfiler { + /** + * If true, async profiler will be enabled when user creates a new async profiler task. + * If false, it will be disabled. + * The default value is true. + */ + public static boolean ACTIVE = true; + + /** + * Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. + * default 10min. + */ + public static int MAX_DURATION = 600; + + /** + * Path for the JFR outputs from the Async Profiler. + * If the parameter is not empty, the file will be created in the specified directory, + * otherwise the Files.createTemp method will be used to create the file. + */ + public static String OUTPUT_PATH = ""; + + /** + * The size of the chunk when uploading jfr + */ + public static final int DATA_CHUNK_SIZE = 1024 * 1024; + } + public static class Meter { /** * If true, skywalking agent will enable sending meters. Otherwise disable meter report. diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService index cfda93521c..f75d28cd78 100644 --- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService +++ b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService @@ -36,3 +36,6 @@ org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService org.apache.skywalking.apm.agent.core.remote.EventReportServiceClient org.apache.skywalking.apm.agent.core.ServiceInstanceGenerator +org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTaskExecutionService +org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTaskChannelService +org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerDataSender \ No newline at end of file diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java index 48af242a43..e177920cfe 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java @@ -59,7 +59,7 @@ public static void afterClass() { public void testServiceDependencies() throws Exception { HashMap registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices"); - assertThat(registryService.size(), is(20)); + assertThat(registryService.size(), is(23)); assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)); assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class)); @@ -109,7 +109,7 @@ private void assertGRPCChannelManager(GRPCChannelManager service) throws Excepti assertNotNull(service); List listeners = getFieldValue(service, "listeners"); - assertEquals(listeners.size(), 10); + assertEquals(listeners.size(), 12); } private void assertSamplingService(SamplingService service) { diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 06f5717de0..0ceb658627 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -164,6 +164,12 @@ profile.duration=${SW_AGENT_PROFILE_DURATION:10} profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} # Snapshot transport to backend buffer size profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:4500} +# If true, async profiler will be enabled when user creates a new async profiler task. If false, it will be disabled. The default value is true. +asyncprofiler.active=${SW_AGENT_ASYNC_PROFILER_ACTIVE:true} +# Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. default 10min. +asyncprofiler.max_duration=${SW_AGENT_ASYNC_PROFILER_MAX_DURATION:600} +# Path for the JFR outputs from the Async Profiler. If the parameter is not empty, the file will be created in the specified directory, otherwise the Files.createTemp method will be used to create the file. +asyncprofiler.output_path=${SW_AGENT_ASYNC_PROFILER_OUTPUT_PATH:} # If true, the agent collects and reports metrics to the backend. meter.active=${SW_METER_ACTIVE:true} # Report meters interval. The unit is second diff --git a/dist-material/LICENSE b/dist-material/LICENSE index a5c6f781cf..f36ccd2177 100755 --- a/dist-material/LICENSE +++ b/dist-material/LICENSE @@ -222,6 +222,7 @@ The text of each license is the standard Apache 2.0 license. Google: jsr305 3.0.2: http://central.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.pom , Apache 2.0 Google: guava 32.0.1: https://github.com/google/guava , Apache 2.0 netty 4.1.100: https://github.com/netty/netty/blob/4.1/LICENSE.txt, Apache 2.0 + async-profiler 3.0: https://github.com/async-profiler/async-profiler/blob/v3.0/LICENSE, Apache 2.0 ======================================================================== BSD licenses diff --git a/pom.xml b/pom.xml index adac36458d..1ca1ec9c2a 100755 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ 2.0.48.Final 1.3.2 3.1 + 3.0 6.0.53 @@ -260,6 +261,11 @@ ${jmh.version} test + + tools.profiler + async-profiler + ${async-profiler.version} +