Skip to content

Commit

Permalink
improved session handling and introduced parameter handling for cyphe…
Browse files Browse the repository at this point in the history
…r query
  • Loading branch information
galpha committed Nov 4, 2024
1 parent 5a90bfd commit 06af065
Showing 1 changed file with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionWork;

import java.util.HashMap;
import java.util.Map;


public class Neo4jSink extends RichSinkFunction<StreamTriple> {

Driver driver;
Session session;

// URI examples: "neo4j://localhost", "neo4j+s://xxx.databases.neo4j.io"
final String dbUri = "neo4j://localhost";
Expand All @@ -28,7 +28,7 @@ public void open(Configuration parameters) throws Exception {
try (Driver driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
System.out.println("Connection established.");
this.driver = driver;
this.session = driver.session();
}
}

Expand All @@ -41,24 +41,28 @@ public void close() throws Exception {
public void invoke(StreamTriple streamTriple, Context context) throws Exception {
super.invoke(streamTriple, context);

try (Session session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {

try {
String id = session.writeTransaction(tx -> writeTriple(tx, streamTriple));
System.out.printf("Triple %s added to database.", id);
} catch (Exception e) {
System.out.println(e.getMessage());
}
try {
writeTriple(streamTriple);
System.out.println("Triple added to database.");
} catch (Exception e) {
System.out.println(e.getMessage());
}

}

private String writeTriple(Transaction tx, StreamTriple streamTriple) {
private void writeTriple(StreamTriple streamTriple) {
String query = "MATCH (s1:Station), (s2:Station) WHERE elementId(s1) = $srcId AND elementId(s2) = $trgId " +
"WITH s1, s2 " +
"CREATE (s1)-[t:CDC_TRIP]->(s2) " +
"SET t.operation = $operation " +
"SET t.count = $count ";
"SET t.operation = $operation, " +
"t.count = $count ";

Map<String, Object> params = new HashMap<>();
params.put("srcId", streamTriple.getSource().getVertexId());
params.put("trgId", streamTriple.getTarget().getVertexId());
params.put("count", streamTriple.getProperties().get("count"));


return null;
this.session.run(query, params);
}
}

0 comments on commit 06af065

Please sign in to comment.