Skip to content

Commit

Permalink
use effect for denoting requirement for event-loop
Browse files Browse the repository at this point in the history
  • Loading branch information
TimWhiting committed Feb 11, 2024
1 parent a69df81 commit 8c2d1a2
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 165 deletions.
2 changes: 1 addition & 1 deletion kklib/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void kk_free_fun(void* p, kk_block_t* b, kk_context_t* ctx) {

kk_string_t kk_get_host(kk_context_t* ctx) {
kk_unused(ctx);
kk_define_string_literal(static, host, 5, "libc", ctx);
kk_define_string_literal(static, host, 4, "libc", ctx);
return kk_string_dup(host,ctx);
}

Expand Down
77 changes: 38 additions & 39 deletions lib/std/async.kk
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ extern import
// This is common for almost all `:async` operations since `cancel` and `timeout` can
// cancel operations non-deterministically which raises the `Cancel` exception and cancels
// outstanding asynchronous requests.
pub alias asyncx = <async,io>

pub alias asyncx = <async,io,event-loop>

pub alias io-eventx = <io,event-loop>
// ----------------------------------------------------------------------------
// Promises
// ----------------------------------------------------------------------------
Expand All @@ -61,7 +61,7 @@ abstract struct promise<a>

abstract value type promise-state<a>
Resolved( value : a )
Awaiting( listeners : list<a -> io ()> )
Awaiting( listeners : list<a -> io-eventx ()> )

// Create a new promise.
pub fun promise() : async promise<a>
Expand All @@ -70,7 +70,7 @@ pub fun promise() : async promise<a>
// Await a promise; returns immediately if the promise was already resolved and otherwise
// waits asynchronously.
pub fun promise/await( p : promise<a> ) : asyncx a
fun setup(cb : _ -> io-noexn ())
fun setup(cb : _ -> io-event ())
val r = p.state
match (!r)
Awaiting(listeners) -> r := Awaiting(Cons(cb,listeners))
Expand Down Expand Up @@ -113,14 +113,14 @@ abstract value struct channel<a>(
abstract type channel-state<a>
Empty
Values( value : a, values : list<a> = [] )
Waiting( listener : a -> io-noexn (), listeners : list<a -> io-noexn ()> = [] )
Waiting( listener : a -> io-event (), listeners : list<a -> io-event ()> = [] )

fun from-values(values : list<a> ) : channel-state<a>
match values
Nil -> Empty
Cons(v,vs) -> Values(v,vs)

fun from-waiting(listeners : list<a -> io-noexn ()>) : channel-state<a>
fun from-waiting(listeners : list<a -> io-event ()>) : channel-state<a>
match listeners
Nil -> Empty
Cons(l,ls) -> Waiting(l,ls)
Expand All @@ -136,7 +136,7 @@ pub fun receive( ch : channel<a> ) : asyncx a
ch.receivex.untry

fun receivex( ch : channel<a>, cancelable : bool = True ) : <async,ndet> error<a>
fun setup( cb : (_,_) -> io-noexn () )
fun setup( cb : (_,_) -> io-event () )
fun cbr(x) cb(Ok(x),True)
val r = ch.state
match !r
Expand All @@ -161,7 +161,7 @@ pub fun try-receive( ch : channel<a> ) : <async,ndet> maybe<a>
Just(v)
_ -> Nothing

fun emit-io( ch : channel<a>, value : a ) : io-noexn ()
fun emit-io( ch : channel<a>, value : a ) : io-event ()
val r = ch.state
match !r
Empty -> r := Values(value, [])
Expand All @@ -179,7 +179,7 @@ fun trace-channel( msg : string, ch : channel<a> ) : <async,ndet> ()
async-io-noexn
trace-channel-io( msg, ch )

fun trace-channel-io( msg : string, ch : channel<a> ) : io-noexn ()
fun trace-channel-io( msg : string, ch : channel<a> ) : io-event ()
val msgx = msg ++ ": id=" ++ ch.chid.show
val r = ch.state
match !r
Expand All @@ -199,7 +199,7 @@ fun trace-anyx( s : string, x : a ) : async ()
// abstraction can reliably time out over any composition of asynchronous operations
// and is therefore quite expressive.

pub fun timeout( secs : duration, action : () -> <async,io|e> a ) : <async,io|e> maybe<a>
pub fun timeout( secs : duration, action : () -> <asyncx|e> a ) : <asyncx|e> maybe<a>
firstof { wait(secs); Nothing} { Just(action()) }

// Execute `a` and `b` interleaved. As soon as one of them finishes,
Expand All @@ -216,20 +216,20 @@ pub fun firstof( a : () -> <async,exn|e> a, b : () -> <async,exn|e> a ) : <asyn

// Wait (asynchronously) for `secs` seconds as a `:double`.
// Use `yield()` to yield to other asynchronous operations.
pub fun float/wait( secs : float64 ) : <async,io> ()
pub fun float/wait( secs : float64 ) : asyncx ()
wait(secs.duration)

// Wait (asynchronously) for optional `secs` seconds `:duration` (`= 0.seconds`).
// Use `yield()` to yield generally to other asynchronous operations.
pub fun wait( secs : duration = zero ) : <async,io> ()
pub fun wait( secs : duration = zero ) : asyncx ()
if secs <= zero then return yield()
val msecs = max(zero:int32,secs.milli-seconds.int32)
await fn(cb)
val tid = set-timeout( fn(){ cb(Ok(())) }, msecs )
Just( { clear-timeout(tid) } )

// Yield to other asynchronous operations. Same as `wait(0)`.
pub fun yield() : <async,io> ()
pub fun yield() : asyncx ()
await0 fn(cb)
set-timeout( cb, zero )
()
Expand All @@ -239,18 +239,18 @@ abstract struct timeout-id(
timer : any
)

fun set-timeout( cb : () -> io-noexn (), ms : int32 ) : io-noexn timeout-id
fun set-timeout( cb : () -> io-event (), ms : int32 ) : io-event timeout-id
Timeout-id(set-timeoutx(cb,max(ms,zero)))

extern set-timeoutx( cb : () -> io-noexn (), ms : int32 ) : io-noexn any
extern set-timeoutx( cb : () -> io-event (), ms : int32 ) : io-event any
cs "_Async.SetTimeout"
js "setTimeout"
c "kk_set_timeout"

fun clear-timeout( tid : timeout-id ) : io-noexn ()
fun clear-timeout( tid : timeout-id ) : io-event ()
clear-timeoutx(tid.timer)

extern clear-timeoutx( tid : any) : io-noexn ()
extern clear-timeoutx( tid : any) : io-event ()
cs "_Async.ClearTimeout"
js "clearTimeout"
c "kk_clear_timeout"
Expand Down Expand Up @@ -418,13 +418,13 @@ fun interleaved-div( xs : list<() -> <async,exn|e> a> ) : <async,ndet,div,strand
// ----------------------------------------------------------------------------

// Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception.
pub fun await-exn0( setup : (cb : (null<exception>) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,io> ()
pub fun await-exn0( setup : (cb : (null<exception>) -> io-event () ) -> io-eventx maybe<() -> io-event ()> ) : asyncx ()
await fn(cb)
setup( fn(nexn) cb(nexn.unnull(())) )

// Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception
// and the second argument the possible result value.
pub fun await-exn1( setup : (cb : (null<exception>,a) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,io> a
pub fun await-exn1( setup : (cb : (null<exception>,a) -> io-event () ) -> io-eventx maybe<() -> io-event ()> ) : asyncx a
await fn(cb)
setup( fn(nexn,x) cb(nexn.unnull(x)) )

Expand All @@ -434,13 +434,13 @@ fun unnull( nexn : null<exception>, x : a ) : error<a>
Just(exn) -> Error(exn)

// Convenience function for awaiting a zero argument callback.
pub fun await0( setup : (cb : () -> io-noexn () ) -> io () ) : <async,io> ()
pub fun await0( setup : (cb : () -> io-event () ) -> io-eventx () ) : asyncx ()
await fn(cb)
setup( fn() cb(Ok(())) )
Nothing

// Convenience function for awaiting a single argument callback.
pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : <async,io> a
pub fun await1( setup : (cb : (a) -> io-event () ) -> io-eventx () ) : asyncx a
await fn(cb)
setup( fn(x) cb(Ok(x)) )
Nothing
Expand All @@ -450,7 +450,7 @@ pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : <async,io> a
// value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`).
// The callback should be invoked exactly once -- when that happens `await` is resumed with the result using `untry`
// either raise an exception or return the plain result.
pub fun setup/await( setup : (cb : error<a> -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,io> a
pub fun setup/await( setup : (cb : error<a> -> io-event () ) -> io-eventx maybe<() -> io-event ()> ) : asyncx a
await-exn(setup).untry


Expand All @@ -460,13 +460,13 @@ pub fun setup/await( setup : (cb : error<a> -> io-noexn () ) -> io maybe<() -> i
// Async effect
// ----------------------------------------------------------------------------
alias await-result<a> = error<a>
alias await-setup<a> = (cb : (error<a>,bool) -> io-noexn ()) -> io-noexn (maybe<() -> io-noexn ()>)
alias await-setup<a> = (cb : (error<a>,bool) -> io-event ()) -> io-event (maybe<() -> io-event ()>)

// Asynchronous operations have the `:async` effect.
pub effect async
ctl do-await( setup : await-setup<a>, scope : scope, cancelable : bool ) : error<a>
ctl no-await( setup : await-setup<a>, scope : scope, cancelable : bool, f : error<a> -> io-noexn () ) : ()
ctl async-iox( action : () -> io-noexn a ) : a
ctl no-await( setup : await-setup<a>, scope : scope, cancelable : bool, f : error<a> -> io-event () ) : ()
ctl async-iox( action : () -> io-event a ) : a
ctl cancel( scope : scope ) : ()


Expand All @@ -480,7 +480,7 @@ pub fun noscope/cancel() : async ()
// it takes either an exception or a result `a`. Usually `setup` returns `Nothing` but you can return a `Just(cleanup)`
// value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`).
// The callback should be invoked exactly once -- when that happens `await-exn` is resumed with the result.
pub fun await-exn( setup : (cb : (error<a>) -> io-noexn ()) -> io (maybe<() -> io-noexn ()>) ) : async error<a>
pub fun await-exn( setup : (cb : (error<a>) -> io-event ()) -> io-eventx (maybe<() -> io-event ()>) ) : async error<a>
do-await(fn(cb)
match (try{ setup(fn(res) cb(res,True) ) })
Ok(mcleanup) -> mcleanup
Expand All @@ -494,7 +494,7 @@ pub fun await-exn( setup : (cb : (error<a>) -> io-noexn ()) -> io (maybe<() -> i
// The callback `cb` will eventually emit the result into the given channel `ch` after applying the transformation `f` to the result.\
// Note: once you exit the `cancelable` scope where `await-to-channel` was called, the callback is invoked with a `Cancel` exception.
// The channel should always be awaited within the same `cancelable` scope as the `await-to-channel` invokation.
pub fun await-to-channel( setup : (cb : (error<a>,bool) -> io-noexn ()) -> io (maybe<() -> io-noexn ()>), ch : channel<b>, f : error<a> -> b ) : async channel<b>
pub fun await-to-channel( setup : (cb : (error<a>,bool) -> io-event ()) -> io-eventx (maybe<() -> io-event ()>), ch : channel<b>, f : error<a> -> b ) : async channel<b>
no-await(fn(cb)
match(try{setup(cb)})
Ok(mcleanup) -> mcleanup
Expand All @@ -506,11 +506,11 @@ pub fun await-to-channel( setup : (cb : (error<a>,bool) -> io-noexn ()) -> io (m
)
ch

fun async-io-noexn( f : () -> io-noexn a ) : <async,ndet> a
fun async-io-noexn( f : () -> io-event a ) : <async,ndet> a
async-iox(f)

// Perform an I/O operation at the outer level; exceptions are propagated back.
fun async-io( f : () -> io a ) : asyncx a
fun async-io( f : () -> io-eventx a ) : asyncx a
async-io-noexn( { try(f) } ).untry


Expand Down Expand Up @@ -545,20 +545,19 @@ pub fun cancelable( action : () -> <async|e> a ) : <async|e> a
// ----------------------------------------------------------------------------

pub fun @default-async(action)
with handle-uv
async/handle(action);
async/handle(action)

fun nodispose() : io-noexn ()
fun nodispose() : io-event ()
()

// The outer `:async` effect handler. This is automatically applied by the compiler
// around the `main` function if it has an `:async` effect.
pub fun async/handle(action : () -> <async,io-noexn> () ) : io-noexn ()
val callbacks : ref<global,list<(scope,() -> io-noexn ())>> = unsafe-total{ref([])}
fun handle-await( setup : await-setup<a>, scope : scope, f : error<a> -> io-noexn (), cancelable : bool) : io-noexn ()
pub fun async/handle(action : () -> <async,io-event> () ) : io-event ()
val callbacks : ref<global,list<(scope,() -> io-event ())>> = unsafe-total{ref([])}
fun handle-await( setup : await-setup<a>, scope : scope, f : error<a> -> io-event (), cancelable : bool) : io-event ()
val cscope = child-scope(unique(),scope)
val dispose = ref(nodispose)
fun cb( res : error<_>, is-done : bool ) : io-noexn ()
fun cb( res : error<_>, is-done : bool ) : io-event ()
if ((!callbacks).contains(cscope)) then
if is-done then
callbacks := (!callbacks).remove(cscope)
Expand All @@ -576,7 +575,7 @@ pub fun async/handle(action : () -> <async,io-noexn> () ) : io-noexn ()
// if setup fails, immediately resume with the exception
cb(Error(exn),True)

fun handle-cancel( scope : scope ) : io-noexn ()
fun handle-cancel( scope : scope ) : io-event ()
(!callbacks).foreach fn(entry)
val (cscope,cb) = entry
if (cscope.in-scope-of(scope)) then cb()
Expand All @@ -591,10 +590,10 @@ pub fun async/handle(action : () -> <async,io-noexn> () ) : io-noexn ()
fun async-iox( f )
f()

fun io-noexn( f : () -> io-noexn a ) : io a
fun io-noexn( f : () -> io-event a ) : io-eventx a
f()

fun io-noexn1( f : (a1) -> io-noexn a, x1 : a1 ) : io a
fun io-noexn1( f : (a1) -> io-event a, x1 : a1 ) : io-eventx a
f(x1)

// ----------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions lib/std/core.kk
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub alias io-total = <ndet,console,net,fsys,ui,st<global>>
// The `:io-noexn` effect is used for functions that perform arbitrary I/O operations, but raise no exceptions
pub alias io-noexn = <div,io-total>

// The `:event-loop` effect signifies that a function requires an initialized event loop
pub type event-loop :: X

// The `:io-event` effect is used for functions that perform arbitrary I/O operations and raise no exceptions, but do require an initialized event loop
pub alias io-event = <div,io-total,event-loop>

// The `:io` effect is used for functions that perform arbitrary I/O operations.
pub alias io = <exn,io-noexn>

Expand Down
Loading

0 comments on commit 8c2d1a2

Please sign in to comment.