Skip to content

Commit

Permalink
[APGAS] added finish expression construct
Browse files Browse the repository at this point in the history
  • Loading branch information
tardieu committed Oct 30, 2015
1 parent 5ef7060 commit d5600f5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
21 changes: 20 additions & 1 deletion apgas/src/apgas/Constructs.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Callable;

import apgas.impl.SerializableRunnable;

Expand All @@ -31,7 +32,7 @@ private Constructs() {
* Runs {@code f} then waits for all tasks transitively spawned by {@code f}
* to complete.
* <p>
* If {@code f} or the transitively tasks spawned by {@code f} have uncaught
* If {@code f} or the tasks transitively spawned by {@code f} have uncaught
* exceptions then {@code finish(f)} then throws a {@link MultipleException}
* that collects these uncaught exceptions.
*
Expand All @@ -44,6 +45,24 @@ public static void finish(Job f) {
GlobalRuntime.getRuntime().finish(f);
}

/**
* Evaluates {@code f}, waits for all the tasks transitively spawned by
* {@code f}, and returns the result.
* <p>
* If {@code f} or the tasks transitively spawned by {@code f} have uncaught
* exceptions then {@code finish(F)} then throws a {@link MultipleException}
* that collects these uncaught exceptions.
*
* @param <T>
* the type of the result
* @param f
* the function to run
* @return the result of the evaluation
*/
public static <T> T finish(Callable<T> f) {
return GlobalRuntime.getRuntime().finish(f);
}

/**
* Submits a new local task to the global runtime with body {@code f} and
* returns immediately.
Expand Down
19 changes: 18 additions & 1 deletion apgas/src/apgas/GlobalRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

Expand Down Expand Up @@ -98,7 +99,7 @@ public static GlobalRuntime getRuntime() {
* Runs {@code f} then waits for all tasks transitively spawned by {@code f}
* to complete.
* <p>
* If {@code f} or the transitively tasks spawned by {@code f} have uncaught
* If {@code f} or the tasks transitively spawned by {@code f} have uncaught
* exceptions then {@code finish(F)} then throws a {@link MultipleException}
* that collects these uncaught exceptions.
*
Expand All @@ -109,6 +110,22 @@ public static GlobalRuntime getRuntime() {
*/
protected abstract void finish(Job f);

/**
* Evaluates {@code f}, waits for all the tasks transitively spawned by
* {@code f}, and returns the result.
* <p>
* If {@code f} or the tasks transitively spawned by {@code f} have uncaught
* exceptions then {@code finish(F)} then throws a {@link MultipleException}
* that collects these uncaught exceptions.
*
* @param <T>
* the type of the result
* @param f
* the function to run
* @return the result of the evaluation
*/
protected abstract <T> T finish(Callable<T> f);

/**
* Submits a new local task to the global runtime with body {@code f} and
* returns immediately.
Expand Down
9 changes: 9 additions & 0 deletions apgas/src/apgas/impl/GlobalRuntimeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
Expand All @@ -34,6 +35,7 @@
import apgas.Place;
import apgas.SerializableCallable;
import apgas.SerializableJob;
import apgas.util.Cell;
import apgas.util.GlobalID;

import com.hazelcast.core.IMap;
Expand Down Expand Up @@ -366,6 +368,13 @@ public void finish(Job f) {
}
}

@Override
public <T> T finish(Callable<T> f) {
final Cell<T> cell = new Cell<T>();
finish(() -> cell.set(f.call()));
return cell.get();
}

@Override
public void async(Job f) {
final Worker worker = currentWorker();
Expand Down

0 comments on commit d5600f5

Please sign in to comment.