Skip to content

Commit

Permalink
增加TestPattern单元测试。删除接口函数签名中的throws Exception声明
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Aug 27, 2023
1 parent fa62aed commit 7bb761d
Show file tree
Hide file tree
Showing 29 changed files with 582 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ public static IntRangeBean parse(String str) {
return null;

int pos = str.indexOf(SEPARATOR);
if (pos < 0)
throw new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str);
if (pos < 0) {
Integer start = ConvertHelper.stringToInt(str,
err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str));
return of(start, 1);
}

Integer start = ConvertHelper.stringToInt(str.substring(0, pos),
err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ public static LongRangeBean parse(String str) {
return null;

int pos = str.indexOf(SEPARATOR);
if (pos < 0)
throw new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str);
if (pos < 0) {
Long start = ConvertHelper.stringToLong(str,
err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str));
return of(start, 1);
}

Long start = ConvertHelper.stringToLong(str.substring(0, pos),
err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str));
Expand Down
6 changes: 6 additions & 0 deletions nop-stream/nop-stream-cep/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
<groupId>io.github.entropy-cloud</groupId>
<artifactId>nop-stream-core</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.nop.stream.cep;

import io.nop.api.core.annotations.core.Description;
import io.nop.api.core.config.IConfigReference;
import io.nop.api.core.util.SourceLocation;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import static io.nop.api.core.config.AppConfig.varRef;

public interface NopCepConfigs {
SourceLocation s_loc = SourceLocation.fromClass(NopCepConfigs.class);

String COMMON_HINT =
"And it could accelerate the CEP operate process "
+ "speed and limit the capacity of cache in pure memory. Note: It's only effective to "
+ "limit usage of memory when 'state.backend' was set as 'rocksdb', which would "
+ "transport the elements exceeded the number of the cache into the rocksdb state "
+ "storage instead of memory state storage.";

@Description("The Config option to set the maximum element number the "
+ "eventsBufferCache of SharedBuffer could hold. "
+ COMMON_HINT)
IConfigReference<Integer> CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS =
varRef(s_loc, "nop.cep.pipeline.sharedbuffer.cache.event-slots", Integer.class, 1024);

@Description("The Config option to set the maximum element number the entryCache"
+ " of SharedBuffer could hold. And it could accelerate the"
+ " CEP operate process speed with state."
+ COMMON_HINT)
IConfigReference<Integer> CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS =
varRef(s_loc, "nop.cep.pipeline.sharedbuffer.cache.entry-slots", Integer.class, 1024);

@Description("The interval to log the information of cache state statistics in "
+ "CEP operator.")
IConfigReference<Duration> CEP_CACHE_STATISTICS_INTERVAL =
varRef(s_loc, "nop.cep.pipeline.sharedbuffer.cache.statistics-interval", Duration.class, Duration.of(30, ChronoUnit.MINUTES));

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,22 @@
import java.io.Serializable;
import java.time.Duration;

import static io.nop.stream.cep.NopCepConfigs.CEP_CACHE_STATISTICS_INTERVAL;
import static io.nop.stream.cep.NopCepConfigs.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS;
import static io.nop.stream.cep.NopCepConfigs.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS;

@DataBean
public final class SharedBufferCacheConfig implements Serializable {
private final int eventsBufferCacheSlots;
private final int entryCacheSlots;
private final Duration cacheStatisticsInterval;

public SharedBufferCacheConfig() {
this(CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS.get(),
CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS.get(),
CEP_CACHE_STATISTICS_INTERVAL.get());
}

public int getEventsBufferCacheSlots() {
return eventsBufferCacheSlots;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public PatternFlatSelectAdapter(final PatternFlatSelectFunction<IN, OUT> flatSel
}

@Override
public void open(final Configuration parameters) throws Exception {
public void open(final Configuration parameters) {
FunctionUtils.setFunctionRuntimeContext(flatSelectFunction, getRuntimeContext());
FunctionUtils.openFunction(flatSelectFunction, parameters);
}

@Override
public void close() throws Exception {
public void close() {
FunctionUtils.closeFunction(flatSelectFunction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public PatternSelectAdapter(final PatternSelectFunction<IN, OUT> selectFunction)
}

@Override
public void open(final Configuration parameters) throws Exception {
public void open(final Configuration parameters) {
FunctionUtils.setFunctionRuntimeContext(selectFunction, getRuntimeContext());
FunctionUtils.openFunction(selectFunction, parameters);
}

@Override
public void close() throws Exception {
public void close() {
FunctionUtils.closeFunction(selectFunction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public PatternTimeoutFlatSelectAdapter(
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(Configuration parameters) {
super.open(parameters);
FunctionUtils.setFunctionRuntimeContext(flatTimeoutFunction, getRuntimeContext());
FunctionUtils.openFunction(flatTimeoutFunction, parameters);
Expand All @@ -65,9 +65,9 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
//FunctionUtils.closeFunction(flatTimeoutFunction);
FunctionUtils.closeFunction(flatTimeoutFunction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public PatternTimeoutSelectAdapter(
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(Configuration parameters) {
super.open(parameters);
FunctionUtils.setFunctionRuntimeContext(timeoutFunction, getRuntimeContext());
FunctionUtils.openFunction(timeoutFunction, parameters);
}

@Override
public void close() throws Exception {
public void close() {
super.close();
FunctionUtils.closeFunction(timeoutFunction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import io.nop.stream.cep.pattern.Pattern;
import io.nop.stream.cep.pattern.conditions.IterativeCondition;
import io.nop.stream.cep.time.TimerService;
import io.nop.stream.core.common.functions.RuntimeContext;
import io.nop.stream.core.configuration.Configuration;
import io.nop.stream.core.exceptions.StreamRuntimeException;
import io.nop.stream.core.common.functions.RuntimeContext;
import io.nop.stream.core.util.FunctionUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -186,7 +186,7 @@ private boolean isFinalState(ComputationState state) {
* @param cepRuntimeContext runtime context of the enclosing operator
* @param conf The configuration containing the parameters attached to the contract.
*/
public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Exception {
public void open(RuntimeContext cepRuntimeContext, Configuration conf) {
for (State<T> state : getStates()) {
for (StateTransition<T> transition : state.getStateTransitions()) {
IterativeCondition condition = transition.getCondition();
Expand All @@ -199,7 +199,7 @@ public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Ex
/**
* Tear-down method for the NFA.
*/
public void close() throws Exception {
public void close() {
for (State<T> state : getStates()) {
for (StateTransition<T> transition : state.getStateTransitions()) {
IterativeCondition condition = transition.getCondition();
Expand Down Expand Up @@ -235,8 +235,7 @@ public Collection<Map<String, List<T>>> process(
final T event,
final long timestamp,
final AfterMatchSkipStrategy afterMatchSkipStrategy,
final TimerService timerService)
throws Exception {
final TimerService timerService) {
try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) {
return doProcess(
sharedBufferAccessor,
Expand Down Expand Up @@ -264,8 +263,7 @@ public Collection<Map<String, List<T>>> process(
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final long timestamp,
final AfterMatchSkipStrategy afterMatchSkipStrategy)
throws Exception {
final AfterMatchSkipStrategy afterMatchSkipStrategy) {

final List<Map<String, List<T>>> result = new ArrayList<>();
final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
Expand Down Expand Up @@ -352,8 +350,7 @@ private Collection<Map<String, List<T>>> doProcess(
final NFAState nfaState,
final EventWrapper event,
final AfterMatchSkipStrategy afterMatchSkipStrategy,
final TimerService timerService)
throws Exception {
final TimerService timerService) {

final PriorityQueue<ComputationState> newPartialMatches =
new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
Expand Down Expand Up @@ -433,8 +430,7 @@ private void processMatchesAccordingToSkipStrategy(
AfterMatchSkipStrategy afterMatchSkipStrategy,
PriorityQueue<ComputationState> potentialMatches,
PriorityQueue<ComputationState> partialMatches,
List<Map<String, List<T>>> result)
throws Exception {
List<Map<String, List<T>>> result) {

nfaState.getCompletedMatches().addAll(potentialMatches);

Expand Down Expand Up @@ -548,7 +544,7 @@ private class EventWrapper implements AutoCloseable {
this.sharedBufferAccessor = sharedBufferAccessor;
}

EventId getEventId() throws Exception {
EventId getEventId() {
if (eventId == null) {
this.eventId = sharedBufferAccessor.registerEvent(event, timestamp);
}
Expand All @@ -565,7 +561,7 @@ public long getTimestamp() {
}

@Override
public void close() throws Exception {
public void close() {
if (eventId != null) {
sharedBufferAccessor.releaseEvent(eventId);
}
Expand Down Expand Up @@ -615,8 +611,7 @@ private Collection<ComputationState> computeNextStates(
final SharedBufferAccessor<T> sharedBufferAccessor,
final ComputationState computationState,
final EventWrapper event,
final TimerService timerService)
throws Exception {
final TimerService timerService) {

final ConditionContext context =
new ConditionContext(
Expand Down Expand Up @@ -757,8 +752,7 @@ private void addComputationState(
DeweyNumber version,
long startTimestamp,
long previousTimestamp,
EventId startEventId)
throws Exception {
EventId startEventId) {
ComputationState computationState =
ComputationState.createState(
currentState.getName(),
Expand Down Expand Up @@ -859,8 +853,7 @@ private boolean checkFilterCondition(
*/
private Map<String, List<EventId>> extractCurrentMatches(
final SharedBufferAccessor<T> sharedBufferAccessor,
final ComputationState computationState)
throws Exception {
final ComputationState computationState) {
if (computationState.getPreviousBufferEntry() == null) {
return new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public static NoSkipStrategy noSkip() {
public void prune(
Collection<ComputationState> matchesToPrune,
Collection<Map<String, List<EventId>>> matchedResult,
SharedBufferAccessor<?> sharedBufferAccessor)
throws Exception {
SharedBufferAccessor<?> sharedBufferAccessor) {
if (!isSkipStrategy()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public SharedBufferAccessor<V> getAccessor() {
return new SharedBufferAccessor<>(this);
}

void advanceTime(long timestamp) throws Exception {
void advanceTime(long timestamp) {
Iterator<Long> iterator = eventsCount.keys().iterator();
while (iterator.hasNext()) {
Long next = iterator.next();
Expand All @@ -265,7 +265,7 @@ void advanceTime(long timestamp) throws Exception {
}
}

EventId registerEvent(V value, long timestamp) throws Exception {
EventId registerEvent(V value, long timestamp) {
Integer id = eventsCount.get(timestamp);
if (id == null) {
id = 0;
Expand Down Expand Up @@ -319,7 +319,7 @@ void upsertEntry(NodeId nodeId, Lockable<SharedBufferNode> entry) {
*
* @param eventId id of the event
*/
void removeEvent(EventId eventId) throws Exception {
void removeEvent(EventId eventId) {
this.eventsBufferCache.invalidate(eventId);
this.eventsBuffer.remove(eventId);
}
Expand All @@ -329,7 +329,7 @@ void removeEvent(EventId eventId) throws Exception {
*
* @param nodeId id of the event
*/
void removeEntry(NodeId nodeId) throws Exception {
void removeEntry(NodeId nodeId) {
this.entryCache.invalidate(nodeId);
this.entries.remove(nodeId);
}
Expand Down Expand Up @@ -385,7 +385,7 @@ Lockable<V> getEvent(EventId eventId) {
*
* @throws Exception Thrown if the system cannot access the state.
*/
void flushCache() throws Exception {
void flushCache() {
if (!entryCache.asMap().isEmpty()) {
entries.putAll(entryCache.asMap());
entryCache.invalidateAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class SharedBufferAccessor<V> implements AutoCloseable {
* @param timestamp watermark, no earlier events will arrive
* @throws Exception Thrown if the system cannot access the state.
*/
public void advanceTime(long timestamp) throws Exception {
public void advanceTime(long timestamp) {
sharedBuffer.advanceTime(timestamp);
}

Expand All @@ -74,7 +74,7 @@ public void advanceTime(long timestamp) throws Exception {
* @return unique id of that event that should be used when putting entries to the buffer.
* @throws Exception Thrown if the system cannot access the state.
*/
public EventId registerEvent(V value, long timestamp) throws Exception {
public EventId registerEvent(V value, long timestamp) {
return sharedBuffer.registerEvent(value, timestamp);
}

Expand Down Expand Up @@ -252,7 +252,7 @@ public void lockNode(final NodeId node, final DeweyNumber version) {
* @param version dewey number of the (potential) edge that locked the given node
* @throws Exception Thrown if the system cannot access the state.
*/
public void releaseNode(final NodeId node, final DeweyNumber version) throws Exception {
public void releaseNode(final NodeId node, final DeweyNumber version){
// the stack used to detect all nodes that needs to be released.
Stack<NodeId> nodesToExamine = new Stack<>();
Stack<DeweyNumber> versionsToExamine = new Stack<>();
Expand Down Expand Up @@ -314,7 +314,7 @@ private void lockEvent(EventId eventId) {
* @param eventId id of the event
* @throws Exception Thrown if the system cannot access the state.
*/
public void releaseEvent(EventId eventId) throws Exception {
public void releaseEvent(EventId eventId) {
Lockable<V> eventWrapper = sharedBuffer.getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
Expand All @@ -330,7 +330,7 @@ public void releaseEvent(EventId eventId) throws Exception {
*
* @throws Exception Thrown if the system cannot access the state.
*/
public void close() throws Exception {
public void close() {
sharedBuffer.flushCache();
}

Expand Down
Loading

0 comments on commit 7bb761d

Please sign in to comment.