Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes busy waiting thread in CLIJxPool by using ArrayBlockingQueue #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 5 additions & 48 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>net.haesleinhuepf</groupId>
<artifactId>clijx-faclon-heavy_</artifactId>
<version>0.30.1.14</version>
<version>0.30.1.14-SNAPSHOT</version>

<name>clijx-faclon-heavy_</name>
<description>Multi-GPU support for processing big data</description>
Expand Down Expand Up @@ -74,7 +74,7 @@
<package-name>net.haesleinhuepf</package-name>
<license.licenseName>bsd_3</license.licenseName>
<license.copyrightOwners>Robert Haase, MPI CBG</license.copyrightOwners>
<scijava.app.directory>C:/programs/fiji-win64/Fiji.app/</scijava.app.directory>
<!--scijava.app.directory>C:/programs/fiji-win64/Fiji.app/</scijava.app.directory-->
<!--<scijava.app.directory>/home/rhaase/programs/fiji/Fiji.app/</scijava.app.directory>-->
<enforcer.skip>true</enforcer.skip>

Expand Down Expand Up @@ -104,56 +104,13 @@
<artifactId>clijx_</artifactId>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-imglib2</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-aws-s3</artifactId>
<version>3.2.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-blosc</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-google-cloud</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-jpeg</artifactId>
<version>0.0.1-beta1</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-hdf5</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.janelia.saalfeldlab</groupId>
<artifactId>n5-zarr</artifactId>
<version>0.0.6</version>
</dependency>
<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>sc.fiji</groupId>
<artifactId>bigdataviewer-vistools</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void accept(ClearCLBuffer input, ClearCLBuffer output) {
}
```

Last but not least, you need to onfigure tile-size and margin for overlapping tiles:
Last but not least, you need to configure tile-size and margin for overlapping tiles:
```
int margin = 20;
int tile_size = 256;
Expand Down
151 changes: 151 additions & 0 deletions src/main/java/net/haesleinhuepf/clijx/faclonheavy/CLIJxFilterOp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package net.haesleinhuepf.clijx.faclonheavy;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.function.Consumer;

import ij.process.FloatProcessor;
import net.haesleinhuepf.clij.clearcl.ClearCLBuffer;
import net.haesleinhuepf.clij.converters.implementations.ClearCLBufferToRandomAccessibleIntervalConverter;
import net.haesleinhuepf.clij.converters.implementations.RandomAccessibleIntervalToClearCLBufferConverter;
import net.haesleinhuepf.clijx.CLIJx;
import net.imglib2.RandomAccessible;
import net.imglib2.RandomAccessibleInterval;
import net.imglib2.img.array.ArrayImgs;
import net.imglib2.type.NativeType;
import net.imglib2.type.Type;
import net.imglib2.type.numeric.RealType;
import net.imglib2.type.numeric.real.FloatType;
import net.imglib2.util.Intervals;
import net.imglib2.view.Views;

/**
* A cell loader that fill {@link RandomAccessibleInterval} with data that is
* generated by CLIJx filters. It automatically selects an idle GPU to process the tile on.
*
* @author Stephan Saalfeld, Robert Haase
* @param <T> output type
* @param <S> source type
*/
public class CLIJxFilterOp<T extends RealType<T> & NativeType<T>, S extends RealType<S>> implements Consumer<RandomAccessibleInterval<T>> {

protected final RandomAccessible<S> source;
protected final long[] padding;
protected final CLIJxPool clijxPool;
protected final Class<? extends TileProcessor> klass;

public CLIJxFilterOp(
final RandomAccessible<S> source,
final CLIJxPool clijxPool,
final Class<? extends TileProcessor> klass,
final long... padding) {

this.source = source;
final int n = source.numDimensions();
if (n == padding.length)
this.padding = padding;
else
this.padding = Arrays.copyOf(padding, n);
this.clijxPool = clijxPool;
this.klass = klass;
}

@SuppressWarnings("unchecked")
@Override
public void accept(final RandomAccessibleInterval<T> cell) {
// get a CLIJx instance running on one particular GPU
CLIJx clijx = clijxPool.getIdleCLIJx();

// push the image to the GPU
final RandomAccessibleIntervalToClearCLBufferConverter rai2cl = new RandomAccessibleIntervalToClearCLBufferConverter();
rai2cl.setCLIJ(clijx.getCLIJ());
final ClearCLBuffer input = rai2cl.convert(Views.interval(source, Intervals.expand(cell, padding)));
// create memory on GPU for result image
final ClearCLBuffer output = clijx.create(input);

// process the image
TileProcessor filter;
try {
filter = klass.getDeclaredConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
filter.setCLIJx(clijx);
filter.accept(input, output);

// pull the result image back
final ClearCLBufferToRandomAccessibleIntervalConverter cl2rai = new ClearCLBufferToRandomAccessibleIntervalConverter();
cl2rai.setCLIJ(clijx.getCLIJ());
final RandomAccessibleInterval<T> result = cl2rai.convert(output);

// clean up
input.close();
output.close();
clijxPool.setCLIJxIdle(clijx);

// todo: this is an additional copy-step which might not be necessary
Util.copyReal(result, Views.translate(Views.zeroMin(cell), padding));
}


/**
* Some useful methods that do not fit elsewhere.
*
* @author Stephan Saalfeld
*/
public interface Util {

/**
* Copy the contents of a source {@link RandomAccessible} in an
* interval defined by and target {@link RandomAccessibleInterval}
* into that target {@link RandomAccessibleInterval}.
*
* @param <T>
* @param source
* @param target
*/
static <T extends Type<T>> void copy(
final RandomAccessible<? extends T> source,
final RandomAccessibleInterval<T> target) {

Views.flatIterable(Views.interval(Views.pair(source, target), target)).forEach(
pair -> pair.getB().set(pair.getA()));
}

/**
* Copy the contents of a source {@link RandomAccessible} in an
* interval defined by and target {@link RandomAccessibleInterval}
* into that target {@link RandomAccessibleInterval}.
*
* @param <T>
* @param source
* @param target
*/
static <T extends RealType<T>, S extends RealType<S>> void copyReal(
final RandomAccessible<? extends T> source,
final RandomAccessibleInterval<? extends S> target) {

Views.flatIterable(Views.interval(Views.pair(source, target), target)).forEach(
pair -> pair.getB().setReal(pair.getA().getRealDouble()));
}

/**
* Materialize the first 2D slice of a {@link RandomAccessibleInterval}
* of {@link FloatType} into a new ImageJ {@link FloatProcessor}.
*
* @param source
* @return
*/
static FloatProcessor materialize(final RandomAccessibleInterval<FloatType> source) {
final FloatProcessor target = new FloatProcessor((int) source.dimension(0), (int) source.dimension(1));
Util.copy(
Views.zeroMin(source),
ArrayImgs.floats(
(float[]) target.getPixels(),
target.getWidth(),
target.getHeight()));
return target;
}
}
}
79 changes: 32 additions & 47 deletions src/main/java/net/haesleinhuepf/clijx/faclonheavy/CLIJxPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,43 @@

import net.haesleinhuepf.clij.CLIJ;
import net.haesleinhuepf.clijx.CLIJx;
import org.jocl.CL;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;

/**
* The CLIJxPool holds instances of CLIJx allowing to execute operations on multiple OpenCL devices / GPUs at a time.
*/
public class CLIJxPool {
CLIJx[] pool;
boolean[] idle;

final ArrayBlockingQueue<CLIJx> pool;
final int size;
final CLIJx[] allInstances; // For logging only

public CLIJxPool(int[] device_indices, int[] number_of_instances_per_clij) {
int sum = 0;
for (int v : number_of_instances_per_clij) {
sum = sum + v;
}
pool = new CLIJx[sum];
idle = new boolean[pool.length];
size = sum;
pool = new ArrayBlockingQueue<>(size, true);
allInstances = new CLIJx[size];

int count = 0;
for (int i = 0; i < device_indices.length; i++) {
for (int j = 0; j < number_of_instances_per_clij[i]; j++) {
pool[count] = new CLIJx(new CLIJ(device_indices[i]));
idle[count] = true;
count ++;
CLIJx clijx = new CLIJx(new CLIJ(device_indices[i]));
pool.add(clijx);
allInstances[count] = clijx;
count++;
}
}
}

public static CLIJxPool fromDeviceNames(String[] device_names, int[] number_of_instances_per_clij) {
ArrayList<Integer> index_list = new ArrayList();
ArrayList<Integer> instance_count_list = new ArrayList();
List<Integer> index_list = new ArrayList<>();
List<Integer> instance_count_list = new ArrayList<>();

int index = 0;
for (String name : CLIJ.getAvailableDeviceNames()) {
Expand Down Expand Up @@ -63,26 +66,19 @@ public static CLIJxPool fromDeviceNames(String[] device_names, int[] number_of_i

public static CLIJxPool fullPool() {
return CLIJxPool.fromDeviceNames(
new String[]{"UHD", "gfx9", "mx", "1070", "2060", "2070", "2080"},
new int[] { 1, 1, 1, 1, 2, 2, 4}
);
}

public static CLIJxPool powerPool() {
return CLIJxPool.fromDeviceNames(
new String[]{"1070", "2060", "2070", "2080"},
new int[] { 1, 2, 2, 4}
new String[]{"Iris", "UHD", "gfx9", "mx", "1070", "2060", "2070", "2080"},
new int[] { 1, 1, 1, 1, 1, 2, 2, 4}
);
}

public int size() {
return pool.length;
return size;
}

public String log() {
StringBuilder text = new StringBuilder();
for (int i = 0; i < pool.length; i ++ ) {
text.append(" * " + pool[i].getGPUName() + "[" + pool[i] + "]" + "\n");
for (int i = 0; i < size; i ++ ) {
text.append(" * ").append(allInstances[i].getGPUName()).append("[").append(allInstances[i]).append("]").append("\n");
}
return text.toString();
}
Expand All @@ -91,23 +87,12 @@ public String log() {
* Select a CLIJx instance that's idle at the moment, mark it as busy and return it.
* @return a clijx instance
*/
public synchronized CLIJx getIdleCLIJx() {
while (true) {
for (int i = 0; i < idle.length; i++) {
if (idle[i]) {
idle[i] = false;
return pool[i];
}
}

// if none is idle, wait a bit and continue checking again.
// Todo: This is a potential endless loop. Fix this.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
public CLIJx getIdleCLIJx() {
// Catch the exception inside to avoid breaking change. Is it a good idea ?
try {
return pool.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Expand All @@ -116,13 +101,13 @@ public synchronized CLIJx getIdleCLIJx() {
*
* @param clijx
*/
public void setCLIJxIdle(CLIJx clijx) {
public void setCLIJxIdle(CLIJx clijx, boolean clear) {
// clean up that instance before another thread can use it.
clijx.clear();
for (int i = 0; i < idle.length; i++) {
if (pool[i] == clijx) {
idle[i] = true;
}
}
if (clear) clijx.clear();
pool.add(clijx);
}

public void setCLIJxIdle(CLIJx clijx) {
setCLIJxIdle(clijx, true);
}
}
Loading