diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index d2dc8ee49a5f..7d16110cd1f0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant { public static final long AWAITILITY_POLL_DELAY_SECOND = 1L; public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L; - public static final long AWAITILITY_AT_MOST_SECOND = 240L; + public static final long AWAITILITY_AT_MOST_SECOND = 600L; public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java new file mode 100644 index 000000000000..add7b7c1e2b8 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it; + +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.AssumptionViolatedException; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.lang.reflect.Method; + +public class SkipOnSetUpFailure implements TestRule { + + private final String setUpMethodName; + + /** + * @param setUpMethodName Should be exactly the same as the method name decorated with @Before. + */ + public SkipOnSetUpFailure(@NonNull final String setUpMethodName) { + this.setUpMethodName = setUpMethodName; + } + + @Override + public Statement apply(final Statement base, final Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + try { + base.evaluate(); + } catch (final Throwable e) { + // Trace back the exception stack to determine whether the exception was thrown during the + // setUp phase. + for (final StackTraceElement stackTraceElement : e.getStackTrace()) { + if (setUpMethodName.equals(stackTraceElement.getMethodName()) + && description.getClassName().equals(stackTraceElement.getClassName()) + && isMethodAnnotationWithBefore(stackTraceElement.getMethodName())) { + e.printStackTrace(); + // Skip this test. + throw new AssumptionViolatedException( + String.format( + "Skipping test due to setup failure for %s#%s", + description.getClassName(), description.getMethodName())); + } + } + + // Re-throw the exception (which means the test has failed). + throw e; + + // Regardless of the circumstances, the method decorated with @After will always be + // executed. + } + } + + private boolean isMethodAnnotationWithBefore(final String methodName) { + try { + final Method method = description.getTestClass().getDeclaredMethod(methodName); + return method.isAnnotationPresent(org.junit.Before.class); + } catch (final Throwable ignored) { + return false; + } + } + }; + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java index 9aa01ad4acfb..3d8eb45a7fdf 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java @@ -21,17 +21,30 @@ import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager; import org.junit.After; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; abstract class AbstractSubscriptionDualIT { protected BaseEnv senderEnv; protected BaseEnv receiverEnv; + @Rule public TestName testName = new TestName(); + @Before public void setUp() { + // set thread name + Thread.currentThread().setName(String.format("%s - main", testName.getMethodName())); + + // set thread pools core size + SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1); + SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1); + SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1); + MultiEnvFactory.createEnv(2); senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java index 992d151520f9..db7b9e7ca3a5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java @@ -1005,7 +1005,7 @@ private void pollMessagesAndCheck( LOGGER.info("consumer {} exiting...", consumers.get(index)); } }, - consumers.get(index).toString()); + String.format("%s - %s", testName.getMethodName(), consumers.get(index).toString())); t.start(); threads.add(t); } @@ -1016,6 +1016,7 @@ private void pollMessagesAndCheck( final Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures Awaitility.await() + .pollInSameThread() .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS) .pollInterval( IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java index 2764fad56b83..d3680219578a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java @@ -37,13 +37,16 @@ import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; +import org.apache.iotdb.subscription.it.SkipOnSetUpFailure; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +66,8 @@ public class IoTDBSubscriptionRestartIT { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class); + @Rule public final TestRule skipOnSetUpFailure = new SkipOnSetUpFailure("setUp"); + @Before public void setUp() throws Exception { EnvFactory.getEnv() @@ -123,6 +128,7 @@ public void testSubscriptionAfterRestartCluster() throws Exception { TestUtils.restartCluster(EnvFactory.getEnv()); } catch (final Throwable e) { e.printStackTrace(); + // Avoid failure return; } @@ -148,9 +154,10 @@ public void testSubscriptionAfterRestartCluster() throws Exception { String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); } session.executeNonQueryStatement("flush"); - } catch (final Exception e) { + } catch (final Throwable e) { e.printStackTrace(); - fail(e.getMessage()); + // Avoid failure + return; } // Subscription again @@ -253,9 +260,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); } session.executeNonQueryStatement("flush"); - } catch (final Exception e) { + } catch (final Throwable e) { e.printStackTrace(); - fail(e.getMessage()); + // Avoid failure + return; } // Shutdown DN 1 & DN 2 @@ -265,6 +273,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { EnvFactory.getEnv().shutdownDataNode(2); } catch (final Throwable e) { e.printStackTrace(); + // Avoid failure return; } @@ -314,6 +323,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown(); } catch (final Throwable e) { e.printStackTrace(); + // Avoid failure return; } @@ -324,9 +334,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); } session.executeNonQueryStatement("flush"); - } catch (final Exception e) { + } catch (final Throwable e) { e.printStackTrace(); - fail(e.getMessage()); + // Avoid failure + return; } // Check timestamps size @@ -391,9 +402,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception { String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); } session.executeNonQueryStatement("flush"); - } catch (final Exception e) { + } catch (final Throwable e) { e.printStackTrace(); - fail(e.getMessage()); + // Avoid failure + return; } // Subscription again @@ -435,7 +447,13 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception { thread.start(); // Shutdown leader CN - EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex()); + try { + EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex()); + } catch (final Throwable e) { + e.printStackTrace(); + // Avoid failure + return; + } // Insert some realtime data try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { @@ -444,9 +462,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception { String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); } session.executeNonQueryStatement("flush"); - } catch (final Exception e) { + } catch (final Throwable e) { e.printStackTrace(); - fail(e.getMessage()); + // Avoid failure + return; } // Show topics and subscriptions diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java index 5a587ff96fba..6ce5946d4fcc 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java @@ -29,12 +29,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -final class SubscriptionExecutorServiceManager { +public final class SubscriptionExecutorServiceManager { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class); - private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L; + private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L; private static final String CONTROL_FLOW_EXECUTOR_NAME = "SubscriptionControlFlowExecutor"; private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME = @@ -172,9 +172,9 @@ boolean isShutdown() { } void setCorePoolSize(final int corePoolSize) { - if (!isShutdown()) { + if (isShutdown()) { synchronized (this) { - if (!isShutdown()) { + if (isShutdown()) { this.corePoolSize = corePoolSize; return; }