Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mantis runtime compile warn fix #468

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions mantis-runtime/src/main/java/io/mantisrx/runtime/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
public class Config<T> {

private Metadata metadata = new Metadata();
private SourceHolder<?> source;
private List<StageConfig<?, ?>> stages;
private SinkHolder<T> sink;
private final SourceHolder<?> source;
private final List<StageConfig<?, ?>> stages;
private final SinkHolder<T> sink;
private Lifecycle lifecycle = DefaultLifecycleFactory.getInstance();
private Map<String, ParameterDefinition<?>> parameterDefinitions = new HashMap<>();
private final Map<String, ParameterDefinition<?>> parameterDefinitions = new HashMap<>();

Config(Stages<?> stages, SinkHolder<T> observable) {
this.source = stages.getSource();
Expand All @@ -49,11 +49,11 @@ private void putParameterOnce(ParameterDefinition<?> definition) {

private void initParams() {
// add parameters from Source, Stage and Sink and ensure we don't have naming conflicts between params defined by Source, Stage and Sink
source.getSourceFunction().getParameters().forEach(p -> putParameterOnce(p));
source.getSourceFunction().getParameters().forEach(this::putParameterOnce);
for (StageConfig<?, ?> stage : stages) {
stage.getParameters().forEach(p -> putParameterOnce(p));
stage.getParameters().forEach(this::putParameterOnce);
}
sink.getSinkAction().getParameters().forEach(p -> putParameterOnce(p));
sink.getSinkAction().getParameters().forEach(this::putParameterOnce);
}

public Config<T> lifecycle(Lifecycle lifecycle) {
Expand All @@ -80,7 +80,7 @@ public Config<T> parameterDefinition(ParameterDefinition<?> definition) {
}

public Job<T> create() {
return new Job<T>(source, stages, sink, lifecycle,
metadata, parameterDefinitions);
return new Job<>(source, stages, sink, lifecycle,
metadata, parameterDefinitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public MetricsRegistry getMetricsRegistry() {
/**
* Returns the Job Parameters associated with the current job
*
* @return
* @return Parameters
*/

public Parameters getParameters() {
Expand All @@ -103,7 +103,7 @@ public ServiceLocator getServiceLocator() {
/**
* Returns the JobId of the current job.
*
* @return
* @return String
*/
public String getJobId() {
return workerInfo.getJobId();
Expand All @@ -112,7 +112,7 @@ public String getJobId() {
/**
* Returns information related to the current worker
*
* @return
* @return WorkerInfo
*/
public WorkerInfo getWorkerInfo() {
return workerInfo;
Expand Down Expand Up @@ -159,11 +159,12 @@ public void completeAndExit() {
* Returns an Observable of WorkerMap for the current Job. The user can use this Observable
* to track the location information of all the workers of the current job.
*
* @return
* @return Observable
*/
public Observable<WorkerMap> getWorkerMapObservable() {
return this.workerMapObservable;
}

@Nullable
public ClassLoader getClassLoader() { return this.classLoader; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
*/
public class GroupToGroup<K1, T, K2, R> extends KeyValueStageConfig<T, K2, R> {

private GroupComputation<K1, T, K2, R> computation;
private long keyExpireTimeSeconds;
private final GroupComputation<K1, T, K2, R> computation;
private final long keyExpireTimeSeconds;

/**
* @deprecated As of release 0.603, use {@link #GroupToGroup(GroupComputation, Config, Codec)} instead
Expand Down Expand Up @@ -75,13 +75,13 @@ public static class Config<K1, T, K2, R> {
// input type for keyToKey is serial
// always assume a stateful calculation is being made
// do not allow config to override
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(Codec)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
*/
public class GroupToScalar<K, T, R> extends StageConfig<T, R> {

private GroupToScalarComputation<K, T, R> computation;
private long keyExpireTimeSeconds;
private final GroupToScalarComputation<K, T, R> computation;
private final long keyExpireTimeSeconds;

/**
* @deprecated As of release 0.603, use {@link #GroupToScalar(GroupToScalarComputation, Config, Codec)} instead
Expand Down Expand Up @@ -81,9 +81,9 @@ public static class Config<K, T, R> {
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
* @param codec is netty reactivex Codec
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(io.mantisrx.common.codec.Codec)} instead
*/
Expand All @@ -100,9 +100,9 @@ public Config<K, T, R> codec(Codec<R> codec) {
/**
* Not used. As we are not generating GroupedObservables
*
* @param seconds
* @param seconds is a long
*
* @return
* @return Config
*/
public Config<K, T, R> keyExpireTimeSeconds(long seconds) {
this.keyExpireTimeSeconds = seconds;
Expand Down
29 changes: 6 additions & 23 deletions mantis-runtime/src/main/java/io/mantisrx/runtime/Groups.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ public static <K, T> Observable<GroupedObservable<K, T>> flatten(
Observable<GroupedObservable<K, T>> flattenedGroups = Observable.merge(groups);
return flattenedGroups
// // re-group by key
.groupBy(new Func1<GroupedObservable<K, T>, K>() {
@Override
public K call(GroupedObservable<K, T> group) {
return group.getKey();
}
})
.groupBy(GroupedObservable::getKey)

// flatten, with merged group
.flatMap(new Func1<GroupedObservable<K, GroupedObservable<K, T>>, Observable<GroupedObservable<K, T>>>() {
Expand All @@ -53,29 +48,17 @@ public Observable<GroupedObservable<K, T>> call(
/**
* Convert O O MantisGroup to O GroupedObservable
*
* @param groups
* @param <K> MantisGroup<K, T> groups keyValue
*
* @return
* @param <T> MantisGroup<K, T>> groups value
*
* @return Observable
*/

public static <K, T> Observable<GroupedObservable<K, T>> flattenMantisGroupsToGroupedObservables(
Observable<Observable<MantisGroup<K, T>>> groups) {
Observable<MantisGroup<K, T>> flattenedGroups = Observable.merge(groups);
return flattenedGroups.groupBy(new Func1<MantisGroup<K, T>, K>() {

@Override
public K call(MantisGroup<K, T> t) {
return t.getKeyValue();
}

}, new Func1<MantisGroup<K, T>, T>() {

@Override
public T call(MantisGroup<K, T> t) {
return t.getValue();
}

});
return flattenedGroups.groupBy(MantisGroup::getKeyValue, MantisGroup::getValue);


}
Expand Down
13 changes: 6 additions & 7 deletions mantis-runtime/src/main/java/io/mantisrx/runtime/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@

import io.mantisrx.runtime.lifecycle.Lifecycle;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class Job<T> {

private Metadata metadata;
private SourceHolder<?> source;
private List<StageConfig<?, ?>> stages;
private SinkHolder<T> sink;
private Lifecycle lifecycle;
private Map<String, ParameterDefinition<?>> parameterDefinitions = new HashMap<>();
private final Metadata metadata;
private final SourceHolder<?> source;
private final List<StageConfig<?, ?>> stages;
private final SinkHolder<T> sink;
private final Lifecycle lifecycle;
private final Map<String, ParameterDefinition<?>> parameterDefinitions;

Job(SourceHolder<?> source, List<StageConfig<?, ?>> stages, SinkHolder<T> sink,
Lifecycle lifecycle, Metadata metadata,
Expand Down
10 changes: 5 additions & 5 deletions mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
*/
public class KeyToKey<K1, T, K2, R> extends KeyValueStageConfig<T, K2, R> {

private KeyComputation<K1, T, K2, R> computation;
private long keyExpireTimeSeconds;
private final KeyComputation<K1, T, K2, R> computation;
private final long keyExpireTimeSeconds;


/**
Expand Down Expand Up @@ -77,13 +77,13 @@ public static class Config<K1, T, K2, R> {
// input type for keyToKey is serial
// always assume a stateful calculation is being made
// do not allow config to override
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
* @param codec is a netty reactivex codec
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(Codec)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

public class KeyToScalar<K, T, R> extends StageConfig<T, R> {

private ToScalarComputation<K, T, R> computation;
private long keyExpireTimeSeconds;
private final ToScalarComputation<K, T, R> computation;
private final long keyExpireTimeSeconds;

/**
* @deprecated As of release 0.603, use {@link #KeyToScalar(ToScalarComputation, Config, Codec)} instead
Expand Down Expand Up @@ -66,13 +66,13 @@ public static class Config<K, T, R> {
// default input type is serial for
// 'stateful group calculation' use case
// do not allow config override
private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL;
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
* @param codec is netty reactive x
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(Codec)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public <K, R> KeyedStages<K, R> stage(ToKeyComputation<T, K, R> computation,
* @param computation The computation that transforms a scalar to a group
* @param config stage config
*
* @return
* @return KeyedStages
*/
public <K, R> KeyedStages<K, R> stage(ToGroupComputation<T, K, R> computation,
ScalarToGroup.Config<T, K, R> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@

public class ScalarToGroup<T, K, R> extends KeyValueStageConfig<T, K, R> {

private ToGroupComputation<T, K, R> computation;
private long keyExpireTimeSeconds;
private final ToGroupComputation<T, K, R> computation;
private final long keyExpireTimeSeconds;


/**
* @param computation
* @param config
* @param inputCodec
* @param computation is a ToGroupComputation
* @param config is a ScalartoGroup config
* @param inputCodec is codec of mantisx runtime codec
*
* @deprecated As of release 0.603, use {@link #ScalarToGroup(ToGroupComputation, Config, Codec)} instead
*/
Expand Down Expand Up @@ -80,9 +80,9 @@ public static class Config<T, K, R> {
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
* @param codec is Codec of netty reactivex
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(Codec)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
*/
public class ScalarToKey<T, K, R> extends KeyValueStageConfig<T, K, R> {

private ToKeyComputation<T, K, R> computation;
private long keyExpireTimeSeconds;
private final ToKeyComputation<T, K, R> computation;
private final long keyExpireTimeSeconds;


/**
* @param computation
* @param config
* @param inputCodec
* @param computation is a ToGroupComputation
* @param config is a ScalartoGroup config
* @param inputCodec is codec of mantisx runtime codec
*
* @deprecated As of release 0.603, use {@link #ScalarToKey(ToKeyComputation, Config, Codec)} (ToGroupComputation, Config, Codec)} instead
*/
Expand Down Expand Up @@ -75,9 +75,9 @@ public static class Config<T, K, R> {
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
* @param codec is Codec of netty reactivex
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(Codec)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

public class ScalarToScalar<T, R> extends StageConfig<T, R> {

private INPUT_STRATEGY inputStrategy;
private ScalarComputation<T, R> computation;
private final INPUT_STRATEGY inputStrategy;
private final ScalarComputation<T, R> computation;
private List<ParameterDefinition<?>> parameters;

/**
Expand Down Expand Up @@ -72,9 +72,9 @@ public static class Config<T, R> {
private List<ParameterDefinition<?>> parameters = Collections.emptyList();

/**
* @param codec
* @param codec is Codec of netty reactivex
*
* @return
* @return Config
*
* @deprecated As of release 0.603, use {@link #codec(Codec)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

public class ScalingPolicy {

private int numInstances;
private MachineDefinition machineDefinition;
private final int numInstances;
private final MachineDefinition machineDefinition;

ScalingPolicy(int numInstances,
MachineDefinition machineDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class SinkHolder<T> {

private Metadata metadata;
private Sink<T> sinkAction;
private final Sink<T> sinkAction;

public SinkHolder(final SelfDocumentingSink<T> sinkAction) {
this.metadata = sinkAction.metadata();
Expand Down
Loading
Loading