Skip to content

Commit

Permalink
Add a ClusterManager using one-sided communication
Browse files Browse the repository at this point in the history
  • Loading branch information
barche committed Feb 24, 2018
1 parent 4054396 commit 1f3a9d3
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 21 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ Fields `j2mpi` and `mpi2j` of `MPIManager` are associative collections mapping j

This launches a total of 5 processes, mpi rank 0 is the julia pid 1. mpi rank 1 is julia pid 2 and so on.

The program must call `MPI.start(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
The program must call `MPI.start_main_loop(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
On mpi rank 0, it returns a `manager` which can be used with `@mpi_do`
On other processes (i.e., the workers) the function does not return


### MPIManager
### (MPI transport - all processes execute MPI code)
`MPI.start` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
`MPI.start_main_loop` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
`mpirun -np 5 julia 06-cman-transport.jl MPI` will run the example using MPI as transport.

## Julia MPI-only interface
Expand All @@ -183,6 +183,11 @@ juliacomm = MPI.COMM_WORLD
ccomm = MPI.CComm(juliacomm)
```

### MPIWindowIOManager
This manager is started using the `MPI_WINDOW_IO` or `MPI_WINDOW_NOWAIT` transports. It uses asynchronous IO
based on MPI windows. The `MPI_WINDOW_NOWAIT` will only use the clustermanager for code preceeded by the `@cluster`
macro. See `test_windowcman.jl` and `test_windowcman_nowait.jl` for examples.

### Currently wrapped MPI functions
Convention: `MPI_Fun => MPI.Fun`

Expand Down
1 change: 1 addition & 0 deletions src/MPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ include(depfile)
include("mpi-base.jl")
include("cman.jl")
include("window-io.jl")
include("window-cman.jl")

const mpitype_dict = Dict{DataType, Cint}()
const mpitype_dict_inverse = Dict{Cint, DataType}()
Expand Down
45 changes: 28 additions & 17 deletions src/cman.jl
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import Base: launch, manage, kill, procs, connect
export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL, MPI_WINDOW_IO, MPI_WINDOW_NOWAIT



################################################################################
# MPI Cluster Manager
# Note: The cluster manager object lives only in the manager process,
# except for MPI_TRANSPORT_ALL
"""
MPI Cluster Manager
Note: The cluster manager object lives only in the manager process,
except for MPI_TRANSPORT_ALL and MPI_WINDOW_IO
There are four different transport modes:
# There are three different transport modes:
MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
allows interactive use from a Julia shell, using the familiar `addprocs`
interface.
# MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
# allows interactive use from a Julia shell, using the familiar `addprocs`
# interface.
MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
process. This corresponds to the "usual" way in which MPI is used in a
headless mode, e.g. submitted as a script to a queueing system.
# MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
# process. This corresponds to the "usual" way in which MPI is used in a
# headless mode, e.g. submitted as a script to a queueing system.
TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
communication between processes. MPI can still be used by the user.
# TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
# communication between processes. MPI can still be used by the user.
MPI_WINDOW_IO: Uses the MPI shared memory model with passive communication on all processes.
The program must be started with mpirun or equivalent.
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL
MPI_WINDOW_NOWAIT: Sets up a cluster manager, but only uses it for code enlosed in the @cluster
macro. All other code runs as regular MPI code (single program, multiple data).
"""
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL MPI_WINDOW_IO MPI_WINDOW_NOWAIT

mutable struct MPIManager <: ClusterManager
np::Int # number of worker processes (excluding the manager process)
Expand Down Expand Up @@ -313,8 +319,9 @@ end
################################################################################
# Alternative startup model: All Julia processes are started via an external
# mpirun, and the user does not call addprocs.

# Enter the MPI cluster manager's main loop (does not return on the workers)
"""
Enter the MPI cluster manager's main loop (does not return on the workers)
"""
function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
comm::MPI.Comm=MPI.COMM_WORLD)
!MPI.Initialized() && MPI.Init()
Expand Down Expand Up @@ -379,6 +386,10 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
MPI.Finalize()
exit()
end
elseif mode == MPI_WINDOW_IO
start_window_worker(comm, true)
elseif mode == MPI_WINDOW_NOWAIT
start_window_worker(comm, false)
else
error("Unknown mode $mode")
end
Expand Down
185 changes: 185 additions & 0 deletions src/window-cman.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import Base: launch, kill, manage, connect
export MPIWindowIOManager, launch, kill, manage, connect, @cluster

"""
Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
"""
mutable struct MPIWindowIOManager <: ClusterManager
comm::MPI.Comm
connection_windows::Vector{WindowIO}
stdio_windows::Vector{WindowIO}
workers_wait::Bool

function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool)
nb_procs = MPI.Comm_size(comm)
connection_windows = Vector{WindowIO}(nb_procs)
stdio_windows = Vector{WindowIO}(nb_procs)

for i in 1:nb_procs
connection_windows[i] = WindowIO(comm)
stdio_windows[i] = WindowIO(comm)
end

# Make sure all windows are created before continuing
MPI.Barrier(comm)

return new(comm, connection_windows, stdio_windows, workers_wait)
end
end

# Closes all local MPI Windows in a manager. Must be called collectively on all ranks
function closeall(manager::MPIWindowIOManager)
for w in manager.connection_windows
close(w)
end
for w in manager.stdio_windows
close(w)
end
end

function launch(mgr::MPIWindowIOManager, params::Dict,
instances::Array, cond::Condition)
try
nprocs = MPI.Comm_size(mgr.comm)
for cnt in 1:(nprocs-1)
push!(instances, WorkerConfig())
end
notify(cond)
catch e
println("Error in MPI launch $e")
rethrow(e)
end
end

function kill(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
@spawnat pid notify(_stop_requested)
Distributed.set_worker_state(Distributed.Worker(pid), Distributed.W_TERMINATED)
end

function manage(mgr::MPIWindowIOManager, id::Integer, config::WorkerConfig, op::Symbol) end

function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
myrank = MPI.Comm_rank(mgr.comm)
if myrank == 0
proc_stdio = mgr.stdio_windows[pid]
@schedule while !eof(proc_stdio)
try
println("\tFrom worker $(pid):\t$(readline(proc_stdio))")
catch e
end
end
end
return (mgr.connection_windows[pid], WindowWriter(mgr.connection_windows[myrank+1], pid-1))
end

function redirect_to_mpi(s::WindowWriter)
(rd, wr) = redirect_stdout()
@schedule while !eof(rd) && isopen(s.winio)
av = readline(rd)
if isopen(s.winio)
println(s,av)
flush(s)
end
end
end

function checkworkers()
for w in workers()
if w != (@fetchfrom w myid())
error("worker $w is not waiting")
end
end
end

function notify_workers()
for w in workers()
@spawnat(w, notify(_stop_requested))
end
end

function wait_for_events()
global _stop_requested
wait(_stop_requested)
end

"""
Initialize the current process as a Julia parallel worker. Must be called on all ranks.
If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
"""
function start_window_worker(comm::Comm, workers_wait)
rank = MPI.Comm_rank(comm)
N = MPI.Comm_size(comm)

manager = MPIWindowIOManager(comm, workers_wait)
cookie = string(comm)
if length(cookie) > Base.Distributed.HDR_COOKIE_LEN
cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN]
end

try
if rank == 0
Base.cluster_cookie(cookie)
MPI.Barrier(comm)
addprocs(manager)
@assert nprocs() == N
@assert nworkers() == (N == 1 ? 1 : N-1)

if !workers_wait
checkworkers()
notify_workers()
end
else
init_worker(cookie, manager)
MPI.Barrier(comm)
redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0))
for i in vcat([1], (rank+2):N)
# Receiving end of connections to all higher workers and master
Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1))
end

global _stop_requested = Condition()
wait_for_events()
end
catch e
Base.display_error(STDERR,"exception $e on rank $rank",backtrace())
end

if workers_wait && rank != 0
closeall(manager)
MPI.Finalize()
exit(0)
end

return manager
end

"""
Stop the manager. This closes all windows and calls MPI.Finalize on all workers
"""
function stop_main_loop(manager::MPIWindowIOManager)
if myid() != 1
wait_for_events()
else
checkworkers()
if nprocs() > 1
rmprocs(workers())
end
end
closeall(manager)
MPI.Finalize()
end

"""
Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
since this will temporarily activate the worker event loops to listen for messages.
"""
macro cluster(expr)
quote
if myid() != 1
wait_for_events()
else
$(esc(expr))
notify_workers()
end
end
end
6 changes: 4 additions & 2 deletions src/window-io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,16 @@ function has_data_available(w::WindowIO)
end

# Check if we need to grow the buffer
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate
if w.header.needed_length > w.header.length
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
MPI.Win_detach(w.win, w.buffer)
resize!(w.buffer, w.header.needed_length)
MPI.Win_attach(w.win, w.buffer)
w.header.address = MPI.Get_address(w.buffer)
w.header.length = w.header.needed_length
MPI.Win_unlock(w.myrank, w.header_win)
end
MPI.Win_unlock(w.myrank, w.header_win)

return w.header.count > w.ptr
end
Expand All @@ -128,7 +128,9 @@ end
function wait_nb_available(w, nb)
nb_found = wait_nb_available(w)
while nb_found < nb && w.is_open
MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win)
MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates
MPI.Win_unlock(w.myrank, w.header_win)
nb_found = wait_nb_available(w)
end
return nb_found
Expand Down
31 changes: 31 additions & 0 deletions test/test_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,35 @@ sum_mesg = MPI.Reduce(mesg, MPI.SUM, root, comm)
sum_mesg = rank == root ? sum_mesg : size*mesg
@test isapprox(norm(sum_mesg-size*mesg), 0.0)

# For comparison with the clustermanager version
const ARRSIZE = 1024^2*100
@test ARRSIZE % size == 0
const my_arr = fill(1*(rank+1),ARRSIZE ÷ size)

function mpi_sum(arr)::Int
mysum = 0
for x in arr
mysum += x
end
totalsum = MPI.Reduce(mysum, +, 0, comm)
return rank == 0 ? totalsum[1] : 0
end

const sumresult = mpi_sum(my_arr)
const expected = sum((ARRSIZE ÷ size) * (1:size))
if rank == 0
@test sumresult == expected
end
if rank == 0
println("Timings for MPI reduce:")
@time expected == mpi_sum(my_arr)
@time expected == mpi_sum(my_arr)
@time expected == mpi_sum(my_arr)
else
mpi_sum(my_arr)
mpi_sum(my_arr)
mpi_sum(my_arr)
end


MPI.Finalize()
Loading

0 comments on commit 1f3a9d3

Please sign in to comment.