diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java index a3e35d064..cfb4cf35a 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java @@ -56,7 +56,7 @@ public class GroupToScalar extends StageConfig { GroupToScalar(GroupToScalarComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -78,6 +78,7 @@ public static class Config { // 'stateful group calculation' use case // do not allow config override private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int concurrency = DEFAULT_STAGE_CONCURRENCY; private List> parameters = Collections.emptyList(); /** @@ -119,6 +120,12 @@ public Config concurrentInput() { return this; } + public Config concurrentInput(final int concurrency) { + this.inputStrategy = INPUT_STRATEGY.CONCURRENT; + this.concurrency = concurrency; + return this; + } + public Codec getCodec() { return codec; } @@ -135,6 +142,8 @@ public INPUT_STRATEGY getInputStrategy() { return inputStrategy; } + public int getConcurrency() { return concurrency; } + public Config withParameters(List> params) { this.parameters = params; return this; diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java index 5385cbe51..756639400 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java @@ -33,7 +33,12 @@ public abstract class KeyValueStageConfig extends StageConfig { private final Codec keyCodec; public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params); + super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); + this.keyCodec = outputKeyCodec; + } + + public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { + super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency); this.keyCodec = outputKeyCodec; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java index 54f1e71fc..7fb2a3206 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java @@ -54,7 +54,7 @@ public class ScalarToGroup extends KeyValueStageConfig { public ScalarToGroup(ToGroupComputation computation, Config config, Codec inputCodec) { - super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; @@ -76,6 +76,7 @@ public static class Config { private String description; // default input type is concurrent for 'grouping' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT; + private int concurrency = DEFAULT_STAGE_CONCURRENCY; private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default private List> parameters = Collections.emptyList(); @@ -108,6 +109,7 @@ public Config keyExpireTimeSeconds(long seconds) { public Config serialInput() { this.inputStrategy = INPUT_STRATEGY.SERIAL; + this.concurrency = 1; return this; } @@ -116,6 +118,12 @@ public Config concurrentInput() { return this; } + public Config concurrentInput(final int concurrency) { + this.inputStrategy = INPUT_STRATEGY.CONCURRENT; + this.concurrency = concurrency; + return this; + } + public Config description(String description) { this.description = description; return this; @@ -137,6 +145,8 @@ public INPUT_STRATEGY getInputStrategy() { return inputStrategy; } + public int getConcurrency() { return concurrency; } + public long getKeyExpireTimeSeconds() { return keyExpireTimeSeconds; }