diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index fa5447bc5b07..1ae4918008df 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -38,13 +38,16 @@ import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.lang.management.ManagementFactory; import java.lang.management.MonitorInfo; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.lang.reflect.Field; import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -675,4 +678,46 @@ private String getKillPoints() { public abstract String getSystemPropertiesPath(); protected abstract MppJVMConfig initVMConfig(); + + public void executeJstack() throws Exception { + final long pid = getPidOfProcess(this.instance); + if (pid == -1) { + logger.warn("Failed to get pid for {} before executing jstack", this.getId()); + return; + } + final String command = "jstack -l " + pid; + logger.info("Executing command {} for {}", command, this.getId()); + final Process process = Runtime.getRuntime().exec(command); + final StringBuilder output = new StringBuilder(); + try (final BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append(System.lineSeparator()); + } + } + final int exitCode = process.waitFor(); + logger.info("Command {} exited with code {}", command, exitCode); + logger.info(output.toString()); + } + + /** + * @param process The process to obtain the native process ID from. + * @return The native process ID of the process, -1 if failure. + */ + private static synchronized long getPidOfProcess(final Process process) { + long pid = -1; + try { + if (process.getClass().getName().equals("java.lang.UNIXProcess") + || process.getClass().getName().equals("java.lang.ProcessImpl")) { + final Field f = process.getClass().getDeclaredField("pid"); + f.setAccessible(true); + pid = f.getLong(process); + f.setAccessible(false); + } + } catch (final Exception e) { + pid = -1; + } + return pid; + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java index 650f861f498d..787e855c18fd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java @@ -1010,6 +1010,7 @@ private void pollMessagesAndCheck( } // Check data on receiver + final long[] currentTime = {System.currentTimeMillis()}; try { try (final Connection connection = receiverEnv.getConnection(); final Statement statement = connection.createStatement()) { @@ -1020,6 +1021,13 @@ private void pollMessagesAndCheck( LOGGER.info("detect receiver crashed, skipping this test..."); return; } + // potential stuck + if (System.currentTimeMillis() - currentTime[0] > 60_000L) { + currentTime[0] = System.currentTimeMillis(); + for (final DataNodeWrapper wrapper : senderEnv.getDataNodeWrapperList()) { + wrapper.executeJstack(); + } + } TestUtils.assertSingleResultSetEqual( TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), expectedHeaderWithResult);