Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition by intersection #50

Merged
merged 17 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.AlgorithmParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.StreamsTopology;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.ConfigParameters;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.map.MapIndex;
import us.dot.its.jpo.conflictmonitor.monitor.topologies.ConfigTopology;
import us.dot.its.jpo.conflictmonitor.monitor.topologies.IntersectionEventTopology;
import us.dot.its.jpo.conflictmonitor.monitor.utils.BsmUtils;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.ode.model.OdeBsmData;

Expand Down Expand Up @@ -104,7 +108,8 @@ public List<Object> parameterObjects() {
"connectors",
"spatial-indexes",
"spat-window-store",
"bsm-window-store"
"bsm-window-store",
"map-store"
);
return getJsonResponse(linkMap);
} catch (Exception ex) {
Expand Down Expand Up @@ -256,6 +261,21 @@ private void addLinks(Map<String, String> map, String... paths) {
return getJsonResponse(allItems);
}

@GetMapping(value = "/map-store")
public @ResponseBody ResponseEntity<String> mapStore() {
var mapStore = intersectionEventTopology.getMapStore();
var mapMap = new TreeMap<String, ProcessedMap<LineString>>();
try (var mapIterator = mapStore.all()) {
while (mapIterator.hasNext()) {
var kvp = mapIterator.next();
RsuIntersectionKey key = kvp.key;
ProcessedMap<LineString> map = kvp.value;
mapMap.put(key.toString(), map);
}
}
return getJsonResponse(mapMap);
}

@GetMapping(value = "/spat-window-store")
public @ResponseBody ResponseEntity<String> spatWindowStore() {
var spatWindowStore = intersectionEventTopology.getSpatWindowStore();
Expand Down Expand Up @@ -299,13 +319,13 @@ private void addLinks(Map<String, String> map, String... paths) {
try (var bsmIterator = bsmWindowStore.all()) {
while (bsmIterator.hasNext()) {
var kvp = bsmIterator.next();
Windowed<BsmIntersectionKey> key = kvp.key;
Windowed<BsmIntersectionIdKey> key = kvp.key;
Instant startTime = key.window().startTime();
Instant endTime = key.window().endTime();
BsmIntersectionKey theKey= key.key();
BsmIntersectionIdKey theKey= key.key();
OdeBsmData value = kvp.value;
// Integer intersectionId = value.();
String vehicleId = IntersectionEventTopology.getBsmID(value);
String vehicleId = BsmUtils.getVehicleId(value);
TreeMap<String, TreeMap<String, OdeBsmData>> bsms = null;
if (intersectionMap.containsKey(vehicleId)) {
bsms = intersectionMap.get(vehicleId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public class BsmEventParameters {
updateType = READ_ONLY)
String outputTopic;

@ConfigData(key = "bsm.event.bsmIntersectionOutputTopic",
description = "The Kafka topic to write BSMs partitioned by intersection to",
updateType = READ_ONLY)
String bsmIntersectionOutputTopic;

@ConfigData(key = "bsm.event.stateStoreName",
description = "The name of the Timestamped KeyValue Store for BSMs",
updateType = READ_ONLY)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package us.dot.its.jpo.conflictmonitor.monitor.algorithms.intersection_event;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;

import us.dot.its.jpo.conflictmonitor.monitor.algorithms.StreamsTopology;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
Expand All @@ -15,7 +15,7 @@
public interface IntersectionEventStreamsAlgorithm
extends IntersectionEventAlgorithm, StreamsTopology {

ReadOnlyWindowStore<BsmIntersectionKey, OdeBsmData> getBsmWindowStore();
ReadOnlyWindowStore<BsmIntersectionIdKey, OdeBsmData> getBsmWindowStore();
ReadOnlyWindowStore<RsuIntersectionKey, ProcessedSpat> getSpatWindowStore();
ReadOnlyKeyValueStore<RsuIntersectionKey, ProcessedMap<LineString>> getMapStore();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;

import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
Expand All @@ -15,7 +16,7 @@
public interface MessageIngestStreamsAlgorithm
extends MessageIngestAlgorithm {

ReadOnlyWindowStore<BsmIntersectionKey, OdeBsmData> getBsmWindowStore(KafkaStreams streams);
ReadOnlyWindowStore<BsmIntersectionIdKey, OdeBsmData> getBsmWindowStore(KafkaStreams streams);
ReadOnlyWindowStore<RsuIntersectionKey, ProcessedSpat> getSpatWindowStore(KafkaStreams streams);
ReadOnlyKeyValueStore<RsuIntersectionKey, ProcessedMap<LineString>> getMapStore(KafkaStreams streams);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.EqualsAndHashCode;
import lombok.Generated;
import lombok.Getter;
import lombok.Setter;
import lombok.*;
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.ode.model.OdeBsmData;

@Getter
@Setter
@EqualsAndHashCode
@ToString
@Generated
public class BsmEvent {
private OdeBsmData startingBsm;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package us.dot.its.jpo.conflictmonitor.monitor.models.bsm;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import us.dot.its.jpo.conflictmonitor.monitor.models.map.IntersectionRegion;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;

@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class BsmIntersectionIdKey extends RsuIntersectionKey {

String bsmId;

public BsmIntersectionIdKey() {}

public BsmIntersectionIdKey(String bsmId, String rsuId, int intersectionId) {
super(rsuId, intersectionId);
this.bsmId = bsmId;
}

public BsmIntersectionIdKey(String bsmId, String rsuId, int intersectionId, int region) {
super(rsuId, intersectionId, region);
this.bsmId = bsmId;
}

@JsonIgnore
public IntersectionRegion getIntersectionRegion() {
return new IntersectionRegion(getIntersectionId(), getRegion());
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package us.dot.its.jpo.conflictmonitor.monitor.models.bsm;

import java.util.Objects;

import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -14,16 +12,16 @@
@EqualsAndHashCode
@Getter
@Setter
public class BsmIntersectionKey implements RsuIdKey {
public class BsmRsuIdKey implements RsuIdKey {

private static final Logger logger = LoggerFactory.getLogger(BsmIntersectionKey.class);
private static final Logger logger = LoggerFactory.getLogger(BsmRsuIdKey.class);

private String rsuId;
private String bsmId;

public BsmIntersectionKey() {}
public BsmRsuIdKey() {}

public BsmIntersectionKey(String rsuId, String bsmId){
public BsmRsuIdKey(String rsuId, String bsmId){
this.rsuId = rsuId;
this.bsmId = bsmId;
}
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Conflict Monitor has been using -1 to indicate an unknown intersectionId and Region. Do we want to be using -1 instead of null?

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ public class IntersectionRegion {
public IntersectionRegion() {}

public IntersectionRegion(Integer intersectionId, Integer region) {
this.intersectionId = intersectionId;
this.region = region;
if (intersectionId != null && intersectionId.intValue() >= 0) {
this.intersectionId = intersectionId;
}
if (region != null && region.intValue() >= 0) {
this.region = region;
}
}

public IntersectionRegion(ProcessedMap map) {
Expand Down
Loading