Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Small fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Jul 3, 2017
1 parent 09a048f commit d9d7caa
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/dagr/core/execsystem/TaskStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,4 @@ case object TaskStatus extends IntEnum[TaskStatus] {
case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val value: Int = 6 }
case object SucceededExecution extends Succeeded { val description: String = "has succeeded"; val value: Int = 7 }
case object ManuallySucceeded extends Succeeded { val description: String = "has succeeded (manually)"; val value: Int = 8 }

val TaskStatuses = Seq(Unknown, Started, Stopped, FailedGetTasks, FailedScheduling, FailedExecution, FailedOnComplete, SucceededExecution, ManuallySucceeded)
}
49 changes: 19 additions & 30 deletions core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ trait GraphExecutor[T<:Task] extends Executor {
}

object GraphExecutor {
/** Greates a default graph executor given a task executor */
/** Creates a default graph executor given a task executor */
def apply[T<:Task](taskExecutor: TaskExecutor[T])(implicit ex: ExecutionContext): GraphExecutor[T] =
new GraphExecutorImpl(taskExecutor=taskExecutor, dependencyGraph=DependencyGraph())
}
Expand Down Expand Up @@ -102,8 +102,7 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto

/** Start the execution of this task and all tasks that depend on it. Returns the number of tasks that were not
* executed. A given task should only be attempted once. */
def _execute(rootTask: Task): Int = {

protected def _execute(rootTask: Task): Int = {
// Catch failure if the initial registration fails.
val rootFuture: Future[Task] = failFutureWithTaskStatus(rootTask) {
Future {
Expand All @@ -128,7 +127,7 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
}

// Return the number of tasks known not to succeed
this._tasks.map(_.taskInfo).count { info => !info.status.isInstanceOf[Succeeded] }
this._tasks.map(_.taskInfo).count { info => !info.status.success }
}

/** Process a given task, its sub-tree, and tasks that only depend on this task.
Expand Down Expand Up @@ -171,6 +170,7 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
buildTask(parent) flatMap {
case x :: Nil if x == parent => // one task and it returned itself, so execute it
requireNoDependencies(parent)
// FIXME: make sure the first "case's" task is of type T
if (this.taskCaches.isEmpty || this.taskCaches.exists(_.execute(parent))) {
executeWithTaskExecutor(parent)
}
Expand All @@ -184,7 +184,7 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
}
}

/** Build the task and catch any exceptions during the call to [[Task.getTasks()]]. */
/** Build the task and catch any exceptions during the call to [[Task.make()]]. */
private def buildTask(task: Task): Future[Seq[Task]] = failWithFailedToBuild(task) {
requireNoDependencies(task)
val tasks = task.make().toList
Expand All @@ -204,36 +204,25 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto
// on the child task), to the graph. The parent task can complete once they have all been added to the
// dependency graph.
val childFutures: Future[Seq[Task]] = Future {
// check that children of the parent haven't already been added to the graph. Not currently allowed. A child
// can only have one parent!
childTasks.foreach { child =>
require(!this.dependencyGraph.contains(child), s"child '${child.name}' of parent '${parent.name}' already in the graph")
}

// Register them all at once, since locking is expensive, and we may end up creating many, many futures
Future {
lockIt {
val result = childTasks map { child =>
(registerTask(child).contains(true), child)
}
// check for cyclical dependencies since a new sub-tree has been added
this.dependencyGraph.exceptIfCyclicalDependency(parent)
// NB: returns true if the task has no dependencies, false otherwise
result
lockIt {
// check that children of the parent haven't already been added to the graph. Not currently allowed. A child
// can only have one parent!
childTasks.foreach { child =>
require(!this.dependencyGraph.contains(child), s"child '${child.name}' of parent '${parent.name}' already in the graph")
}
}
} flatMap { future: Future[Seq[(Boolean, Task)]] =>
// Process each child task that has no dependencies, otherwise just return a success for it. In the latter
// case, it will be processed once all its dependencies have been met (executed). Note this will happen only
// since presumably children with dependencies depend on children without dependencies, and so will be handled
// in their processTask call.
future flatMap { things: Seq[(Boolean, Task)] =>
val futures = things.map { case (hasNoDependencies: Boolean, child: Task) =>
if (hasNoDependencies) processTask(child) else Future.successful(child)
val result = childTasks flatMap { child =>
// NB: if the child has dependencies, it will get executed when the task with last unmet dependency is removed
// from the dependency graph
if (registerTask(child).contains(true)) Some(child) else None
}
val result = Future.sequence(futures)
// check for cyclical dependencies since a new sub-tree has been added
this.dependencyGraph.exceptIfCyclicalDependency(parent)
// NB: returns true if the task has no dependencies, false otherwise
result
}
} flatMap { children: Seq[Task] =>
Future.sequence(children.map(processTask))
}

childFutures map { _ => // is of type Seq[Task]
Expand Down

0 comments on commit d9d7caa

Please sign in to comment.