Skip to content

Commit

Permalink
Implement Promise.completeAsync().
Browse files Browse the repository at this point in the history
Compared to `new Future()`, this offers:

- The option to run on the main thread.
- The option to send progress events.
- The ability to interrupt the job by calling `promise.error()`.
  • Loading branch information
player-03 committed Dec 2, 2024
1 parent 4e39bf5 commit ab1cc93
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 19 deletions.
37 changes: 30 additions & 7 deletions src/lime/app/Future.hx
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import lime.utils.Log;
var promise = new Promise<T>();
promise.future = this;

FutureWork.run(work, promise);
FutureWork.runSimpleJob(work, promise);
}
else
#end
Expand Down Expand Up @@ -308,7 +308,6 @@ import lime.utils.Log;
}
}

#if (lime_threads && !html5)
/**
The class that handles asynchronous `work` functions passed to `new Future()`.
**/
Expand All @@ -319,26 +318,42 @@ import lime.utils.Log;
@:dox(hide) class FutureWork
{
private static var threadPool:ThreadPool;
private static var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic}>;
private static var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic, progress:Int->Int->Dynamic}>;

public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;

@:allow(lime.app.Promise)
private static inline function cancelJob(id:Int):Void
{
threadPool.cancelJob(id);
}

#if (lime_threads && !html5)
@:allow(lime.app.Future)
private static function run<T>(work:Void->T, promise:Promise<T>):Void
private static function runSimpleJob<T>(work:Void->T, promise:Promise<T>):Void
{
run(threadPool_doWork, promise, work, MULTI_THREADED);
}
#end

@:allow(lime.app.Promise)
private static function run<T>(work:WorkFunction<State->WorkOutput->Void>, promise:Promise<T>, state:State, mode:ThreadMode):Int
{
if (threadPool == null)
{
threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);
threadPool.onProgress.add(threadPool_onProgress);

promises = new Map();
}

var jobID:Int = threadPool.run(threadPool_doWork, work);
promises[jobID] = {complete: promise.complete, error: promise.error};
var jobID:Int = threadPool.run(work, state, mode);
promises[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
return jobID;
}

// Event Handlers
Expand Down Expand Up @@ -368,6 +383,15 @@ import lime.utils.Log;
promise.error(error);
}

private static function threadPool_onProgress(progress:{progress:Int, total:Int}):Void
{
// ThreadPool doesn't enforce types, so check manually
if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt)
{
promises[threadPool.activeJob.id].progress(progress.progress, progress.total);
}
}

// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
{
Expand All @@ -392,4 +416,3 @@ import lime.utils.Log;
return threadPool != null ? threadPool.activeJobs : 0;
}
}
#end
100 changes: 88 additions & 12 deletions src/lime/app/Promise.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package lime.app;

import lime.app.Future;
import lime.system.ThreadPool;
import lime.system.WorkOutput;

/**
`Promise` is an implementation of Futures and Promises, with the exception that
in addition to "success" and "failure" states (represented as "complete" and "error"),
Expand All @@ -10,18 +14,20 @@ package lime.app;
for recipients of it's `Future` object. For example:
```haxe
function examplePromise ():Future<String> {
var promise = new Promise<String> ();
function examplePromise():Future<String>
{
var promise = new Promise<String>();
var progress = 0, total = 10;
var timer = new Timer (100);
timer.run = function () {
var timer = new Timer(100);
timer.run = function()
{
promise.progress (progress, total);
progress++;
if (progress == total) {
if (progress == total)
{
promise.complete ("Done!");
timer.stop ();
Expand All @@ -31,12 +37,11 @@ package lime.app;
};
return promise.future;
}
var future = examplePromise ();
future.onComplete (function (message) { trace (message); });
future.onProgress (function (loaded, total) { trace ("Progress: " + loaded + ", " + total); });
var future = examplePromise();
future.onComplete(function(message) { trace(message); });
future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); });
```
**/
#if !lime_debug
Expand Down Expand Up @@ -69,6 +74,8 @@ class Promise<T>
**/
public var isError(get, null):Bool;

private var jobID:Int = -1;

#if commonjs
private static function __init__()
{
Expand Down Expand Up @@ -96,11 +103,23 @@ class Promise<T>
**/
public function complete(data:T):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(complete.bind(data));
return this;
}

if (!future.isError)
{
future.isComplete = true;
future.value = data;

if (jobID != -1)
{
FutureWork.cancelJob(jobID);
jobID = -1;
}

if (future.__completeListeners != null)
{
for (listener in future.__completeListeners)
Expand All @@ -115,6 +134,45 @@ class Promise<T>
return this;
}

/**
Runs the given function asynchronously, and resolves this `Promise` with
the complete, error, and/or progress events sent by that function.
Sample usage:
```haxe
function examplePromise():Future<String>
{
var promise = new Promise<String>();
promise.completeAsync(function(state:State, output:WorkOutput):Void
{
output.sendProgress({progress:state.progress, total:10});
state.progress++;
if (state.progress == 10)
{
output.sendComplete("Done!");
}
},
{progress: 0}, MULTI_THREADED);
return promise.future;
}
var future = examplePromise();
future.onComplete(function(message) { trace(message); });
future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); });
```
@param doWork A function to perform work asynchronously. For best results,
see the guidelines in the `ThreadPool` class overview.
@param state The value to pass to `doWork`.
@param mode Which mode to run the job in: `SINGLE_THREADED` or `MULTI_THREADED`.
**/
public function completeAsync(doWork:WorkFunction<State->WorkOutput->Void>, ?state:State, ?mode:ThreadMode = MULTI_THREADED):Void
{
jobID = FutureWork.run(doWork, this, state, mode);
}

/**
Resolves this `Promise` with the complete, error and/or progress state
of another `Future`
Expand All @@ -137,11 +195,23 @@ class Promise<T>
**/
public function error(msg:Dynamic):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(error.bind(msg));
return this;
}

if (!future.isComplete)
{
future.isError = true;
future.error = msg;

if (jobID != -1)
{
FutureWork.cancelJob(jobID);
jobID = -1;
}

if (future.__errorListeners != null)
{
for (listener in future.__errorListeners)
Expand All @@ -164,6 +234,12 @@ class Promise<T>
**/
public function progress(progress:Int, total:Int):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(this.progress.bind(progress, total));
return this;
}

if (!future.isError && !future.isComplete)
{
if (future.__progressListeners != null)
Expand All @@ -179,12 +255,12 @@ class Promise<T>
}

// Get & Set Methods
@:noCompletion private function get_isComplete():Bool
@:noCompletion private inline function get_isComplete():Bool
{
return future.isComplete;
}

@:noCompletion private function get_isError():Bool
@:noCompletion private inline function get_isError():Bool
{
return future.isError;
}
Expand Down
4 changes: 4 additions & 0 deletions src/lime/system/WorkOutput.hx
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ class JobData
private inline function new(doWork:WorkFunction<State->WorkOutput->Void>, state:State, ?id:Int)
{
this.id = id != null ? id : nextID++;
if (this.id == -1)
{
throw "All job IDs have been used!";
}
this.doWork = doWork;
this.state = state;
}
Expand Down

0 comments on commit ab1cc93

Please sign in to comment.