Skip to content

Commit

Permalink
Reduce duplicate code in FutureWork.
Browse files Browse the repository at this point in the history
  • Loading branch information
player-03 committed Aug 12, 2024
1 parent 76ccda8 commit dba3339
Showing 1 changed file with 56 additions and 108 deletions.
164 changes: 56 additions & 108 deletions src/lime/app/Future.hx
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ import lime.utils.Log;
}
#end

FutureWork.run(dispatchWorkFunction, doWork, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED);
FutureWork.forMode(useThreads ? MULTI_THREADED : SINGLE_THREADED)
.run(dispatchWorkFunction, doWork, promise);
}
}

Expand Down Expand Up @@ -201,15 +202,15 @@ import lime.utils.Log;

while (!isComplete && !isError && time <= end)
{
if (FutureWork.activeJobs < 1)
if (FutureWork.totalActiveJobs < 1)
{
Log.error('Cannot block for a Future without a "doWork" function.');
return this;
}

if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0)
if (FutureWork.singleThread.activeJobs > 0)
{
@:privateAccess FutureWork.singleThreadPool.__update(time - prevTime);
@:privateAccess FutureWork.singleThread.threadPool.__update(time - prevTime);
}
else
{
Expand Down Expand Up @@ -336,7 +337,7 @@ import lime.utils.Log;
var promise = new Promise<T>();
promise.future = future;

FutureWork.run(doWork, state, promise, mode);
FutureWork.forMode(mode).run(doWork, state, promise);

return future;
}
Expand Down Expand Up @@ -387,150 +388,97 @@ enum FutureStatus<T>
#end
@:dox(hide) class FutureWork
{
@:allow(lime.app.Future)
private static var singleThreadPool:ThreadPool;
private static var promisesSingle:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic, progress:Int -> Int -> Dynamic}> = new Map();
public static var singleThread(default, null):FutureWork = new FutureWork(SINGLE_THREADED);
#if lime_threads
private static var multiThreadPool:ThreadPool;
// It isn't safe to pass a promise object to a web worker, but since it's
// `@:generic` we can't store it as `Promise<Dynamic>`. Instead, we'll store
// the methods we need.
private static var promisesMulti:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic, progress:Int -> Int -> Dynamic}> = new Map();
public static var multiThread(default, null):FutureWork = new FutureWork(MULTI_THREADED);
#end
public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;

private static function getPool(mode:ThreadMode):ThreadPool
public static var totalActiveJobs(get, never):Int;
private static inline function get_totalActiveJobs():Int
{
#if lime_threads
if (mode == MULTI_THREADED) {
if(multiThreadPool == null) {
multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
multiThreadPool.onComplete.add(multiThreadPool_onComplete);
multiThreadPool.onError.add(multiThreadPool_onError);
multiThreadPool.onProgress.add(multiThreadPool_onProgress);
}
return multiThreadPool;
}
#end
if(singleThreadPool == null) {
singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED);
singleThreadPool.onComplete.add(singleThreadPool_onComplete);
singleThreadPool.onError.add(singleThreadPool_onError);
singleThreadPool.onProgress.add(singleThreadPool_onProgress);
}
return singleThreadPool;
return singleThread.activeJobs #if lime_threads + multiThread.activeJobs #end;
}

@:allow(lime.app.Future)
private static function run<T>(doWork:WorkFunction<State->WorkOutput->Void>, state:State, promise:Promise<T>, mode:ThreadMode = MULTI_THREADED):Void
private static function forMode(mode:ThreadMode):FutureWork
{
var jobID:Int = getPool(mode).run(doWork, state);

#if lime_threads
if (mode == MULTI_THREADED)
{
promisesMulti[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
if (mode == MULTI_THREADED) {
return multiThread;
}
else
#end
{
promisesSingle[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
}
return singleThread;
}

// Event Handlers
private static function singleThreadPool_onComplete(result:Dynamic):Void
{
var promise = promisesSingle[singleThreadPool.activeJob.id];
promisesSingle.remove(singleThreadPool.activeJob.id);
promise.complete(result);
}
private var threadPool:ThreadPool;

// Because `Promise` is `@:generic`, we can't always store it as `Promise<Dynamic>`.
// Instead, we'll store the specific methods we need.
private var promises:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic, progress:Int -> Int -> Dynamic}> = new Map();

public var minThreads(get, set):Int;
public var maxThreads(get, set):Int;
public var activeJobs(get, never):Int;

private static function singleThreadPool_onError(error:Dynamic):Void
private function new(mode:ThreadMode)
{
var promise = promisesSingle[singleThreadPool.activeJob.id];
promisesSingle.remove(singleThreadPool.activeJob.id);
promise.error(error);
threadPool = new ThreadPool(mode);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);
threadPool.onProgress.add(threadPool_onProgress);
}

private static function singleThreadPool_onProgress(progress:{progress:Int, total:Int}):Void
@:allow(lime.app.Future)
private function run<T>(doWork:WorkFunction<State->WorkOutput->Void>, state:State, promise:Promise<T>):Void
{
if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt)
{
promisesSingle[singleThreadPool.activeJob.id].progress(progress.progress, progress.total);
}
var jobID:Int = threadPool.run(doWork, state);
promises[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
}

#if lime_threads
private static function multiThreadPool_onComplete(result:Dynamic):Void
// Event Handlers
private function threadPool_onComplete(result:Dynamic):Void
{
var promise = promisesMulti[multiThreadPool.activeJob.id];
promisesMulti.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.complete(result);
}

private static function multiThreadPool_onError(error:Dynamic):Void
private function threadPool_onError(error:Dynamic):Void
{
var promise = promisesMulti[multiThreadPool.activeJob.id];
promisesMulti.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.error(error);
}

private static function multiThreadPool_onProgress(progress:{progress:Int, total:Int}):Void
private function threadPool_onProgress(progress:Dynamic):Void
{
if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt)
{
promisesMulti[multiThreadPool.activeJob.id].progress(progress.progress, progress.total);
promises[threadPool.activeJob.id].progress(progress.progress, progress.total);
}
}
#end

// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
private inline function get_minThreads():Int
{
if (singleThreadPool != null)
{
singleThreadPool.minThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.minThreads = value;
}
#end
return minThreads = value;
return threadPool.minThreads;
}
private inline function set_minThreads(value:Int):Int
{
return threadPool.minThreads = value;
}

@:noCompletion private static inline function set_maxThreads(value:Int):Int
private inline function get_maxThreads():Int
{
if (singleThreadPool != null)
{
singleThreadPool.maxThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.maxThreads = value;
}
#end
return maxThreads = value;
return threadPool.maxThreads;
}
private inline function set_maxThreads(value:Int):Int
{
return threadPool.maxThreads = value;
}

@:noCompletion private static function get_activeJobs():Int
private inline function get_activeJobs():Int
{
var sum:Int = 0;
if (singleThreadPool != null)
{
sum += singleThreadPool.activeJobs;
}
#if lime_threads
if (multiThreadPool != null)
{
sum += multiThreadPool.activeJobs;
}
#end
return sum;
return threadPool.activeJobs;
}
}

0 comments on commit dba3339

Please sign in to comment.