Skip to content

Commit

Permalink
Add a wait(::[Abstract]WorkerPool) (#106)
Browse files Browse the repository at this point in the history
Original PR: JuliaLang/julia#48238

Co-authored-by: kleinschmidt <[email protected]>
  • Loading branch information
fatteneder and kleinschmidt authored Jul 25, 2024
1 parent 1cd2677 commit 6a0383b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ An `AbstractWorkerPool` should implement:
- [`push!`](@ref) - add a new worker to the overall pool (available + busy)
- [`put!`](@ref) - put back a worker to the available pool
- [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution)
- [`wait`](@ref) - block until a worker is available
- [`length`](@ref) - number of workers available in the overall pool
- [`isready`](@ref) - return false if a `take!` on the pool would block, else true
Expand Down Expand Up @@ -120,6 +121,11 @@ function wp_local_take!(pool::AbstractWorkerPool)
return worker
end

function wp_local_wait(pool::AbstractWorkerPool)
wait(pool.channel)
return nothing
end

function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
worker = take!(pool)
try
Expand All @@ -133,7 +139,7 @@ end
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
# it avoids the overhead associated with a local remotecall.

for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int))
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int), (:wait, Nothing))
func_local = Symbol(string("wp_local_", func))
@eval begin
function ($func)(pool::WorkerPool)
Expand Down
21 changes: 21 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,27 @@ wp = WorkerPool(workers())
wp = WorkerPool(2:3)
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]

# wait on worker pool
wp = WorkerPool(2:2)
w = take!(wp)

# local call to _wait
@test !isready(wp)
t = @async wait(wp)
@test !istaskdone(t)
put!(wp, w)
status = timedwait(() -> istaskdone(t), 10)
@test status == :ok

# remote call to _wait
take!(wp)
@test !isready(wp)
f = @spawnat w wait(wp)
@test !isready(f)
put!(wp, w)
status = timedwait(() -> isready(f), 10)
@test status == :ok

# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(x->x, wp, 1:100)
Expand Down

0 comments on commit 6a0383b

Please sign in to comment.