-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow user specified concurrency for Scalar to Group and Group to Scalar #720
Allow user specified concurrency for Scalar to Group and Group to Scalar #720
Conversation
@@ -33,7 +33,12 @@ public abstract class KeyValueStageConfig<T, K, R> extends StageConfig<T, R> { | |||
private final Codec<K> keyCodec; | |||
|
|||
public KeyValueStageConfig(String description, Codec<?> inputKeyCodec, Codec<T> inputCodec, Codec<K> outputKeyCodec, Codec<R> outputCodec, INPUT_STRATEGY inputStrategy, List<ParameterDefinition<?>> params) { | |||
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params); | |||
super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could call the other constructor in this class instead of duplicate call to super + keyCodec assignment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Pushed a change.
@@ -116,6 +118,12 @@ public Config<T, K, R> concurrentInput() { | |||
return this; | |||
} | |||
|
|||
public Config<T, K, R> concurrentInput(final int concurrency) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could cause issues, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went looking around and we do allocate the same key to the same thread every time. We have concurrency either way, this just lets the user specify it precisely, unless I'm misunderstanding the if/else that contains the linked line.
Context
The default concurrency for
ScalarToGroup
andGroupToScalar
stages is-1
and thus relies on the number of inner observables when executing the stage. There may be cases such as CPU bound stages where the user wants fine grained control over how many threads are executing the stage logic.Some discussion in #681 indicates a desire for this, but I agree with @calvin681 that changing defaults shouldn't be done lightly as users have come to expect certain threading behavior with the stages. I think this represents a compromise where unaware users can continue with the default and power users can begin to control threading.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all tests