Skip to content

Commit

Permalink
Cleanup thread functions. (savonet#1019)
Browse files Browse the repository at this point in the history
* First stab.

* Second stab.

* Replace add_timeout by thread.run(.delay).

* Fix tests.

* Replace exec_at by thread.when.

* Changelog entry.

* Forgot the main file...

* Move mutexify.

* Rename mutexify to thread.mutexify.

* Doc.

* Doc.
  • Loading branch information
smimram authored Nov 4, 2019
1 parent e171296 commit 2cef1f8
Show file tree
Hide file tree
Showing 21 changed files with 187 additions and 129 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ Changed:
- Changed `request.queue` into a Liquidsoap implementation (#1013).
- Removed `request.equeue`, such a feature could be re-implemented in
Liquidsoap, see `request.queue`.
- Renamed `add_timeout` to `thread.run.recurrent`, added `thread.run` variant,
renamed `exec_at` to `thread.when` and renamed `mutexify` to `thread.mutexify`
(#1019).

Fixed:

Expand Down
23 changes: 22 additions & 1 deletion libs/deprecations.liq
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,25 @@ end
def quote(s) =
deprecated("quote", "string.quote")
string.quote(s)
end
end

# Deprecated: this function has been replaced by thread.run.recurrent.
# @flag hidden
def add_timeout(~fast=true, delay, f)
deprecated("add_timeout", "thread.run.recurrent")
thread.run.recurrent(fast=fast, delay=delay, f)
end

# Deprecated: this function has been replaced by thread.when.
# @flag hidden
def exec_at(~freq=1., ~pred, f)
deprecated("exec_at", "thread.when")
thread.when(every=freq, pred, f)
end

# Deprecated: this function has been replaced by thread.mutexify.
# @flag hidden
def mutexify(f)
deprecated("mutexify", "thread.mutexify")
thread.mutexify(f)
end
8 changes: 2 additions & 6 deletions libs/flows.liq
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def register_flow(~server="",~user="default",~password="default",
ignore(request(cmd="ping radio",params=[]))
ping_period
end
add_timeout(fast=false,ping_period,ping)
thread.run.recurrent(fast=false, delay=ping_period, ping)

# Register streams
def register_stream(format_url)
Expand All @@ -97,11 +97,7 @@ def register_flow(~server="",~user="default",~password="default",
artist = m["artist"]
title = m["title"]
params = [("m_title",title),("m_artist",artist)]
def update_song() =
request(cmd="metadata",params=params)
(-1.)
end
add_timeout(fast=false,0.,update_song)
thread.run(fast=false, {request(cmd="metadata",params=params)})
end
on_metadata(metadata,s)
end
2 changes: 1 addition & 1 deletion libs/hls.liq
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def input.hls(~id="",~reload=10.,uri)

source = merge_tracks(source)

add_timeout(0., load_playlist)
thread.run.recurrent(load_playlist)

source
end
Expand Down
1 change: 1 addition & 0 deletions libs/pervasives.liq
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%include "audio.liq"
%include "ref.liq"
%include "list.liq"
%include "thread.liq"
%include "utils.liq"
%include "string.liq"
%include "metadata.liq"
Expand Down
36 changes: 36 additions & 0 deletions libs/thread.liq
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# TODO: we cannot easly define recursive functions with optional arguments for
# now, see #1018.
def rec thread.run.recurrent(~fast, ~delay, f)
def f ()
delay = f ()
if delay >= 0. then thread.run.recurrent(fast=fast, delay=delay, f) end
end
thread.run(fast=fast, delay=delay, f)
end

# Run a recurrent function in a separate thread.
# @category Control
# @param ~fast Whether the thread is supposed to return quickly or not. Typically, blocking tasks (e.g. fetching data over the internet) should not be considered to be fast. When set to `false` its priority will be lowered below that of request resolutions and fast timeouts. This is only effective if you set a dedicated queue for fast tasks, see the "scheduler" settings for more details.
# @param ~delay Delay (in sec.) after which the thread should be lauched.
# @param f Function to execute.
def thread.run.recurrent(~fast=true, ~delay=0., f)
thread.run.recurrent(fast=fast, delay=delay, f)
end

# Execute a given action when a predicate is true.
# @category System
# @param ~every How often (in sec.) to check for the predicate.
# @param ~once Execute the function only once.
# @param ~changed Execute the function only if the predicate was false when last checked.
# @param p Predicate indicating when to execute the function, typically a time interval such as `{10h-10h30}`.
# @param f Function to execute when the predicate is true.
def thread.when(~every=1., ~once=false, ~changed=true, p, f)
last = ref(false)
def check()
b = p()
if b and not (changed and !last) then f() end
last := b
if b and once then (-1.) else every end
end
thread.run.recurrent(delay=0., check)
end
39 changes: 9 additions & 30 deletions libs/utils.liq
Original file line number Diff line number Diff line change
Expand Up @@ -363,24 +363,6 @@ def at(pred,s)
switch([(pred,s)])
end

# Execute a given action when a predicate is true. This will be run in background.
# @category System
# @param ~every How often (in sec.) to check for the predicate.
# @param ~once Execute the function only once.
# @param ~changed Execute the function only if the predicate was false when last checked.
# @param p Predicate indicating when to execute the function, typically a time interval such as `{10h-10h30}`.
# @param f Function to execute when the predicate is true.
def exec_at(~every=1., ~once=false, ~changed=true, p, f)
last = ref(false)
def check()
b = p()
if b and ((not changed) or !last == false) then f() end
last := b
if b and once then (-1.) else every end
end
add_timeout(0., check)
end

# Assign a new clock to the given source (and to other time-dependent sources)
# and return the source. It is a conveniency wrapper around clock.assign_new(),
# allowing more concise scripts in some cases.
Expand All @@ -395,28 +377,25 @@ end
# Create a log of clock times for all the clocks initially present. The log is
# in a simple format which you can directly use with gnuplot.
# @category Liquidsoap
# @param ~interval Polling interval.
# @param ~delay Delay before setting up the clock logger. This should \
# be used to ensure that the logger starts only after \
# the clocks are created.
# @param unlabeled Path of the log file.
def clock.log(~delay=0.,~interval=1.,logfile)
# @param ~every Polling interval.
# @param logfile Path of the log file.
def clock.log(~delay=0., ~every=1., logfile)
# Get the current clocks
clocks = list.map(fst,clock.status())
# Column headers
system("echo \\# #{string.concat(separator=' ',clocks)} > #{(logfile:string)}")
ignore(file.write(data="# #{string.concat(separator=' ', clocks)}", logfile))
def report()
status = clock.status()
status = list.map(fun (x) -> (fst(x),string_of(snd(x))), status)
status = list.map(fun (c) -> status[c], clocks)
system("echo #{string.concat(separator=' ',status)} >> #{logfile}")
interval
end
if delay<=0. then
add_timeout(interval,report)
else
add_timeout(delay,{add_timeout(interval,report) (-1.)})
ignore(file.write(append=true, data="#{string.concat(separator=' ', status)}", logfile))
every
end
delay = if delay <= 0. then 0. else delay end
thread.run.recurrent(delay=delay, report)
end

# Skip track when detecting a blank.
Expand Down Expand Up @@ -636,7 +615,7 @@ def chop(~duration=float_getter(3.),~metadata=[],s) =
insert(new_track=true, metadata)
duration()
end
add_timeout(duration(), f)
thread.run.recurrent(delay=duration(), f)
s
end

Expand Down
4 changes: 2 additions & 2 deletions scripts/tests/115-1.liq
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!../../src/liquidsoap ../../libs/pervasives.liq
%include "test.liq"

s = add(normalize=true,[
Expand All @@ -8,8 +9,7 @@ s = add(normalize=true,[

output.dummy(s)

add_timeout(3.,{
thread.run(delay=3.,{
test.pass()
shutdown()
(-1.)
})
5 changes: 2 additions & 3 deletions scripts/tests/115-2.liq
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ sa = sine()

sb_on = ref false
sb = switch([({!sb_on},once(sine(220.,duration=2.)))])
add_timeout(1.,{ sb_on := true ; (-1.) })
thread.run(delay=1., { sb_on := true })

output.dummy(smooth_add(delay=0.9,p=0.3,normal=sa,special=sb))

add_timeout(4.,{
thread.run(delay=4.,{
test.pass()
shutdown()
(-1.)
})
4 changes: 2 additions & 2 deletions scripts/tests/BUG403.liq
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

r = ref false
pred = { v=!r ; r:=false ; v }
add_timeout(2.,{ r := true ; (-1.) })
thread.run(delay=2., { r := true })

mixer = fallback(id="mixer", track_sensitive=false,
[at(pred, sine(duration=3.)), blank()])

output.dummy(mixer)
add_timeout(3.,{ test.pass() ; shutdown() ; (-1.) })
thread.run(delay=3., { test.pass() ; shutdown()})
7 changes: 4 additions & 3 deletions scripts/tests/LS268.liq
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!../../src/liquidsoap ../../libs/pervasives.liq
# In LS-268 we realized that an incorrect assumption had
# been made in code from LS-394, resulting in a crash in
# case of source re-awakening.
Expand All @@ -9,6 +10,6 @@ o = output.dummy(fallible=true,p)

on_shutdown(test.pass)

add_timeout(2.,{ source.shutdown(o) ; (-1.) })
add_timeout(3.,{ output.dummy(fallible=true,p) ; (-1.) })
add_timeout(4.,{ shutdown() ; (-1.) })
thread.run(delay=2., { source.shutdown(o) })
thread.run(delay=3., { ignore(output.dummy(fallible=true,p)) })
thread.run(delay=4., { shutdown() })
4 changes: 2 additions & 2 deletions scripts/tests/LS354-1.liq
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ d = source.dynamic({ if !r then [s] else [] end })

output.dummy(mksafe(d))

add_timeout(2.,{r:=true;(-1.)})
add_timeout(4.,{test.fail(); shutdown(); (-1.)})
thread.run(delay=2., {r:=true})
thread.run(delay=4., {test.fail(); shutdown()})
6 changes: 3 additions & 3 deletions scripts/tests/LS354-2.liq
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ d = source.dynamic({ if !r==1 then [s1]

output.dummy(mksafe(d))

add_timeout(2.,{r:=1; (-1.)})
add_timeout(3.,{r:=2; (-1.)})
add_timeout(5.,{test.fail(); shutdown(); (-1.)})
thread.run(delay=2., {r:=1})
thread.run(delay=3., {r:=2})
thread.run(delay=5., {test.fail(); shutdown()})
4 changes: 2 additions & 2 deletions scripts/tests/LS460.liq
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ output.dummy(id="bar",mksafe(q))
output.dummy(id="foo",fallback([amplify(1.,q),blank(duration=1.)]))

def at(t,s)
add_timeout(t,{ignore(server.execute(s));(-1.)})
thread.run(delay=t,{ignore(server.execute(s))})
end

at(3.,"foo.stop")
at(4.,"bar.skip")
at(5.,"foo.start")
add_timeout(6.,{test.pass(); shutdown(); (-1.)})
thread.run(delay=6.,{test.pass(); shutdown()})
2 changes: 1 addition & 1 deletion scripts/tests/LS556.liq
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ mono = (noise(duration=0.5):source(1,0,0))
stereo = (sine(duration=0.3):source(2,0,0))
variable = (rotate([(mono:source(*+1,0,0)),(stereo:source(*+1,0,0))]):source(*+1,0,0))
output.dummy(audio_to_stereo(variable))
add_timeout(3.,{ test.pass() ; shutdown() ; (-1.) })
thread.run(delay=3., { test.pass() ; shutdown() })
3 changes: 1 addition & 2 deletions scripts/tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ test: threesines $(TESTS)
# Generate samples files in threesines directory

threesines: threesines/a.wav threesines/b.wav threesines/c.wav
MKSINE= ../../src/liquidsoap --no-pervasives -q \
../../libs/list.liq ../../libs/utils.liq \
MKSINE= ../../src/liquidsoap -q ../../libs/pervasives.liq \
"clock.assign_new(sync=false,[ \
output.file(%wav,\"$$fname\",fallible=true,on_stop=shutdown, \
once(sine(duration=2.,$$freq)))])"
Expand Down
2 changes: 1 addition & 1 deletion scripts/tests/fixme/sequence-2.liq
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# which is bit fragile for a test.

flag = ref true
add_timeout(1.,{ flag:=false ; (-1.) })
thread.run(delay=1., { flag:=false })
on = sine()
off = fail()
s = source.dynamic({ if !flag then [on] else [off] end })
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ builtins = lang/lang_builtins.ml \
lang/builtins_time.ml lang/builtins_callbacks.ml \
lang/builtins_server.ml lang/builtins_math.ml \
lang/builtins_files.ml lang/builtins_resolvers.ml \
lang/builtins_process.ml lang/builtins_source.ml \
lang/builtins_interactive.ml \
lang/builtins_thread.ml lang/builtins_process.ml \
lang/builtins_source.ml lang/builtins_interactive.ml \
$(if $(W_SSL),lang/builtins_https.ml lang/builtins_harbor_ssl.ml) \
$(if $(W_OSX_SECURE_TRANSPORT),lang/builtins_https_secure_transport.ml lang/builtins_harbor_secure_transport.ml) \
$(if $(W_LO),lang/builtins_lo.ml) \
Expand Down
88 changes: 88 additions & 0 deletions src/lang/builtins_thread.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
(*****************************************************************************
Liquidsoap, a programmable audio stream generator.
Copyright 2003-2019 Savonet team
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details, fully stated in the COPYING
file at the root of the liquidsoap distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************)

open Lang_builtins

let () =
add_builtin "thread.run" ~cat:Control
[
"fast", Lang.bool_t, Some (Lang.bool true),
Some "Whether the thread is supposed to return quickly or not. Typically, \
blocking tasks (e.g. fetching data over the internet) should not be \
considered to be fast. When set to `false` its priority will be \
lowered below that of request resolutions and fast timeouts. This is \
only effective if you set a dedicated queue for fast tasks, see the \
\"scheduler\" settings for more details." ;
"delay", Lang.float_t, Some (Lang.float 0.),
Some "Delay (in sec.) after which the thread should be lauched.";
"", Lang.fun_t [] Lang.unit_t, None, Some "Function to execute."
]
Lang.unit_t
~descr:"Run a function in a separate thread."
(fun p ->
let delay = Lang.to_float (List.assoc "delay" p) in
let f = List.assoc "" p in
let priority =
if Lang.to_bool (List.assoc "fast" p) then
Tutils.Maybe_blocking
else
Tutils.Blocking
in
let task =
{ Duppy.Task.
priority = priority ;
events = [`Delay delay] ;
handler = fun _ -> Lang.to_unit (Lang.apply ~t:Lang.unit_t f []); []
}
in
Duppy.Task.add Tutils.scheduler task;
Lang.unit)

let () =
let t = Lang.univ_t 1 in
add_builtin "thread.mutexify" ~cat:Liq
~descr:"Protect functions with a mutex in order to avoid concurrent \
calls. It returns the original value when the argument is not a \
function."
["",t,None,None] t
(fun p ->
let m = Mutex.create () in
let v = List.assoc "" p in
match v.Lang.value with
| Lang.Fun (p,args,env,body) ->
let fn (args:Lang.full_env) t = Tutils.mutexify m (fun () ->
let args = List.map (fun (x,gv) -> x, Lazy.from_val gv) args in
let env = List.rev_append args env in
let v = {v with Lang.value =
Lang.Fun ([],[],env,body)}
in
Lang.apply ~t v []) ()
in
{ v with Lang.value =
Lang.FFI (p, args, fn) }
| Lang.FFI (p, args, fn) ->
let fn args t = Tutils.mutexify m (fun () ->
fn args t) ()
in
{ v with Lang.value =
Lang.FFI (p, args, fn) }
| _ -> v)
Loading

0 comments on commit 2cef1f8

Please sign in to comment.