Skip to content

Commit

Permalink
Allow user specified concurrency for Scalar to Group and Group to Scalar
Browse files Browse the repository at this point in the history
  • Loading branch information
crioux-stripe committed Oct 7, 2024
1 parent 65a7949 commit d9ea475
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class GroupToScalar<K, T, R> extends StageConfig<T, R> {

GroupToScalar(GroupToScalarComputation<K, T, R> computation,
Config<K, T, R> config, Codec<K> inputKeyCodec, Codec<T> inputCodec) {
super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters);
super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;
}
Expand All @@ -78,6 +78,7 @@ public static class Config<K, T, R> {
// 'stateful group calculation' use case
// do not allow config override
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private int concurrency = DEFAULT_STAGE_CONCURRENCY;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
Expand Down Expand Up @@ -119,6 +120,12 @@ public Config<K, T, R> concurrentInput() {
return this;
}

public Config<K, T, R> concurrentInput(final int concurrency) {
this.inputStrategy = INPUT_STRATEGY.CONCURRENT;
this.concurrency = concurrency;
return this;
}

public Codec<R> getCodec() {
return codec;
}
Expand All @@ -135,6 +142,8 @@ public INPUT_STRATEGY getInputStrategy() {
return inputStrategy;
}

public int getConcurrency() { return concurrency; }

public Config<K, T, R> withParameters(List<ParameterDefinition<?>> params) {
this.parameters = params;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ public abstract class KeyValueStageConfig<T, K, R> extends StageConfig<T, R> {
private final Codec<K> keyCodec;

public KeyValueStageConfig(String description, Codec<?> inputKeyCodec, Codec<T> inputCodec, Codec<K> outputKeyCodec, Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params) {
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params);
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY);
this.keyCodec = outputKeyCodec;
}

public KeyValueStageConfig(String description, Codec<?> inputKeyCodec, Codec<T> inputCodec, Codec<K> outputKeyCodec, Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params, int concurrency) {
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency);
this.keyCodec = outputKeyCodec;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ScalarToGroup<T, K, R> extends KeyValueStageConfig<T, K, R> {

public ScalarToGroup(ToGroupComputation<T, K, R> computation,
Config<T, K, R> config, Codec<T> inputCodec) {
super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters);
super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency);
this.computation = computation;
this.keyExpireTimeSeconds = config.keyExpireTimeSeconds;

Expand All @@ -76,6 +76,7 @@ public static class Config<T, K, R> {
private String description;
// default input type is concurrent for 'grouping' use case
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT;
private int concurrency = DEFAULT_STAGE_CONCURRENCY;
private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

Expand Down Expand Up @@ -108,6 +109,7 @@ public Config<T, K, R> keyExpireTimeSeconds(long seconds) {

public Config<T, K, R> serialInput() {
this.inputStrategy = INPUT_STRATEGY.SERIAL;
this.concurrency = 1;
return this;
}

Expand All @@ -116,6 +118,12 @@ public Config<T, K, R> concurrentInput() {
return this;
}

public Config<T, K, R> concurrentInput(final int concurrency) {
this.inputStrategy = INPUT_STRATEGY.CONCURRENT;
this.concurrency = concurrency;
return this;
}

public Config<T, K, R> description(String description) {
this.description = description;
return this;
Expand All @@ -137,6 +145,8 @@ public INPUT_STRATEGY getInputStrategy() {
return inputStrategy;
}

public int getConcurrency() { return concurrency; }

public long getKeyExpireTimeSeconds() {
return keyExpireTimeSeconds;
}
Expand Down

0 comments on commit d9ea475

Please sign in to comment.