Skip to content

Commit

Permalink
Merge pull request #1830 from player-03/old_thread_classes
Browse files Browse the repository at this point in the history
Restore old `Future` and `BackgroundWorker` behavior.
  • Loading branch information
player-03 authored Aug 16, 2024
2 parents 8f631fe + 6873ae1 commit 31700a0
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 170 deletions.
220 changes: 57 additions & 163 deletions src/lime/app/Future.hx
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,36 @@ import lime.utils.Log;
@:noCompletion private var __progressListeners:Array<Int->Int->Void>;

/**
@param work Deprecated; use `Future.withEventualValue()` instead.
@param useThreads Deprecated; use `Future.withEventualValue()` instead.
@param work Optional: a function to compute this future's value.
@param useThreads Whether to run `work` on a background thread, where supported.
If false or if this isn't a system target, it will run immediately on the main thread.
**/
public function new(work:WorkFunction<Void->T> = null, useThreads:Bool = false)
public function new(work:Void->T = null, useThreads:Bool = false)
{
if (work != null)
{
var promise = new Promise<T>();
promise.future = this;

#if (lime_threads && html5)
#if (lime_threads && !html5)
if (useThreads)
{
work.makePortable();
var promise = new Promise<T>();
promise.future = this;

FutureWork.run(work, promise);
}
else
#end

FutureWork.run(dispatchWorkFunction, work, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED, true);
{
try
{
value = work();
isComplete = true;
}
catch (e:Dynamic)
{
error = e;
isError = true;
}
}
}
}

Expand Down Expand Up @@ -189,41 +201,30 @@ import lime.utils.Log;
**/
public function ready(waitTime:Int = -1):Future<T>
{
#if (lime_threads && !html5)
if (isComplete || isError)
{
return this;
}
else
{
var time = System.getTimer();
var prevTime = time;
var end = time + waitTime;

while (!isComplete && !isError && time <= end)
while (!isComplete && !isError && time <= end && FutureWork.activeJobs > 0)
{
if (FutureWork.activeJobs < 1)
{
Log.error('Cannot block for a Future without a "work" function.');
return this;
}
#if sys
Sys.sleep(0.01);
#end

if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0)
{
@:privateAccess FutureWork.singleThreadPool.__update(time - prevTime);
}
else
{
#if sys
Sys.sleep(0.01);
#end
}

prevTime = time;
time = System.getTimer();
}

return this;
}
#else
return this;
#end
}

/**
Expand Down Expand Up @@ -305,41 +306,9 @@ import lime.utils.Log;
future.value = value;
return future;
}

/**
Creates a `Future` instance which will asynchronously compute a value.
Once `work()` returns a non-null value, the `Future` will finish with that value.
If `work()` throws an error, the `Future` will finish with that error instead.
@param work A function that computes a value of type `T`.
@param state An argument to pass to `work()`. As this may be used on another thread, the
main thread must not access or modify `state` until the `Future` finishes.
@param mode Whether to use real threads as opposed to green threads. Green threads rely
on cooperative multitasking, meaning `work()` must return periodically to allow other code
enough time to run. In these cases, `work()` should return null to signal that it isn't finished.
@return A new `Future` instance.
@see https://en.wikipedia.org/wiki/Cooperative_multitasking
**/
public static function withEventualValue<T>(work:WorkFunction<State -> Null<T>>, state:State, mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future<T>
{
var future = new Future<T>();
var promise = new Promise<T>();
promise.future = future;

FutureWork.run(work, state, promise, mode);

return future;
}

/**
(For backwards compatibility.) Dispatches the given zero-argument function.
**/
@:noCompletion private static function dispatchWorkFunction<T>(work:WorkFunction<Void -> T>):Null<T>
{
return work.dispatch();
}
}

#if (lime_threads && !html5)
/**
The class that handles asynchronous `work` functions passed to `new Future()`.
**/
Expand All @@ -349,152 +318,77 @@ import lime.utils.Log;
#end
@:dox(hide) class FutureWork
{
@:allow(lime.app.Future)
private static var singleThreadPool:ThreadPool;
#if lime_threads
private static var multiThreadPool:ThreadPool;
// It isn't safe to pass a promise object to a web worker, but since it's
// `@:generic` we can't store it as `Promise<Dynamic>`. Instead, we'll store
// the two methods we need.
private static var promises:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic}> = new Map();
#end
private static var threadPool:ThreadPool;
private static var promises:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic}>;

public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;

private static function getPool(mode:ThreadMode):ThreadPool
{
#if lime_threads
if (mode == MULTI_THREADED) {
if(multiThreadPool == null) {
multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
multiThreadPool.onComplete.add(multiThreadPool_onComplete);
multiThreadPool.onError.add(multiThreadPool_onError);
}
return multiThreadPool;
}
#end
if(singleThreadPool == null) {
singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED);
singleThreadPool.onComplete.add(singleThreadPool_onComplete);
singleThreadPool.onError.add(singleThreadPool_onError);
}
return singleThreadPool;
}

@:allow(lime.app.Future)
private static function run<T>(work:WorkFunction<State->Null<T>>, state:State, promise:Promise<T>, mode:ThreadMode = MULTI_THREADED, legacyCode:Bool = false):Void
private static function run<T>(work:Void->T, promise:Promise<T>):Void
{
var bundle = {work: work, state: state, promise: promise, legacyCode: legacyCode};

#if lime_threads
if (mode == MULTI_THREADED)
{
#if html5
work.makePortable();
#end
if(threadPool == null) {
threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);

bundle.promise = null;
promises = new Map();
}
#end

var jobID:Int = getPool(mode).run(threadPool_doWork, bundle);

#if lime_threads
if (mode == MULTI_THREADED)
{
promises[jobID] = {complete: promise.complete, error: promise.error};
}
#end
var jobID:Int = threadPool.run(threadPool_doWork, work);
promises[jobID] = {complete: promise.complete, error: promise.error};
}

// Event Handlers
private static function threadPool_doWork(bundle:{work:WorkFunction<State->Dynamic>, state:State, legacyCode:Bool}, output:WorkOutput):Void
private static function threadPool_doWork(work:Void->Dynamic, output:WorkOutput):Void
{
try
{
var result = bundle.work.dispatch(bundle.state);
if (result != null || bundle.legacyCode)
{
output.sendComplete(result);
}
output.sendComplete(work());
}
catch (e:Dynamic)
{
output.sendError(e);
}
}

private static function singleThreadPool_onComplete(result:Dynamic):Void
private static function threadPool_onComplete(result:Dynamic):Void
{
singleThreadPool.activeJob.state.promise.complete(result);
}

private static function singleThreadPool_onError(error:Dynamic):Void
{
singleThreadPool.activeJob.state.promise.error(error);
}

#if lime_threads
private static function multiThreadPool_onComplete(result:Dynamic):Void
{
var promise = promises[multiThreadPool.activeJob.id];
promises.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.complete(result);
}

private static function multiThreadPool_onError(error:Dynamic):Void
private static function threadPool_onError(error:Dynamic):Void
{
var promise = promises[multiThreadPool.activeJob.id];
promises.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.error(error);
}
#end

// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
{
if (singleThreadPool != null)
if (threadPool != null)
{
singleThreadPool.minThreads = value;
threadPool.minThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.minThreads = value;
}
#end
return minThreads = value;
}

@:noCompletion private static inline function set_maxThreads(value:Int):Int
{
if (singleThreadPool != null)
if (threadPool != null)
{
singleThreadPool.maxThreads = value;
threadPool.maxThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.maxThreads = value;
}
#end
return maxThreads = value;
}

@:noCompletion private static function get_activeJobs():Int
@:noCompletion private static inline function get_activeJobs():Int
{
var sum:Int = 0;
if (singleThreadPool != null)
{
sum += singleThreadPool.activeJobs;
}
#if lime_threads
if (multiThreadPool != null)
{
sum += multiThreadPool.activeJobs;
}
#end
return sum;
return threadPool != null ? threadPool.activeJobs : 0;
}
}
#end
2 changes: 1 addition & 1 deletion src/lime/graphics/Image.hx
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ class Image

return promise.future;
#else
return Future.withEventualValue(fromBytes, bytes, MULTI_THREADED);
return new Future(fromBytes.bind(bytes), true);
#end
}

Expand Down
2 changes: 1 addition & 1 deletion src/lime/media/AudioBuffer.hx
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class AudioBuffer

return promise.future;
#else
return Future.withEventualValue(fromFiles, paths, MULTI_THREADED);
return new Future(fromFiles.bind(paths), true);
#end
}

Expand Down
Loading

0 comments on commit 31700a0

Please sign in to comment.