Skip to content

Commit

Permalink
Merge: 3e7f2a3 f39212e
Browse files Browse the repository at this point in the history
Merge: 3e7f2a3 f39212e

Merge: 3e7f2a3 f39212e

Merge: 3e7f2a3 f39212e

Merge branch 'master' of gitli.corp.linkedin.com:ds-platform/gobblin

Signed-off-by: Yinan Li <[email protected]>

Conflicts:
product-spec.json
runtime/src/main/java/com/linkedin/uif/runtime/Task.java
runtime/src/main/java/com/linkedin/uif/runtime/local/LocalTaskStateTracker2.java

RB=380666
R=etl-infra-reviewers
A=cbotev
  • Loading branch information
liyinan926 committed Nov 5, 2014
2 parents 1cc44ae + 6710be2 commit 14e0436
Show file tree
Hide file tree
Showing 25 changed files with 363 additions and 301 deletions.
2 changes: 1 addition & 1 deletion acl/project.acl
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
owners: [ stakiar, veramach, kgoodhop, lqiao, jyu, ynli, hcai, mitu ]
owners: [ stakiar, veramach, kgoodhop, lqiao, jyu, ynli, hcai, mitu, cbotev]
whitelist: [ tester ]
paths: ['.*']
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_SOURCE_QUERYBASED_IS_METADATA_COLUMN_CHECK_ENABLED = "true";
public static final String DEFAULT_COLUMN_NAME_CASE = "NOCHANGE";
public static final int DEFAULT_SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE = 1000;

public static final String FILEBASED_REPORT_STATUS_ON_COUNT="filebased.report.status.on.count";
public static final int DEFAULT_FILEBASED_REPORT_STATUS_ON_COUNT = 10000;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Object convertField(JsonElement value) {
for(String pattern: patterns) {
DateTimeFormatter dtf = DateTimeFormat.forPattern(pattern).withZone(this.timeZone);
try {
formattedDate = dtf.parseDateTime(value.getAsString()).getMillis();
formattedDate = dtf.parseDateTime(value.getAsString()).withZone(DateTimeZone.forID("UTC")).getMillis();
if(Boolean.valueOf(this.state.getProp(ConfigurationKeys.CONVERTER_IS_EPOCH_TIME_IN_SECONDS))) {
formattedDate = (Long)formattedDate / 1000;
}
Expand Down
13 changes: 7 additions & 6 deletions product-spec.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
{
"name": "gobblin",
"group": "[email protected]@",
"version": "0.0.81",
"version": "0.0.99",
"scmUrl": "[email protected]:ds-platform/gobblin.git",
"owners": [
"kgoodhop",
"lqiao",
"stakiar",
"ynli"
"ynli",
"mitu",
"hcai",
"cbotev"
],
"comment": "",
"description": "Gobblin - Universal Ingestion Framework",
Expand Down Expand Up @@ -62,16 +65,14 @@
"jodaTime": "joda-time:joda-time:1.6",
"metricsCore": "com.codahale.metrics:metrics-core:3.0.2",
"commonsIO": "commons-io:commons-io:2.4",
"salesforceWsc": "com.force.api:force-wsc:29.0.0",
"salesforcePartner": "com.force.api:force-partner-api:29.0.0",
"influxdb": "org.influxdb:influxdb-java:1.2",
"jsch": "com.jcraft:jsch:0.1.46",
"mysqlDriver": "org.mysql:mysql-connector-java:5.1.14",
"commonsVfs": "org.apache.commons:commons-vfs2:2.0"
},
"product": {},
"trunkDev": {
"autoRevert": true,
"containerInfrastructure": true
"containerInfrastructure": false
},
"extSCM": {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -78,6 +80,50 @@ public AbstractJobLauncher(Properties properties) throws Exception {
populateSourceWrapperMap();
}

/**
* Run the given list of {@link WorkUnit}s of the given job.
*
* @param jobId job ID
* @param workUnits given list of {@link WorkUnit}s to run
* @param stateTracker a {@link TaskStateTracker} for task state tracking
* @param taskExecutor a {@link TaskExecutor} for task execution
* @param countDownLatch a {@link java.util.concurrent.CountDownLatch} waited on for job completion
* @return a list of {@link Task}s from the {@link WorkUnit}s
* @throws InterruptedException
*/
public static List<Task> runWorkUnits(String jobId, List<WorkUnit> workUnits,
TaskStateTracker stateTracker,
TaskExecutor taskExecutor,
CountDownLatch countDownLatch)
throws InterruptedException {

List<Task> tasks = Lists.newArrayList();
for (WorkUnit workUnit : workUnits) {
String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
WorkUnitState workUnitState = new WorkUnitState(workUnit);
workUnitState.setId(taskId);
workUnitState.setProp(ConfigurationKeys.JOB_ID_KEY, jobId);
workUnitState.setProp(ConfigurationKeys.TASK_ID_KEY, taskId);

// Create a new task from the work unit and submit the task to run
Task task = new Task(new TaskContext(workUnitState), stateTracker, Optional.of(countDownLatch));
stateTracker.registerNewTask(task);
tasks.add(task);
LOG.info(String.format("Submitting task %s to run", taskId));
taskExecutor.submit(task);
}

LOG.info(String.format("Waiting for submitted tasks of job %s to complete...", jobId));
while (countDownLatch.getCount() > 0) {
LOG.info(String.format("%d out of %d tasks of job %s are running",
countDownLatch.getCount(), workUnits.size(), jobId));
countDownLatch.await(1, TimeUnit.MINUTES);
}
LOG.info(String.format("All tasks of job %s have completed", jobId));

return tasks;
}

@Override
@SuppressWarnings("unchecked")
public void launchJob(Properties jobProps, JobListener jobListener) throws JobException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public void jobCompleted(JobState jobState) {
boolean emailNotificationEnabled = Boolean.valueOf(jobState.getProp(
ConfigurationKeys.EMAIL_NOTIFICATION_ENABLED_KEY, Boolean.toString(false)));
if (!emailNotificationEnabled) {
LOGGER.info("Email notification is not enabled for job " + jobState.getJobName());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ public class RunOnceJobListener implements JobListener {

@Override
public void jobCompleted(JobState jobState) {
if (!jobState.contains(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY)) {
LOG.error("Job configuration file path not found in job state of job " + jobState.getJobId());
return;
}

String jobConfigFile = jobState.getProp(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY);
// Rename the config file so we won't run this job when the worker is bounced
try {
Files.move(new File(jobConfigFile), new File(jobConfigFile + ".done"));
} catch (IOException ioe) {
LOG.error("Failed to rename job configuration file for job " +
jobState.getJobName(), ioe);
LOG.error("Failed to rename job configuration file for job " + jobState.getJobName(), ioe);
}
}
}
22 changes: 18 additions & 4 deletions runtime/src/main/java/com/linkedin/uif/runtime/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,8 +45,9 @@ public class Task implements Runnable {
private final String jobId;
private final String taskId;
private final TaskContext taskContext;
private final TaskStateTracker taskStateTracker;
private final TaskState taskState;
private final TaskStateTracker taskStateTracker;
private final Optional<CountDownLatch> countDownLatch;

private final List<Optional<Fork>> forks = Lists.newArrayList();

Expand All @@ -59,13 +61,16 @@ public class Task implements Runnable {
* to construct and run a {@link Task}
* @param taskStateTracker a {@link TaskStateTracker} for tracking task state
*/
public Task(TaskContext context, TaskStateTracker taskStateTracker) {
@SuppressWarnings("unchecked")
public Task(TaskContext context, TaskStateTracker taskStateTracker,
Optional<CountDownLatch> countDownLatch) {

this.taskContext = context;
// Task manager is used to register failed tasks
this.taskStateTracker = taskStateTracker;
this.taskState = context.getTaskState();
this.jobId = this.taskState.getJobId();
this.taskId = this.taskState.getTaskId();
this.taskStateTracker = taskStateTracker;
this.countDownLatch = countDownLatch;
}

@Override
Expand Down Expand Up @@ -269,6 +274,15 @@ public int getRetryCount() {
return this.retryCount;
}

/**
* Mark the completion of this {@link Task}.
*/
public void markTaskCompletion() {
if (this.countDownLatch.isPresent()) {
this.countDownLatch.get().countDown();
}
}

@Override
public String toString() {
return this.taskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AbstractIdleService;

Expand Down Expand Up @@ -114,7 +116,8 @@ public void run() {
}

// Create a task based off the work unit
Task task = new Task(new TaskContext(workUnitState), this.taskStateTracker);
Task task = new Task(new TaskContext(workUnitState), this.taskStateTracker,
Optional.<CountDownLatch>absent());
// And then execute the task
this.taskExecutor.execute(task);
} catch (InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

import com.linkedin.uif.configuration.ConfigurationKeys;
import com.linkedin.uif.configuration.WorkUnitState;
import com.linkedin.uif.metrics.JobMetrics;
import com.linkedin.uif.runtime.AbstractJobLauncher;
import com.linkedin.uif.runtime.FileBasedJobLock;
import com.linkedin.uif.runtime.JobException;
import com.linkedin.uif.runtime.JobLauncher;
import com.linkedin.uif.runtime.JobLock;
import com.linkedin.uif.runtime.JobState;
import com.linkedin.uif.runtime.Task;
import com.linkedin.uif.runtime.TaskExecutor;
import com.linkedin.uif.runtime.TaskState;
import com.linkedin.uif.runtime.TaskStateTracker;
import com.linkedin.uif.runtime.WorkUnitManager;
import com.linkedin.uif.source.workunit.MultiWorkUnit;
import com.linkedin.uif.source.workunit.WorkUnit;

Expand All @@ -42,30 +40,24 @@ public class LocalJobLauncher extends AbstractJobLauncher {

private static final Logger LOG = LoggerFactory.getLogger(LocalJobLauncher.class);

private final WorkUnitManager workUnitManager;
private final TaskExecutor taskExecutor;
private final TaskStateTracker taskStateTracker;
// Service manager to manage dependent services
private final ServiceManager serviceManager;

private volatile JobState jobState;
private volatile CountDownLatch countDownLatch;
private volatile boolean isCancelled = false;

public LocalJobLauncher(Properties properties) throws Exception {
super(properties);

TaskExecutor taskExecutor = new TaskExecutor(properties);
TaskStateTracker taskStateTracker = new LocalTaskStateTracker2(properties, taskExecutor);
((LocalTaskStateTracker2) taskStateTracker).setJobLauncher(this);
this.workUnitManager = new WorkUnitManager(taskExecutor, taskStateTracker);

this.taskExecutor = new TaskExecutor(properties);
this.taskStateTracker = new LocalTaskStateTracker2(properties, this.taskExecutor);
this.serviceManager = new ServiceManager(Lists.newArrayList(
// The order matters due to dependencies between services
taskExecutor,
taskStateTracker,
this.workUnitManager
this.taskExecutor,
this.taskStateTracker
));
// Start all dependent services
this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
}

@Override
Expand All @@ -89,7 +81,8 @@ public void cancelJob(Properties jobProps) throws JobException {
protected void runJob(String jobName, Properties jobProps, JobState jobState,
List<WorkUnit> workUnits) throws Exception {

this.jobState = jobState;
// Start all dependent services
this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);

// Figure out the actual work units to run by flattening MultiWorkUnits
List<WorkUnit> workUnitsToRun = Lists.newArrayList();
Expand All @@ -101,28 +94,29 @@ protected void runJob(String jobName, Properties jobProps, JobState jobState,
}
}

String jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);

// Add all work units to run
for (WorkUnit workUnit : workUnitsToRun) {
String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
WorkUnitState workUnitState = new WorkUnitState(workUnit);
workUnitState.setId(taskId);
workUnitState.setProp(ConfigurationKeys.JOB_ID_KEY, jobId);
workUnitState.setProp(ConfigurationKeys.TASK_ID_KEY, taskId);
this.workUnitManager.addWorkUnit(workUnitState);
if (workUnitsToRun.isEmpty()) {
LOG.warn("No work units to run");
return;
}

String jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
this.countDownLatch = new CountDownLatch(workUnitsToRun.size());
LOG.info(String.format("Waiting for job %s to complete...", jobId));
// Wait for all tasks to complete
this.countDownLatch.await();
List<Task> tasks = AbstractJobLauncher.runWorkUnits(
jobId, workUnitsToRun, this.taskStateTracker, this.taskExecutor, this.countDownLatch);

// Set job state appropriately
if (isCancelled) {
jobState.setState(JobState.RunningState.CANCELLED);
} else if (this.jobState.getState() == JobState.RunningState.RUNNING) {
this.jobState.setState(JobState.RunningState.SUCCESSFUL);
} else if (jobState.getState() == JobState.RunningState.RUNNING) {
jobState.setState(JobState.RunningState.SUCCESSFUL);
}

// Collect task states and set job state to FAILED if any task failed
for (Task task : tasks) {
jobState.addTaskState(task.getTaskState());
if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.FAILED) {
jobState.setState(JobState.RunningState.FAILED);
}
}

// Stop all dependent services
Expand All @@ -138,25 +132,4 @@ protected JobLock getJobLock(String jobName, Properties jobProps) throws IOExcep
jobProps.getProperty(ConfigurationKeys.JOB_LOCK_DIR_KEY),
jobName);
}

/**
* Callback method when a task is completed.
*
* @param taskState {@link TaskState}
*/
public synchronized void onTaskCompletion(TaskState taskState) {
if (JobMetrics.isEnabled(this.properties)) {
// Remove all task-level metrics after the task is done
taskState.removeMetrics();
}

LOG.info(String.format("Task %s completed with state %s", taskState.getTaskId(),
taskState.getWorkingState().name()));
if (taskState.getWorkingState() == WorkUnitState.WorkingState.FAILED) {
// The job is considered being failed if any task failed
this.jobState.setState(JobState.RunningState.FAILED);
}
this.jobState.addTaskState(taskState);
this.countDownLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ public void scheduleJob(Properties jobProps, JobListener jobListener) throws Job
.build();

try {
if (this.scheduler.checkExists(job.getKey())) {
throw new JobException(String.format("Job %s has already been scheduled", jobName));
}

// Schedule the Quartz job with a trigger built from the job configuration
this.scheduler.scheduleJob(job, getTrigger(job.getKey(), jobProps));
} catch (SchedulerException se) {
Expand Down Expand Up @@ -544,8 +548,6 @@ public void onFileCreate(File file) {
// Then load job configuration properties from the new job configuration file
loadJobConfig(jobProps, file);

jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY,
file.getAbsolutePath());
// Schedule the new job
try {
boolean runOnce = Boolean.valueOf(
Expand Down Expand Up @@ -596,6 +598,7 @@ public void onFileChange(File file) {
private void loadJobConfig(Properties jobProps, File file) {
try {
jobProps.load(new FileReader(file));
jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, file.getAbsolutePath());
} catch (Exception e) {
LOG.error("Failed to load job configuration from file " + file.getAbsolutePath(), e);
}
Expand Down
Loading

0 comments on commit 14e0436

Please sign in to comment.