diff --git a/.github/workflows/cluster-it-1c1d.yml b/.github/workflows/cluster-it-1c1d.yml
index 46f591f9ce67..782b5fd90ac9 100644
--- a/.github/workflows/cluster-it-1c1d.yml
+++ b/.github/workflows/cluster-it-1c1d.yml
@@ -27,7 +27,7 @@ concurrency:
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
- GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
jobs:
Simple:
diff --git a/.github/workflows/cluster-it-1c3d.yml b/.github/workflows/cluster-it-1c3d.yml
index e84feca528d8..2c62101c9dc3 100644
--- a/.github/workflows/cluster-it-1c3d.yml
+++ b/.github/workflows/cluster-it-1c3d.yml
@@ -27,7 +27,7 @@ concurrency:
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
- GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
jobs:
Simple:
diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml
index 5b62958d527d..55d4f621cebc 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -25,7 +25,7 @@ concurrency:
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
- GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
jobs:
auto-create-schema:
diff --git a/.github/workflows/sonar-codecov.yml b/.github/workflows/sonar-codecov.yml
index 46fa342af15b..dab6cf493cdc 100644
--- a/.github/workflows/sonar-codecov.yml
+++ b/.github/workflows/sonar-codecov.yml
@@ -29,7 +29,7 @@ concurrency:
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
PR_NUMBER: ${{ github.event.number }}
- GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
jobs:
codecov:
diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml
index 3c6ada72f53c..3763dc6dc9bb 100644
--- a/.github/workflows/unit-test.yml
+++ b/.github/workflows/unit-test.yml
@@ -30,7 +30,7 @@ concurrency:
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
- GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
jobs:
unit-test:
diff --git a/.gitignore b/.gitignore
index 25f3070aae38..111f5922f1a3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -122,3 +122,4 @@ iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/
# Develocity
.mvn/.gradle-enterprise/
+.mvn/.develocity/
diff --git a/.mvn/gradle-enterprise.xml b/.mvn/develocity.xml
similarity index 69%
rename from .mvn/gradle-enterprise.xml
rename to .mvn/develocity.xml
index cf1a9a0a9e88..b505d1a36664 100644
--- a/.mvn/gradle-enterprise.xml
+++ b/.mvn/develocity.xml
@@ -19,20 +19,15 @@
under the License.
-->
-
+
https://ge.apache.org
- false
-
- true
- true
- true
-
#{isFalse(env['GITHUB_ACTIONS'])}
- ALWAYS
- true
+
+
+
#{{'0.0.0.0'}}
@@ -45,4 +40,4 @@
false
-
+
diff --git a/.mvn/extensions.xml b/.mvn/extensions.xml
index c5b001d44862..f3f1983375a6 100644
--- a/.mvn/extensions.xml
+++ b/.mvn/extensions.xml
@@ -22,12 +22,12 @@
com.gradle
- gradle-enterprise-maven-extension
- 1.19.2
+ develocity-maven-extension
+ 1.21.3
com.gradle
common-custom-user-data-maven-extension
- 1.12.4
+ 2.0
diff --git a/code-coverage/pom.xml b/code-coverage/pom.xml
index 19fec8ef59ec..aa1bd56ff3f6 100644
--- a/code-coverage/pom.xml
+++ b/code-coverage/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-parent
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
iotdb-code-coverage
pom
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 7ad8a8083d7b..1f112d7fc284 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-parent
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
iotdb-distribution
pom
@@ -33,25 +33,25 @@
org.apache.iotdb
iotdb-server
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
zip
org.apache.iotdb
iotdb-cli
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
zip
org.apache.iotdb
iotdb-confignode
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
zip
org.apache.iotdb
library-udf
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index 4e95a297d7b7..29229f707813 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
client-cpp-example
IoTDB: Example: CPP Client
diff --git a/example/jdbc/pom.xml b/example/jdbc/pom.xml
index 1d2152b998af..2eb25e0633e6 100644
--- a/example/jdbc/pom.xml
+++ b/example/jdbc/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
jdbc-example
IoTDB: Example: JDBC
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCCharsetExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCCharsetExample.java
index 79ae881e3017..3ecb9c056462 100644
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCCharsetExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCCharsetExample.java
@@ -45,7 +45,7 @@ public static void main(String[] args) throws Exception {
final IoTDBStatement statement = (IoTDBStatement) connection.createStatement()) {
final String insertSQLWithGB18030 =
- "insert into root.测试(timestamp, 维语, 彝语, 繁体, 蒙文, 简体, 标点符号, 藏语) values(1, 'ئۇيغۇر تىلى', 'ꆈꌠꉙ', \"繁體\", 'ᠮᠣᠩᠭᠣᠯ ᠬᠡᠯᠡ', '简体', '——?!', \"བོད་སྐད།\");";
+ "insert into root.测试(timestamp, 彝语, 繁体, 蒙文, 简体, 标点符号, 藏语) values(1, 'ꆈꌠꉙ', \"繁體\", 'ᠮᠣᠩᠭᠣᠯ ᠬᠡᠯᠡ', '简体', '——?!', \"བོད་སྐད།\");";
final byte[] insertSQLWithGB18030Bytes = insertSQLWithGB18030.getBytes("GB18030");
statement.execute(insertSQLWithGB18030Bytes);
} catch (IoTDBSQLException e) {
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
index fe01d1640f59..b67be1f44131 100644
--- a/example/mqtt-customize/pom.xml
+++ b/example/mqtt-customize/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
customize-mqtt-example
IoTDB: Example: Customized MQTT
diff --git a/example/mqtt/pom.xml b/example/mqtt/pom.xml
index 7e101fb4c428..62619735c0fc 100644
--- a/example/mqtt/pom.xml
+++ b/example/mqtt/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
mqtt-example
IoTDB: Example: MQTT
diff --git a/example/pipe-count-point-processor/pom.xml b/example/pipe-count-point-processor/pom.xml
index c487ea15a771..9b486cd08bd5 100644
--- a/example/pipe-count-point-processor/pom.xml
+++ b/example/pipe-count-point-processor/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
pipe-count-point-processor-example
IoTDB: Example: Pipe: Count Point Processor
diff --git a/example/pipe-opc-ua-sink/pom.xml b/example/pipe-opc-ua-sink/pom.xml
index afa5b6171ced..37107b08de5e 100644
--- a/example/pipe-opc-ua-sink/pom.xml
+++ b/example/pipe-opc-ua-sink/pom.xml
@@ -23,7 +23,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
4.0.0
pipe-opc-ua-sink-example
diff --git a/example/pom.xml b/example/pom.xml
index 3a4f1a22d019..548f7004d0d4 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-parent
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
iotdb-examples
pom
diff --git a/example/rest-java-example/pom.xml b/example/rest-java-example/pom.xml
index 9d1a354faac4..970fe626dd9e 100644
--- a/example/rest-java-example/pom.xml
+++ b/example/rest-java-example/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
rest-java-example
IoTDB: Example: Java Rest
diff --git a/example/schema/pom.xml b/example/schema/pom.xml
index 1ac5536b4f6d..07c308576796 100644
--- a/example/schema/pom.xml
+++ b/example/schema/pom.xml
@@ -24,7 +24,7 @@
iotdb-examples
org.apache.iotdb
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
schema-example
IoTDB: Example: Schema
diff --git a/example/session/pom.xml b/example/session/pom.xml
index ac5d936f9977..818cebd1ccc0 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
client-example
IoTDB: Example: Session Client
diff --git a/example/trigger/pom.xml b/example/trigger/pom.xml
index bf4937557836..1970972306f4 100644
--- a/example/trigger/pom.xml
+++ b/example/trigger/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
trigger-example
IoTDB: Example: Trigger
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index 080b41807da5..db0cd0088459 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-examples
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
udf-example
IoTDB: Example: UDF
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index e2dd5c6425bd..68806474fc1f 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-parent
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
integration-test
IoTDB: Integration-Test
@@ -72,47 +72,47 @@
org.apache.iotdb
iotdb-server
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-session
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-jdbc
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
trigger-api
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
isession
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
service-rpc
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-thrift-confignode
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
node-commons
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-thrift-commons
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.tsfile
@@ -122,7 +122,7 @@
org.apache.iotdb
udf-api
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
@@ -132,7 +132,7 @@
org.apache.iotdb
iotdb-consensus
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.slf4j
@@ -161,17 +161,17 @@
org.apache.iotdb
iotdb-confignode
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-thrift
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-cli
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
commons-codec
@@ -201,7 +201,7 @@
org.apache.iotdb
iotdb-server
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
test-jar
test
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRepairDataIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRepairDataIT.java
index fc84978dc906..25d2935b9447 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRepairDataIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRepairDataIT.java
@@ -70,8 +70,7 @@ public void testRepairData() {
statement.execute("CREATE DATABASE root.tesgsg");
statement.execute("CREATE TIMESERIES root.testsg.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN");
File tsfile = generateUnsortedFile();
- statement.execute(
- String.format("load \"%s\" verify=false", tsfile.getParentFile().getAbsolutePath()));
+ statement.execute(String.format("load \"%s\"", tsfile.getParentFile().getAbsolutePath()));
Assert.assertFalse(validate(statement));
statement.execute("START REPAIR DATA");
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByForDebugIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByForDebugIT.java
new file mode 100644
index 000000000000..c6f502bd4b54
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByForDebugIT.java
@@ -0,0 +1,1406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.orderBy;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.bouncycastle.util.Arrays;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBOrderByForDebugIT {
+
+ // the data can be viewed in
+ // https://docs.google.com/spreadsheets/d/1OWA1bKraArCwWVnuTjuhJ5yLG0PFLdD78gD6FjquepI/edit#gid=0
+ private static final String[] sql =
+ new String[] {
+ "CREATE DATABASE root.sg",
+ "CREATE TIMESERIES root.sg.d.num WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.sg.d.bigNum WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.sg.d.floatNum WITH DATATYPE=DOUBLE, ENCODING=RLE, 'MAX_POINT_NUMBER'='5'",
+ "CREATE TIMESERIES root.sg.d.str WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.sg.d.bool WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(0,3,2947483648,231.2121,\"coconut\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(20,2,2147483648,434.12,\"pineapple\",TRUE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(40,1,2247483648,12.123,\"apricot\",TRUE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(80,9,2147483646,43.12,\"apple\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(100,8,2147483964,4654.231,\"papaya\",TRUE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(31536000000,6,2147483650,1231.21,\"banana\",TRUE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(31536000100,10,3147483648,231.55,\"pumelo\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(31536000500,4,2147493648,213.1,\"peach\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(31536001000,5,2149783648,56.32,\"orange\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(31536010000,7,2147983648,213.112,\"lemon\",TRUE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(31536100000,11,2147468648,54.121,\"pitaya\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(41536000000,12,2146483648,45.231,\"strawberry\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(41536000020,14,2907483648,231.34,\"cherry\",FALSE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(41536900000,13,2107483648,54.12,\"lychee\",TRUE)",
+ "insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(51536000000,15,3147483648,235.213,\"watermelon\",TRUE)",
+ // Newly added 'flush' command compared to IoTDBOrderByIT
+ "flush"
+ };
+
+ private static final String[] sql2 =
+ new String[] {
+ "CREATE TIMESERIES root.sg.d2.num WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.sg.d2.bigNum WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.sg.d2.floatNum WITH DATATYPE=DOUBLE, ENCODING=RLE, 'MAX_POINT_NUMBER'='5'",
+ "CREATE TIMESERIES root.sg.d2.str WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.sg.d2.bool WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(0,3,2947483648,231.2121,\"coconut\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(20,2,2147483648,434.12,\"pineapple\",TRUE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(40,1,2247483648,12.123,\"apricot\",TRUE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(80,9,2147483646,43.12,\"apple\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(100,8,2147483964,4654.231,\"papaya\",TRUE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(31536000000,6,2147483650,1231.21,\"banana\",TRUE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(31536000100,10,3147483648,231.55,\"pumelo\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(31536000500,4,2147493648,213.1,\"peach\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(31536001000,5,2149783648,56.32,\"orange\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(31536010000,7,2147983648,213.112,\"lemon\",TRUE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(31536100000,11,2147468648,54.121,\"pitaya\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(41536000000,12,2146483648,45.231,\"strawberry\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(41536000020,14,2907483648,231.34,\"cherry\",FALSE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(41536900000,13,2107483648,54.12,\"lychee\",TRUE)",
+ "insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(51536000000,15,3147483648,235.213,\"watermelon\",TRUE)",
+ // Newly added 'flush' command compared to IoTDBOrderByIT
+ "flush"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ protected static void insertData() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ for (String sql : sql) {
+ statement.execute(sql);
+ }
+ for (String sql : sql2) {
+ statement.execute(sql);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // ordinal data
+ String[][] res =
+ new String[][] {
+ {"0", "3", "2947483648", "231.2121", "coconut", "false"},
+ {"20", "2", "2147483648", "434.12", "pineapple", "true"},
+ {"40", "1", "2247483648", "12.123", "apricot", "true"},
+ {"80", "9", "2147483646", "43.12", "apple", "false"},
+ {"100", "8", "2147483964", "4654.231", "papaya", "true"},
+ {"31536000000", "6", "2147483650", "1231.21", "banana", "true"},
+ {"31536000100", "10", "3147483648", "231.55", "pumelo", "false"},
+ {"31536000500", "4", "2147493648", "213.1", "peach", "false"},
+ {"31536001000", "5", "2149783648", "56.32", "orange", "false"},
+ {"31536010000", "7", "2147983648", "213.112", "lemon", "true"},
+ {"31536100000", "11", "2147468648", "54.121", "pitaya", "false"},
+ {"41536000000", "12", "2146483648", "45.231", "strawberry", "false"},
+ {"41536000020", "14", "2907483648", "231.34", "cherry", "false"},
+ {"41536900000", "13", "2107483648", "54.12", "lychee", "true"},
+ {"51536000000", "15", "3147483648", "235.213", "watermelon", "true"},
+ };
+
+ private void checkHeader(ResultSetMetaData resultSetMetaData, String[] title)
+ throws SQLException {
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ assertEquals(title[i - 1], resultSetMetaData.getColumnName(i));
+ }
+ }
+
+ private void testNormalOrderBy(String sql, int[] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(
+ metaData,
+ new String[] {
+ "Time",
+ "root.sg.d.num",
+ "root.sg.d.bigNum",
+ "root.sg.d.floatNum",
+ "root.sg.d.str",
+ "root.sg.d.bool"
+ });
+ int i = 0;
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ String actualNum = resultSet.getString(2);
+ String actualBigNum = resultSet.getString(3);
+ double actualFloatNum = resultSet.getDouble(4);
+ String actualStr = resultSet.getString(5);
+ String actualBool = resultSet.getString(6);
+
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(res[ans[i]][1], actualNum);
+ assertEquals(res[ans[i]][2], actualBigNum);
+ assertEquals(Double.parseDouble(res[ans[i]][3]), actualFloatNum, 0.0001);
+ assertEquals(res[ans[i]][4], actualStr);
+ assertEquals(res[ans[i]][5], actualBool);
+
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ // 1. One-level order by test
+ @Test
+ public void orderByTest1() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by num";
+ int[] ans = {2, 1, 0, 7, 8, 5, 9, 4, 3, 6, 10, 11, 13, 12, 14};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest2() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by bigNum,time";
+ int[] ans = {13, 11, 10, 3, 1, 5, 4, 7, 9, 8, 2, 12, 0, 6, 14};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest3() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by floatNum";
+ int[] ans = {2, 3, 11, 13, 10, 8, 7, 9, 0, 12, 6, 14, 1, 5, 4};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest4() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by str";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest5() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by num desc";
+ int[] ans = {2, 1, 0, 7, 8, 5, 9, 4, 3, 6, 10, 11, 13, 12, 14};
+ testNormalOrderBy(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void orderByTest6() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.sg.d order by bigNum desc, time asc";
+ int[] ans = {6, 14, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest7() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by floatNum desc";
+ int[] ans = {2, 3, 11, 13, 10, 8, 7, 9, 0, 12, 6, 14, 1, 5, 4};
+ testNormalOrderBy(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void orderByTest8() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by str desc";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderBy(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void orderByTest15() {
+ String sql = "select num+bigNum,floatNum from root.sg.d order by str";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(
+ metaData,
+ new String[] {"Time", "root.sg.d.num + root.sg.d.bigNum", "root.sg.d.floatNum"});
+ int i = 0;
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ double actualNum = resultSet.getDouble(2);
+ double actualFloat = resultSet.getDouble(3);
+
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(
+ Long.parseLong(res[ans[i]][1]) + Long.parseLong(res[ans[i]][2]), actualNum, 0.0001);
+ assertEquals(Double.parseDouble(res[ans[i]][3]), actualFloat, 0.0001);
+
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ // 2. Multi-level order by test
+ @Test
+ public void orderByTest9() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by bool asc, str asc";
+ int[] ans = {3, 12, 0, 8, 7, 10, 6, 11, 2, 5, 9, 13, 4, 1, 14};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest10() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by bool asc, num desc";
+ int[] ans = {12, 11, 10, 6, 3, 8, 7, 0, 14, 13, 4, 9, 5, 1, 2};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest11() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.sg.d order by bigNum desc, floatNum desc";
+ int[] ans = {14, 6, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest12() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.sg.d order by str desc, floatNum desc";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderBy(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void orderByTest13() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.sg.d order by num+floatNum desc, floatNum desc";
+ int[] ans = {4, 5, 1, 14, 12, 6, 0, 9, 7, 13, 10, 8, 11, 3, 2};
+ testNormalOrderBy(sql, ans);
+ }
+
+ @Test
+ public void orderByTest14() {
+ String sql = "select num+bigNum from root.sg.d order by num+floatNum desc, floatNum desc";
+ int[] ans = {4, 5, 1, 14, 12, 6, 0, 9, 7, 13, 10, 8, 11, 3, 2};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(metaData, new String[] {"Time", "root.sg.d.num + root.sg.d.bigNum"});
+ int i = 0;
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ double actualNum = resultSet.getDouble(2);
+
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(
+ Long.parseLong(res[ans[i]][1]) + Long.parseLong(res[ans[i]][2]), actualNum, 0.001);
+
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByTest16() {
+ String sql = "select num+floatNum from root.sg.d order by floatNum+num desc, floatNum desc";
+ int[] ans = {4, 5, 1, 14, 12, 6, 0, 9, 7, 13, 10, 8, 11, 3, 2};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(metaData, new String[] {"Time", "root.sg.d.num + root.sg.d.floatNum"});
+ int i = 0;
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ double actualNum = resultSet.getDouble(2);
+
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(
+ Long.parseLong(res[ans[i]][1]) + Double.parseDouble(res[ans[i]][3]),
+ actualNum,
+ 0.001);
+
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByTest17() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by str desc, str asc";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderBy(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void orderByTest18() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by str, str";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderBy(sql, ans);
+ }
+
+ // limit cannot be pushed down in ORDER BY
+ @Test
+ public void orderByTest19() {
+ String sql = "select num from root.sg.d order by num limit 5";
+ int[] ans = {2, 1, 0, 7, 8};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(metaData, new String[] {"Time", "root.sg.d.num"});
+ int i = 0;
+ while (resultSet.next()) {
+ String actualTime = resultSet.getString(1);
+ String actualNum = resultSet.getString(2);
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(res[ans[i]][1], actualNum);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ // 3. aggregation query
+ @Test
+ public void orderByInAggregationTest() {
+ String sql = "select avg(num) from root.sg.d group by session(10000ms) order by avg(num) desc";
+ double[][] ans = new double[][] {{15.0}, {13.0}, {13.0}, {11.0}, {6.4}, {4.6}};
+ long[] times =
+ new long[] {51536000000L, 41536000000L, 41536900000L, 31536100000L, 31536000000L, 0L};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualAvg = resultSet.getDouble(2);
+ assertEquals(times[i], actualTime);
+ assertEquals(ans[i][0], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest2() {
+ String sql =
+ "select avg(num) from root.sg.d group by session(10000ms) order by max_value(floatNum)";
+ double[][] ans =
+ new double[][] {
+ {13.0, 54.12},
+ {11.0, 54.121},
+ {13.0, 231.34},
+ {15.0, 235.213},
+ {6.4, 1231.21},
+ {4.6, 4654.231}
+ };
+ long[] times =
+ new long[] {41536900000L, 31536100000L, 41536000000L, 51536000000L, 31536000000L, 0L};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualAvg = resultSet.getDouble(2);
+ assertEquals(times[i], actualTime);
+ assertEquals(ans[i][0], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest3() {
+ String sql =
+ "select avg(num) from root.sg.d group by session(10000ms) order by avg(num) desc,max_value(floatNum)";
+ double[] ans = new double[] {15.0, 13.0, 13.0, 11.0, 6.4, 4.6};
+ long[] times =
+ new long[] {51536000000L, 41536900000L, 41536000000L, 31536100000L, 31536000000L, 0L};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualAvg = resultSet.getDouble(2);
+ assertEquals(times[i], actualTime);
+ assertEquals(ans[i], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest4() {
+ String sql =
+ "select avg(num)+avg(floatNum) from root.sg.d group by session(10000ms) order by avg(num)+avg(floatNum)";
+ double[][] ans =
+ new double[][] {{1079.56122}, {395.4584}, {65.121}, {151.2855}, {67.12}, {250.213}};
+ long[] times =
+ new long[] {0L, 31536000000L, 31536100000L, 41536000000L, 41536900000L, 51536000000L};
+ int[] order = new int[] {2, 4, 3, 5, 1, 0};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualAvg = resultSet.getDouble(2);
+ assertEquals(times[order[i]], actualTime);
+ assertEquals(ans[order[i]][0], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest5() {
+ String sql =
+ "select min_value(bigNum) from root.sg.d group by session(10000ms) order by avg(num)+avg(floatNum)";
+ long[] ans =
+ new long[] {2147483646L, 2147483650L, 2147468648L, 2146483648L, 2107483648L, 3147483648L};
+ long[] times =
+ new long[] {0L, 31536000000L, 31536100000L, 41536000000L, 41536900000L, 51536000000L};
+ int[] order = new int[] {2, 4, 3, 5, 1, 0};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ long actualMinValue = resultSet.getLong(2);
+ assertEquals(times[order[i]], actualTime);
+ assertEquals(ans[order[i]], actualMinValue, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest6() {
+ String sql =
+ "select min_value(num)+min_value(bigNum) from root.sg.d group by session(10000ms) order by avg(num)+avg(floatNum)";
+ long[] ans =
+ new long[] {2147483647L, 2147483654L, 2147468659L, 2146483660L, 2107483661L, 3147483663L};
+ long[] times =
+ new long[] {0L, 31536000000L, 31536100000L, 41536000000L, 41536900000L, 51536000000L};
+ int[] order = new int[] {2, 4, 3, 5, 1, 0};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualMinValue = resultSet.getDouble(2);
+ assertEquals(times[order[i]], actualTime);
+ assertEquals(ans[order[i]], actualMinValue, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest7() {
+ String sql =
+ "select avg(num)+min_value(floatNum) from root.sg.d group by session(10000ms) order by max_value(floatNum)";
+ double[][] ans =
+ new double[][] {
+ {13.0, 54.12, 54.12},
+ {11.0, 54.121, 54.121},
+ {13.0, 231.34, 45.231},
+ {15.0, 235.213, 235.213},
+ {6.4, 1231.21, 56.32},
+ {4.6, 4654.231, 12.123}
+ };
+ long[] times =
+ new long[] {41536900000L, 31536100000L, 41536000000L, 51536000000L, 31536000000L, 0L};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualAvg = resultSet.getDouble(2);
+ assertEquals(times[i], actualTime);
+ assertEquals(ans[i][0] + ans[i][2], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationTest8() {
+ String sql =
+ "select avg(num)+avg(floatNum) from root.sg.d group by session(10000ms) order by avg(floatNum)+avg(num)";
+ double[][] ans =
+ new double[][] {{1079.56122}, {395.4584}, {65.121}, {151.2855}, {67.12}, {250.213}};
+ long[] times =
+ new long[] {0L, 31536000000L, 31536100000L, 41536000000L, 41536900000L, 51536000000L};
+ int[] order = new int[] {2, 4, 3, 5, 1, 0};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ double actualAvg = resultSet.getDouble(2);
+ assertEquals(times[order[i]], actualTime);
+ assertEquals(ans[order[i]][0], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ // 4. raw data query with align by device
+ private void testNormalOrderByAlignByDevice(String sql, int[] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(
+ metaData, new String[] {"Time", "Device", "num", "bigNum", "floatNum", "str", "bool"});
+ int i = 0;
+ int total = 0;
+ String device = "root.sg.d";
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ String actualDevice = resultSet.getString(2);
+ String actualNum = resultSet.getString(3);
+ String actualBigNum = resultSet.getString(4);
+ double actualFloatNum = resultSet.getDouble(5);
+ String actualStr = resultSet.getString(6);
+ String actualBool = resultSet.getString(7);
+
+ assertEquals(device, actualDevice);
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(res[ans[i]][1], actualNum);
+ assertEquals(res[ans[i]][2], actualBigNum);
+ assertEquals(Double.parseDouble(res[ans[i]][3]), actualFloatNum, 0.0001);
+ assertEquals(res[ans[i]][4], actualStr);
+ assertEquals(res[ans[i]][5], actualBool);
+
+ if (device.equals("root.sg.d")) {
+ device = "root.sg.d2";
+ } else {
+ device = "root.sg.d";
+ i++;
+ }
+ total++;
+ }
+ assertEquals(i, ans.length);
+ assertEquals(total, ans.length * 2);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest1() {
+ String sql =
+ "select num+bigNum from root.** order by num+floatNum desc, floatNum desc align by device";
+ int[] ans = {4, 5, 1, 14, 12, 6, 0, 9, 7, 13, 10, 8, 11, 3, 2};
+ String device = "root.sg.d";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ String actualTime = resultSet.getString(1);
+ String actualDevice = resultSet.getString(2);
+ double actualNum = resultSet.getDouble(3);
+
+ assertEquals(device, actualDevice);
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(
+ Long.parseLong(res[ans[i]][1]) + Long.parseLong(res[ans[i]][2]), actualNum, 0.0001);
+ if (device.equals("root.sg.d")) {
+ device = "root.sg.d2";
+ } else {
+ device = "root.sg.d";
+ i++;
+ }
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest2() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.** order by num align by device";
+ int[] ans = {2, 1, 0, 7, 8, 5, 9, 4, 3, 6, 10, 11, 13, 12, 14};
+ testNormalOrderByAlignByDevice(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest3() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by floatNum align by device";
+ int[] ans = {2, 3, 11, 13, 10, 8, 7, 9, 0, 12, 6, 14, 1, 5, 4};
+ testNormalOrderByAlignByDevice(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest4() {
+ String sql = "select num,bigNum,floatNum,str,bool from root.** order by str align by device";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderByAlignByDevice(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest5() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by num desc align by device";
+ int[] ans = {2, 1, 0, 7, 8, 5, 9, 4, 3, 6, 10, 11, 13, 12, 14};
+ testNormalOrderByAlignByDevice(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest6() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by str desc align by device";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderByAlignByDevice(sql, Arrays.reverse(ans));
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest7() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by bool asc, num desc align by device";
+ int[] ans = {12, 11, 10, 6, 3, 8, 7, 0, 14, 13, 4, 9, 5, 1, 2};
+ testNormalOrderByAlignByDevice(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest8() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by bigNum desc, floatNum desc align by device";
+ int[] ans = {14, 6, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ testNormalOrderByAlignByDevice(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest9() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by str desc, floatNum desc align by device";
+ int[] ans = {3, 2, 5, 12, 0, 9, 13, 8, 4, 7, 1, 10, 6, 11, 14};
+ testNormalOrderByAlignByDevice(sql, Arrays.reverse(ans));
+ }
+
+ private void testNormalOrderByMixAlignBy(String sql, int[] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(
+ metaData, new String[] {"Time", "Device", "num", "bigNum", "floatNum", "str", "bool"});
+ int i = 0;
+ int total = 0;
+ String device = "root.sg.d";
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ String actualDevice = resultSet.getString(2);
+ String actualNum = resultSet.getString(3);
+ String actualBigNum = resultSet.getString(4);
+ double actualFloatNum = resultSet.getDouble(5);
+ String actualStr = resultSet.getString(6);
+ String actualBool = resultSet.getString(7);
+
+ assertEquals(device, actualDevice);
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(res[ans[i]][1], actualNum);
+ assertEquals(res[ans[i]][2], actualBigNum);
+ assertEquals(Double.parseDouble(res[ans[i]][3]), actualFloatNum, 0.0001);
+ assertEquals(res[ans[i]][4], actualStr);
+ assertEquals(res[ans[i]][5], actualBool);
+
+ if (device.equals("root.sg.d2")) {
+ i++;
+ device = "root.sg.d";
+ } else {
+ device = "root.sg.d2";
+ }
+
+ total++;
+ }
+ assertEquals(total, ans.length * 2);
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private void testDeviceViewOrderByMixAlignBy(String sql, int[] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(
+ metaData, new String[] {"Time", "Device", "num", "bigNum", "floatNum", "str", "bool"});
+ int i = 0;
+ int total = 0;
+ String device = "root.sg.d2";
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ String actualDevice = resultSet.getString(2);
+ String actualNum = resultSet.getString(3);
+ String actualBigNum = resultSet.getString(4);
+ double actualFloatNum = resultSet.getDouble(5);
+ String actualStr = resultSet.getString(6);
+ String actualBool = resultSet.getString(7);
+
+ assertEquals(device, actualDevice);
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(res[ans[i]][1], actualNum);
+ assertEquals(res[ans[i]][2], actualBigNum);
+ assertEquals(Double.parseDouble(res[ans[i]][3]), actualFloatNum, 0.0001);
+ assertEquals(res[ans[i]][4], actualStr);
+ assertEquals(res[ans[i]][5], actualBool);
+
+ i++;
+ total++;
+ if (total == ans.length) {
+ i = 0;
+ if (device.equals("root.sg.d2")) {
+ device = "root.sg.d";
+ } else {
+ device = "root.sg.d2";
+ }
+ }
+ }
+ assertEquals(total, ans.length * 2);
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private void orderByBigNumAlignByDevice(String sql, int[] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ checkHeader(
+ metaData, new String[] {"Time", "Device", "num", "bigNum", "floatNum", "str", "bool"});
+ int i = 0;
+ int total = 0;
+ String device = "root.sg.d";
+ while (resultSet.next()) {
+
+ String actualTime = resultSet.getString(1);
+ String actualDevice = resultSet.getString(2);
+ String actualNum = resultSet.getString(3);
+ String actualBigNum = resultSet.getString(4);
+ double actualFloatNum = resultSet.getDouble(5);
+ String actualStr = resultSet.getString(6);
+ String actualBool = resultSet.getString(7);
+
+ if (total < 4) {
+ i = total % 2;
+ if (total < 2) {
+ device = "root.sg.d2";
+ } else {
+ device = "root.sg.d";
+ }
+ }
+
+ assertEquals(device, actualDevice);
+ assertEquals(res[ans[i]][0], actualTime);
+ assertEquals(res[ans[i]][1], actualNum);
+ assertEquals(res[ans[i]][2], actualBigNum);
+ assertEquals(Double.parseDouble(res[ans[i]][3]), actualFloatNum, 0.0001);
+ assertEquals(res[ans[i]][4], actualStr);
+ assertEquals(res[ans[i]][5], actualBool);
+
+ if (device.equals("root.sg.d2")) {
+ device = "root.sg.d";
+ } else {
+ i++;
+ device = "root.sg.d2";
+ }
+
+ total++;
+ }
+ assertEquals(i, ans.length);
+ assertEquals(total, ans.length * 2);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest12() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by bigNum desc, device desc, time asc align by device";
+ int[] ans = {6, 14, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ orderByBigNumAlignByDevice(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest13() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by bigNum desc, time desc, device asc align by device";
+ int[] ans = {14, 6, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ testNormalOrderByMixAlignBy(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest14() {
+ int[] ans = {14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by time desc, bigNum desc, device asc align by device";
+ testNormalOrderByMixAlignBy(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest15() {
+ int[] ans = {6, 14, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by device desc, bigNum desc, time asc align by device";
+ testDeviceViewOrderByMixAlignBy(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest16() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by device desc, time asc, bigNum desc align by device";
+ int[] ans = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14};
+ testDeviceViewOrderByMixAlignBy(sql, ans);
+ }
+
+ @Test
+ public void alignByDeviceOrderByTest17() {
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.** order by bigNum desc, device desc, num asc, time asc align by device";
+ int[] ans = {6, 14, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
+ orderByBigNumAlignByDevice(sql, ans);
+ }
+
+ // 5. aggregation query align by device
+ @Test
+ public void orderByInAggregationAlignByDeviceTest() {
+ String sql =
+ "select avg(num) from root.** group by session(10000ms) order by avg(num) align by device";
+
+ double[] ans = {4.6, 4.6, 6.4, 6.4, 11.0, 11.0, 13.0, 13.0, 13.0, 13.0, 15.0, 15.0};
+ long[] times =
+ new long[] {
+ 0L,
+ 0L,
+ 31536000000L,
+ 31536000000L,
+ 31536100000L,
+ 31536100000L,
+ 41536000000L,
+ 41536900000L,
+ 41536000000L,
+ 41536900000L,
+ 51536000000L,
+ 51536000000L
+ };
+ String[] device =
+ new String[] {
+ "root.sg.d",
+ "root.sg.d2",
+ "root.sg.d",
+ "root.sg.d2",
+ "root.sg.d",
+ "root.sg.d2",
+ "root.sg.d",
+ "root.sg.d",
+ "root.sg.d2",
+ "root.sg.d2",
+ "root.sg.d",
+ "root.sg.d2"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ double actualAvg = resultSet.getDouble(3);
+
+ assertEquals(device[i], actualDevice);
+ assertEquals(times[i], actualTime);
+ assertEquals(ans[i], actualAvg, 0.0001);
+ i++;
+ }
+ assertEquals(i, ans.length);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest2() {
+ String sql = "select avg(num) from root.** order by avg(num) align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, true);
+ }
+
+ private void checkSingleDouble(String sql, Object value, boolean deviceAsc) {
+ String device = "root.sg.d";
+ if (!deviceAsc) device = "root.sg.d2";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ String deviceName = resultSet.getString(1);
+ double actualVal = resultSet.getDouble(2);
+ assertEquals(deviceName, device);
+ assertEquals(Double.parseDouble(value.toString()), actualVal, 1);
+ if (device.equals("root.sg.d")) device = "root.sg.d2";
+ else device = "root.sg.d";
+ i++;
+ }
+ assertEquals(i, 2);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest3() {
+ String sql =
+ "select avg(num)+avg(bigNum) from root.** order by max_value(floatNum) align by device";
+ long value = 2388936669L + 8;
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest4() {
+
+ String sql =
+ "select avg(num)+avg(bigNum) from root.** order by max_value(floatNum)+min_value(num) align by device";
+ long value = 2388936669L + 8;
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest5() {
+ String sql =
+ "select avg(num) from root.** order by max_value(floatNum)+avg(num) align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest6() {
+ String sql =
+ "select avg(num) from root.** order by max_value(floatNum)+avg(num), device asc, time desc align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest7() {
+ String sql =
+ "select avg(num) from root.** order by max_value(floatNum)+avg(num), time asc, device desc align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, false);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest8() {
+ String sql =
+ "select avg(num) from root.** order by time asc, max_value(floatNum)+avg(num), device desc align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, false);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest9() {
+ String sql =
+ "select avg(num) from root.** order by device asc, max_value(floatNum)+avg(num), time desc align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest10() {
+ String sql =
+ "select avg(num) from root.** order by max_value(floatNum) desc,time asc, avg(num) asc, device desc align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, false);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest11() {
+ String sql =
+ "select avg(num) from root.** order by max_value(floatNum) desc,device asc, avg(num) asc, time desc align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest12() {
+ String sql =
+ "select avg(num+floatNum) from root.** order by time,avg(num+floatNum) align by device";
+ String value = "537.34154";
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest13() {
+ String sql = "select avg(num) from root.** order by time,avg(num+floatNum) align by device";
+ String value = "8";
+ checkSingleDouble(sql, value, true);
+ }
+
+ @Test
+ public void orderByInAggregationAlignByDeviceTest14() {
+ String sql = "select avg(num+floatNum) from root.** order by time,avg(num) align by device";
+ String value = "537.34154";
+ checkSingleDouble(sql, value, true);
+ }
+
+ String[][] UDFRes =
+ new String[][] {
+ {"0", "3", "0", "0"},
+ {"20", "2", "0", "0"},
+ {"40", "1", "0", "0"},
+ {"80", "9", "0", "0"},
+ {"100", "8", "0", "0"},
+ {"31536000000", "6", "0", "0"},
+ {"31536000100", "10", "0", "0"},
+ {"31536000500", "4", "0", "0"},
+ {"31536001000", "5", "0", "0"},
+ {"31536010000", "7", "0", "0"},
+ {"31536100000", "11", "0", "0"},
+ {"41536000000", "12", "2146483648", "0"},
+ {"41536000020", "14", "0", "14"},
+ {"41536900000", "13", "2107483648", "0"},
+ {"51536000000", "15", "0", "15"},
+ };
+
+ // UDF Test
+ private void orderByUDFTest(String sql, int[] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ String time = resultSet.getString(1);
+ String num = resultSet.getString(2);
+ String topK = resultSet.getString(3);
+ String bottomK = resultSet.getString(4);
+
+ assertEquals(time, UDFRes[ans[i]][0]);
+ assertEquals(num, UDFRes[ans[i]][1]);
+ if (Objects.equals(UDFRes[ans[i]][3], "0")) {
+ assertNull(topK);
+ } else {
+ assertEquals(topK, UDFRes[ans[i]][3]);
+ }
+
+ if (Objects.equals(UDFRes[ans[i]][2], "0")) {
+ assertNull(bottomK);
+ } else {
+ assertEquals(bottomK, UDFRes[ans[i]][2]);
+ }
+
+ i++;
+ }
+ assertEquals(i, 15);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void orderByUDFTest1() {
+ String sql =
+ "select num, top_k(num, 'k'='2'), bottom_k(bigNum, 'k'='2') from root.sg.d order by top_k(num, 'k'='2') nulls first, bottom_k(bigNum, 'k'='2') nulls first";
+ int[] ans = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 11, 12, 14};
+ orderByUDFTest(sql, ans);
+ }
+
+ @Test
+ public void orderByUDFTest2() {
+ String sql =
+ "select num, top_k(num, 'k'='2'), bottom_k(bigNum, 'k'='2') from root.sg.d order by top_k(num, 'k'='2'), bottom_k(bigNum, 'k'='2')";
+ int[] ans = {12, 14, 13, 11, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ orderByUDFTest(sql, ans);
+ }
+
+ private void errorTest(String sql, String error) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.executeQuery(sql);
+ } catch (Exception e) {
+ assertEquals(error, e.getMessage());
+ }
+ }
+
+ @Test
+ public void errorTest1() {
+ errorTest(
+ "select num from root.sg.d order by avg(bigNum)",
+ "701: Raw data and aggregation hybrid query is not supported.");
+ }
+
+ @Test
+ public void errorTest2() {
+ errorTest(
+ "select avg(num) from root.sg.d order by bigNum",
+ "701: Raw data and aggregation hybrid query is not supported.");
+ }
+
+ @Test
+ public void errorTest3() {
+ errorTest(
+ "select bigNum,floatNum from root.sg.d order by s1",
+ "701: root.sg.d.s1 in order by clause doesn't exist.");
+ }
+
+ @Test
+ public void errorTest4() {
+ errorTest(
+ "select bigNum,floatNum from root.** order by bigNum",
+ "701: root.**.bigNum in order by clause shouldn't refer to more than one timeseries.");
+ }
+
+ @Test
+ public void errorTest5() {
+ errorTest(
+ "select bigNum,floatNum from root.** order by s1 align by device",
+ "701: s1 in order by clause doesn't exist.");
+ }
+
+ @Test
+ public void errorTest6() {
+ errorTest(
+ "select bigNum,floatNum from root.** order by root.sg.d.bigNum align by device",
+ "701: ALIGN BY DEVICE: the suffix paths can only be measurement or one-level wildcard");
+ }
+
+ @Test
+ public void errorTest7() {
+ errorTest(
+ "select last bigNum,floatNum from root.** order by root.sg.d.bigNum",
+ "701: root.sg.d.bigNum in order by clause doesn't exist in the result of last query.");
+ }
+
+ // last query
+ public void testLastQueryOrderBy(String sql, String[][] ans) {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int i = 0;
+ while (resultSet.next()) {
+ String time = resultSet.getString(1);
+ String num = resultSet.getString(2);
+ String value = resultSet.getString(3);
+ String dataType = resultSet.getString(4);
+
+ assertEquals(time, ans[0][i]);
+ assertEquals(num, ans[1][i]);
+ assertEquals(value, ans[2][i]);
+ assertEquals(dataType, ans[3][i]);
+
+ i++;
+ }
+ assertEquals(i, 4);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void lastQueryOrderBy() {
+ String[][] ans =
+ new String[][] {
+ {"51536000000", "51536000000", "51536000000", "51536000000"},
+ {"root.sg.d.num", "root.sg.d2.num", "root.sg.d.bigNum", "root.sg.d2.bigNum"},
+ {"15", "15", "3147483648", "3147483648"},
+ {"INT32", "INT32", "INT64", "INT64"}
+ };
+ String sql = "select last bigNum,num from root.** order by value, timeseries";
+ testLastQueryOrderBy(sql, ans);
+ }
+
+ @Test
+ public void lastQueryOrderBy2() {
+ String[][] ans =
+ new String[][] {
+ {"51536000000", "51536000000", "51536000000", "51536000000"},
+ {"root.sg.d2.num", "root.sg.d2.bigNum", "root.sg.d.num", "root.sg.d.bigNum"},
+ {"15", "3147483648", "15", "3147483648"},
+ {"INT32", "INT64", "INT32", "INT64"}
+ };
+ String sql = "select last bigNum,num from root.** order by timeseries desc";
+ testLastQueryOrderBy(sql, ans);
+ }
+
+ @Test
+ public void lastQueryOrderBy3() {
+ String[][] ans =
+ new String[][] {
+ {"51536000000", "51536000000", "51536000000", "51536000000"},
+ {"root.sg.d2.num", "root.sg.d2.bigNum", "root.sg.d.num", "root.sg.d.bigNum"},
+ {"15", "3147483648", "15", "3147483648"},
+ {"INT32", "INT64", "INT32", "INT64"}
+ };
+ String sql = "select last bigNum,num from root.** order by timeseries desc, value asc";
+ testLastQueryOrderBy(sql, ans);
+ }
+
+ @Test
+ public void lastQueryOrderBy4() {
+ String[][] ans =
+ new String[][] {
+ {"51536000000", "51536000000", "51536000000", "51536000000"},
+ {"root.sg.d2.num", "root.sg.d.num", "root.sg.d2.bigNum", "root.sg.d.bigNum"},
+ {"15", "15", "3147483648", "3147483648"},
+ {"INT32", "INT32", "INT64", "INT64"}
+ };
+ String sql = "select last bigNum,num from root.** order by value, timeseries desc";
+ testLastQueryOrderBy(sql, ans);
+ }
+
+ @Test
+ public void lastQueryOrderBy5() {
+ String[][] ans =
+ new String[][] {
+ {"51536000000", "51536000000", "51536000000", "51536000000"},
+ {"root.sg.d2.num", "root.sg.d.num", "root.sg.d2.bigNum", "root.sg.d.bigNum"},
+ {"15", "15", "3147483648", "3147483648"},
+ {"INT32", "INT32", "INT64", "INT64"}
+ };
+ String sql = "select last bigNum,num from root.** order by datatype, timeseries desc";
+ testLastQueryOrderBy(sql, ans);
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
index bfbba1b0f6b9..c608bf3e24c0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
@@ -285,6 +285,28 @@ public void showActiveDeviceTest10() {
basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
}
+ @Test
+ public void showActiveDeviceEmptyTest() {
+ String sql = "show devices root.empty where time < 50";
+ String[] retArray = new String[] {};
+ basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
+
+ sql = "count devices root.empty where time < 50";
+ long value = 0;
+ basicCountActiveDeviceTest(sql, COUNT_DEVICES_COLUMN_NAMES, value);
+ }
+
+ @Test
+ public void showActiveTimeseriesEmptyTest() {
+ String sql = "show timeseries root.empty where time < 50";
+ String[] retArray = new String[] {};
+ basicShowActiveDeviceTest(sql, SHOW_TIMESERIES_COLUMN_NAMES, retArray);
+
+ sql = "count timeseries root.empty where time < 50";
+ long value = 0;
+ basicCountActiveDeviceTest(sql, COUNT_TIMESERIES_COLUMN_NAMES, value);
+ }
+
@Test
public void showActiveTimeseriesTest() {
String sql = "show timeseries where time = 4";
@@ -461,7 +483,6 @@ public static void basicCountActiveDeviceTest(String sql, String columnName, lon
try (ResultSet resultSet = statement.executeQuery(sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- Map map = new HashMap<>();
assertEquals(1, resultSetMetaData.getColumnCount());
assertEquals(columnName, resultSetMetaData.getColumnName(1));
int cnt = 0;
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
index fe9832422c0b..9bb22ea599a4 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
@@ -38,6 +38,7 @@
import java.util.HashSet;
import java.util.Set;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
/**
@@ -264,4 +265,34 @@ public void testQueryDataFromTimeSeriesWithoutData() {
}
Assert.assertEquals(0, cnt);
}
+
+ @Test
+ public void testIllegalInput() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
+ assertThrows(
+ "Unsupported datatype: UNKNOWN",
+ SQLException.class,
+ () -> statement.execute("create timeseries root.sg2.d.s1 with datatype=UNKNOWN"));
+ assertThrows(
+ "Unsupported datatype: VECTOR",
+ SQLException.class,
+ () -> statement.execute("create timeseries root.sg2.d.s1 with datatype=VECTOR"));
+ assertThrows(
+ "Unsupported datatype: YES",
+ SQLException.class,
+ () -> statement.execute("create timeseries root.sg2.d.s1 with datatype=YES"));
+ assertThrows(
+ "Unsupported datatype: UNKNOWN",
+ SQLException.class,
+ () -> statement.execute("create device template t1 (s1 UNKNOWN, s2 boolean)"));
+ assertThrows(
+ "Unsupported datatype: VECTOR",
+ SQLException.class,
+ () -> statement.execute("create device template t1 (s1 VECTOR, s2 boolean)"));
+ } catch (SQLException ignored) {
+ fail();
+ }
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index 0f8ca0df38a9..fc3f71df0aee 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -91,7 +91,7 @@ public class IoTDBSelectIntoIT {
SELECT_INTO_SQL_LIST.add("CREATE DATABASE root.sg_type");
for (int deviceId = 0; deviceId < 6; deviceId++) {
for (TSDataType dataType : TSDataType.values()) {
- if (!dataType.equals(TSDataType.VECTOR)) {
+ if (!dataType.equals(TSDataType.VECTOR) && !dataType.equals(TSDataType.UNKNOWN)) {
SELECT_INTO_SQL_LIST.add(
String.format(
"CREATE TIMESERIES root.sg_type.d_%d.s_%s %s",
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFOrderByIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFOrderByIT.java
index 5114ba42d5bc..46f0a35a249e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFOrderByIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFOrderByIT.java
@@ -53,7 +53,7 @@ public class IoTDBUDAFOrderByIT {
"CREATE DATABASE root.sg",
"CREATE TIMESERIES root.sg.d.num WITH DATATYPE=INT32, ENCODING=RLE",
"CREATE TIMESERIES root.sg.d.bigNum WITH DATATYPE=INT64, ENCODING=RLE",
- "CREATE TIMESERIES root.sg.d.floatNum WITH DATATYPE=DOUBLE, ENCODING=RLE, 'MAX_POINT_NUMBER'='5'",
+ "CREATE TIMESERIES root.sg.d.floatNum WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
"CREATE TIMESERIES root.sg.d.str WITH DATATYPE=TEXT, ENCODING=PLAIN",
"CREATE TIMESERIES root.sg.d.bool WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
"insert into root.sg.d(timestamp,num,bigNum,floatNum,str,bool) values(0,3,2947483648,231.2121,\"coconut\",FALSE)",
@@ -74,7 +74,7 @@ public class IoTDBUDAFOrderByIT {
"flush",
"CREATE TIMESERIES root.sg.d2.num WITH DATATYPE=INT32, ENCODING=RLE",
"CREATE TIMESERIES root.sg.d2.bigNum WITH DATATYPE=INT64, ENCODING=RLE",
- "CREATE TIMESERIES root.sg.d2.floatNum WITH DATATYPE=DOUBLE, ENCODING=RLE, 'MAX_POINT_NUMBER'='5'",
+ "CREATE TIMESERIES root.sg.d2.floatNum WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
"CREATE TIMESERIES root.sg.d2.str WITH DATATYPE=TEXT, ENCODING=PLAIN",
"CREATE TIMESERIES root.sg.d2.bool WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
"insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(0,3,2947483648,231.2121,\"coconut\",FALSE)",
@@ -92,7 +92,6 @@ public class IoTDBUDAFOrderByIT {
"insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(41536000020,14,2907483648,231.34,\"cherry\",FALSE)",
"insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(41536900000,13,2107483648,54.12,\"lychee\",TRUE)",
"insert into root.sg.d2(timestamp,num,bigNum,floatNum,str,bool) values(51536000000,15,3147483648,235.213,\"watermelon\",TRUE)",
- "flush",
};
@BeforeClass
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index 1c021b074215..c00814cab0d4 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -188,8 +188,13 @@ public void testDoubleLivingAutoConflict() throws Exception {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
for (int i = 400; i < 500; ++i) {
if (!TestUtils.tryExecuteNonQueryWithRetry(
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index ecd0b2d5775a..60c0ba37d4b2 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -34,6 +34,7 @@
import java.util.Map;
public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
+
@Test
public void testAutoDropInHistoricalTransfer() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -70,10 +71,69 @@ public void testAutoDropInHistoricalTransfer() throws Exception {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("1,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "show pipes",
+ "ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,RemainingEventCount,EstimatedRemainingSeconds,",
+ Collections.emptySet());
+ }
+ }
+
+ @Test
+ public void testAutoDropInHistoricalTransferWithTimeRange() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv,
+ "insert into root.db.d1(time, s1) values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)")) {
+ return;
+ }
+
+ final Map extractorAttributes = new HashMap<>();
+ final Map processorAttributes = new HashMap<>();
+ final Map connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.mode", "query");
+ extractorAttributes.put("extractor.start-time", "1970-01-01T08:00:02+08:00");
+ extractorAttributes.put("extractor.end-time", "1970-01-01T08:00:04+08:00");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("3,"));
+
TestUtils.assertDataEventuallyOnEnv(
senderEnv,
"show pipes",
- "ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,",
+ "ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,RemainingEventCount,EstimatedRemainingSeconds,",
Collections.emptySet());
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index aeab3f74463f..a017cf112d7a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -226,7 +226,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
leaderIndex = i;
try {
senderEnv.shutdownDataNode(i);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -237,7 +237,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
try {
senderEnv.startDataNode(i);
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -261,8 +261,13 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
Collections.singleton("2,"));
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
@@ -338,7 +343,7 @@ public void testPipeAfterRegisterNewDataNode() throws Exception {
try {
senderEnv.registerNewDataNode(true);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -357,8 +362,13 @@ public void testPipeAfterRegisterNewDataNode() throws Exception {
Collections.singleton("2,"));
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
@@ -439,7 +449,7 @@ public void testCreatePipeWhenRegisteringNewDataNode() throws Exception {
t.start();
try {
senderEnv.registerNewDataNode(true);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -500,7 +510,7 @@ public void testRegisteringNewDataNodeWhenTransferringData() throws Exception {
t.start();
try {
senderEnv.registerNewDataNode(true);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -518,7 +528,7 @@ public void testRegisteringNewDataNodeWhenTransferringData() throws Exception {
try {
senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
}
}
@@ -562,7 +572,7 @@ public void testRegisteringNewDataNodeAfterTransferringData() throws Exception {
try {
senderEnv.registerNewDataNode(true);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -580,7 +590,7 @@ public void testRegisteringNewDataNodeAfterTransferringData() throws Exception {
try {
senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
}
}
@@ -634,7 +644,7 @@ public void testNewDataNodeFailureParallelToTransferringData() throws Exception
senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
@@ -695,7 +705,13 @@ public void testSenderRestartWhenTransferring() throws Exception {
return;
}
- TestUtils.restartCluster(senderEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
+
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.**",
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
index be4aa458d9fb..f7c0c63b842e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
@@ -22,6 +22,8 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -36,11 +38,17 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.junit.Assert.fail;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
@@ -179,4 +187,122 @@ private void doTest(
Collections.singleton("8,"));
}
}
+
+ @Test
+ public void testZstdCompressorLevel() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (1, 1)",
+ "insert into root.db.d1(time, s2) values (1, 1)",
+ "insert into root.db.d1(time, s3) values (1, 1)",
+ "insert into root.db.d1(time, s4) values (1, 1)",
+ "insert into root.db.d1(time, s5) values (1, 1)",
+ "flush"))) {
+ return;
+ }
+
+ // Create 5 pipes with different zstd compression levels, p4 and p5 should fail.
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p1"
+ + " with extractor ('extractor.pattern'='root.db.d1.s1')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='3')",
+ receiverIp, receiverPort));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p2"
+ + " with extractor ('extractor.pattern'='root.db.d1.s2')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='22')",
+ receiverIp, receiverPort));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p3"
+ + " with extractor ('extractor.pattern'='root.db.d1.s3')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='-131072')",
+ receiverIp, receiverPort));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p4"
+ + " with extractor ('extractor.pattern'='root.db.d1.s4')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='-131073')",
+ receiverIp, receiverPort));
+ fail();
+ } catch (SQLException e) {
+ // Make sure the error message in IoTDBConnector.java is returned
+ Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p5"
+ + " with extractor ('extractor.pattern'='root.db.d1.s5')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='23')",
+ receiverIp, receiverPort));
+ fail();
+ } catch (SQLException e) {
+ // Make sure the error message in IoTDBConnector.java is returned
+ Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
+ }
+
+ final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(3, showPipeResult.size());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "count timeseries", "count(timeseries),", Collections.singleton("3,"));
+ }
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
index 745376aa9733..87e00151db15 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
@@ -465,8 +465,13 @@ public void testLifeCycleWithClusterRestart() throws Exception {
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
try (final SyncConfigNodeIServiceClient ignored =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
@@ -529,7 +534,18 @@ public void testReceiverRestartWhenTransferring() throws Exception {
});
t.start();
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ try {
+ t.interrupt();
+ t.join();
+ } catch (Throwable ignored) {
+ }
+ return;
+ }
+
t.join();
if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
return;
@@ -711,8 +727,13 @@ public void testDoubleLiving() throws Exception {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
for (int i = 400; i < 500; ++i) {
if (!TestUtils.tryExecuteNonQueryWithRetry(
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
index fb70e8e37838..b05e695ad3bb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
@@ -173,7 +173,7 @@ public void testSchemaRegionLeaderChange() throws Exception {
try {
index = senderEnv.getFirstLeaderSchemaRegionDataNodeIndex();
senderEnv.shutdownDataNode(index);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
return;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
index 8c19a5e2160c..76cd4a90e8b8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
@@ -84,8 +84,13 @@ public void testAutoRestartSchemaTask() throws Exception {
}
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ return;
+ }
for (int i = 10; i < 20; ++i) {
if (!TestUtils.tryExecuteNonQueryWithRetry(
@@ -142,8 +147,13 @@ public void testAutoRestartConfigTask() throws Exception {
}
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
for (int i = 10; i < 20; ++i) {
if (!TestUtils.tryExecuteNonQueryWithRetry(
diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 1d0ef260ed48..9aa01ad4acfb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -36,19 +36,20 @@ public void setUp() {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
+ setUpConfig();
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ void setUpConfig() {
// enable auto create schema
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- // for IoTDBSubscriptionConsumerGroupIT
- receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
-
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
}
@After
diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 07d5b4ed791b..992d151520f9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -108,6 +108,15 @@ static final class SubscriptionInfo {
}
}
+ @Override
+ void setUpConfig() {
+ super.setUpConfig();
+
+ // Enable air gap receiver
+ receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
+ }
+
+ @Override
@Before
public void setUp() {
super.setUp();
diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
new file mode 100644
index 000000000000..2b1cc407b7f7
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.dual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2Subscription.class})
+public class IoTDBSubscriptionTimePrecisionIT extends AbstractSubscriptionDualIT {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class);
+
+ @Override
+ void setUpConfig() {
+ super.setUpConfig();
+
+ // Set timestamp precision to nanosecond
+ senderEnv.getConfig().getCommonConfig().setTimestampPrecision("ns");
+ receiverEnv.getConfig().getCommonConfig().setTimestampPrecision("ns");
+ }
+
+ @Test
+ public void testTopicTimePrecision() throws Exception {
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+
+ // Insert some historical data on sender
+ final long currentTime1 = System.currentTimeMillis() * 1000_000L; // in nanosecond
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s2) values (%s, 1)", currentTime1 - i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic on sender
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+ try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
+ session.open();
+ {
+ final Properties config = new Properties();
+ config.put(TopicConstant.START_TIME_KEY, currentTime1 - 99);
+ config.put(
+ TopicConstant.END_TIME_KEY,
+ TopicConstant.NOW_TIME_VALUE); // now should be strictly larger than current time 1
+ session.createTopic(topic1, config);
+ }
+ {
+ final Properties config = new Properties();
+ config.put(
+ TopicConstant.START_TIME_KEY,
+ TopicConstant.NOW_TIME_VALUE); // now should be strictly smaller than current time 2
+ session.createTopic(topic2, config);
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Insert some historical data on sender again
+ final long currentTime2 = System.currentTimeMillis() * 1000_000L; // in nanosecond
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d2(time, s1) values (%s, 1)", currentTime2 + i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscribe on sender and insert on receiver
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer();
+ final ISession session = receiverEnv.getSessionConnection()) {
+ consumer.open();
+ consumer.subscribe(topic1, topic2);
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
+ final List messages =
+ consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ for (final Iterator it =
+ message.getSessionDataSetsHandler().tabletIterator();
+ it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ // Auto unsubscribe topics
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Check data on receiver
+ try {
+ try (final Connection connection = receiverEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
+ .pollInterval(
+ IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
+ .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
+ new HashMap() {
+ {
+ put("count(root.db.d1.s2)", "100");
+ put("count(root.db.d2.s1)", "100");
+ }
+ }));
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 7091c93b4dbc..0cd6bc0d0a10 100644
--- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -506,7 +506,7 @@ public void testTopicInvalidTimeRangeConfig() throws Exception {
session.open();
final Properties properties = new Properties();
properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
- properties.put(TopicConstant.END_TIME_KEY, "now");
+ properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE);
session.createTopic("topic1", properties);
fail();
} catch (final Exception ignored) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 46b697e44db6..13a5e8abc261 100644
--- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -60,6 +60,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
@@ -181,7 +182,7 @@ public void testBasicSubscribeTsFile() throws Exception {
}
// Create topic
- final String topicName = "topic2";
+ final String topicName = "topic1";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
@@ -297,11 +298,12 @@ public void testBasicPullConsumerWithCommitAsync() throws Exception {
}
// Create topic
+ final String topicName = "topic1";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -325,7 +327,7 @@ public void testBasicPullConsumerWithCommitAsync() throws Exception {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
while (!isClosed.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
final List messages =
@@ -370,7 +372,7 @@ public void onFailure(final Throwable e) {
}
});
}
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// avoid fail
@@ -450,11 +452,12 @@ public void testBasicPushConsumer() {
}
// Create topic
+ final String topicName = "topic1";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -480,7 +483,7 @@ public void testBasicPushConsumer() {
.buildPushConsumer()) {
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
// The push consumer should automatically poll 10 rows of data by 1 onReceive()
Awaitility.await()
@@ -537,11 +540,112 @@ public void testBasicPushConsumer() {
Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
});
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ @Test
+ public void testPollUnsubscribedTopics() throws Exception {
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+ for (int i = 100; i < 200; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+ session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
+
+ // Create topic
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
+ session.open();
+ {
+ final Properties properties = new Properties();
+ properties.put(TopicConstant.END_TIME_KEY, 99);
+ session.createTopic("topic1", properties);
+ }
+ {
+ final Properties properties = new Properties();
+ properties.put(TopicConstant.START_TIME_KEY, 100);
+ session.createTopic("topic2", properties);
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ final AtomicLong timestampSum = new AtomicLong();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe("topic2"); // only subscribe topic2
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
+ final List messages =
+ consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ timestampSum.getAndAdd(dataSet.next().getTimestamp());
+ rowCount.addAndGet(1);
+ }
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ // automatically unsubscribe topics when closing
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Check row count
+ try {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
+ .pollInterval(
+ IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
+ .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ Assert.assertEquals(100, rowCount.get());
+ Assert.assertEquals((100 + 199) * 100 / 2, timestampSum.get());
+ });
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 231845921e13..2764fad56b83 100644
--- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -119,7 +119,12 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
}
// Restart cluster
- TestUtils.restartCluster(EnvFactory.getEnv());
+ try {
+ TestUtils.restartCluster(EnvFactory.getEnv());
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
// Show topics and subscriptions
try (final SyncConfigNodeIServiceClient client =
@@ -254,9 +259,14 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
}
// Shutdown DN 1 & DN 2
- Thread.sleep(10000); // wait some time
- EnvFactory.getEnv().shutdownDataNode(1);
- EnvFactory.getEnv().shutdownDataNode(2);
+ try {
+ Thread.sleep(10000); // wait some time
+ EnvFactory.getEnv().shutdownDataNode(1);
+ EnvFactory.getEnv().shutdownDataNode(2);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
// Subscription again
final Map timestamps = new HashMap<>();
@@ -297,10 +307,15 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
thread.start();
// Start DN 1 & DN 2
- Thread.sleep(10000); // wait some time
- EnvFactory.getEnv().startDataNode(1);
- EnvFactory.getEnv().startDataNode(2);
- ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
+ try {
+ Thread.sleep(10000); // wait some time
+ EnvFactory.getEnv().startDataNode(1);
+ EnvFactory.getEnv().startDataNode(2);
+ ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
// Insert some realtime data
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
index ec53326f4274..f04f387fd381 100644
--- a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
@@ -89,7 +89,7 @@ protected void testOnWindows() throws IOException {
"root",
"-pw",
"root",
- "-td",
+ "-t",
"target",
"-q",
"select * from root.test.t2 where time > 1 and time < 1000000000000",
@@ -115,7 +115,7 @@ protected void testOnWindows() throws IOException {
"root",
"-pw",
"root",
- "-td",
+ "-t",
"target",
"-q",
"select * from root.test.t2 where time > 1 and time < 1000000000000",
@@ -141,7 +141,7 @@ protected void testOnWindows() throws IOException {
"root",
"-pw",
"root",
- "-td",
+ "-t",
"target",
"-type",
"sql",
@@ -170,7 +170,7 @@ protected void testOnUnix() throws IOException {
"root",
"-pw",
"root",
- "-td",
+ "-t",
"target",
"-q",
"select * from root.**");
@@ -193,7 +193,7 @@ protected void testOnUnix() throws IOException {
"root",
"-pw",
"root",
- "-td",
+ "-t",
"target",
"-q",
"select * from root.**");
@@ -216,7 +216,7 @@ protected void testOnUnix() throws IOException {
"root",
"-pw",
"root",
- "-td",
+ "-t",
"target",
"-type",
"sql",
diff --git a/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportDataTestIT.java b/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportDataTestIT.java
index 625d2b9481c1..391b726f6d73 100644
--- a/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportDataTestIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportDataTestIT.java
@@ -25,8 +25,8 @@
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -46,7 +46,7 @@ public class ImportDataTestIT extends AbstractScript {
private static String libPath;
- @Before
+ @BeforeClass
public static void setUp() {
EnvFactory.getEnv().initClusterEnvironment();
ip = EnvFactory.getEnv().getIP();
@@ -55,7 +55,7 @@ public static void setUp() {
libPath = EnvFactory.getEnv().getLibPath();
}
- @After
+ @AfterClass
public static void tearDown() {
EnvFactory.getEnv().cleanClusterEnvironment();
}
@@ -74,7 +74,7 @@ public void test() throws IOException {
@Override
protected void testOnWindows() throws IOException {
final String[] output = {
- "The file name must end with \"csv\" or \"txt\" or \"sql\"!",
+ "The file name must end with \"csv\" or \"txt\"!",
};
ProcessBuilder builder =
new ProcessBuilder(
@@ -89,7 +89,7 @@ protected void testOnWindows() throws IOException {
"root",
"-pw",
"root",
- "-f",
+ "-s",
"./",
"&",
"exit",
@@ -101,7 +101,7 @@ protected void testOnWindows() throws IOException {
@Override
protected void testOnUnix() throws IOException {
final String[] output = {
- "The file name must end with \"csv\" or \"txt\" or \"sql\"!",
+ "The file name must end with \"csv\" or \"txt\"!",
};
ProcessBuilder builder =
new ProcessBuilder(
@@ -115,7 +115,7 @@ protected void testOnUnix() throws IOException {
"root",
"-pw",
"root",
- "-f",
+ "-s",
"./");
builder.environment().put("CLASSPATH", libPath);
testOutput(builder, output, 0);
diff --git a/iotdb-api/external-api/pom.xml b/iotdb-api/external-api/pom.xml
index 791019515461..860e1453757c 100644
--- a/iotdb-api/external-api/pom.xml
+++ b/iotdb-api/external-api/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-api
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
external-api
IoTDB: API: External API
diff --git a/iotdb-api/pipe-api/pom.xml b/iotdb-api/pipe-api/pom.xml
index 10944b63e75c..c0e4338b3ef1 100644
--- a/iotdb-api/pipe-api/pom.xml
+++ b/iotdb-api/pipe-api/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-api
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
pipe-api
IoTDB: API: Pipe API
diff --git a/iotdb-api/pom.xml b/iotdb-api/pom.xml
index 3744f1f5b1e4..f29b64894b6f 100644
--- a/iotdb-api/pom.xml
+++ b/iotdb-api/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-parent
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
iotdb-api
pom
diff --git a/iotdb-api/trigger-api/pom.xml b/iotdb-api/trigger-api/pom.xml
index 88cb4ed0bab6..49b192aebe70 100644
--- a/iotdb-api/trigger-api/pom.xml
+++ b/iotdb-api/trigger-api/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-api
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
trigger-api
IoTDB: API: Trigger API
diff --git a/iotdb-api/udf-api/pom.xml b/iotdb-api/udf-api/pom.xml
index d11ae4dbceb2..f6b5897c826b 100644
--- a/iotdb-api/udf-api/pom.xml
+++ b/iotdb-api/udf-api/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-api
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
udf-api
IoTDB: API: UDF API
diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/utils/RowImpl.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/utils/RowImpl.java
index e0ca3a53fc65..d7d3a8848690 100644
--- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/utils/RowImpl.java
+++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/utils/RowImpl.java
@@ -45,36 +45,57 @@ public long getTime() {
@Override
public int getInt(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return (int) rowRecord[columnIndex];
}
@Override
public long getLong(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return (long) rowRecord[columnIndex];
}
@Override
public float getFloat(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return (float) rowRecord[columnIndex];
}
@Override
public double getDouble(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return (double) rowRecord[columnIndex];
}
@Override
public boolean getBoolean(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return (boolean) rowRecord[columnIndex];
}
@Override
public Binary getBinary(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return transformToUDFBinary((org.apache.tsfile.utils.Binary) rowRecord[columnIndex]);
}
@Override
public String getString(int columnIndex) {
+ if (columnIndex >= size()) {
+ throw new IndexOutOfBoundsException("Index out of bound error!");
+ }
return rowRecord[columnIndex].toString();
}
diff --git a/iotdb-client/cli/pom.xml b/iotdb-client/cli/pom.xml
index c8451aac10ed..cc62e9e8d8d5 100644
--- a/iotdb-client/cli/pom.xml
+++ b/iotdb-client/cli/pom.xml
@@ -24,7 +24,7 @@
org.apache.iotdb
iotdb-client
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
iotdb-cli
IoTDB: Client: CLI
@@ -37,37 +37,37 @@
org.apache.iotdb
iotdb-session
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-jdbc
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-antlr
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
node-commons
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
iotdb-server
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
isession
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.iotdb
service-rpc
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.apache.tsfile
@@ -82,7 +82,7 @@
org.apache.iotdb
iotdb-thrift
- 1.3.2-SNAPSHOT
+ 1.3.3-SNAPSHOT
org.slf4j
diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
index 6ae1a1791dcc..1e689e903c1a 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
@@ -31,6 +31,7 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,17 +44,21 @@ public abstract class AbstractDataTool {
protected static final String HOST_ARGS = "h";
protected static final String HOST_NAME = "host";
+ protected static final String HOST_DEFAULT_VALUE = "127.0.0.1";
protected static final String HELP_ARGS = "help";
protected static final String PORT_ARGS = "p";
protected static final String PORT_NAME = "port";
+ protected static final String PORT_DEFAULT_VALUE = "6667";
protected static final String PW_ARGS = "pw";
protected static final String PW_NAME = "password";
+ protected static final String PW_DEFAULT_VALUE = "root";
protected static final String USERNAME_ARGS = "u";
protected static final String USERNAME_NAME = "username";
+ protected static final String USERNAME_DEFAULT_VALUE = "root";
protected static final String TIME_FORMAT_ARGS = "tf";
protected static final String TIME_FORMAT_NAME = "timeformat";
@@ -61,7 +66,7 @@ public abstract class AbstractDataTool {
protected static final String TIME_ZONE_ARGS = "tz";
protected static final String TIME_ZONE_NAME = "timeZone";
- protected static final String TIMEOUT_ARGS = "t";
+ protected static final String TIMEOUT_ARGS = "timeout";
protected static final String TIMEOUT_NAME = "timeout";
protected static final int MAX_HELP_CONSOLE_WIDTH = 92;
protected static final String[] TIME_FORMAT =
@@ -125,10 +130,14 @@ public abstract class AbstractDataTool {
protected AbstractDataTool() {}
- protected static String checkRequiredArg(String arg, String name, CommandLine commandLine)
+ protected static String checkRequiredArg(
+ String arg, String name, CommandLine commandLine, String defaultValue)
throws ArgsErrorException {
String str = commandLine.getOptionValue(arg);
if (str == null) {
+ if (StringUtils.isNotBlank(defaultValue)) {
+ return defaultValue;
+ }
String msg = String.format("Required values for option '%s' not provided", name);
LOGGER.info(msg);
LOGGER.info("Use -help for more information");
@@ -145,11 +154,10 @@ protected static void setTimeZone() throws IoTDBConnectionException, StatementEx
}
protected static void parseBasicParams(CommandLine commandLine) throws ArgsErrorException {
- host = checkRequiredArg(HOST_ARGS, HOST_NAME, commandLine);
- port = checkRequiredArg(PORT_ARGS, PORT_NAME, commandLine);
- username = checkRequiredArg(USERNAME_ARGS, USERNAME_NAME, commandLine);
-
- password = commandLine.getOptionValue(PW_ARGS);
+ host = checkRequiredArg(HOST_ARGS, HOST_NAME, commandLine, HOST_DEFAULT_VALUE);
+ port = checkRequiredArg(PORT_ARGS, PORT_NAME, commandLine, PORT_DEFAULT_VALUE);
+ username = checkRequiredArg(USERNAME_ARGS, USERNAME_NAME, commandLine, USERNAME_DEFAULT_VALUE);
+ password = commandLine.getOptionValue(PW_ARGS, PW_DEFAULT_VALUE);
}
protected static boolean checkTimeFormat() {
@@ -176,30 +184,27 @@ protected static Options createNewOptions() {
Option opHost =
Option.builder(HOST_ARGS)
.longOpt(HOST_NAME)
- .required()
.argName(HOST_NAME)
.hasArg()
- .desc("Host Name (required)")
+ .desc("Host Name (optional)")
.build();
options.addOption(opHost);
Option opPort =
Option.builder(PORT_ARGS)
.longOpt(PORT_NAME)
- .required()
.argName(PORT_NAME)
.hasArg()
- .desc("Port (required)")
+ .desc("Port (optional)")
.build();
options.addOption(opPort);
Option opUsername =
Option.builder(USERNAME_ARGS)
.longOpt(USERNAME_NAME)
- .required()
.argName(USERNAME_NAME)
.hasArg()
- .desc("Username (required)")
+ .desc("Username (optional)")
.build();
options.addOption(opUsername);
@@ -209,7 +214,7 @@ protected static Options createNewOptions() {
.optionalArg(true)
.argName(PW_NAME)
.hasArg()
- .desc("Password (required)")
+ .desc("Password (optional)")
.build();
options.addOption(opPassword);
return options;
diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
index bbf8750000ed..68f140535a55 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
@@ -67,14 +67,14 @@
*/
public class ExportData extends AbstractDataTool {
- private static final String TARGET_DIR_ARGS = "td";
+ private static final String TARGET_DIR_ARGS = "t";
private static final String TARGET_DIR_NAME = "targetDirectory";
- private static final String TARGET_FILE_ARGS = "f";
- private static final String TARGET_FILE_NAME = "targetFile";
+ private static final String TARGET_FILE_ARGS = "tfn";
+ private static final String TARGET_FILE_NAME = "targetFileName";
private static final String SQL_FILE_ARGS = "s";
- private static final String SQL_FILE_NAME = "sqlfile";
+ private static final String SQL_FILE_NAME = "sourceSqlFile";
private static final String DATA_TYPE_ARGS = "datatype";
private static final String DATA_TYPE_NAME = "datatype";
@@ -90,8 +90,8 @@ public class ExportData extends AbstractDataTool {
private static final String ALIGNED_ARGS = "aligned";
private static final String ALIGNED_NAME = "export aligned insert sql";
- private static final String LINES_PER_FILE_ARGS = "linesPerFile";
- private static final String LINES_PER_FILE_ARGS_NAME = "Lines Per File";
+ private static final String LINES_PER_FILE_ARGS = "lpf";
+ private static final String LINES_PER_FILE_ARGS_NAME = "linesPerFile";
private static final String TSFILEDB_CLI_PREFIX = "ExportData";
@@ -207,7 +207,7 @@ public static void main(String[] args) {
}
private static void parseSpecialParams(CommandLine commandLine) throws ArgsErrorException {
- targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME, commandLine);
+ targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME, commandLine, null);
targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
needDataTypePrinted = Boolean.valueOf(commandLine.getOptionValue(DATA_TYPE_ARGS));
queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
index 647bd2a497fc..fac86ac07fd0 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
@@ -75,8 +75,8 @@
public class ImportData extends AbstractDataTool {
- private static final String FILE_ARGS = "f";
- private static final String FILE_NAME = "file or folder";
+ private static final String FILE_ARGS = "s";
+ private static final String FILE_NAME = "sourceFileOrFolder";
private static final String FAILED_FILE_ARGS = "fd";
private static final String FAILED_FILE_NAME = "failed file directory";
@@ -98,8 +98,8 @@ public class ImportData extends AbstractDataTool {
private static final String TYPE_INFER_ARGS = "typeInfer";
private static final String TYPE_INFER_ARGS_NAME = "type infer";
- private static final String LINES_PER_FAILED_FILE_ARGS = "linesPerFailedFile";
- private static final String LINES_PER_FAILED_FILE_ARGS_NAME = "Lines Per FailedFile";
+ private static final String LINES_PER_FAILED_FILE_ARGS = "lpf";
+ private static final String LINES_PER_FAILED_FILE_ARGS_NAME = "linesPerFailedFile";
private static final String TSFILEDB_CLI_PREFIX = "ImportData";
diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
index acd55b3eb42b..610c9e1f7566 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
@@ -115,7 +115,6 @@ private static Options createOptions() {
Option opFailedFile =
Option.builder(FAILED_FILE_ARGS)
- .required(false)
.longOpt(FAILED_FILE_NAME)
.hasArg()
.argName(FAILED_FILE_ARGS_NAME)
@@ -124,13 +123,6 @@ private static Options createOptions() {
.build();
options.addOption(opFailedFile);
- Option opAligned =
- Option.builder(ALIGNED_ARGS)
- .longOpt(ALIGNED_ARGS)
- .desc("Whether import schema as aligned timeseries(optional)")
- .build();
- options.addOption(opAligned);
-
Option opBatchPointSize =
Option.builder(BATCH_POINT_SIZE_ARGS)
.longOpt(BATCH_POINT_SIZE_NAME)
@@ -341,35 +333,35 @@ private static void writeScheme(
hasStarted.set(true);
} else if (pointSize.get() >= batchPointSize) {
try {
- writeAndEmptyDataSet(
- paths,
- dataTypes,
- encodings,
- compressors,
- null,
- null,
- null,
- measurementAlias,
- 3);
- writeAndEmptyDataSet(
- pathsWithAlias,
- dataTypesWithAlias,
- encodingsWithAlias,
- compressorsWithAlias,
- null,
- null,
- null,
- null,
- 3);
- paths.clear();
- dataTypes.clear();
- encodings.clear();
- compressors.clear();
- measurementAlias.clear();
- pointSize.set(0);
+ if (CollectionUtils.isNotEmpty(paths)) {
+ writeAndEmptyDataSet(
+ paths, dataTypes, encodings, compressors, null, null, null, null, 3);
+ }
+ } catch (Exception e) {
+ paths.forEach(t -> failedRecords.add(Collections.singletonList(t)));
+ }
+ try {
+ if (CollectionUtils.isNotEmpty(pathsWithAlias)) {
+ writeAndEmptyDataSet(
+ pathsWithAlias,
+ dataTypesWithAlias,
+ encodingsWithAlias,
+ compressorsWithAlias,
+ null,
+ null,
+ null,
+ measurementAlias,
+ 3);
+ }
} catch (Exception e) {
- failedRecords.add((List