Skip to content

Commit

Permalink
Moved the part of task flow within forked branches into a new class Fork
Browse files Browse the repository at this point in the history
Also added support to allow using converters, quality checkers, and task publishers in forks. This is the first step to support having separate config files for each forked branch and allowing a richer sets of operations within a fork.

Signed-off-by: Yinan Li <[email protected]>
  • Loading branch information
liyinan926 committed Nov 5, 2014
1 parent 0bc4115 commit 1cc44ae
Show file tree
Hide file tree
Showing 31 changed files with 669 additions and 378 deletions.
2 changes: 2 additions & 0 deletions common-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ dependencies {
compile spec.external.slf4jLog4j
compile spec.external.jclOverSlf4j
compile spec.external.jodaTime
compile spec.external.jacksonCore
compile spec.external.jacksonMapper

testCompile spec.external.testng
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ConfigurationKeys {
public static final String SOURCE_WRAPPER_CLASS_KEY = "source.wrapper.class";
public static final String CONVERTER_CLASSES_KEY = "converter.classes";
public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class";
public static final String DEFAULT_FORK_OPERATOR_CLASS = "com.linkedin.uif.converter.IdentityForkOperator";
public static final String DEFAULT_FORK_OPERATOR_CLASS = "com.linkedin.uif.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
public static final String WORK_UNIT_RETRY_POLICY_KEY = "workunit.retry.policy";
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.linkedin.uif.fork;

/**
* A type of {@link java.lang.Exception}s thrown when copying is not supported.
*
* @author ynli
*/
public class CopyNotSupportedException extends Exception {

public CopyNotSupportedException(String message) {
super(message);
}
}
16 changes: 16 additions & 0 deletions common-api/src/main/java/com/linkedin/uif/fork/Copyable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.uif.fork;

/**
* An interface for classes that supports making copies of their instances.
*
* @author ynli
*/
public interface Copyable<T> {

/**
* Make a new copy of this instance.
*
* @return new copy of this instance
*/
public T copy() throws CopyNotSupportedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.uif.fork;

import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

/**
* A wrapper class for {@link org.apache.avro.generic.GenericRecord}
* that is also {@link Copyable}.
*
* @author ynli
*/
public class CopyableGenericRecord implements Copyable<GenericRecord> {

private final GenericRecord record;

public CopyableGenericRecord(GenericRecord record) {
this.record = record;
}

@Override
public GenericRecord copy() throws CopyNotSupportedException {
if (!(this.record instanceof GenericData.Record)) {
throw new CopyNotSupportedException(
"The record to make copy is not an instance of " + GenericData.Record.class.getName());
}
// Make a deep copy of the original record
return new GenericData.Record((GenericData.Record) this.record, true);
}
}
22 changes: 22 additions & 0 deletions common-api/src/main/java/com/linkedin/uif/fork/CopyableSchema.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.linkedin.uif.fork;

import org.apache.avro.Schema;

/**
* A wrapper class for {@link org.apache.avro.Schema} that is also {@link Copyable}.
*
* @author ynli
*/
public class CopyableSchema implements Copyable<Schema> {

private final Schema schema;

public CopyableSchema(Schema schema) {
this.schema = schema;
}

@Override
public Schema copy() throws CopyNotSupportedException {
return new Schema.Parser().parse(this.schema.toString());
}
}
52 changes: 52 additions & 0 deletions common-api/src/main/java/com/linkedin/uif/fork/ForkOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.uif.fork;

import java.io.Closeable;
import java.util.List;

import com.linkedin.uif.configuration.WorkUnitState;

/**
* An interface for fork operators that convert one input data record into multiple
* records. So essentially this operator forks one input data stream into multiple
* data streams. This interface allows user to plugin their fork logic.
*
* @author ynli
*
* @param <S> schema data type
* @param <D> data record data type
*/
public interface ForkOperator<S, D> extends Closeable {

/**
* Initialize this {@link ForkOperator}.
*
* @param workUnitState {@link WorkUnitState} carrying the configuration
*/
public void init(WorkUnitState workUnitState) throws Exception;

/**
* Get the number of branches after the fork.
*
* @param workUnitState {@link WorkUnitState} carrying the configuration
* @return number of branches after the fork
*/
public int getBranches(WorkUnitState workUnitState);

/**
* Get a list of {@link java.lang.Boolean}s indicating if the schema should go to each branch.
*
* @param workUnitState {@link WorkUnitState} carrying the configuration
* @param input input schema
* @return list of {@link java.lang.Boolean}s
*/
public List<Boolean> forkSchema(WorkUnitState workUnitState, S input);

/**
* Get a list of {@link java.lang.Boolean}s indicating if the record should go to each branch.
*
* @param workUnitState {@link WorkUnitState} carrying the configuration
* @param input input data record
* @return list of {@link java.lang.Boolean}s
*/
public List<Boolean> forkDataRecord(WorkUnitState workUnitState, D input);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.linkedin.uif.converter;
package com.linkedin.uif.fork;

import java.io.IOException;
import java.util.List;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import com.linkedin.uif.configuration.ConfigurationKeys;
Expand All @@ -17,11 +16,11 @@
* @author ynli
*/
@SuppressWarnings("unused")
public class IdentityForkOperator<S, D> implements ForkOperator<S, S, D, D> {
public class IdentityForkOperator<S, D> implements ForkOperator<S, D> {

// Reuse both lists to save the cost of allocating new lists
private final List<Optional<S>> schemas = Lists.newArrayList();
private final List<Optional<D>> records = Lists.newArrayList();
private final List<Boolean> schemas = Lists.newArrayList();
private final List<Boolean> records = Lists.newArrayList();

@Override
public void init(WorkUnitState workUnitState) {
Expand All @@ -34,26 +33,22 @@ public int getBranches(WorkUnitState workUnitState) {
}

@Override
public List<Optional<S>> forkSchema(WorkUnitState workUnitState, S input)
throws SchemaConversionException {
public List<Boolean> forkSchema(WorkUnitState workUnitState, S input) {

schemas.clear();
Optional<S> copy = Optional.of(input);
for (int i = 0; i < getBranches(workUnitState); i++) {
schemas.add(copy);
schemas.add(Boolean.TRUE);
}

return schemas;
}

@Override
public List<Optional<D>> forkDataRecord(WorkUnitState workUnitState, D input)
throws DataConversionException {
public List<Boolean> forkDataRecord(WorkUnitState workUnitState, D input) {

records.clear();
Optional<D> copy = Optional.of(input);
for (int i = 0; i < getBranches(workUnitState); i++) {
records.add(copy);
records.add(Boolean.TRUE);
}

return records;
Expand Down
6 changes: 0 additions & 6 deletions product-spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@
"mysqlDriver": "org.mysql:mysql-connector-java:5.1.14",
"commonsVfs": "org.apache.commons:commons-vfs2:2.0"
},
"product": {
"container": {
"containerImpl": "com.linkedin.container:container-impl:@spec.product.container.version@",
"version": "6.0.154"
}
},
"trunkDev": {
"autoRevert": true,
"containerInfrastructure": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ public class TaskPublisherBuilder
{
private final TaskLevelPolicyCheckResults results;
private final WorkUnitState workUnitState;

public TaskPublisherBuilder(WorkUnitState workUnitState, TaskLevelPolicyCheckResults results) {
private final int index;

public TaskPublisherBuilder(WorkUnitState workUnitState, TaskLevelPolicyCheckResults results, int index) {
this.results = results;
this.workUnitState = workUnitState;
this.index = index;
}

public static TaskPublisherBuilder newBuilder(WorkUnitState taskState, TaskLevelPolicyCheckResults results) {
return new TaskPublisherBuilder(taskState, results);
public static TaskPublisherBuilder newBuilder(WorkUnitState taskState,
TaskLevelPolicyCheckResults results,
int index) {

return new TaskPublisherBuilder(taskState, results, index);
}

public TaskPublisher build() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

public class TaskPublisherBuilderFactory
{
public TaskPublisherBuilder newTaskPublisherBuilder(WorkUnitState workUnitState, TaskLevelPolicyCheckResults results) {
return new TaskPublisherBuilder(workUnitState, results);
public TaskPublisherBuilder newTaskPublisherBuilder(WorkUnitState workUnitState,
TaskLevelPolicyCheckResults results,
int index) {

return TaskPublisherBuilder.newBuilder(workUnitState, results, index);
}
}
Loading

0 comments on commit 1cc44ae

Please sign in to comment.