Skip to content

Commit

Permalink
Subscription: skip on setup and cluster failure when running subscrip…
Browse files Browse the repository at this point in the history
…tion restart IT & fix some bugs in SubscriptionExecutorServiceManager (#12710)
  • Loading branch information
VGalaxies authored Jun 12, 2024
1 parent a9d1401 commit 20c4cd2
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant {

public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
public static final long AWAITILITY_AT_MOST_SECOND = 240L;
public static final long AWAITILITY_AT_MOST_SECOND = 600L;

public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it;

import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.AssumptionViolatedException;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.lang.reflect.Method;

public class SkipOnSetUpFailure implements TestRule {

private final String setUpMethodName;

/**
* @param setUpMethodName Should be exactly the same as the method name decorated with @Before.
*/
public SkipOnSetUpFailure(@NonNull final String setUpMethodName) {
this.setUpMethodName = setUpMethodName;
}

@Override
public Statement apply(final Statement base, final Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try {
base.evaluate();
} catch (final Throwable e) {
// Trace back the exception stack to determine whether the exception was thrown during the
// setUp phase.
for (final StackTraceElement stackTraceElement : e.getStackTrace()) {
if (setUpMethodName.equals(stackTraceElement.getMethodName())
&& description.getClassName().equals(stackTraceElement.getClassName())
&& isMethodAnnotationWithBefore(stackTraceElement.getMethodName())) {
e.printStackTrace();
// Skip this test.
throw new AssumptionViolatedException(
String.format(
"Skipping test due to setup failure for %s#%s",
description.getClassName(), description.getMethodName()));
}
}

// Re-throw the exception (which means the test has failed).
throw e;

// Regardless of the circumstances, the method decorated with @After will always be
// executed.
}
}

private boolean isMethodAnnotationWithBefore(final String methodName) {
try {
final Method method = description.getTestClass().getDeclaredMethod(methodName);
return method.isAnnotationPresent(org.junit.Before.class);
} catch (final Throwable ignored) {
return false;
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,30 @@

import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;

abstract class AbstractSubscriptionDualIT {

protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;

@Rule public TestName testName = new TestName();

@Before
public void setUp() {
// set thread name
Thread.currentThread().setName(String.format("%s - main", testName.getMethodName()));

// set thread pools core size
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);

MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ private void pollMessagesAndCheck(
LOGGER.info("consumer {} exiting...", consumers.get(index));
}
},
consumers.get(index).toString());
String.format("%s - %s", testName.getMethodName(), consumers.get(index).toString()));
t.start();
threads.add(t);
}
Expand All @@ -1016,6 +1016,7 @@ private void pollMessagesAndCheck(
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollInSameThread()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;

import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,6 +66,8 @@ public class IoTDBSubscriptionRestartIT {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);

@Rule public final TestRule skipOnSetUpFailure = new SkipOnSetUpFailure("setUp");

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
Expand Down Expand Up @@ -123,6 +128,7 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
TestUtils.restartCluster(EnvFactory.getEnv());
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

Expand All @@ -148,9 +154,10 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Subscription again
Expand Down Expand Up @@ -253,9 +260,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Shutdown DN 1 & DN 2
Expand All @@ -265,6 +273,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
EnvFactory.getEnv().shutdownDataNode(2);
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

Expand Down Expand Up @@ -314,6 +323,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

Expand All @@ -324,9 +334,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Check timestamps size
Expand Down Expand Up @@ -391,9 +402,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Subscription again
Expand Down Expand Up @@ -435,7 +447,13 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
thread.start();

// Shutdown leader CN
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
try {
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

// Insert some realtime data
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
Expand All @@ -444,9 +462,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Show topics and subscriptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

final class SubscriptionExecutorServiceManager {
public final class SubscriptionExecutorServiceManager {

private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);

private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L;

private static final String CONTROL_FLOW_EXECUTOR_NAME = "SubscriptionControlFlowExecutor";
private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
Expand Down Expand Up @@ -172,9 +172,9 @@ boolean isShutdown() {
}

void setCorePoolSize(final int corePoolSize) {
if (!isShutdown()) {
if (isShutdown()) {
synchronized (this) {
if (!isShutdown()) {
if (isShutdown()) {
this.corePoolSize = corePoolSize;
return;
}
Expand Down

0 comments on commit 20c4cd2

Please sign in to comment.