Skip to content

Commit

Permalink
Towards the Neo4j CDC integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrizZz110 committed Nov 3, 2024
1 parent a04f837 commit 5a90bfd
Show file tree
Hide file tree
Showing 8 changed files with 944 additions and 1 deletion.
32 changes: 32 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ under the License.
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.11</slf4j.version>
<jackson.databind.version>2.14.2</jackson.databind.version>
<jackson.version>2.14.1</jackson.version>

<!-- Set this to `provided` for cluster execution or `compile` for local IDE execution -->
<flink.scope>compile</flink.scope>
Expand Down Expand Up @@ -122,6 +124,29 @@ under the License.
<!-- Set this to `provided` for cluster execution or `compile` for local IDE execution -->
<scope>${flink.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Adding jackson dependencies. They must be in the default scope (compile). -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- jackson needs jackson-datatype-jsr310 for Java 8 java.time.Instant support -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- Gradoop dependencies -->
<dependency>
Expand Down Expand Up @@ -220,6 +245,12 @@ under the License.
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>4.4.18</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -274,6 +305,7 @@ under the License.
<goal>shade</goal>
</goals>
<configuration>
<outputFile>target/jar/${project.artifactId}-${project.version}-shaded.jar</outputFile>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package edu.dbsleipzig.stream.grouping.application;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.util.JSONPObject;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import edu.dbsleipzig.stream.grouping.application.functions.CDCEventToTripleMapper;
import edu.dbsleipzig.stream.grouping.application.functions.events.CDCEvent;
import edu.dbsleipzig.stream.grouping.application.functions.events.EventDeserializationSchema;
import edu.dbsleipzig.stream.grouping.impl.algorithm.GraphStreamGrouping;
import edu.dbsleipzig.stream.grouping.impl.algorithm.TableGroupingBase;
import edu.dbsleipzig.stream.grouping.impl.functions.aggregation.AvgProperty;
import edu.dbsleipzig.stream.grouping.impl.functions.aggregation.Count;
import edu.dbsleipzig.stream.grouping.impl.functions.aggregation.MaxProperty;
import edu.dbsleipzig.stream.grouping.impl.functions.aggregation.MinProperty;
import edu.dbsleipzig.stream.grouping.impl.functions.utils.WindowConfig;
import edu.dbsleipzig.stream.grouping.model.graph.StreamGraph;
import edu.dbsleipzig.stream.grouping.model.graph.StreamGraphConfig;
import edu.dbsleipzig.stream.grouping.model.graph.StreamTriple;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.gradoop.flink.model.impl.operators.aggregation.functions.min.MinEdgeProperty;

import java.util.HashMap;
import java.util.HashSet;


public class CDCNeo4jApp {

public static void main(String[] args) throws Exception {

String TOPIC = "creates";

KafkaSource<CDCEvent> source =
KafkaSource.<CDCEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics(TOPIC)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(new EventDeserializationSchema()))
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStream<CDCEvent> kafkaStringStream = env.fromSource(
source, WatermarkStrategy.forMonotonousTimestamps(), "KafkaSource");

DataStream<StreamTriple> graphStreamTriples = kafkaStringStream.flatMap(new CDCEventToTripleMapper());

StreamGraph streamGraph = StreamGraph.fromFlinkStream(graphStreamTriples, new StreamGraphConfig(env));

// Configure and build the grouping operator
GraphStreamGrouping groupingOperator = new TableGroupingBase.GroupingBuilder()
.setWindowSize(60, WindowConfig.TimeUnit.SECONDS)
.addVertexGroupingKey(":label")
.addVertexGroupingKey("id")
.addEdgeGroupingKey(":label")
.addEdgeGroupingKey("operation")
//.addVertexAggregateFunction(new Count())
.addEdgeAggregateFunction(new Count())
.addEdgeAggregateFunction(new MinProperty("duration", "minDuration"))
.addEdgeAggregateFunction(new MaxProperty( "duration", "maxDuration"))
.addEdgeAggregateFunction(new AvgProperty( "duration", "avgDuration"))
.build();

// Execute the grouping and overwrite the input stream with the grouping result
streamGraph = groupingOperator.execute(streamGraph);

// Print on console
streamGraph.print();

streamGraph.toTripleStream().map(new MapFunction<StreamTriple, String>() {
@Override
public String map(StreamTriple value) throws Exception {

ObjectMapper objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());

JsonNode object = objectMapper.valueToTree(value);

JsonNode srcVertex = object.get("source");
JsonNode trgVertex = object.get("target");



HashMap<String, String> sourcePropertyMap = new HashMap<>();
value.getSource().getVertexProperties().iterator().forEachRemaining(property -> {
sourcePropertyMap.put(property.getKey(), property.getValue().toString());
});

HashMap<String, String> targetPropertyMap = new HashMap<>();
value.getTarget().getVertexProperties().iterator().forEachRemaining(property -> {
targetPropertyMap.put(property.getKey(), property.getValue().toString());
});

((ObjectNode) srcVertex).put("vertexProperties", objectMapper.valueToTree(sourcePropertyMap));
((ObjectNode) trgVertex).put("vertexProperties", objectMapper.valueToTree(targetPropertyMap));

HashMap<String, String> edgePropertyMap = new HashMap<>();
value.getProperties().iterator().forEachRemaining(property -> {
edgePropertyMap.put(property.getKey(), property.getValue().toString());
});

((ObjectNode) object).put("properties", objectMapper.valueToTree(edgePropertyMap));

return object.toString();

}
});
// .print();

// Write to Socket
// SocketClientSink<String> socketSink = new SocketClientSink<>("localhost", 19093, new SimpleStringSchema());
//
// streamGraph
// .toTripleStream()
// .map(StreamTriple::toString)
// .returns(TypeInformation.of(String.class))
// .addSink(socketSink);

// Trigger the workflow execution
env.execute();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package edu.dbsleipzig.stream.grouping.application.functions;

import edu.dbsleipzig.stream.grouping.application.functions.events.CDCEvent;
import edu.dbsleipzig.stream.grouping.model.graph.StreamTriple;
import edu.dbsleipzig.stream.grouping.model.graph.StreamVertex;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.properties.Properties;

import java.sql.Timestamp;
import java.time.Instant;


public class CDCEventToTripleMapper implements FlatMapFunction<CDCEvent, StreamTriple> {
public CDCEventToTripleMapper() {

}

private static final long serialVersionUID = 1L;


@Override
public void flatMap(CDCEvent event, Collector<StreamTriple> out) throws Exception {

if (event == null) {
return;
}

// Parse the String to an Instant
Instant instant = Instant.parse(event.getMetadata().getTxStartTime().getTZDT());

StreamTriple triple = new StreamTriple();

triple.setId(event.getId());
triple.setLabel(event.getEvent().getType());
triple.setTimestamp(Timestamp.from(instant));

Properties properties = new Properties();
properties.set("operation", event.getEvent().getOperation());
properties.set("startTime", event.getEvent().getState().getAfter().getProperties().getStartTime().getTZDT());
properties.set("tripId",event.getEvent().getState().getAfter().getProperties().getTripId().getS());
properties.set("rideType",event.getEvent().getState().getAfter().getProperties().getRideType().getS());
properties.set("endTime", event.getEvent().getState().getAfter().getProperties().getEndTime().getTZDT());
properties.set("userType", event.getEvent().getState().getAfter().getProperties().getUserType().getS());
properties.set("duration", event.getEvent().getState().getAfter().getProperties().getDuration().getI64());
triple.setProperties(properties);

StreamVertex src = new StreamVertex();
String srcId = event.getEvent().getStart().getElementId();
src.setVertexLabel(event.getEvent().getStart().getLabels()[0]);
src.setVertexId(srcId);
src.setEventTime(Timestamp.from(instant));
Properties srcProperties = new Properties();
srcProperties.set("id", srcId);
src.setVertexProperties(srcProperties);

triple.setSource(src);

StreamVertex trg = new StreamVertex();
String trgId = event.getEvent().getEnd().getElementId();
trg.setVertexLabel(event.getEvent().getEnd().getLabels()[0]);
trg.setVertexId(trgId);
trg.setEventTime(Timestamp.from(instant));
Properties trgProperties = new Properties();
trgProperties.set("id", trgId);
trg.setVertexProperties(trgProperties);

triple.setTarget(trg);

out.collect(triple);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package edu.dbsleipzig.stream.grouping.application.functions;

import edu.dbsleipzig.stream.grouping.model.graph.StreamTriple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionWork;


public class Neo4jSink extends RichSinkFunction<StreamTriple> {

Driver driver;

// URI examples: "neo4j://localhost", "neo4j+s://xxx.databases.neo4j.io"
final String dbUri = "neo4j://localhost";
final String dbUser = "neo4j";
final String dbPassword = "password";

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

try (Driver driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
System.out.println("Connection established.");
this.driver = driver;
}
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public void invoke(StreamTriple streamTriple, Context context) throws Exception {
super.invoke(streamTriple, context);

try (Session session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {

try {
String id = session.writeTransaction(tx -> writeTriple(tx, streamTriple));
System.out.printf("Triple %s added to database.", id);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}

private String writeTriple(Transaction tx, StreamTriple streamTriple) {
String query = "MATCH (s1:Station), (s2:Station) WHERE elementId(s1) = $srcId AND elementId(s2) = $trgId " +
"WITH s1, s2 " +
"CREATE (s1)-[t:CDC_TRIP]->(s2) " +
"SET t.operation = $operation " +
"SET t.count = $count ";

return null;
}
}
Loading

0 comments on commit 5a90bfd

Please sign in to comment.