Skip to content

Commit

Permalink
refactor sub IT
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Jun 12, 2024
1 parent ad33eb4 commit 564de62
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 257 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it;

import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;

public abstract class AbstractSubscriptionIT {

@Rule public TestName testName = new TestName();

@Rule public final TestRule skipOnSetUpFailure = new SkipOnSetUpFailure("setUp");

@Before
public void setUp() {
// set thread name
Thread.currentThread().setName(String.format("%s - main", testName.getMethodName()));

// set thread pools core size
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
}

@After
public void tearDown() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,24 @@

package org.apache.iotdb.subscription.it;

import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;

import java.util.concurrent.TimeUnit;

public class IoTDBSubscriptionITConstant {

public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
public static final long AWAITILITY_AT_MOST_SECOND = 600L;
private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
private static final long AWAITILITY_AT_MOST_SECOND = 600L;

public static final ConditionFactory AWAIT =
Awaitility.await()
.pollInSameThread()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
.atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS);

public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.subscription.it.local;
package org.apache.iotdb.subscription.it.cluster;

import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
Expand All @@ -36,40 +36,37 @@
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;

import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBSubscriptionRestartIT {
public class IoTDBSubscriptionRestartIT extends AbstractSubscriptionIT {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);

@Rule public final TestRule skipOnSetUpFailure = new SkipOnSetUpFailure("setUp");

@Before
public void setUp() throws Exception {
public void setUp() {
super.setUp();

EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
Expand All @@ -83,7 +80,9 @@ public void setUp() throws Exception {
}

@After
public void tearDown() throws Exception {
public void tearDown() {
super.tearDown();

EnvFactory.getEnv().cleanClusterEnvironment();
}

Expand Down Expand Up @@ -201,12 +200,7 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
// Check timestamps size
try {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
.atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
AWAIT.untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down Expand Up @@ -343,12 +337,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
// Check timestamps size
try {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
.atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
AWAIT.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down Expand Up @@ -486,12 +475,7 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
// Check timestamps size
try {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
.atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
AWAIT.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,19 @@

import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;

abstract class AbstractSubscriptionDualIT {
abstract class AbstractSubscriptionDualIT extends AbstractSubscriptionIT {

protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;

@Rule public TestName testName = new TestName();

@Before
public void setUp() {
// set thread name
Thread.currentThread().setName(String.format("%s - main", testName.getMethodName()));

// set thread pools core size
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
super.setUp();

MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
Expand All @@ -55,7 +45,7 @@ public void setUp() {
receiverEnv.initClusterEnvironment();
}

void setUpConfig() {
protected void setUpConfig() {
// enable auto create schema
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
Expand All @@ -66,7 +56,9 @@ void setUpConfig() {
}

@After
public final void tearDown() {
public void tearDown() {
super.tearDown();

senderEnv.cleanClusterEnvironment();
receiverEnv.cleanClusterEnvironment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.utils.Pair;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -62,11 +61,11 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
Expand Down Expand Up @@ -109,7 +108,7 @@ static final class SubscriptionInfo {
}

@Override
void setUpConfig() {
protected void setUpConfig() {
super.setUpConfig();

// Enable air gap receiver
Expand Down Expand Up @@ -1015,22 +1014,16 @@ private void pollMessagesAndCheck(
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollInSameThread()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
.atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
.untilAsserted(
() -> {
if (receiverCrashed.get()) {
LOGGER.info("detect receiver crashed, skipping this test...");
return;
}
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
expectedHeaderWithResult);
});
AWAIT.untilAsserted(
() -> {
if (receiverCrashed.get()) {
LOGGER.info("detect receiver crashed, skipping this test...");
return;
}
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
expectedHeaderWithResult);
});
}
} catch (final Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;

import org.apache.tsfile.write.record.Tablet;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -43,10 +42,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
Expand All @@ -57,7 +56,7 @@ public class IoTDBSubscriptionTimePrecisionIT extends AbstractSubscriptionDualIT
LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class);

@Override
void setUpConfig() {
protected void setUpConfig() {
super.setUpConfig();

// Set timestamp precision to nanosecond
Expand Down Expand Up @@ -168,21 +167,16 @@ public void testTopicTimePrecision() throws Exception {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
.atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
new HashMap<String, String>() {
{
put("count(root.db.d1.s2)", "100");
put("count(root.db.d2.s1)", "100");
}
}));
AWAIT.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
new HashMap<String, String>() {
{
put("count(root.db.d1.s2)", "100");
put("count(root.db.d2.s1)", "100");
}
}));
}
} catch (final Exception e) {
e.printStackTrace();
Expand Down
Loading

0 comments on commit 564de62

Please sign in to comment.