Skip to content

Commit

Permalink
Merge pull request #1814 from com-kalisita/neo4jwriter
Browse files Browse the repository at this point in the history
Feature :  Neo4jWriter for DataX
  • Loading branch information
penglin358 authored Jul 14, 2023
2 parents 6c7e0f8 + ec3b972 commit 5028b2c
Show file tree
Hide file tree
Showing 23 changed files with 1,739 additions and 48 deletions.
99 changes: 51 additions & 48 deletions README.md

Large diffs are not rendered by default.

193 changes: 193 additions & 0 deletions neo4jwriter/doc/neo4jwriter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# DataX neo4jWriter 插件文档

## 功能简介

本目前市面上的neo4j 批量导入主要有Cypher Create,Load CSV,第三方或者官方提供的Batch Import。Load CSV支持节点10W级别一下,Batch Import 需要对数据库进行停机。要想实现不停机的数据写入,Cypher是最好的方式。

## 支持版本

支持Neo4j 4 和Neo4j 5,如果是Neo4j 3,需要自行将驱动降低至相对应的版本进行编译。

## 实现原理

将datax的数据转换成了neo4j驱动能识别的对象,利用 unwind 语法进行批量插入。

## 如何配置

### 配置项介绍

| 配置 | 说明 | 是否必须 | 默认值 | 示例 |
|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- |
| database | 数据库名字 || - | neo4j |
| uri | 数据库访问链接 || - | bolt://localhost:7687 |
| username | 访问用户名 || - | neo4j |
| password | 访问密码 || - | neo4j |
| bearerToken | 权限相关 || - | - |
| kerberosTicket | 权限相关 || - | - |
| cypher | 同步语句 || - | unwind $batch as row create(p) set p.name = row.name |
| batchDataVariableName | unwind 携带的数据变量名 | | | batch |
| properties | 定义neo4j中数据的属性名字和类型 || - | 见后续案例 |
| batchSize | 一批写入数据量 || 1000 | |
| maxTransactionRetryTimeSeconds | 事务运行最长时间 || 30秒 | 30 |
| maxConnectionTimeoutSeconds | 驱动最长链接时间 || 30秒 | 30 |
| retryTimes | 发生错误的重试次数 || 3次 | 3 |
| retrySleepMills | 重试失败后的等待时间 || 3秒 | 3 |

### 支持的数据类型
> 配置时均忽略大小写
```
BOOLEAN,
STRING,
LONG,
SHORT,
INTEGER,
DOUBLE,
FLOAT,
LOCAL_DATE,
LOCAL_TIME,
LOCAL_DATE_TIME,
LIST,
//map类型支持 . 属性表达式取值
MAP,
CHAR_ARRAY,
BYTE_ARRAY,
BOOLEAN_ARRAY,
STRING_ARRAY,
LONG_ARRAY,
INT_ARRAY,
SHORT_ARRAY,
DOUBLE_ARRAY,
FLOAT_ARRAY,
Object_ARRAY
```

### 写节点

这里提供了一个写节点包含很多类型属性的例子。你可以在我的测试方法中运行。

```json
"writer": {
"name": "neo4jWriter",
"parameter": {
"uri": "neo4j://localhost:7687",
"username": "neo4j",
"password": "Test@12343",
"database": "neo4j",
"cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate",
"batchDataVariableName": "batch",
"batchSize": "33",
"properties": [
{
"name": "pbool",
"type": "BOOLEAN"
},
{
"name": "pstring",
"type": "STRING"
},
{
"name": "plong",
"type": "LONG"
},
{
"name": "pshort",
"type": "SHORT"
},
{
"name": "pdouble",
"type": "DOUBLE"
},
{
"name": "pstringarr",
"type": "STRING_ARRAY",
"split": ","
},
{
"name": "plocaldate",
"type": "LOCAL_DATE",
"dateFormat": "yyyy-MM-dd"
}
]
}
}
```

### 写关系

```json
"writer": {
"name": "neo4jWriter",
"parameter": {
"uri": "neo4j://localhost:7687",
"username": "neo4j",
"password": "Test@12343",
"database": "neo4j",
"cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)",
"batchDataVariableName": "batch",
"batch_size": "33",
"properties": [
{
"name": "startNodeId",
"type": "STRING"
},
{
"name": "endNodeId",
"type": "STRING"
}
]
}
}
```

### 节点/关系类型动态写

> 需要使用AOPC函数拓展,如果你的数据库没有,请安装APOC函数拓展
```json
"writer": {
"name": "neo4jWriter",
"parameter": {
"uri": "bolt://localhost:7687",
"username": "yourUserName",
"password": "yourPassword",
"database": "yourDataBase",
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
"batchDataVariableName": "batch",
"batch_size": "1",
"properties": [
{
"name": "Label",
"type": "STRING"
},
{
"name": "id",
"type": "STRING"
}
]
}
}
```

## 注意事项

* properties定义的顺序需要与reader端顺序一一对应。
* 灵活使用map类型,可以免去很多数据加工的烦恼。在cypher中,可以根据 . 属性访问符号一直取值。比如 unwind $batch as row create (p) set p.name = row.prop.name,set p.age = row.prop.age,在这个例子中,prop是map类型,包含name和age两个属性。
* 如果提示事务超时,建议调大事务运行时间或者调小batchSize
* 如果用于更新场景,遇到死锁问题影响写入,建议二开源码加入死锁异常检测,并进行重试。

## 性能报告

**JVM参数**

16G G1垃圾收集器 8核心

**Neo4j数据库配置**

32核心,256G

**datax 配置**

* Channel 20 batchsize = 1000
* 任务平均流量:15.23MB/s
* 记录写入速度:44440 rec/s
* 读出记录总数:2222013
90 changes: 90 additions & 0 deletions neo4jwriter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>neo4jwriter</artifactId>
<name>neo4jwriter</name>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
<junit4.version>4.13.2</junit4.version>
<test.container.version>1.17.6</test.container.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>${neo4j-java-driver.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test.container.version}</version>
</dependency>
<!-- Testcontainers 1.x is tightly coupled with the JUnit 4.x rule API-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit4.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
35 changes: 35 additions & 0 deletions neo4jwriter/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/neo4jwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>neo4jwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/neo4jwriter</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/neo4jwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
Loading

0 comments on commit 5028b2c

Please sign in to comment.