diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 981d39dccca7..949a7f21fd2b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -612,10 +612,10 @@ public void setTestMethodName(String testMethodName) { @Override public void dumpTestJVMSnapshot() { for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { - configNodeWrapper.dumpJVMSnapshot(testMethodName); + configNodeWrapper.executeJstack(testMethodName); } for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { - dataNodeWrapper.dumpJVMSnapshot(testMethodName); + dataNodeWrapper.executeJstack(testMethodName); } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index fa5447bc5b07..942b6a9e0124 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -38,11 +38,14 @@ import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.lang.management.ManagementFactory; import java.lang.management.MonitorInfo; +import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.net.MalformedURLException; @@ -61,6 +64,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Stream; import static org.apache.iotdb.it.env.cluster.ClusterConstant.CLUSTER_CONFIGURATIONS; @@ -566,7 +570,6 @@ public String getNodePath() { return System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + getId(); } - @Override public void dumpJVMSnapshot(String testCaseName) { JMXServiceURL url; try { @@ -675,4 +678,72 @@ private String getKillPoints() { public abstract String getSystemPropertiesPath(); protected abstract MppJVMConfig initVMConfig(); + + @Override + public void executeJstack() { + executeJstack(logger::info); + } + + @Override + public void executeJstack(final String testCaseName) { + final String fileName = + getLogDirPath() + File.separator + testCaseName + "_" + getId() + "-threads.jstack"; + try (final PrintWriter output = new PrintWriter(fileName)) { + executeJstack(output::println); + } catch (final IOException e) { + logger.warn("IOException occurred when executing Jstack for {}", this.getId(), e); + } + logger.info("Jstack execution output can be found at {}", fileName); + } + + private void executeJstack(final Consumer consumer) { + final long pid = this.getPid(); + if (pid == -1) { + logger.warn("Failed to get pid for {} before executing Jstack", this.getId()); + return; + } + final String command = "jstack -l " + pid; + logger.info("Executing command {} for {}", command, this.getId()); + try { + final Process process = Runtime.getRuntime().exec(command); + try (final BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + consumer.accept(line); + } + } + final int exitCode = process.waitFor(); + logger.info("Command {} exited with code {}", command, exitCode); + } catch (final IOException e) { + logger.warn("IOException occurred when executing Jstack for {}", this.getId(), e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("InterruptedException occurred when executing Jstack for {}", this.getId(), e); + } + } + + /** + * @return The native process ID of the process, -1 if failure. + */ + @Override + public long getPid() { + final JMXServiceURL url; + try { + url = + new JMXServiceURL( + String.format("service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi", jmxPort)); + } catch (final MalformedURLException ignored) { + return -1; + } + try (final JMXConnector connector = JMXConnectorFactory.connect(url)) { + final MBeanServerConnection mbsc = connector.getMBeanServerConnection(); + final RuntimeMXBean rmbean = + ManagementFactory.newPlatformMXBeanProxy( + mbsc, ManagementFactory.RUNTIME_MXBEAN_NAME, RuntimeMXBean.class); + return Long.parseLong(rmbean.getName().split("@")[0]); + } catch (final Throwable ignored) { + return -1; + } + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java index cd0eb86bde18..b0ce0d4a2214 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java @@ -45,5 +45,19 @@ public interface BaseNodeWrapper { String getIpAndPortString(); - void dumpJVMSnapshot(String testCaseName); + /** + * Perform jstack on the process corresponding to the wrapper, and use logger to output the + * results. + */ + void executeJstack(); + + /** + * Perform jstack on the process corresponding to the wrapper, and output the results to a file in + * the log directory. + * + * @param testCaseName the name of test case + */ + void executeJstack(final String testCaseName); + + long getPid(); } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java index 650f861f498d..e33257492500 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java @@ -1010,6 +1010,7 @@ private void pollMessagesAndCheck( } // Check data on receiver + final long[] currentTime = {System.currentTimeMillis()}; try { try (final Connection connection = receiverEnv.getConnection(); final Statement statement = connection.createStatement()) { @@ -1020,6 +1021,15 @@ private void pollMessagesAndCheck( LOGGER.info("detect receiver crashed, skipping this test..."); return; } + // potential stuck + if (System.currentTimeMillis() - currentTime[0] > 60_000L) { + for (final DataNodeWrapper wrapper : senderEnv.getDataNodeWrapperList()) { + wrapper.executeJstack(); + // wrapper.executeJstack(String.format("%s_%s", testName.getMethodName(), + // currentTime[0])); + } + currentTime[0] = System.currentTimeMillis(); + } TestUtils.assertSingleResultSetEqual( TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), expectedHeaderWithResult);