From 2cef1f8f22a019853f1d8b0785c7b9335d2761ac Mon Sep 17 00:00:00 2001 From: Samuel Mimram Date: Mon, 4 Nov 2019 15:25:05 +0100 Subject: [PATCH] Cleanup thread functions. (#1019) * First stab. * Second stab. * Replace add_timeout by thread.run(.delay). * Fix tests. * Replace exec_at by thread.when. * Changelog entry. * Forgot the main file... * Move mutexify. * Rename mutexify to thread.mutexify. * Doc. * Doc. --- CHANGES.md | 3 + libs/deprecations.liq | 23 +++++++- libs/flows.liq | 8 +-- libs/hls.liq | 2 +- libs/pervasives.liq | 1 + libs/thread.liq | 36 ++++++++++++ libs/utils.liq | 39 +++---------- scripts/tests/115-1.liq | 4 +- scripts/tests/115-2.liq | 5 +- scripts/tests/BUG403.liq | 4 +- scripts/tests/LS268.liq | 7 ++- scripts/tests/LS354-1.liq | 4 +- scripts/tests/LS354-2.liq | 6 +- scripts/tests/LS460.liq | 4 +- scripts/tests/LS556.liq | 2 +- scripts/tests/Makefile | 3 +- scripts/tests/fixme/sequence-2.liq | 2 +- src/Makefile | 4 +- src/lang/builtins_thread.ml | 88 ++++++++++++++++++++++++++++++ src/lang/lang_builtins.ml | 65 ---------------------- src/tools/tutils.ml | 6 +- 21 files changed, 187 insertions(+), 129 deletions(-) create mode 100644 libs/thread.liq mode change 100644 => 100755 scripts/tests/115-1.liq mode change 100644 => 100755 scripts/tests/LS268.liq create mode 100644 src/lang/builtins_thread.ml diff --git a/CHANGES.md b/CHANGES.md index d0346d48e6..835cd8e5a3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -25,6 +25,9 @@ Changed: - Changed `request.queue` into a Liquidsoap implementation (#1013). - Removed `request.equeue`, such a feature could be re-implemented in Liquidsoap, see `request.queue`. +- Renamed `add_timeout` to `thread.run.recurrent`, added `thread.run` variant, + renamed `exec_at` to `thread.when` and renamed `mutexify` to `thread.mutexify` + (#1019). Fixed: diff --git a/libs/deprecations.liq b/libs/deprecations.liq index 287fedd013..b3459987fe 100644 --- a/libs/deprecations.liq +++ b/libs/deprecations.liq @@ -35,4 +35,25 @@ end def quote(s) = deprecated("quote", "string.quote") string.quote(s) -end \ No newline at end of file +end + +# Deprecated: this function has been replaced by thread.run.recurrent. +# @flag hidden +def add_timeout(~fast=true, delay, f) + deprecated("add_timeout", "thread.run.recurrent") + thread.run.recurrent(fast=fast, delay=delay, f) +end + +# Deprecated: this function has been replaced by thread.when. +# @flag hidden +def exec_at(~freq=1., ~pred, f) + deprecated("exec_at", "thread.when") + thread.when(every=freq, pred, f) +end + +# Deprecated: this function has been replaced by thread.mutexify. +# @flag hidden +def mutexify(f) + deprecated("mutexify", "thread.mutexify") + thread.mutexify(f) +end diff --git a/libs/flows.liq b/libs/flows.liq index e4c5d18ab0..6b1c3696af 100644 --- a/libs/flows.liq +++ b/libs/flows.liq @@ -81,7 +81,7 @@ def register_flow(~server="",~user="default",~password="default", ignore(request(cmd="ping radio",params=[])) ping_period end - add_timeout(fast=false,ping_period,ping) + thread.run.recurrent(fast=false, delay=ping_period, ping) # Register streams def register_stream(format_url) @@ -97,11 +97,7 @@ def register_flow(~server="",~user="default",~password="default", artist = m["artist"] title = m["title"] params = [("m_title",title),("m_artist",artist)] - def update_song() = - request(cmd="metadata",params=params) - (-1.) - end - add_timeout(fast=false,0.,update_song) + thread.run(fast=false, {request(cmd="metadata",params=params)}) end on_metadata(metadata,s) end diff --git a/libs/hls.liq b/libs/hls.liq index 45e2303b09..b1d89cb702 100644 --- a/libs/hls.liq +++ b/libs/hls.liq @@ -83,7 +83,7 @@ def input.hls(~id="",~reload=10.,uri) source = merge_tracks(source) - add_timeout(0., load_playlist) + thread.run.recurrent(load_playlist) source end diff --git a/libs/pervasives.liq b/libs/pervasives.liq index d13b397fc7..119fd87f25 100644 --- a/libs/pervasives.liq +++ b/libs/pervasives.liq @@ -1,6 +1,7 @@ %include "audio.liq" %include "ref.liq" %include "list.liq" +%include "thread.liq" %include "utils.liq" %include "string.liq" %include "metadata.liq" diff --git a/libs/thread.liq b/libs/thread.liq new file mode 100644 index 0000000000..62e0351ac2 --- /dev/null +++ b/libs/thread.liq @@ -0,0 +1,36 @@ +# TODO: we cannot easly define recursive functions with optional arguments for +# now, see #1018. +def rec thread.run.recurrent(~fast, ~delay, f) + def f () + delay = f () + if delay >= 0. then thread.run.recurrent(fast=fast, delay=delay, f) end + end + thread.run(fast=fast, delay=delay, f) +end + +# Run a recurrent function in a separate thread. +# @category Control +# @param ~fast Whether the thread is supposed to return quickly or not. Typically, blocking tasks (e.g. fetching data over the internet) should not be considered to be fast. When set to `false` its priority will be lowered below that of request resolutions and fast timeouts. This is only effective if you set a dedicated queue for fast tasks, see the "scheduler" settings for more details. +# @param ~delay Delay (in sec.) after which the thread should be lauched. +# @param f Function to execute. +def thread.run.recurrent(~fast=true, ~delay=0., f) + thread.run.recurrent(fast=fast, delay=delay, f) +end + +# Execute a given action when a predicate is true. +# @category System +# @param ~every How often (in sec.) to check for the predicate. +# @param ~once Execute the function only once. +# @param ~changed Execute the function only if the predicate was false when last checked. +# @param p Predicate indicating when to execute the function, typically a time interval such as `{10h-10h30}`. +# @param f Function to execute when the predicate is true. +def thread.when(~every=1., ~once=false, ~changed=true, p, f) + last = ref(false) + def check() + b = p() + if b and not (changed and !last) then f() end + last := b + if b and once then (-1.) else every end + end + thread.run.recurrent(delay=0., check) +end diff --git a/libs/utils.liq b/libs/utils.liq index d0e8fb2476..a5458d3b4e 100644 --- a/libs/utils.liq +++ b/libs/utils.liq @@ -363,24 +363,6 @@ def at(pred,s) switch([(pred,s)]) end -# Execute a given action when a predicate is true. This will be run in background. -# @category System -# @param ~every How often (in sec.) to check for the predicate. -# @param ~once Execute the function only once. -# @param ~changed Execute the function only if the predicate was false when last checked. -# @param p Predicate indicating when to execute the function, typically a time interval such as `{10h-10h30}`. -# @param f Function to execute when the predicate is true. -def exec_at(~every=1., ~once=false, ~changed=true, p, f) - last = ref(false) - def check() - b = p() - if b and ((not changed) or !last == false) then f() end - last := b - if b and once then (-1.) else every end - end - add_timeout(0., check) -end - # Assign a new clock to the given source (and to other time-dependent sources) # and return the source. It is a conveniency wrapper around clock.assign_new(), # allowing more concise scripts in some cases. @@ -395,28 +377,25 @@ end # Create a log of clock times for all the clocks initially present. The log is # in a simple format which you can directly use with gnuplot. # @category Liquidsoap -# @param ~interval Polling interval. # @param ~delay Delay before setting up the clock logger. This should \ # be used to ensure that the logger starts only after \ # the clocks are created. -# @param unlabeled Path of the log file. -def clock.log(~delay=0.,~interval=1.,logfile) +# @param ~every Polling interval. +# @param logfile Path of the log file. +def clock.log(~delay=0., ~every=1., logfile) # Get the current clocks clocks = list.map(fst,clock.status()) # Column headers - system("echo \\# #{string.concat(separator=' ',clocks)} > #{(logfile:string)}") + ignore(file.write(data="# #{string.concat(separator=' ', clocks)}", logfile)) def report() status = clock.status() status = list.map(fun (x) -> (fst(x),string_of(snd(x))), status) status = list.map(fun (c) -> status[c], clocks) - system("echo #{string.concat(separator=' ',status)} >> #{logfile}") - interval - end - if delay<=0. then - add_timeout(interval,report) - else - add_timeout(delay,{add_timeout(interval,report) (-1.)}) + ignore(file.write(append=true, data="#{string.concat(separator=' ', status)}", logfile)) + every end + delay = if delay <= 0. then 0. else delay end + thread.run.recurrent(delay=delay, report) end # Skip track when detecting a blank. @@ -636,7 +615,7 @@ def chop(~duration=float_getter(3.),~metadata=[],s) = insert(new_track=true, metadata) duration() end - add_timeout(duration(), f) + thread.run.recurrent(delay=duration(), f) s end diff --git a/scripts/tests/115-1.liq b/scripts/tests/115-1.liq old mode 100644 new mode 100755 index 77ecac59ad..ee537edf98 --- a/scripts/tests/115-1.liq +++ b/scripts/tests/115-1.liq @@ -1,3 +1,4 @@ +#!../../src/liquidsoap ../../libs/pervasives.liq %include "test.liq" s = add(normalize=true,[ @@ -8,8 +9,7 @@ s = add(normalize=true,[ output.dummy(s) -add_timeout(3.,{ +thread.run(delay=3.,{ test.pass() shutdown() - (-1.) }) diff --git a/scripts/tests/115-2.liq b/scripts/tests/115-2.liq index 7a09823eb7..a2ade4705c 100644 --- a/scripts/tests/115-2.liq +++ b/scripts/tests/115-2.liq @@ -4,12 +4,11 @@ sa = sine() sb_on = ref false sb = switch([({!sb_on},once(sine(220.,duration=2.)))]) -add_timeout(1.,{ sb_on := true ; (-1.) }) +thread.run(delay=1., { sb_on := true }) output.dummy(smooth_add(delay=0.9,p=0.3,normal=sa,special=sb)) -add_timeout(4.,{ +thread.run(delay=4.,{ test.pass() shutdown() - (-1.) }) diff --git a/scripts/tests/BUG403.liq b/scripts/tests/BUG403.liq index 7735043b4c..4e963a0630 100644 --- a/scripts/tests/BUG403.liq +++ b/scripts/tests/BUG403.liq @@ -14,10 +14,10 @@ r = ref false pred = { v=!r ; r:=false ; v } -add_timeout(2.,{ r := true ; (-1.) }) +thread.run(delay=2., { r := true }) mixer = fallback(id="mixer", track_sensitive=false, [at(pred, sine(duration=3.)), blank()]) output.dummy(mixer) -add_timeout(3.,{ test.pass() ; shutdown() ; (-1.) }) +thread.run(delay=3., { test.pass() ; shutdown()}) diff --git a/scripts/tests/LS268.liq b/scripts/tests/LS268.liq old mode 100644 new mode 100755 index fde5fdbe4b..8f533cc8f6 --- a/scripts/tests/LS268.liq +++ b/scripts/tests/LS268.liq @@ -1,3 +1,4 @@ +#!../../src/liquidsoap ../../libs/pervasives.liq # In LS-268 we realized that an incorrect assumption had # been made in code from LS-394, resulting in a crash in # case of source re-awakening. @@ -9,6 +10,6 @@ o = output.dummy(fallible=true,p) on_shutdown(test.pass) -add_timeout(2.,{ source.shutdown(o) ; (-1.) }) -add_timeout(3.,{ output.dummy(fallible=true,p) ; (-1.) }) -add_timeout(4.,{ shutdown() ; (-1.) }) +thread.run(delay=2., { source.shutdown(o) }) +thread.run(delay=3., { ignore(output.dummy(fallible=true,p)) }) +thread.run(delay=4., { shutdown() }) diff --git a/scripts/tests/LS354-1.liq b/scripts/tests/LS354-1.liq index 3a197842b6..0309247cef 100644 --- a/scripts/tests/LS354-1.liq +++ b/scripts/tests/LS354-1.liq @@ -9,5 +9,5 @@ d = source.dynamic({ if !r then [s] else [] end }) output.dummy(mksafe(d)) -add_timeout(2.,{r:=true;(-1.)}) -add_timeout(4.,{test.fail(); shutdown(); (-1.)}) +thread.run(delay=2., {r:=true}) +thread.run(delay=4., {test.fail(); shutdown()}) diff --git a/scripts/tests/LS354-2.liq b/scripts/tests/LS354-2.liq index 0d8c73ec14..c67de1d9bd 100644 --- a/scripts/tests/LS354-2.liq +++ b/scripts/tests/LS354-2.liq @@ -12,6 +12,6 @@ d = source.dynamic({ if !r==1 then [s1] output.dummy(mksafe(d)) -add_timeout(2.,{r:=1; (-1.)}) -add_timeout(3.,{r:=2; (-1.)}) -add_timeout(5.,{test.fail(); shutdown(); (-1.)}) +thread.run(delay=2., {r:=1}) +thread.run(delay=3., {r:=2}) +thread.run(delay=5., {test.fail(); shutdown()}) diff --git a/scripts/tests/LS460.liq b/scripts/tests/LS460.liq index 386ac04498..5593c57469 100644 --- a/scripts/tests/LS460.liq +++ b/scripts/tests/LS460.liq @@ -10,10 +10,10 @@ output.dummy(id="bar",mksafe(q)) output.dummy(id="foo",fallback([amplify(1.,q),blank(duration=1.)])) def at(t,s) - add_timeout(t,{ignore(server.execute(s));(-1.)}) + thread.run(delay=t,{ignore(server.execute(s))}) end at(3.,"foo.stop") at(4.,"bar.skip") at(5.,"foo.start") -add_timeout(6.,{test.pass(); shutdown(); (-1.)}) +thread.run(delay=6.,{test.pass(); shutdown()}) diff --git a/scripts/tests/LS556.liq b/scripts/tests/LS556.liq index 1c1392e6f5..d846ff02d9 100644 --- a/scripts/tests/LS556.liq +++ b/scripts/tests/LS556.liq @@ -5,4 +5,4 @@ mono = (noise(duration=0.5):source(1,0,0)) stereo = (sine(duration=0.3):source(2,0,0)) variable = (rotate([(mono:source(*+1,0,0)),(stereo:source(*+1,0,0))]):source(*+1,0,0)) output.dummy(audio_to_stereo(variable)) -add_timeout(3.,{ test.pass() ; shutdown() ; (-1.) }) +thread.run(delay=3., { test.pass() ; shutdown() }) diff --git a/scripts/tests/Makefile b/scripts/tests/Makefile index e0a4a6bccc..84af389d07 100644 --- a/scripts/tests/Makefile +++ b/scripts/tests/Makefile @@ -13,8 +13,7 @@ test: threesines $(TESTS) # Generate samples files in threesines directory threesines: threesines/a.wav threesines/b.wav threesines/c.wav -MKSINE= ../../src/liquidsoap --no-pervasives -q \ - ../../libs/list.liq ../../libs/utils.liq \ +MKSINE= ../../src/liquidsoap -q ../../libs/pervasives.liq \ "clock.assign_new(sync=false,[ \ output.file(%wav,\"$$fname\",fallible=true,on_stop=shutdown, \ once(sine(duration=2.,$$freq)))])" diff --git a/scripts/tests/fixme/sequence-2.liq b/scripts/tests/fixme/sequence-2.liq index 9a9541f72b..a6d38211de 100644 --- a/scripts/tests/fixme/sequence-2.liq +++ b/scripts/tests/fixme/sequence-2.liq @@ -18,7 +18,7 @@ # which is bit fragile for a test. flag = ref true -add_timeout(1.,{ flag:=false ; (-1.) }) +thread.run(delay=1., { flag:=false }) on = sine() off = fail() s = source.dynamic({ if !flag then [on] else [off] end }) diff --git a/src/Makefile b/src/Makefile index 513a414864..cf02ab7724 100644 --- a/src/Makefile +++ b/src/Makefile @@ -212,8 +212,8 @@ builtins = lang/lang_builtins.ml \ lang/builtins_time.ml lang/builtins_callbacks.ml \ lang/builtins_server.ml lang/builtins_math.ml \ lang/builtins_files.ml lang/builtins_resolvers.ml \ - lang/builtins_process.ml lang/builtins_source.ml \ - lang/builtins_interactive.ml \ + lang/builtins_thread.ml lang/builtins_process.ml \ + lang/builtins_source.ml lang/builtins_interactive.ml \ $(if $(W_SSL),lang/builtins_https.ml lang/builtins_harbor_ssl.ml) \ $(if $(W_OSX_SECURE_TRANSPORT),lang/builtins_https_secure_transport.ml lang/builtins_harbor_secure_transport.ml) \ $(if $(W_LO),lang/builtins_lo.ml) \ diff --git a/src/lang/builtins_thread.ml b/src/lang/builtins_thread.ml new file mode 100644 index 0000000000..9bb604a920 --- /dev/null +++ b/src/lang/builtins_thread.ml @@ -0,0 +1,88 @@ +(***************************************************************************** + + Liquidsoap, a programmable audio stream generator. + Copyright 2003-2019 Savonet team + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details, fully stated in the COPYING + file at the root of the liquidsoap distribution. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + + *****************************************************************************) + +open Lang_builtins + +let () = + add_builtin "thread.run" ~cat:Control + [ + "fast", Lang.bool_t, Some (Lang.bool true), + Some "Whether the thread is supposed to return quickly or not. Typically, \ + blocking tasks (e.g. fetching data over the internet) should not be \ + considered to be fast. When set to `false` its priority will be \ + lowered below that of request resolutions and fast timeouts. This is \ + only effective if you set a dedicated queue for fast tasks, see the \ + \"scheduler\" settings for more details." ; + "delay", Lang.float_t, Some (Lang.float 0.), + Some "Delay (in sec.) after which the thread should be lauched."; + "", Lang.fun_t [] Lang.unit_t, None, Some "Function to execute." + ] + Lang.unit_t + ~descr:"Run a function in a separate thread." + (fun p -> + let delay = Lang.to_float (List.assoc "delay" p) in + let f = List.assoc "" p in + let priority = + if Lang.to_bool (List.assoc "fast" p) then + Tutils.Maybe_blocking + else + Tutils.Blocking + in + let task = + { Duppy.Task. + priority = priority ; + events = [`Delay delay] ; + handler = fun _ -> Lang.to_unit (Lang.apply ~t:Lang.unit_t f []); [] + } + in + Duppy.Task.add Tutils.scheduler task; + Lang.unit) + +let () = + let t = Lang.univ_t 1 in + add_builtin "thread.mutexify" ~cat:Liq + ~descr:"Protect functions with a mutex in order to avoid concurrent \ + calls. It returns the original value when the argument is not a \ + function." + ["",t,None,None] t + (fun p -> + let m = Mutex.create () in + let v = List.assoc "" p in + match v.Lang.value with + | Lang.Fun (p,args,env,body) -> + let fn (args:Lang.full_env) t = Tutils.mutexify m (fun () -> + let args = List.map (fun (x,gv) -> x, Lazy.from_val gv) args in + let env = List.rev_append args env in + let v = {v with Lang.value = + Lang.Fun ([],[],env,body)} + in + Lang.apply ~t v []) () + in + { v with Lang.value = + Lang.FFI (p, args, fn) } + | Lang.FFI (p, args, fn) -> + let fn args t = Tutils.mutexify m (fun () -> + fn args t) () + in + { v with Lang.value = + Lang.FFI (p, args, fn) } + | _ -> v) diff --git a/src/lang/lang_builtins.ml b/src/lang/lang_builtins.ml index d3765673b4..55943d3a10 100644 --- a/src/lang/lang_builtins.ml +++ b/src/lang/lang_builtins.ml @@ -324,42 +324,6 @@ let () = (** Misc control/system functions. *) -let () = - add_builtin "add_timeout" ~cat:Control - [ "fast", Lang.bool_t, Some (Lang.bool true), - Some - "Set to `false` if the execution of the code can take long \ - in order to lower its priority below that of request resolutions and \ - fast timeouts. \ - This is only effective if you set a dedicated queue for fast tasks, \ - see the \"scheduler\" settings for more details." ; - "",Lang.float_t,None,None ; - "",Lang.fun_t [] Lang.float_t,None,None ] - Lang.unit_t - ~descr:"Call a function in N seconds. \ - If the result of the function is positive or null, the \ - task will be scheduled again after this amount of time (in seconds)." - (fun p -> - let d = Lang.to_float (Lang.assoc "" 1 p) in - let f = Lang.assoc "" 2 p in - let priority = - if Lang.to_bool (List.assoc "fast" p) then - Tutils.Maybe_blocking - else - Tutils.Blocking - in - let rec t d = - { Duppy.Task. - priority = priority ; - events = [`Delay d] ; - handler = - fun _ -> - let d = Lang.to_float (Lang.apply ~t:Lang.float_t f []) in - if d >= 0. then [t d] else [] } - in - Duppy.Task.add Tutils.scheduler (t d); - Lang.unit) - let () = let descr = "Execute a liquidsoap server command." in let cat = Liq in @@ -454,35 +418,6 @@ let () = Gc.full_major () ; Lang.unit) -let () = - let t = Lang.univ_t 1 in - add_builtin "mutexify" ~cat:Liq - ~descr:"Protect functions with a mutex to avoid concurrent calls, \ - return original value otherwise." - ["",t,None,None] t - (fun p -> - let m = Mutex.create () in - let v = List.assoc "" p in - match v.Lang.value with - | Lang.Fun (p,args,env,body) -> - let fn (args:Lang.full_env) t = Tutils.mutexify m (fun () -> - let args = List.map (fun (x,gv) -> x, Lazy.from_val gv) args in - let env = List.rev_append args env in - let v = {v with Lang.value = - Lang.Fun ([],[],env,body)} - in - Lang.apply ~t v []) () - in - { v with Lang.value = - Lang.FFI (p, args, fn) } - | Lang.FFI (p, args, fn) -> - let fn args t = Tutils.mutexify m (fun () -> - fn args t) () - in - { v with Lang.value = - Lang.FFI (p, args, fn) } - | _ -> v) - let () = add_builtin "system" ~cat:Sys ["",Lang.string_t,None,None] diff --git a/src/tools/tutils.ml b/src/tools/tutils.ml index 79ee1c2229..be94c55ca1 100644 --- a/src/tools/tutils.ml +++ b/src/tools/tutils.ml @@ -31,7 +31,7 @@ let conf_scheduler = "such as request resolution (audio file downloading and checking)." ; "Finally, \"slow\" tasks are those that are always taking a long time," ; "like last.fm submission, or user-defined tasks register via" ; - "`add_timeout()`." ; + "`thread.run`." ; "The scheduler consists in a number of queues that process incoming" ; "tasks. Some queues might only process some kinds of tasks so that" ; "they are more responsive." ; @@ -54,9 +54,9 @@ let fast_queues = ~comments:[ "Number of queues that are dedicated to fast tasks." ; "It might be useful to create some if your request resolutions," ; - "or some user defined tasks (cf. `add_timeout()`), are" ; + "or some user defined tasks (cf `thread.run`), are" ; "delayed too much because of slow tasks blocking the generic queues," ; - "such as last.fm submissions or slow `add_timeout` handlers." + "such as last.fm submissions or slow `thread.run` handlers." ] let non_blocking_queues = Dtools.Conf.int ~p:(conf_scheduler#plug "non_blocking_queues") ~d:2