diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index 780a243b08..d409dced8e 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -67,24 +67,36 @@ import lime.utils.Log; @:noCompletion private var __progressListeners:ArrayInt->Void>; /** - @param work Deprecated; use `Future.withEventualValue()` instead. - @param useThreads Deprecated; use `Future.withEventualValue()` instead. + @param work Optional: a function to compute this future's value. + @param useThreads Whether to run `work` on a background thread, where supported. + If false or if this isn't a system target, it will run immediately on the main thread. **/ - public function new(work:WorkFunctionT> = null, useThreads:Bool = false) + public function new(work:Void->T = null, useThreads:Bool = false) { if (work != null) { - var promise = new Promise(); - promise.future = this; - - #if (lime_threads && html5) + #if (lime_threads && !html5) if (useThreads) { - work.makePortable(); + var promise = new Promise(); + promise.future = this; + + FutureWork.run(work, promise); } + else #end - - FutureWork.run(dispatchWorkFunction, work, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED, true); + { + try + { + value = work(); + isComplete = true; + } + catch (e:Dynamic) + { + error = e; + isError = true; + } + } } } @@ -189,6 +201,7 @@ import lime.utils.Log; **/ public function ready(waitTime:Int = -1):Future { + #if (lime_threads && !html5) if (isComplete || isError) { return this; @@ -196,34 +209,22 @@ import lime.utils.Log; else { var time = System.getTimer(); - var prevTime = time; var end = time + waitTime; - while (!isComplete && !isError && time <= end) + while (!isComplete && !isError && time <= end && FutureWork.activeJobs > 0) { - if (FutureWork.activeJobs < 1) - { - Log.error('Cannot block for a Future without a "work" function.'); - return this; - } + #if sys + Sys.sleep(0.01); + #end - if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0) - { - @:privateAccess FutureWork.singleThreadPool.__update(time - prevTime); - } - else - { - #if sys - Sys.sleep(0.01); - #end - } - - prevTime = time; time = System.getTimer(); } return this; } + #else + return this; + #end } /** @@ -305,41 +306,9 @@ import lime.utils.Log; future.value = value; return future; } - - /** - Creates a `Future` instance which will asynchronously compute a value. - - Once `work()` returns a non-null value, the `Future` will finish with that value. - If `work()` throws an error, the `Future` will finish with that error instead. - @param work A function that computes a value of type `T`. - @param state An argument to pass to `work()`. As this may be used on another thread, the - main thread must not access or modify `state` until the `Future` finishes. - @param mode Whether to use real threads as opposed to green threads. Green threads rely - on cooperative multitasking, meaning `work()` must return periodically to allow other code - enough time to run. In these cases, `work()` should return null to signal that it isn't finished. - @return A new `Future` instance. - @see https://en.wikipedia.org/wiki/Cooperative_multitasking - **/ - public static function withEventualValue(work:WorkFunction Null>, state:State, mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future - { - var future = new Future(); - var promise = new Promise(); - promise.future = future; - - FutureWork.run(work, state, promise, mode); - - return future; - } - - /** - (For backwards compatibility.) Dispatches the given zero-argument function. - **/ - @:noCompletion private static function dispatchWorkFunction(work:WorkFunction T>):Null - { - return work.dispatch(); - } } +#if (lime_threads && !html5) /** The class that handles asynchronous `work` functions passed to `new Future()`. **/ @@ -349,75 +318,34 @@ import lime.utils.Log; #end @:dox(hide) class FutureWork { - @:allow(lime.app.Future) - private static var singleThreadPool:ThreadPool; - #if lime_threads - private static var multiThreadPool:ThreadPool; - // It isn't safe to pass a promise object to a web worker, but since it's - // `@:generic` we can't store it as `Promise`. Instead, we'll store - // the two methods we need. - private static var promises:Map Dynamic, error:Dynamic -> Dynamic}> = new Map(); - #end + private static var threadPool:ThreadPool; + private static var promises:Map Dynamic, error:Dynamic -> Dynamic}>; + public static var minThreads(default, set):Int = 0; public static var maxThreads(default, set):Int = 1; public static var activeJobs(get, never):Int; - private static function getPool(mode:ThreadMode):ThreadPool - { - #if lime_threads - if (mode == MULTI_THREADED) { - if(multiThreadPool == null) { - multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); - multiThreadPool.onComplete.add(multiThreadPool_onComplete); - multiThreadPool.onError.add(multiThreadPool_onError); - } - return multiThreadPool; - } - #end - if(singleThreadPool == null) { - singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED); - singleThreadPool.onComplete.add(singleThreadPool_onComplete); - singleThreadPool.onError.add(singleThreadPool_onError); - } - return singleThreadPool; - } - @:allow(lime.app.Future) - private static function run(work:WorkFunctionNull>, state:State, promise:Promise, mode:ThreadMode = MULTI_THREADED, legacyCode:Bool = false):Void + private static function run(work:Void->T, promise:Promise):Void { - var bundle = {work: work, state: state, promise: promise, legacyCode: legacyCode}; - - #if lime_threads - if (mode == MULTI_THREADED) - { - #if html5 - work.makePortable(); - #end + if(threadPool == null) { + threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); + threadPool.onComplete.add(threadPool_onComplete); + threadPool.onError.add(threadPool_onError); - bundle.promise = null; + promises = new Map(); } - #end - - var jobID:Int = getPool(mode).run(threadPool_doWork, bundle); - #if lime_threads - if (mode == MULTI_THREADED) - { - promises[jobID] = {complete: promise.complete, error: promise.error}; - } - #end + var jobID:Int = threadPool.run(threadPool_doWork, work); + promises[jobID] = {complete: promise.complete, error: promise.error}; } // Event Handlers - private static function threadPool_doWork(bundle:{work:WorkFunctionDynamic>, state:State, legacyCode:Bool}, output:WorkOutput):Void + private static function threadPool_doWork(work:Void->Dynamic, output:WorkOutput):Void { try { - var result = bundle.work.dispatch(bundle.state); - if (result != null || bundle.legacyCode) - { - output.sendComplete(result); - } + output.sendComplete(work()); } catch (e:Dynamic) { @@ -425,76 +353,42 @@ import lime.utils.Log; } } - private static function singleThreadPool_onComplete(result:Dynamic):Void + private static function threadPool_onComplete(result:Dynamic):Void { - singleThreadPool.activeJob.state.promise.complete(result); - } - - private static function singleThreadPool_onError(error:Dynamic):Void - { - singleThreadPool.activeJob.state.promise.error(error); - } - - #if lime_threads - private static function multiThreadPool_onComplete(result:Dynamic):Void - { - var promise = promises[multiThreadPool.activeJob.id]; - promises.remove(multiThreadPool.activeJob.id); + var promise = promises[threadPool.activeJob.id]; + promises.remove(threadPool.activeJob.id); promise.complete(result); } - private static function multiThreadPool_onError(error:Dynamic):Void + private static function threadPool_onError(error:Dynamic):Void { - var promise = promises[multiThreadPool.activeJob.id]; - promises.remove(multiThreadPool.activeJob.id); + var promise = promises[threadPool.activeJob.id]; + promises.remove(threadPool.activeJob.id); promise.error(error); } - #end // Getters & Setters @:noCompletion private static inline function set_minThreads(value:Int):Int { - if (singleThreadPool != null) + if (threadPool != null) { - singleThreadPool.minThreads = value; + threadPool.minThreads = value; } - #if lime_threads - if (multiThreadPool != null) - { - multiThreadPool.minThreads = value; - } - #end return minThreads = value; } @:noCompletion private static inline function set_maxThreads(value:Int):Int { - if (singleThreadPool != null) + if (threadPool != null) { - singleThreadPool.maxThreads = value; + threadPool.maxThreads = value; } - #if lime_threads - if (multiThreadPool != null) - { - multiThreadPool.maxThreads = value; - } - #end return maxThreads = value; } - @:noCompletion private static function get_activeJobs():Int + @:noCompletion private static inline function get_activeJobs():Int { - var sum:Int = 0; - if (singleThreadPool != null) - { - sum += singleThreadPool.activeJobs; - } - #if lime_threads - if (multiThreadPool != null) - { - sum += multiThreadPool.activeJobs; - } - #end - return sum; + return threadPool != null ? threadPool.activeJobs : 0; } } +#end diff --git a/src/lime/graphics/Image.hx b/src/lime/graphics/Image.hx index c825d9d84d..97385cdc3a 100644 --- a/src/lime/graphics/Image.hx +++ b/src/lime/graphics/Image.hx @@ -1002,7 +1002,7 @@ class Image return promise.future; #else - return Future.withEventualValue(fromBytes, bytes, MULTI_THREADED); + return new Future(fromBytes.bind(bytes), true); #end } diff --git a/src/lime/media/AudioBuffer.hx b/src/lime/media/AudioBuffer.hx index ff067229ab..967c8fbbe9 100644 --- a/src/lime/media/AudioBuffer.hx +++ b/src/lime/media/AudioBuffer.hx @@ -335,7 +335,7 @@ class AudioBuffer return promise.future; #else - return Future.withEventualValue(fromFiles, paths, MULTI_THREADED); + return new Future(fromFiles.bind(paths), true); #end } diff --git a/src/lime/system/BackgroundWorker.hx b/src/lime/system/BackgroundWorker.hx index 5be17e9eae..ba1b7340a0 100644 --- a/src/lime/system/BackgroundWorker.hx +++ b/src/lime/system/BackgroundWorker.hx @@ -1,4 +1,177 @@ package lime.system; -@:deprecated("Replace references to lime.system.BackgroundWorker with lime.system.ThreadPool. As the API is identical, no other changes are necessary.") -typedef BackgroundWorker = ThreadPool; +import lime.app.Application; +import lime.app.Event; +#if sys +#if haxe4 +import sys.thread.Deque; +import sys.thread.Thread; +#elseif cpp +import cpp.vm.Deque; +import cpp.vm.Thread; +#elseif neko +import neko.vm.Deque; +import neko.vm.Thread; +#end +#end +#if !lime_debug +@:fileXml('tags="haxe,release"') +@:noDebug +#end + +/** + A background worker executes a single function on a background thread, + allowing it to avoid blocking the main thread. However, only system targets + have thread support, meaning the function will block on any other target. + @see `ThreadPool` for improved thread safety, HTML5 threads, and more. +**/ +class BackgroundWorker +{ + private static var MESSAGE_COMPLETE = "__COMPLETE__"; + private static var MESSAGE_ERROR = "__ERROR__"; + + public var canceled(default, null):Bool; + public var completed(default, null):Bool; + public var doWork = new EventVoid>(); + public var onComplete = new EventVoid>(); + public var onError = new EventVoid>(); + public var onProgress = new EventVoid>(); + + @:noCompletion private var __runMessage:Dynamic; + #if (cpp || neko) + @:noCompletion private var __messageQueue:Deque; + @:noCompletion private var __workerThread:Thread; + #end + + public function new() {} + + public function cancel():Void + { + canceled = true; + + #if (cpp || neko) + __workerThread = null; + #end + } + + public function run(message:Dynamic = null):Void + { + canceled = false; + completed = false; + __runMessage = message; + + #if (cpp || neko) + __messageQueue = new Deque(); + __workerThread = Thread.create(__doWork); + + // TODO: Better way to do this + + if (Application.current != null) + { + Application.current.onUpdate.add(__update); + } + #else + __doWork(); + #end + } + + public function sendComplete(message:Dynamic = null):Void + { + completed = true; + + #if (cpp || neko) + __messageQueue.add(MESSAGE_COMPLETE); + __messageQueue.add(message); + #else + if (!canceled) + { + canceled = true; + onComplete.dispatch(message); + } + #end + } + + public function sendError(message:Dynamic = null):Void + { + #if (cpp || neko) + __messageQueue.add(MESSAGE_ERROR); + __messageQueue.add(message); + #else + if (!canceled) + { + canceled = true; + onError.dispatch(message); + } + #end + } + + public function sendProgress(message:Dynamic = null):Void + { + #if (cpp || neko) + __messageQueue.add(message); + #else + if (!canceled) + { + onProgress.dispatch(message); + } + #end + } + + @:noCompletion private function __doWork():Void + { + doWork.dispatch(__runMessage); + + // #if (cpp || neko) + // + // __messageQueue.add (MESSAGE_COMPLETE); + // + // #else + // + // if (!canceled) { + // + // canceled = true; + // onComplete.dispatch (null); + // + // } + // + // #end + } + + @:noCompletion private function __update(deltaTime:Int):Void + { + #if (cpp || neko) + var message = __messageQueue.pop(false); + + if (message != null) + { + if (message == MESSAGE_ERROR) + { + Application.current.onUpdate.remove(__update); + + if (!canceled) + { + canceled = true; + onError.dispatch(__messageQueue.pop(false)); + } + } + else if (message == MESSAGE_COMPLETE) + { + Application.current.onUpdate.remove(__update); + + if (!canceled) + { + canceled = true; + onComplete.dispatch(__messageQueue.pop(false)); + } + } + else + { + if (!canceled) + { + onProgress.dispatch(message); + } + } + } + #end + } +} diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index c33390a008..1f1ef98dac 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -37,9 +37,7 @@ import lime._internal.backend.html5.HTML5Thread as Thread; `WorkOutput` object it receives. Calling `output.sendComplete()` will trigger an `onComplete` event on the main thread. - @see `lime.system.WorkOutput.WorkFunction` for important information about - `doWork`. - @see https://player03.com/openfl/threads-guide/ for a tutorial. + @see `lime.system.WorkOutput.WorkFunction` for important information about `doWork`. **/ #if !lime_debug @:fileXml('tags="haxe,release"')