Skip to content

Commit

Permalink
Merge pull request #87 from usdot-jpo-ode/broadcast-rate-zero
Browse files Browse the repository at this point in the history
Broadcast Rate Events for MAPs and SPATs that stop being broadcasted
  • Loading branch information
John-Wiens authored May 22, 2024
2 parents ec90c1c + aa7d59d commit 121c951
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package us.dot.its.jpo.conflictmonitor.monitor.topologies.validation;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
Expand All @@ -17,8 +10,13 @@
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.ProcessedValidationMessage;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Common code for {@link MapValidationTopology} and {@link SpatAssessmentsTopoloby}
* Common code for {@link MapValidationTopology} and {@link SpatValidationTopology}
*/
public abstract class BaseValidationTopology<TParams>
extends BaseStreamsTopology<TParams> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package us.dot.its.jpo.conflictmonitor.monitor.topologies.validation;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.ProcessingTimePeriod;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.broadcast_rate.BroadcastRateEvent;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;

import java.time.Duration;

/**
* Base class for zero broadcast rate checker. Maintains a state store with the latest wall clock timestamp for each
* key, and emits events if the current time is more than the rolling window period later.
* @param <TItem> Type of input items, ProcessedMap or ProcessedSpat
* @param <TEvent> Type of output events
*/
public abstract class BaseZeroRateChecker<TItem, TEvent extends BroadcastRateEvent>
implements Processor<RsuIntersectionKey, TItem, RsuIntersectionKey, TEvent> {

protected abstract Logger getLogger();

protected abstract TEvent createEvent();

private KeyValueStore<RsuIntersectionKey, Long> store;
private final int maxAgeMillis;
private final int checkEveryMillis;
private final String inputTopicName;
private final String stateStoreName;

ProcessorContext<RsuIntersectionKey, TEvent> context;

public BaseZeroRateChecker(final int rollingPeriodSeconds, final int outputIntervalSeconds, final String inputTopicName, final String stateStoreName) {
this.maxAgeMillis = rollingPeriodSeconds * 1000;
this.checkEveryMillis = outputIntervalSeconds * 1000;
this.inputTopicName = inputTopicName;
this.stateStoreName = stateStoreName;
}

@Override
public void init(ProcessorContext<RsuIntersectionKey, TEvent> context) {
this.context = context;
this.store = context.getStateStore(stateStoreName);
context.schedule(Duration.ofMillis(checkEveryMillis),
PunctuationType.WALL_CLOCK_TIME,
this::punctuate);
}

@Override
public void process(Record<RsuIntersectionKey, TItem> record) {
store.put(record.key(), System.currentTimeMillis());
}

private void punctuate(long timestamp) {
// Check if any keys are older than the max age
try (var storeIterator = store.all()) {
while (storeIterator.hasNext()) {
KeyValue<RsuIntersectionKey, Long> item =storeIterator.next();
RsuIntersectionKey key = item.key;
Long lastTimestamp = item.value;
if (timestamp - lastTimestamp > maxAgeMillis) {
emitZeroEvent(key, timestamp);
}
}
}
}

private void emitZeroEvent(RsuIntersectionKey key, long timestamp) {
getLogger().info("emit zero rate event for key = {} at timestamp = {}", key, timestamp);
TEvent event = createEvent();
event.setSource(key.toString());
event.setIntersectionID(key.getIntersectionId());
event.setRoadRegulatorID(key.getRegion());
event.setTopicName(inputTopicName);
ProcessingTimePeriod timePeriod = new ProcessingTimePeriod();
timePeriod.setBeginTimestamp(timestamp - maxAgeMillis);
timePeriod.setEndTimestamp(timestamp);
event.setTimePeriod(timePeriod);
event.setNumberOfMessages(0);
context.forward(new Record<>(key, event, timestamp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,7 +22,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.models.events.minimum_data.MapMinimumDataEvent;
import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes;
import us.dot.its.jpo.geojsonconverter.partitioner.IntersectionIdPartitioner;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdPartitioner;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
Expand All @@ -45,12 +47,20 @@ protected Logger getLogger() {
return logger;
}


private static final String LATEST_TIMESTAMP_STORE = "latest-timestamp-store";


public Topology buildTopology() {
var builder = new StreamsBuilder();

// Create state store for zero count
var zeroCountStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(LATEST_TIMESTAMP_STORE),
us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
Serdes.Long());

builder.addStateStore(zeroCountStoreBuilder);

KStream<RsuIntersectionKey, ProcessedMap<LineString>> processedMapStream = builder
.stream(parameters.getInputTopicName(),
Consumed.with(
Expand Down Expand Up @@ -90,30 +100,48 @@ public Topology buildTopology() {
new IntersectionIdPartitioner<RsuIntersectionKey, MapMinimumDataEvent>())
);





// Save the timestamp of the latest message for each key in a state store to be queried by the zero-check task
processedMapStream.process(() ->
new MapZeroRateChecker(
parameters.getRollingPeriodSeconds(),
parameters.getOutputIntervalSeconds(),
parameters.getInputTopicName(),
LATEST_TIMESTAMP_STORE
), LATEST_TIMESTAMP_STORE)
// Emit zero-rate events to the topic
.to(parameters.getBroadcastRateTopicName(),
Produced.with(
us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
JsonSerdes.MapBroadcastRateEvent(),
new IntersectionIdPartitioner<RsuIntersectionKey, MapBroadcastRateEvent>())
);


// Perform count for Broadcast Rate analysis
KStream<Windowed<RsuIntersectionKey>, Long> countStream =
processedMapStream
.mapValues((value) -> 1) // Map the value to the constant int 1 (key remains the same)
.groupByKey(
Grouped.with(us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(), Serdes.Integer())
)
.windowedBy(
// Hopping window
TimeWindows
.ofSizeAndGrace(Duration.ofSeconds(parameters.getRollingPeriodSeconds()), Duration.ofMillis(parameters.getGracePeriodMilliseconds()))
.advanceBy(Duration.ofSeconds(parameters.getOutputIntervalSeconds()))
)
.count(
Materialized.<RsuIntersectionKey, Long, WindowStore<Bytes, byte[]>>as("map-counts")
.withKeySerde(us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey())
.withValueSerde(Serdes.Long())
)
.suppress(
Suppressed.untilWindowCloses(BufferConfig.unbounded())
)
.toStream();
.mapValues((value) -> 1) // Map the value to the constant int 1 (key remains the same)
.groupByKey(
Grouped.with(us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(), Serdes.Integer())
)
.windowedBy(
// Hopping window
TimeWindows
.ofSizeAndGrace(Duration.ofSeconds(parameters.getRollingPeriodSeconds()), Duration.ofMillis(parameters.getGracePeriodMilliseconds()))
.advanceBy(Duration.ofSeconds(parameters.getOutputIntervalSeconds()))
)
.count(
Materialized.<RsuIntersectionKey, Long, WindowStore<Bytes, byte[]>>as("map-counts")
.withKeySerde(us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey())
.withValueSerde(Serdes.Long())
)
.suppress(
Suppressed.untilWindowCloses(BufferConfig.unbounded())
)
.toStream();


countStream = countStream.peek((windowedKey, value) -> {
Expand All @@ -138,7 +166,7 @@ public Topology buildTopology() {
MapBroadcastRateEvent event = new MapBroadcastRateEvent();
event.setSource(windowedKey.key().toString());
event.setIntersectionID(windowedKey.key().getIntersectionId());
event.setRoadRegulatorID(-1);
event.setRoadRegulatorID(windowedKey.key().getRegion());
event.setTopicName(parameters.getInputTopicName());
ProcessingTimePeriod timePeriod = new ProcessingTimePeriod();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package us.dot.its.jpo.conflictmonitor.monitor.topologies.validation;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.broadcast_rate.MapBroadcastRateEvent;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;


@Slf4j
public class MapZeroRateChecker
extends BaseZeroRateChecker<ProcessedMap<LineString>, MapBroadcastRateEvent> {

public MapZeroRateChecker(int rollingPeriodSeconds, int outputIntervalSeconds, String inputTopicName, String stateStoreName) {
super(rollingPeriodSeconds, outputIntervalSeconds, inputTopicName, stateStoreName);
}

@Override
protected Logger getLogger() {
return log;
}

@Override
protected MapBroadcastRateEvent createEvent() {
return new MapBroadcastRateEvent();
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,7 +19,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.models.events.minimum_data.SpatMinimumDataEvent;
import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes;
import us.dot.its.jpo.geojsonconverter.partitioner.IntersectionIdPartitioner;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdPartitioner;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;

Expand All @@ -42,11 +42,21 @@ protected Logger getLogger() {
return logger;
}

private static final String LATEST_TIMESTAMP_STORE = "latest-timestamp-store";


@Override
public Topology buildTopology() {
var builder = new StreamsBuilder();

// Create state store for zero count
var zeroCountStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(LATEST_TIMESTAMP_STORE),
us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
Serdes.Long());

builder.addStateStore(zeroCountStoreBuilder);

KStream<RsuIntersectionKey, ProcessedSpat> processedSpatStream = builder
.stream(parameters.getInputTopicName(),
Consumed.with(
Expand Down Expand Up @@ -78,6 +88,21 @@ public Topology buildTopology() {
JsonSerdes.SpatMinimumDataEvent(),
new IntersectionIdPartitioner<RsuIntersectionKey, SpatMinimumDataEvent>())
);

// Save the timestamp of the latest message for each key in a state store to be queried by the zero-check task
processedSpatStream.process(() ->
new SpatZeroRateChecker(
parameters.getRollingPeriodSeconds(),
parameters.getOutputIntervalSeconds(),
parameters.getInputTopicName(),
LATEST_TIMESTAMP_STORE
), LATEST_TIMESTAMP_STORE)
.to(parameters.getBroadcastRateTopicName(),
Produced.with(
us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
JsonSerdes.SpatBroadcastRateEvent(),
new IntersectionIdPartitioner<RsuIntersectionKey, SpatBroadcastRateEvent>()
));

// Perform count for Broadcast Rate analysis
KStream<Windowed<RsuIntersectionKey>, Long> countStream =
Expand Down Expand Up @@ -154,5 +179,6 @@ public Topology buildTopology() {
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package us.dot.its.jpo.conflictmonitor.monitor.topologies.validation;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.broadcast_rate.SpatBroadcastRateEvent;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;

@Slf4j
public class SpatZeroRateChecker
extends BaseZeroRateChecker<ProcessedSpat, SpatBroadcastRateEvent> {

public SpatZeroRateChecker(int rollingPeriodSeconds, int outputIntervalSeconds, String inputTopicName, String stateStoreName) {
super(rollingPeriodSeconds, outputIntervalSeconds, inputTopicName, stateStoreName);
}

@Override
protected Logger getLogger() {
return log;
}

@Override
protected SpatBroadcastRateEvent createEvent() {
return new SpatBroadcastRateEvent();
}
}
1 change: 1 addition & 0 deletions jpo-conflictmonitor/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ map.validation:
debug: false



# BSM Repartition
repartition:
algorithm: defaultRepartitionAlgorithm
Expand Down

0 comments on commit 121c951

Please sign in to comment.