Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

实现IOTDB读写插件 #2167

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions datax-example/datax-example-iotdb/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-example</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>datax-example-iotdb</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>iotdbreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>iotdbwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>txtfilereader</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>txtfilewriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.31</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.alibaba.datax.example.iotdb;

import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;

public class TestCreateData {
private static Session session;
private static Random random = new Random();

@Test
public void createAndInsert()
throws IoTDBConnectionException, StatementExecutionException {
// 创建测试数据
// session init
session =
new Session.Builder()
// .host("192.168.150.100")
.host("172.20.31.61")
.port(6667)
.username("root")
.password("root")
.version(Version.V_0_13)
.build();

// open session, close RPCCompression
session.open(false);

// set session fetchSize
session.setFetchSize(10000);

// 创建测点并插入数据
String filePath = "src/test/resources/testData.txt";
String database = "root.cgn";
try {
session.createDatabase(database);
} catch (StatementExecutionException e) {
if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
throw e;
}
}
String device = "root.cgn.device";
createAndInsert2(filePath, device);
}

private static void createAndInsert2(String filePath, String device)
throws IoTDBConnectionException, StatementExecutionException {
// 读取文件(文件中无表头)
// 点的类型 点名 描述 量纲 量程下限 量程上限
// AX L2KRT008MA 测试性描述 % 0 1.00E+02
// AX L2ETY101MP 测试性描述 % 0 1.00E+02
List<List<String>> res = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
String line;
while ((line = br.readLine()) != null) {
String[] words = line.split("\\s+");
List<String> wordList = new ArrayList<>(Arrays.asList(words));
res.add(wordList);
}
} catch (IOException e) {
e.printStackTrace();
}

// 准备传入的参数,构造时间序列
List<String> paths = new ArrayList<>();
List<String> measurements = new ArrayList<>();
List<Boolean> isDoubleList = new ArrayList<>();
List<TSDataType> tsDataTypes = new ArrayList<>();
List<TSEncoding> tsEncodings = new ArrayList<>();
List<CompressionType> compressionTypes = new ArrayList<>();
List<Map<String, String>> tagsList = new ArrayList<>();
List<Map<String, String>> attributesList = new ArrayList<>();
List<String> alias = new ArrayList<>();

for (int i = 0; i < res.size(); i++) {
measurements.add(res.get(i).get(1));
paths.add(device + "." + res.get(i).get(1));
boolean isDouble = "AX".equals(res.get(i).get(0));
isDoubleList.add(isDouble);
tsDataTypes.add(isDouble ? TSDataType.DOUBLE : TSDataType.BOOLEAN);
tsEncodings.add(isDouble ? TSEncoding.GORILLA : TSEncoding.RLE);
compressionTypes.add(CompressionType.SNAPPY);
Map<String, String> attributes = new HashMap<>();
attributes.put("描述", "测试性描述");
attributes.put("量纲", isDouble ? "%" : "");
attributes.put("量程下限", "0");
attributes.put("量程上限", isDouble ? "1.00E+02" : "2.00E+02");
attributesList.add(attributes);
}

// 先删除已有的时间序列
if (session.checkTimeseriesExists(device + ".**")) {
session.deleteTimeseries(device + ".**");
System.out.println("删除已有的时间序列完成==============");
}

// 创建测点时间序列
session.createMultiTimeseries(
paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null);

// 插入数据:每个测点里都写1万条数据,时间间隔1秒
List<List<String>> measurementsList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();

long startTime =
LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
int count = 10000; // 每个测点插入数据条数

for (long time = startTime; count >= 0; time += 1000, count--) {
timestamps.add(time);
measurementsList.add(measurements); // 39个测点
typesList.add(tsDataTypes);

List<Object> randomValue = new ArrayList<>();
for (Boolean isDouble : isDoubleList) {
randomValue.add(isDouble ? random.nextDouble() * 100 : random.nextBoolean());
}
valuesList.add(randomValue); // 39个随机数

// 每1000次插入一批数据
if (count != 10000 && count % 1000 == 0) {
session.insertRecordsOfOneDevice(
device, timestamps, measurementsList, typesList, valuesList);
measurementsList.clear();
valuesList.clear();
typesList.clear();
timestamps.clear();
valuesList.clear();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.alibaba.datax.example.iotdb;

import com.alibaba.datax.example.ExampleContainer;
import com.alibaba.datax.example.util.PathUtil;
import org.junit.Test;

public class TestIoTDB {

@Test
public void testIoTDBReader2MySQLWriter() {
String path = "/iotdb2mysql.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testMySQLReader2IoTDBWriter() {
String path = "/mysql2iotdb.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testIoTDBReader2txtWriter() {
String path = "/iotdb2txt.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testIoTDBReader2StreamWriter() {
String path = "/iotdb2stream.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testMySQLReader2StreamWriter() {
String path = "/mysql2stream.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testMySQLReader2txtWriter() {
String path = "/mysql2txt.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testStreamReader2TxtWriter() {
String path = "/stream2txt.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

@Test
public void testStreamReader2StreamWriter() {
String path = "/stream2stream.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "iotdbreader",
"parameter": {
"username": "root",
"password": "root",
"host": "172.20.31.61",
"port": 6667,
"fetchSize": 10000,
"version": "V_1_0",
"timeColumnPosition": 0,
"querySqls":[
],
"device": "root.cgn.device",
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "toy123",
"writeMode": "insert",
"#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from device"
],
"connection": [
{
"table": [
"device"
],
"#": "下面的URL需要把中括号去掉,否则报错,mysqlwriter的bug,未修改",
"jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
}
]
}
}
}
]
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "iotdbreader",
"parameter": {
"username": "root",
"password": "root",
"host": "192.168.150.100",
"port": 6667,
"fetchSize": 10000,
"version": "V_1_0",
"timeColumnPosition": 0,
"querySqls":[

],
"device": "root.cgn.device",
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}

Loading