From 7bb761deff60e20a5a0dda364d9eff66d0629f54 Mon Sep 17 00:00:00 2001 From: canonical Date: Sun, 27 Aug 2023 19:31:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0TestPattern=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E3=80=82=E5=88=A0=E9=99=A4=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E7=AD=BE=E5=90=8D=E4=B8=AD=E7=9A=84throws=20?= =?UTF-8?q?Exception=E5=A3=B0=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/nop/api/core/beans/IntRangeBean.java | 7 +- .../io/nop/api/core/beans/LongRangeBean.java | 7 +- nop-stream/nop-stream-cep/pom.xml | 6 + .../java/io/nop/stream/cep/NopCepConfigs.java | 40 ++++ .../SharedBufferCacheConfig.java | 10 + .../adaptors/PatternFlatSelectAdapter.java | 4 +- .../adaptors/PatternSelectAdapter.java | 4 +- .../PatternTimeoutFlatSelectAdapter.java | 6 +- .../adaptors/PatternTimeoutSelectAdapter.java | 4 +- .../main/java/io/nop/stream/cep/nfa/NFA.java | 31 ++- .../aftermatch/AfterMatchSkipStrategy.java | 3 +- .../cep/nfa/sharedbuffer/SharedBuffer.java | 10 +- .../sharedbuffer/SharedBufferAccessor.java | 10 +- .../RichCompositeIterativeCondition.java | 6 +- .../conditions/RichIterativeCondition.java | 6 +- .../test/java/io/nop/stream/cep/Event.java | 34 +++ .../io/nop/stream/cep/MockRuntimeContext.java | 6 + .../test/java/io/nop/stream/cep/SubEvent.java | 18 ++ .../java/io/nop/stream/cep/TestPattern.java | 91 ++++++++ .../functions/AbstractRichFunction.java | 4 +- .../core/common/functions/RichFunction.java | 4 +- .../core/common/state/KeyedStateStore.java | 88 ++++---- .../stream/core/common/state/MapState.java | 20 +- .../state/simple/SimpleKeyedStateStore.java | 198 ++++++++++++++++++ .../nop/stream/core/util/FunctionUtils.java | 4 +- .../_vfs/nop/schema/stream/pattern.xdef | 16 ++ .../java/io/nop/xlang/xdef/XDefConstants.java | 4 + .../xdef/domain/SimpleStdDomainHandlers.java | 48 +++++ .../xlang/xdef/domain/StdDomainRegistry.java | 3 + 29 files changed, 582 insertions(+), 110 deletions(-) create mode 100644 nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/NopCepConfigs.java create mode 100644 nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/Event.java create mode 100644 nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/MockRuntimeContext.java create mode 100644 nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/SubEvent.java create mode 100644 nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/TestPattern.java create mode 100644 nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/simple/SimpleKeyedStateStore.java create mode 100644 nop-xdefs/src/main/resources/_vfs/nop/schema/stream/pattern.xdef diff --git a/nop-api-core/src/main/java/io/nop/api/core/beans/IntRangeBean.java b/nop-api-core/src/main/java/io/nop/api/core/beans/IntRangeBean.java index 6db1712ea..00cd8a174 100644 --- a/nop-api-core/src/main/java/io/nop/api/core/beans/IntRangeBean.java +++ b/nop-api-core/src/main/java/io/nop/api/core/beans/IntRangeBean.java @@ -51,8 +51,11 @@ public static IntRangeBean parse(String str) { return null; int pos = str.indexOf(SEPARATOR); - if (pos < 0) - throw new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str); + if (pos < 0) { + Integer start = ConvertHelper.stringToInt(str, + err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str)); + return of(start, 1); + } Integer start = ConvertHelper.stringToInt(str.substring(0, pos), err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str)); diff --git a/nop-api-core/src/main/java/io/nop/api/core/beans/LongRangeBean.java b/nop-api-core/src/main/java/io/nop/api/core/beans/LongRangeBean.java index 0e10b26e4..f5ec31122 100644 --- a/nop-api-core/src/main/java/io/nop/api/core/beans/LongRangeBean.java +++ b/nop-api-core/src/main/java/io/nop/api/core/beans/LongRangeBean.java @@ -51,8 +51,11 @@ public static LongRangeBean parse(String str) { return null; int pos = str.indexOf(SEPARATOR); - if (pos < 0) - throw new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str); + if (pos < 0) { + Long start = ConvertHelper.stringToLong(str, + err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str)); + return of(start, 1); + } Long start = ConvertHelper.stringToLong(str.substring(0, pos), err -> new NopException(ApiErrors.ERR_INVALID_OFFSET_LIMIT_STRING).param(ApiErrors.ARG_VALUE, str)); diff --git a/nop-stream/nop-stream-cep/pom.xml b/nop-stream/nop-stream-cep/pom.xml index 7ed705e67..598c173ab 100644 --- a/nop-stream/nop-stream-cep/pom.xml +++ b/nop-stream/nop-stream-cep/pom.xml @@ -16,6 +16,12 @@ io.github.entropy-cloud nop-stream-core + + + org.junit.jupiter + junit-jupiter + test + \ No newline at end of file diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/NopCepConfigs.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/NopCepConfigs.java new file mode 100644 index 000000000..613240c4a --- /dev/null +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/NopCepConfigs.java @@ -0,0 +1,40 @@ +package io.nop.stream.cep; + +import io.nop.api.core.annotations.core.Description; +import io.nop.api.core.config.IConfigReference; +import io.nop.api.core.util.SourceLocation; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import static io.nop.api.core.config.AppConfig.varRef; + +public interface NopCepConfigs { + SourceLocation s_loc = SourceLocation.fromClass(NopCepConfigs.class); + + String COMMON_HINT = + "And it could accelerate the CEP operate process " + + "speed and limit the capacity of cache in pure memory. Note: It's only effective to " + + "limit usage of memory when 'state.backend' was set as 'rocksdb', which would " + + "transport the elements exceeded the number of the cache into the rocksdb state " + + "storage instead of memory state storage."; + + @Description("The Config option to set the maximum element number the " + + "eventsBufferCache of SharedBuffer could hold. " + + COMMON_HINT) + IConfigReference CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS = + varRef(s_loc, "nop.cep.pipeline.sharedbuffer.cache.event-slots", Integer.class, 1024); + + @Description("The Config option to set the maximum element number the entryCache" + + " of SharedBuffer could hold. And it could accelerate the" + + " CEP operate process speed with state." + + COMMON_HINT) + IConfigReference CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS = + varRef(s_loc, "nop.cep.pipeline.sharedbuffer.cache.entry-slots", Integer.class, 1024); + + @Description("The interval to log the information of cache state statistics in " + + "CEP operator.") + IConfigReference CEP_CACHE_STATISTICS_INTERVAL = + varRef(s_loc, "nop.cep.pipeline.sharedbuffer.cache.statistics-interval", Duration.class, Duration.of(30, ChronoUnit.MINUTES)); + +} diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/configuration/SharedBufferCacheConfig.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/configuration/SharedBufferCacheConfig.java index 7bd570536..8296c153f 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/configuration/SharedBufferCacheConfig.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/configuration/SharedBufferCacheConfig.java @@ -24,12 +24,22 @@ import java.io.Serializable; import java.time.Duration; +import static io.nop.stream.cep.NopCepConfigs.CEP_CACHE_STATISTICS_INTERVAL; +import static io.nop.stream.cep.NopCepConfigs.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS; +import static io.nop.stream.cep.NopCepConfigs.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS; + @DataBean public final class SharedBufferCacheConfig implements Serializable { private final int eventsBufferCacheSlots; private final int entryCacheSlots; private final Duration cacheStatisticsInterval; + public SharedBufferCacheConfig() { + this(CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS.get(), + CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS.get(), + CEP_CACHE_STATISTICS_INTERVAL.get()); + } + public int getEventsBufferCacheSlots() { return eventsBufferCacheSlots; } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternFlatSelectAdapter.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternFlatSelectAdapter.java index 90c48df1c..d62cd15c1 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternFlatSelectAdapter.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternFlatSelectAdapter.java @@ -42,13 +42,13 @@ public PatternFlatSelectAdapter(final PatternFlatSelectFunction flatSel } @Override - public void open(final Configuration parameters) throws Exception { + public void open(final Configuration parameters) { FunctionUtils.setFunctionRuntimeContext(flatSelectFunction, getRuntimeContext()); FunctionUtils.openFunction(flatSelectFunction, parameters); } @Override - public void close() throws Exception { + public void close() { FunctionUtils.closeFunction(flatSelectFunction); } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternSelectAdapter.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternSelectAdapter.java index 5fa7a69c8..c459cff50 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternSelectAdapter.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternSelectAdapter.java @@ -42,13 +42,13 @@ public PatternSelectAdapter(final PatternSelectFunction selectFunction) } @Override - public void open(final Configuration parameters) throws Exception { + public void open(final Configuration parameters) { FunctionUtils.setFunctionRuntimeContext(selectFunction, getRuntimeContext()); FunctionUtils.openFunction(selectFunction, parameters); } @Override - public void close() throws Exception { + public void close() { FunctionUtils.closeFunction(selectFunction); } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java index 44359ee77..dc20b65aa 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java @@ -54,7 +54,7 @@ public PatternTimeoutFlatSelectAdapter( } @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { super.open(parameters); FunctionUtils.setFunctionRuntimeContext(flatTimeoutFunction, getRuntimeContext()); FunctionUtils.openFunction(flatTimeoutFunction, parameters); @@ -65,9 +65,9 @@ public void open(Configuration parameters) throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); - //FunctionUtils.closeFunction(flatTimeoutFunction); + FunctionUtils.closeFunction(flatTimeoutFunction); } @Override diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutSelectAdapter.java index c4bf314ba..aeb810399 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutSelectAdapter.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/functions/adaptors/PatternTimeoutSelectAdapter.java @@ -53,14 +53,14 @@ public PatternTimeoutSelectAdapter( } @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { super.open(parameters); FunctionUtils.setFunctionRuntimeContext(timeoutFunction, getRuntimeContext()); FunctionUtils.openFunction(timeoutFunction, parameters); } @Override - public void close() throws Exception { + public void close() { super.close(); FunctionUtils.closeFunction(timeoutFunction); } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/NFA.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/NFA.java index b34f3e11d..8951e3342 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/NFA.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/NFA.java @@ -31,9 +31,9 @@ import io.nop.stream.cep.pattern.Pattern; import io.nop.stream.cep.pattern.conditions.IterativeCondition; import io.nop.stream.cep.time.TimerService; +import io.nop.stream.core.common.functions.RuntimeContext; import io.nop.stream.core.configuration.Configuration; import io.nop.stream.core.exceptions.StreamRuntimeException; -import io.nop.stream.core.common.functions.RuntimeContext; import io.nop.stream.core.util.FunctionUtils; import java.util.ArrayList; @@ -186,7 +186,7 @@ private boolean isFinalState(ComputationState state) { * @param cepRuntimeContext runtime context of the enclosing operator * @param conf The configuration containing the parameters attached to the contract. */ - public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Exception { + public void open(RuntimeContext cepRuntimeContext, Configuration conf) { for (State state : getStates()) { for (StateTransition transition : state.getStateTransitions()) { IterativeCondition condition = transition.getCondition(); @@ -199,7 +199,7 @@ public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Ex /** * Tear-down method for the NFA. */ - public void close() throws Exception { + public void close() { for (State state : getStates()) { for (StateTransition transition : state.getStateTransitions()) { IterativeCondition condition = transition.getCondition(); @@ -235,8 +235,7 @@ public Collection>> process( final T event, final long timestamp, final AfterMatchSkipStrategy afterMatchSkipStrategy, - final TimerService timerService) - throws Exception { + final TimerService timerService) { try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) { return doProcess( sharedBufferAccessor, @@ -264,8 +263,7 @@ public Collection>> process( final SharedBufferAccessor sharedBufferAccessor, final NFAState nfaState, final long timestamp, - final AfterMatchSkipStrategy afterMatchSkipStrategy) - throws Exception { + final AfterMatchSkipStrategy afterMatchSkipStrategy) { final List>> result = new ArrayList<>(); final Collection>, Long>> timeoutResult = new ArrayList<>(); @@ -352,8 +350,7 @@ private Collection>> doProcess( final NFAState nfaState, final EventWrapper event, final AfterMatchSkipStrategy afterMatchSkipStrategy, - final TimerService timerService) - throws Exception { + final TimerService timerService) { final PriorityQueue newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR); @@ -433,8 +430,7 @@ private void processMatchesAccordingToSkipStrategy( AfterMatchSkipStrategy afterMatchSkipStrategy, PriorityQueue potentialMatches, PriorityQueue partialMatches, - List>> result) - throws Exception { + List>> result) { nfaState.getCompletedMatches().addAll(potentialMatches); @@ -548,7 +544,7 @@ private class EventWrapper implements AutoCloseable { this.sharedBufferAccessor = sharedBufferAccessor; } - EventId getEventId() throws Exception { + EventId getEventId() { if (eventId == null) { this.eventId = sharedBufferAccessor.registerEvent(event, timestamp); } @@ -565,7 +561,7 @@ public long getTimestamp() { } @Override - public void close() throws Exception { + public void close() { if (eventId != null) { sharedBufferAccessor.releaseEvent(eventId); } @@ -615,8 +611,7 @@ private Collection computeNextStates( final SharedBufferAccessor sharedBufferAccessor, final ComputationState computationState, final EventWrapper event, - final TimerService timerService) - throws Exception { + final TimerService timerService) { final ConditionContext context = new ConditionContext( @@ -757,8 +752,7 @@ private void addComputationState( DeweyNumber version, long startTimestamp, long previousTimestamp, - EventId startEventId) - throws Exception { + EventId startEventId) { ComputationState computationState = ComputationState.createState( currentState.getName(), @@ -859,8 +853,7 @@ private boolean checkFilterCondition( */ private Map> extractCurrentMatches( final SharedBufferAccessor sharedBufferAccessor, - final ComputationState computationState) - throws Exception { + final ComputationState computationState) { if (computationState.getPreviousBufferEntry() == null) { return new HashMap<>(); } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/aftermatch/AfterMatchSkipStrategy.java index 245e408ae..8c172fe06 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/aftermatch/AfterMatchSkipStrategy.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/aftermatch/AfterMatchSkipStrategy.java @@ -101,8 +101,7 @@ public static NoSkipStrategy noSkip() { public void prune( Collection matchesToPrune, Collection>> matchedResult, - SharedBufferAccessor sharedBufferAccessor) - throws Exception { + SharedBufferAccessor sharedBufferAccessor) { if (!isSkipStrategy()) { return; } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBuffer.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBuffer.java index 8c9737c0b..bba01492a 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBuffer.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBuffer.java @@ -255,7 +255,7 @@ public SharedBufferAccessor getAccessor() { return new SharedBufferAccessor<>(this); } - void advanceTime(long timestamp) throws Exception { + void advanceTime(long timestamp) { Iterator iterator = eventsCount.keys().iterator(); while (iterator.hasNext()) { Long next = iterator.next(); @@ -265,7 +265,7 @@ void advanceTime(long timestamp) throws Exception { } } - EventId registerEvent(V value, long timestamp) throws Exception { + EventId registerEvent(V value, long timestamp) { Integer id = eventsCount.get(timestamp); if (id == null) { id = 0; @@ -319,7 +319,7 @@ void upsertEntry(NodeId nodeId, Lockable entry) { * * @param eventId id of the event */ - void removeEvent(EventId eventId) throws Exception { + void removeEvent(EventId eventId) { this.eventsBufferCache.invalidate(eventId); this.eventsBuffer.remove(eventId); } @@ -329,7 +329,7 @@ void removeEvent(EventId eventId) throws Exception { * * @param nodeId id of the event */ - void removeEntry(NodeId nodeId) throws Exception { + void removeEntry(NodeId nodeId) { this.entryCache.invalidate(nodeId); this.entries.remove(nodeId); } @@ -385,7 +385,7 @@ Lockable getEvent(EventId eventId) { * * @throws Exception Thrown if the system cannot access the state. */ - void flushCache() throws Exception { + void flushCache() { if (!entryCache.asMap().isEmpty()) { entries.putAll(entryCache.asMap()); entryCache.invalidateAll(); diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBufferAccessor.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBufferAccessor.java index a88cdf218..9cda628be 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBufferAccessor.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBufferAccessor.java @@ -58,7 +58,7 @@ public class SharedBufferAccessor implements AutoCloseable { * @param timestamp watermark, no earlier events will arrive * @throws Exception Thrown if the system cannot access the state. */ - public void advanceTime(long timestamp) throws Exception { + public void advanceTime(long timestamp) { sharedBuffer.advanceTime(timestamp); } @@ -74,7 +74,7 @@ public void advanceTime(long timestamp) throws Exception { * @return unique id of that event that should be used when putting entries to the buffer. * @throws Exception Thrown if the system cannot access the state. */ - public EventId registerEvent(V value, long timestamp) throws Exception { + public EventId registerEvent(V value, long timestamp) { return sharedBuffer.registerEvent(value, timestamp); } @@ -252,7 +252,7 @@ public void lockNode(final NodeId node, final DeweyNumber version) { * @param version dewey number of the (potential) edge that locked the given node * @throws Exception Thrown if the system cannot access the state. */ - public void releaseNode(final NodeId node, final DeweyNumber version) throws Exception { + public void releaseNode(final NodeId node, final DeweyNumber version){ // the stack used to detect all nodes that needs to be released. Stack nodesToExamine = new Stack<>(); Stack versionsToExamine = new Stack<>(); @@ -314,7 +314,7 @@ private void lockEvent(EventId eventId) { * @param eventId id of the event * @throws Exception Thrown if the system cannot access the state. */ - public void releaseEvent(EventId eventId) throws Exception { + public void releaseEvent(EventId eventId) { Lockable eventWrapper = sharedBuffer.getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { @@ -330,7 +330,7 @@ public void releaseEvent(EventId eventId) throws Exception { * * @throws Exception Thrown if the system cannot access the state. */ - public void close() throws Exception { + public void close() { sharedBuffer.flushCache(); } diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichCompositeIterativeCondition.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichCompositeIterativeCondition.java index 9b7e94b05..9a02be681 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichCompositeIterativeCondition.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichCompositeIterativeCondition.java @@ -19,8 +19,8 @@ package io.nop.stream.cep.pattern.conditions; import com.google.common.base.Preconditions; -import io.nop.stream.core.configuration.Configuration; import io.nop.stream.core.common.functions.RuntimeContext; +import io.nop.stream.core.configuration.Configuration; import io.nop.stream.core.util.FunctionUtils; /** @@ -58,7 +58,7 @@ public void setRuntimeContext(RuntimeContext t) { } @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { super.open(parameters); for (IterativeCondition nestedCondition : nestedConditions) { FunctionUtils.openFunction(nestedCondition, parameters); @@ -66,7 +66,7 @@ public void open(Configuration parameters) throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); for (IterativeCondition nestedCondition : nestedConditions) { FunctionUtils.closeFunction(nestedCondition); diff --git a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichIterativeCondition.java b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichIterativeCondition.java index 62de1f884..13ae6b251 100644 --- a/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichIterativeCondition.java +++ b/nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/pattern/conditions/RichIterativeCondition.java @@ -19,10 +19,10 @@ package io.nop.stream.cep.pattern.conditions; import com.google.common.base.Preconditions; -import io.nop.stream.core.configuration.Configuration; import io.nop.stream.core.common.functions.IterationRuntimeContext; import io.nop.stream.core.common.functions.RichFunction; import io.nop.stream.core.common.functions.RuntimeContext; +import io.nop.stream.core.configuration.Configuration; /** * Rich variant of the {@link IterativeCondition}. As a {@link RichFunction}, it gives access to the @@ -67,10 +67,10 @@ public IterationRuntimeContext getIterationRuntimeContext() { // -------------------------------------------------------------------------------------------- @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { } @Override - public void close() throws Exception { + public void close() { } } diff --git a/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/Event.java b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/Event.java new file mode 100644 index 000000000..657f4385d --- /dev/null +++ b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/Event.java @@ -0,0 +1,34 @@ +package io.nop.stream.cep; + +public class Event { + private int id; + private String name; + + public Event(int id, String name) { + this.id = id; + this.name = name; + } + + public String toString() { + return "Event[id=" + id + ",name=" + name + "]"; + } + + public Event() { + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} \ No newline at end of file diff --git a/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/MockRuntimeContext.java b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/MockRuntimeContext.java new file mode 100644 index 000000000..06c34f5c7 --- /dev/null +++ b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/MockRuntimeContext.java @@ -0,0 +1,6 @@ +package io.nop.stream.cep; + +import io.nop.stream.core.common.functions.RuntimeContext; + +public class MockRuntimeContext implements RuntimeContext { +} diff --git a/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/SubEvent.java b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/SubEvent.java new file mode 100644 index 000000000..46e292d37 --- /dev/null +++ b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/SubEvent.java @@ -0,0 +1,18 @@ +package io.nop.stream.cep; + +public class SubEvent extends Event { + private double volume; + + public SubEvent(int id, String name, double volume) { + super(id, name); + this.volume = volume; + } + + public double getVolume() { + return volume; + } + + public void setVolume(double volume) { + this.volume = volume; + } +} diff --git a/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/TestPattern.java b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/TestPattern.java new file mode 100644 index 000000000..ef9356e5b --- /dev/null +++ b/nop-stream/nop-stream-cep/src/test/java/io/nop/stream/cep/TestPattern.java @@ -0,0 +1,91 @@ +package io.nop.stream.cep; + +import io.nop.api.core.time.CoreMetrics; +import io.nop.commons.tuple.Tuple2; +import io.nop.stream.cep.configuration.SharedBufferCacheConfig; +import io.nop.stream.cep.nfa.NFA; +import io.nop.stream.cep.nfa.NFAState; +import io.nop.stream.cep.nfa.aftermatch.AfterMatchSkipStrategy; +import io.nop.stream.cep.nfa.compiler.NFACompiler; +import io.nop.stream.cep.nfa.sharedbuffer.SharedBuffer; +import io.nop.stream.cep.nfa.sharedbuffer.SharedBufferAccessor; +import io.nop.stream.cep.pattern.Pattern; +import io.nop.stream.cep.pattern.conditions.SimpleCondition; +import io.nop.stream.core.common.state.simple.SimpleKeyedStateStore; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class TestPattern { + @Test + public void testNFA() { + Pattern pattern = Pattern.begin("start") + .where(SimpleCondition.of(event -> event.getId() == 42)) + .next("middle") + .subtype(SubEvent.class) + .where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0)) + .followedBy("end") + .where(SimpleCondition.of(event -> event.getName().equals("end"))); + + NFA nfa = NFACompiler.compileFactory(pattern, true).createNFA(); + nfa.open(new MockRuntimeContext(), null); + + NFAState nfaState = nfa.createInitialNFAState(); + + SharedBuffer partialMatches = new SharedBuffer<>(new SimpleKeyedStateStore(), null, new SharedBufferCacheConfig()); + + List events = getData(); + for (Event event : events) { + Collection>> matches = consumeEvent(nfa, partialMatches, nfaState, event, CoreMetrics.currentTimeMillis()); + System.out.println(matches); + + if (nfaState.isStateChanged()) { + nfaState.resetStateChanged(); + nfaState.resetNewStartPartialMatch(); + } + } + nfa.close(); + } + + Collection>> consumeEvent(NFA nfa, SharedBuffer partialMatches, + NFAState nfaState, Event event, long timestamp) { + try (SharedBufferAccessor sharedBufferAccessor = partialMatches.getAccessor()) { + Tuple2< + Collection>>, + Collection>, Long>>> + pendingMatchesAndTimeout = + nfa.advanceTime( + sharedBufferAccessor, + nfaState, + timestamp, + AfterMatchSkipStrategy.noSkip()); + + Collection>> pendingMatches = pendingMatchesAndTimeout.f0; + + Collection>> matchedPatterns = + nfa.process( + sharedBufferAccessor, + nfaState, + event, + timestamp, + AfterMatchSkipStrategy.noSkip(), + null); + matchedPatterns.addAll(pendingMatches); + return matchedPatterns; + } + } + + private List getData() { + List events = new ArrayList<>(); + events.add(new Event(1, "a1")); + events.add(new Event(32, "a32")); + events.add(new Event(42, "a42")); + events.add(new SubEvent(44, "a44", 12)); + events.add(new Event(46, "end")); + events.add(new Event(47, "other")); + return events; + } +} diff --git a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/AbstractRichFunction.java b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/AbstractRichFunction.java index c08c0e853..a02726823 100644 --- a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/AbstractRichFunction.java +++ b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/AbstractRichFunction.java @@ -68,10 +68,10 @@ public IterationRuntimeContext getIterationRuntimeContext() { // -------------------------------------------------------------------------------------------- @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { } @Override - public void close() throws Exception { + public void close() { } } diff --git a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/RichFunction.java b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/RichFunction.java index 2a3d0407f..605bf8c92 100644 --- a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/RichFunction.java +++ b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/RichFunction.java @@ -61,7 +61,7 @@ public interface RichFunction extends StreamFunction { * decide whether to retry the task execution. * @see org.apache.flink.configuration.Configuration */ - void open(Configuration parameters) throws Exception; + void open(Configuration parameters); /** * Tear-down method for the user code. It is called after the last call to the main working @@ -74,7 +74,7 @@ public interface RichFunction extends StreamFunction { * When the runtime catches an exception, it aborts the task and lets the fail-over logic * decide whether to retry the task execution. */ - void close() throws Exception; + void close() ; // ------------------------------------------------------------------------ // Runtime context diff --git a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/KeyedStateStore.java b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/KeyedStateStore.java index ee2874c21..3d66e8bf8 100644 --- a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/KeyedStateStore.java +++ b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/KeyedStateStore.java @@ -21,50 +21,50 @@ /** This interface contains methods for registering keyed state with a managed store. */ public interface KeyedStateStore { -// -// /** -// * Gets a handle to the system's key/value state. The key/value state is only accessible if the -// * function is executed on a KeyedStream. On each access, the state exposes the value for the -// * key of the element currently processed by the function. Each function may have multiple -// * partitioned states, addressed with different names. -// * -// *

Because the scope of each value is the key of the currently processed element, and the -// * elements are distributed by the Flink runtime, the system can transparently scale out and -// * redistribute the state and KeyedStream. -// * -// *

The following code example shows how to implement a continuous counter that counts how -// * many times elements of a certain key occur, and emits an updated count for that element on -// * each occurrence. -// * -// *

{@code
-//     * DataStream stream = ...;
-//     * KeyedStream keyedStream = stream.keyBy("id");
-//     *
-//     * keyedStream.map(new RichMapFunction>() {
-//     *
-//     *     private ValueState count;
-//     *
-//     *     public void open(Configuration cfg) {
-//     *         state = getRuntimeContext().getState(
-//     *                 new ValueStateDescriptor("count", LongSerializer.INSTANCE, 0L));
-//     *     }
-//     *
-//     *     public Tuple2 map(MyType value) {
-//     *         long count = state.value() + 1;
-//     *         state.update(value);
-//     *         return new Tuple2<>(value, count);
-//     *     }
-//     * });
-//     * }
-// * -// * @param stateProperties The descriptor defining the properties of the stats. -// * @param The type of value stored in the state. -// * @return The partitioned state object. -// * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the -// * function (function is not part of a KeyedStream). -// */ -// ValueState getState(ValueStateDescriptor stateProperties); -// + + /** + * Gets a handle to the system's key/value state. The key/value state is only accessible if the + * function is executed on a KeyedStream. On each access, the state exposes the value for the + * key of the element currently processed by the function. Each function may have multiple + * partitioned states, addressed with different names. + * + *

Because the scope of each value is the key of the currently processed element, and the + * elements are distributed by the Flink runtime, the system can transparently scale out and + * redistribute the state and KeyedStream. + * + *

The following code example shows how to implement a continuous counter that counts how + * many times elements of a certain key occur, and emits an updated count for that element on + * each occurrence. + * + *

{@code
+     * DataStream stream = ...;
+     * KeyedStream keyedStream = stream.keyBy("id");
+     *
+     * keyedStream.map(new RichMapFunction>() {
+     *
+     *     private ValueState count;
+     *
+     *     public void open(Configuration cfg) {
+     *         state = getRuntimeContext().getState(
+     *                 new ValueStateDescriptor("count", LongSerializer.INSTANCE, 0L));
+     *     }
+     *
+     *     public Tuple2 map(MyType value) {
+     *         long count = state.value() + 1;
+     *         state.update(value);
+     *         return new Tuple2<>(value, count);
+     *     }
+     * });
+     * }
+ * + * @param stateProperties The descriptor defining the properties of the stats. + * @param The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + ValueState getState(ValueStateDescriptor stateProperties); + // /** // * Gets a handle to the system's key/value list state. This state is similar to the state // * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that holds diff --git a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/MapState.java b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/MapState.java index fd31171ae..6f87a99e6 100644 --- a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/MapState.java +++ b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/MapState.java @@ -45,7 +45,7 @@ public interface MapState extends State { * @return The value of the mapping with the given key * @throws Exception Thrown if the system cannot access the state. */ - UV get(UK key) throws Exception; + UV get(UK key) ; /** * Associates a new value with the given key. @@ -54,7 +54,7 @@ public interface MapState extends State { * @param value The new value of the mapping * @throws Exception Thrown if the system cannot access the state. */ - void put(UK key, UV value) throws Exception; + void put(UK key, UV value) ; /** * Copies all of the mappings from the given map into the state. @@ -62,7 +62,7 @@ public interface MapState extends State { * @param map The mappings to be stored in this state * @throws Exception Thrown if the system cannot access the state. */ - void putAll(Map map) throws Exception; + void putAll(Map map) ; /** * Deletes the mapping of the given key. @@ -70,7 +70,7 @@ public interface MapState extends State { * @param key The key of the mapping * @throws Exception Thrown if the system cannot access the state. */ - void remove(UK key) throws Exception; + void remove(UK key); /** * Returns whether there exists the given mapping. @@ -79,7 +79,7 @@ public interface MapState extends State { * @return True if there exists a mapping whose key equals to the given key * @throws Exception Thrown if the system cannot access the state. */ - boolean contains(UK key) throws Exception; + boolean contains(UK key); /** * Returns all the mappings in the state. @@ -87,7 +87,7 @@ public interface MapState extends State { * @return An iterable view of all the key-value pairs in the state. * @throws Exception Thrown if the system cannot access the state. */ - Iterable> entries() throws Exception; + Iterable> entries() ; /** * Returns all the keys in the state. @@ -95,7 +95,7 @@ public interface MapState extends State { * @return An iterable view of all the keys in the state. * @throws Exception Thrown if the system cannot access the state. */ - Iterable keys() throws Exception; + Iterable keys(); /** * Returns all the values in the state. @@ -103,7 +103,7 @@ public interface MapState extends State { * @return An iterable view of all the values in the state. * @throws Exception Thrown if the system cannot access the state. */ - Iterable values() throws Exception; + Iterable values() ; /** * Iterates over all the mappings in the state. @@ -111,7 +111,7 @@ public interface MapState extends State { * @return An iterator over all the mappings in the state * @throws Exception Thrown if the system cannot access the state. */ - Iterator> iterator() throws Exception; + Iterator> iterator() ; /** * Returns true if this state contains no key-value mappings, otherwise false. @@ -119,5 +119,5 @@ public interface MapState extends State { * @return True if this state contains no key-value mappings, otherwise false. * @throws Exception Thrown if the system cannot access the state. */ - boolean isEmpty() throws Exception; + boolean isEmpty() ; } diff --git a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/simple/SimpleKeyedStateStore.java b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/simple/SimpleKeyedStateStore.java new file mode 100644 index 000000000..667775811 --- /dev/null +++ b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/state/simple/SimpleKeyedStateStore.java @@ -0,0 +1,198 @@ +package io.nop.stream.core.common.state.simple; + +import io.nop.stream.core.common.state.KeyedStateStore; +import io.nop.stream.core.common.state.MapState; +import io.nop.stream.core.common.state.MapStateDescriptor; +import io.nop.stream.core.common.state.ValueState; +import io.nop.stream.core.common.state.ValueStateDescriptor; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class SimpleKeyedStateStore implements KeyedStateStore { + + private long stateWrites = 0; + private long stateReads = 0; + + @Override + public ValueState getState(ValueStateDescriptor stateProperties) { + return new ValueState() { + + private T value; + + @Override + public T value() throws IOException { + stateReads++; + return value; + } + + @Override + public void update(T value) throws IOException { + stateWrites++; + this.value = value; + } + + @Override + public void clear() { + this.value = null; + } + }; + } +// +// @Override +// public ListState getListState(ListStateDescriptor stateProperties) { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public AggregatingState getAggregatingState( +// AggregatingStateDescriptor stateProperties) { +// throw new UnsupportedOperationException(); +// } + + protected Map newMap() { + return new HashMap<>(); + } + + @Override + public MapState getMapState(MapStateDescriptor stateProperties) { + return new MapState<>() { + + private Map values; + + private Map getOrSetMap() { + if (values == null) { + this.values = newMap(); + } + return values; + } + + @Override + public UV get(UK key) { + stateReads++; + if (values == null) { + return null; + } + + return values.get(key); + } + + @Override + public void put(UK key, UV value) { + stateWrites++; + getOrSetMap().put(key, value); + } + + @Override + public void putAll(Map map) { + stateWrites++; + getOrSetMap().putAll(map); + } + + @Override + public void remove(UK key) { + if (values == null) { + return; + } + + stateWrites++; + values.remove(key); + } + + @Override + public boolean contains(UK key) { + if (values == null) { + return false; + } + + stateReads++; + return values.containsKey(key); + } + + @Override + public Iterable> entries() { + if (values == null) { + return Collections.emptyList(); + } + + return () -> new CountingIterator<>(values.entrySet().iterator()); + } + + @Override + public Iterable keys() { + if (values == null) { + return Collections.emptyList(); + } + + return () -> new CountingIterator<>(values.keySet().iterator()); + } + + @Override + public Iterable values() { + if (values == null) { + return Collections.emptyList(); + } + + return () -> new CountingIterator<>(values.values().iterator()); + } + + @Override + public Iterator> iterator() { + if (values == null) { + return Collections.emptyIterator(); + } + + return new CountingIterator<>(values.entrySet().iterator()); + } + + @Override + public boolean isEmpty() { + if (values == null) { + return true; + } + + return values.isEmpty(); + } + + @Override + public void clear() { + stateWrites++; + this.values = null; + } + }; + } + + private class CountingIterator implements Iterator { + + private final Iterator iterator; + + CountingIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + stateReads++; + return iterator.next(); + } + + @Override + public void remove() { + stateWrites++; + iterator.remove(); + } + } +} \ No newline at end of file diff --git a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/util/FunctionUtils.java b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/util/FunctionUtils.java index 17962fd09..f41aa59a3 100644 --- a/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/util/FunctionUtils.java +++ b/nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/util/FunctionUtils.java @@ -30,14 +30,14 @@ @Internal public final class FunctionUtils { - public static void openFunction(StreamFunction function, Configuration parameters) throws Exception { + public static void openFunction(StreamFunction function, Configuration parameters){ if (function instanceof RichFunction) { RichFunction richFunction = (RichFunction) function; richFunction.open(parameters); } } - public static void closeFunction(StreamFunction function) throws Exception { + public static void closeFunction(StreamFunction function) { if (function instanceof RichFunction) { RichFunction richFunction = (RichFunction) function; richFunction.close(); diff --git a/nop-xdefs/src/main/resources/_vfs/nop/schema/stream/pattern.xdef b/nop-xdefs/src/main/resources/_vfs/nop/schema/stream/pattern.xdef new file mode 100644 index 000000000..b660592c9 --- /dev/null +++ b/nop-xdefs/src/main/resources/_vfs/nop/schema/stream/pattern.xdef @@ -0,0 +1,16 @@ + + + + + + + \ No newline at end of file diff --git a/nop-xlang/src/main/java/io/nop/xlang/xdef/XDefConstants.java b/nop-xlang/src/main/java/io/nop/xlang/xdef/XDefConstants.java index b3c36a38a..632b75106 100644 --- a/nop-xlang/src/main/java/io/nop/xlang/xdef/XDefConstants.java +++ b/nop-xlang/src/main/java/io/nop/xlang/xdef/XDefConstants.java @@ -73,6 +73,10 @@ public interface XDefConstants { String STD_DOMAIN_CELL_POS = "cell-pos"; String STD_DOMAIN_CELL_RANGE = "cell-range"; + String STD_DOMAIN_INT_RANGE = "int-range"; + + String STD_DOMAIN_LONG_RANGE = "long-range"; + String STD_DOMAIN_XJSON = "xjson"; String STD_DOMAIN_XJSON_LIST = "xjson-list"; diff --git a/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/SimpleStdDomainHandlers.java b/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/SimpleStdDomainHandlers.java index 1675d511c..47b812b1b 100644 --- a/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/SimpleStdDomainHandlers.java +++ b/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/SimpleStdDomainHandlers.java @@ -8,6 +8,8 @@ package io.nop.xlang.xdef.domain; import io.nop.api.core.beans.FieldSelectionBean; +import io.nop.api.core.beans.IntRangeBean; +import io.nop.api.core.beans.LongRangeBean; import io.nop.api.core.beans.query.OrderFieldBean; import io.nop.api.core.convert.ConvertHelper; import io.nop.api.core.exceptions.NopException; @@ -348,6 +350,52 @@ protected boolean isValid(String text) { } } + public static class IntRangeType extends SimpleStdDomainHandler { + @Override + public String getName() { + return XDefConstants.STD_DOMAIN_INT_RANGE; + } + + @Override + public IGenericType getGenericType(boolean mandatory, IStdDomainOptions options) { + return ReflectionManager.instance().buildRawType(IntRangeBean.class); + } + + @Override + public boolean isFixedType() { + return true; + } + + @Override + public Object parseProp(IStdDomainOptions options, SourceLocation loc, String propName, Object value, + XLangCompileTool cp) { + return IntRangeBean.parse(value.toString()); + } + } + + public static class LongRangeType extends SimpleStdDomainHandler { + @Override + public String getName() { + return XDefConstants.STD_DOMAIN_INT_RANGE; + } + + @Override + public IGenericType getGenericType(boolean mandatory, IStdDomainOptions options) { + return ReflectionManager.instance().buildRawType(LongRangeBean.class); + } + + @Override + public boolean isFixedType() { + return true; + } + + @Override + public Object parseProp(IStdDomainOptions options, SourceLocation loc, String propName, Object value, + XLangCompileTool cp) { + return LongRangeBean.parse(value.toString()); + } + } + public static abstract class AbstractStringSetType extends SimpleStdDomainHandler { @Override public IGenericType getGenericType(boolean mandatory, IStdDomainOptions options) { diff --git a/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/StdDomainRegistry.java b/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/StdDomainRegistry.java index 617c8cb3f..f9fb9e0b3 100644 --- a/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/StdDomainRegistry.java +++ b/nop-xlang/src/main/java/io/nop/xlang/xdef/domain/StdDomainRegistry.java @@ -112,6 +112,9 @@ private void registerDefaults() { registerStdDomainHandler(new SimpleStdDomainHandlers.BooleanOrNumberType()); registerStdDomainHandler(new SimpleStdDomainHandlers.BooleanOrStringType()); + registerStdDomainHandler(new SimpleStdDomainHandlers.IntRangeType()); + registerStdDomainHandler(new SimpleStdDomainHandlers.LongRangeType()); + for (int i = 0; i < StdDataType.DURATION.ordinal(); i++) { StdDataType type = StdDataType.values()[i]; registerStdDomainHandler(ConverterStdDomainHandler.stdTypeHandler(type));