Skip to content

Commit

Permalink
Merge pull request #85 from usdot-jpo-ode/dedup-processedmapwkt
Browse files Browse the repository at this point in the history
Dedup processedmapwkt
  • Loading branch information
John-Wiens authored May 24, 2024
2 parents 83d1afe + 42c6c18 commit 67bec78
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.MapDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedMapDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedMapWktDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.TimDeduplicatorTopology;

@Controller
Expand Down Expand Up @@ -45,21 +46,28 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
ProcessedMapDeduplicatorTopology processedMapDeduplicatorTopology = new ProcessedMapDeduplicatorTopology(
"topic.ProcessedMap",
"topic.DeduplicatedProcessedMap",
props.createStreamProperties("ProcessedMapdeduplicator")
props.createStreamProperties("ProcessedMapDeduplicator")
);
processedMapDeduplicatorTopology.start();

ProcessedMapWktDeduplicatorTopology processedMapWktDeduplicatorTopology = new ProcessedMapWktDeduplicatorTopology(
"topic.ProcessedMapWKT",
"topic.DeduplicatedProcessedMapWKT",
props.createStreamProperties("ProcessedMapWKTdeduplicator")
);
processedMapWktDeduplicatorTopology.start();

MapDeduplicatorTopology mapDeduplicatorTopology = new MapDeduplicatorTopology(
"topic.OdeMapJson",
"topic.DeduplicatedOdeMapJson",
props.createStreamProperties("Mapdeduplicator")
props.createStreamProperties("MapDeduplicator")
);
mapDeduplicatorTopology.start();

TimDeduplicatorTopology timDeduplicatorTopology = new TimDeduplicatorTopology(
"topic.OdeTimJson",
"topic.DeduplicatedOdeTimJson",
props.createStreamProperties("Timdeduplicator")
props.createStreamProperties("TimDeduplicator")
);
timDeduplicatorTopology.start();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package us.dot.its.jpo.deduplicator.deduplicator.models;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;

@NoArgsConstructor
@Setter
@Getter
public class ProcessedMapWktPair {

public ProcessedMap<String> message;
public boolean shouldSend;

public ProcessedMapWktPair(ProcessedMap<String> message, boolean shouldSend){
this.message = message;
this.shouldSend = shouldSend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.kafka.common.serialization.Serdes;

import us.dot.its.jpo.deduplicator.deduplicator.models.ProcessedMapPair;
import us.dot.its.jpo.deduplicator.deduplicator.models.ProcessedMapWktPair;
import us.dot.its.jpo.deduplicator.deduplicator.models.OdeMapPair;
import us.dot.its.jpo.deduplicator.deduplicator.models.JsonPair;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.DeserializedRawMap;
Expand All @@ -18,6 +19,12 @@ public static Serde<ProcessedMapPair> ProcessedMapPair() {
new JsonDeserializer<>(ProcessedMapPair.class));
}

public static Serde<ProcessedMapWktPair> ProcessedMapWktPair() {
return Serdes.serdeFrom(
new JsonSerializer<ProcessedMapWktPair>(),
new JsonDeserializer<>(ProcessedMapWktPair.class));
}

public static Serde<OdeMapPair> OdeMapPair() {
return Serdes.serdeFrom(
new JsonSerializer<OdeMapPair>(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package us.dot.its.jpo.deduplicator.deduplicator.topologies;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

import us.dot.its.jpo.deduplicator.deduplicator.models.ProcessedMapWktPair;
import us.dot.its.jpo.deduplicator.deduplicator.serialization.PairSerdes;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;

import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Properties;

public class ProcessedMapWktDeduplicatorTopology {

private static final Logger logger = LoggerFactory.getLogger(MapDeduplicatorTopology.class);

Topology topology;
KafkaStreams streams;
String inputTopic;
String outputTopic;
Properties streamsProperties;
ObjectMapper objectMapper;

public ProcessedMapWktDeduplicatorTopology(String inputTopic, String outputTopic, Properties streamsProperties){
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.streamsProperties = streamsProperties;
this.objectMapper = new ObjectMapper();
}


public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
}
Topology topology = buildTopology();
streams = new KafkaStreams(topology, streamsProperties);
if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler);
if (stateListener != null) streams.setStateListener(stateListener);
streams.start();
}

public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();

KStream<String, ProcessedMap<String>> inputStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), JsonSerdes.ProcessedMapWKT()));

KStream<String, ProcessedMap<String>> deduplicatedStream = inputStream
.groupByKey(Grouped.with(Serdes.String(), JsonSerdes.ProcessedMapWKT()))
.aggregate(() -> new ProcessedMapWktPair(new ProcessedMap<String>(), true),
(key, newValue, aggregate)->{

// Handle the first message where the aggregate map isn't good.
if(aggregate.getMessage().getProperties() == null){
return new ProcessedMapWktPair(newValue, true );
}

Instant newValueTime = newValue.getProperties().getTimeStamp().toInstant();
Instant oldValueTime = aggregate.getMessage().getProperties().getTimeStamp().toInstant();

if(newValueTime.minus(Duration.ofHours(1)).isAfter(oldValueTime)){
return new ProcessedMapWktPair(newValue, true );
}else{
ZonedDateTime newValueTimestamp = newValue.getProperties().getTimeStamp();
ZonedDateTime newValueOdeReceivedAt = newValue.getProperties().getOdeReceivedAt();

newValue.getProperties().setTimeStamp(aggregate.getMessage().getProperties().getTimeStamp());
newValue.getProperties().setOdeReceivedAt(aggregate.getMessage().getProperties().getOdeReceivedAt());

int oldHash = aggregate.getMessage().getProperties().hashCode();
int newhash = newValue.getProperties().hashCode();

if(oldHash != newhash){
newValue.getProperties().setTimeStamp(newValueTimestamp);
newValue.getProperties().setOdeReceivedAt(newValueOdeReceivedAt);
return new ProcessedMapWktPair(newValue, true);
}else{
return new ProcessedMapWktPair(aggregate.getMessage(), false);
}
}
}, Materialized.with(Serdes.String(), PairSerdes.ProcessedMapWktPair()))
.toStream()
.flatMap((key, value) ->{
ArrayList<KeyValue<String, ProcessedMap<String>>> outputList = new ArrayList<>();
if(value != null && value.isShouldSend()){
outputList.add(new KeyValue<>(key, value.getMessage()));
}
return outputList;
});


deduplicatedStream.to(outputTopic, Produced.with(Serdes.String(), JsonSerdes.ProcessedMapWKT()));

return builder.build();

}

public void stop() {
logger.info("Stopping Processed Map deduplicator Socket Broadcast Topology.");
if (streams != null) {
streams.close();
streams.cleanUp();
streams = null;
}
logger.info("Stopped Processed Map deduplicator Socket Broadcast Topology.");
}

StateListener stateListener;
public void registerStateListener(StateListener stateListener) {
this.stateListener = stateListener;
}

StreamsUncaughtExceptionHandler exceptionHandler;
public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}



}
7 changes: 7 additions & 0 deletions jpo-deduplicator/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ kafka.topics:
- name: topic.DeduplicatedProcessedMap
cleanupPolicy: delete
retentionMs: 300000
- name: topic.ProcessedMapWKT
cleanupPolicy: delete
retentionMs: 300000
- name: topic.DeduplicatedProcessedMapWKT
cleanupPolicy: delete
retentionMs: 300000
- name: topic.OdeMapJson
cleanupPolicy: delete
retentionMs: 300000
Expand All @@ -38,4 +44,5 @@ kafka.topics:
- name: topic.DeduplicatedOdeTimJson
cleanupPolicy: delete
retentionMs: 300000


Large diffs are not rendered by default.

0 comments on commit 67bec78

Please sign in to comment.