Skip to content

Commit

Permalink
Subscription: support topic loose range for path and time (#12760)
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies authored and OneSizeFitsQuorum committed Jun 27, 2024
1 parent 32f98c1 commit b96dd0e
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,9 @@ public static void assertDataEventuallyOnEnv(
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Expand All @@ -720,6 +723,36 @@ public static void assertDataEventuallyOnEnv(
}
}

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
}

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult, long timeoutSeconds) {
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try {
TestUtils.assertSingleResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeaderWithResult);
} catch (Exception e) {
Assert.fail();
}
});
} catch (Exception e) {
e.printStackTrace();
fail();
}
}

public static void assertDataAlwaysOnEnv(
BaseEnv env, String sql, String expectedHeader, Set<String> expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
Expand All @@ -735,6 +768,9 @@ public static void assertDataAlwaysOnEnv(
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(consistentSeconds, TimeUnit.SECONDS)
.failFast(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,12 +1013,23 @@ public void testRealtimeLooseRange() throws Exception {
return;
}

TestUtils.tryExecuteNonQueriesWithRetry(
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1 (time, at1)" + " values (5000, 1), (16000, 3)",
"insert into root.db.d1 (time, at1, at2)" + " values (5001, 1, 2), (6001, 3, 4)",
"flush"));
"flush"))) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(at1) from root.db.d1 where time >= 2000 and time <= 10000",
new HashMap<String, String>() {
{
put("count(root.db.d1.at1)", "4");
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class IoTDBSubscriptionITConstant {

private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
private static final long AWAITILITY_POLL_INTERVAL_SECOND = 1L;
private static final long AWAITILITY_AT_MOST_SECOND = 600L;

public static final ConditionFactory AWAIT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ public void testTopicInvalidTimeRangeConfig() throws Exception {
@Test
public void testTopicInvalidPathConfig() throws Exception {
// Test invalid path when using tsfile format
// NOTE: Delete this test after the restriction "on path/time range/processor when subscribing
// to tsfile" is removed.
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
Expand All @@ -542,6 +544,8 @@ public void testTopicInvalidPathConfig() throws Exception {
@Test
public void testTopicInvalidProcessorConfig() throws Exception {
// Test invalid processor when using tsfile format
// NOTE: Delete this test after the restriction "on path/time range/processor when subscribing
// to tsfile" is removed.
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put("processor", "tumbling-time-sampling-processor");
Expand Down Expand Up @@ -578,6 +582,7 @@ public void testTopicWithQueryMode() throws Exception {
e.printStackTrace();
fail(e.getMessage());
}
assertTopicCount(1);

// Subscription
final AtomicInteger rowCount = new AtomicInteger();
Expand Down Expand Up @@ -652,6 +657,136 @@ public void testTopicWithQueryMode() throws Exception {
}
}

@Test
public void testTopicWithLooseRange() throws Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (1000, 1, 2), (2000, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db1.d1 (time, at1, at2) values (1000, 1, 2), (2000, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (3000, 1, 2), (4000, 3, 4)");
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Create topic
final String topicName = "topic12";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.LOOSE_RANGE_KEY, TopicConstant.LOOSE_RANGE_TIME_AND_PATH_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.d1.at1");
config.put(TopicConstant.START_TIME_KEY, "1500");
config.put(TopicConstant.END_TIME_KEY, "2500");
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
assertTopicCount(1);

final AtomicBoolean dataPrepared = new AtomicBoolean(false);
final AtomicBoolean topicSubscribed = new AtomicBoolean(false);
final AtomicBoolean result = new AtomicBoolean(false);
final List<Thread> threads = new ArrayList<>();

// Subscribe on sender
threads.add(
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topicName);
topicSubscribed.set(true);
while (!result.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
if (dataPrepared.get()) {
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
for (final SubscriptionMessage message : messages) {
for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
}
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
},
String.format("%s - consumer", testName.getMethodName())));

// Insert some realtime data on sender
threads.add(
new Thread(
() -> {
while (!topicSubscribed.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
}
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (1001, 1, 2), (2001, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db1.d1 (time, at1, at2) values (1001, 1, 2), (2001, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (3001, 1, 2), (4001, 3, 4)");
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
dataPrepared.set(true);
},
String.format("%s - data inserter", testName.getMethodName())));

for (final Thread thread : threads) {
thread.start();
}

try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
AWAIT.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(
statement,
"select count(at1) from root.db.d1 where time >= 1500 and time <= 2500"),
new HashMap<String, String>() {
{
put("count(root.db.d1.at1)", "2");
}
}));
}

result.set(true);
for (final Thread thread : threads) {
thread.join();
}
}

private void testTopicInvalidRuntimeConfigTemplate(
final String topicName, final Properties config) throws Exception {
// Create topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class TopicConfig extends PipeParameters {

Expand All @@ -40,14 +43,6 @@ public TopicConfig(final Map<String, String> attributes) {
super(attributes);
}

private static final Map<String, String> LOOSE_RANGE_TIME_CONFIG =
new HashMap<String, String>() {
{
put("history.loose-range", "time");
put("realtime.loose-range", "time");
}
};

private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
Collections.singletonMap("realtime.mode", "batch");
private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
Expand All @@ -58,6 +53,15 @@ public TopicConfig(final Map<String, String> attributes) {
private static final Map<String, String> SUBSCRIBE_MODE_CONFIG =
Collections.singletonMap("mode", "subscribe");

private static final Set<String> LOOSE_RANGE_KEY_SET =
Collections.unmodifiableSet(
new HashSet<String>() {
{
add("history.loose-range");
add("realtime.loose-range");
}
});

/////////////////////////////// de/ser ///////////////////////////////

public void serialize(final DataOutputStream stream) throws IOException {
Expand Down Expand Up @@ -102,12 +106,6 @@ public Map<String, String> getAttributesWithTimeRange(final long creationTime) {
attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime);
}

// enable loose range when using tsfile format
if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
attributes.getOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE))) {
attributesWithTimeRange.putAll(LOOSE_RANGE_TIME_CONFIG);
}

return attributesWithTimeRange;
}

Expand All @@ -125,6 +123,14 @@ public Map<String, String> getAttributesWithSourceMode() {
: SUBSCRIBE_MODE_CONFIG;
}

public Map<String, String> getAttributesWithSourceLooseRange() {
final String looseRangeValue =
attributes.getOrDefault(
TopicConstant.LOOSE_RANGE_KEY, TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
return LOOSE_RANGE_KEY_SET.stream()
.collect(Collectors.toMap(key -> key, key -> looseRangeValue));
}

public Map<String, String> getAttributesWithProcessorPrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public class TopicConstant {
public static final String FORMAT_TS_FILE_HANDLER_VALUE = "TsFileHandler";
public static final String FORMAT_DEFAULT_VALUE = FORMAT_SESSION_DATA_SETS_HANDLER_VALUE;

public static final String LOOSE_RANGE_KEY = "loose-range";
public static final String LOOSE_RANGE_TIME_VALUE = "time";
public static final String LOOSE_RANGE_PATH_VALUE = "path";
public static final String LOOSE_RANGE_TIME_AND_PATH_VALUE = "time,path";
public static final String LOOSE_RANGE_DEFAULT_VALUE = "";

private TopicConstant() {
throw new IllegalStateException("Utility class");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ public Map<String, String> generateExtractorAttributes() {
extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
// source mode
extractorAttributes.putAll(config.getAttributesWithSourceMode());
// loose range
extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
return extractorAttributes;
}

Expand Down

0 comments on commit b96dd0e

Please sign in to comment.