From 4054396fbc504ae8949642a3753cea92c1a6626b Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Sat, 10 Feb 2018 23:42:23 +0100 Subject: [PATCH] Add an MPI-3 (window) based IO --- src/MPI.jl | 1 + src/window-io.jl | 294 ++++++++++++++++++++++++++++++++++++++++++ test/test_windowio.jl | 176 +++++++++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 src/window-io.jl create mode 100644 test/test_windowio.jl diff --git a/src/MPI.jl b/src/MPI.jl index 5beb73221..8c4cf6d66 100644 --- a/src/MPI.jl +++ b/src/MPI.jl @@ -17,6 +17,7 @@ include(depfile) include("mpi-base.jl") include("cman.jl") +include("window-io.jl") const mpitype_dict = Dict{DataType, Cint}() const mpitype_dict_inverse = Dict{Cint, DataType}() diff --git a/src/window-io.jl b/src/window-io.jl new file mode 100644 index 000000000..34aed0226 --- /dev/null +++ b/src/window-io.jl @@ -0,0 +1,294 @@ +export WindowIO, WindowWriter + +# Type of the number of available entries +const WinCountT = Cint + +mutable struct BufferHeader + count::WinCountT # Number of elements in the buffer + address::Cptrdiff_t + length::WinCountT # Current size of the buffer + needed_length::WinCountT # Size the buffer should have to handle all pending writes +end + +""" + + WindowIO(target::Integer, comm=MPI.COMM_WORLD, bufsize=1024^2) + +Expose an MPI RMA window using the IO interface. Must be constructed on all ranks in the communicator. +The target is the rank to which data is sent when calling write, comm is the communicator to use and +bufsize is the initial size of the buffer. A target may be written to by multiple ranks concurrently, +and the receive buffer will be grown as needed, but never shrinks. +Communication happens only when flush is called. +""" +mutable struct WindowIO <: IO + comm::MPI.Comm + myrank::Int + # Represents the received data. First elements contain a counter with the total number of entries in the buffer + buffer::Array{UInt8,1} + win::Win + header::BufferHeader + remote_header::BufferHeader + header_win::Win + header_cwin::CWin + is_open::Bool + # Current read position + ptr::WinCountT + data_available::Condition + read_requested::Condition + lock::ReentrantLock # Needed for Base + waiter + + function WindowIO(comm=MPI.COMM_WORLD, bufsize=1024^2) + buffer = Array{UInt8,1}(bufsize) + header_win = MPI.Win() + header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) + remote_header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) + header_arr = unsafe_wrap(Vector{UInt8}, Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader)) + MPI.Win_create(header_arr, MPI.INFO_NULL, comm, header_win) + win = MPI.Win() + MPI.Win_create_dynamic(MPI.INFO_NULL, comm, win) + MPI.Win_attach(win, buffer) + + w = new(comm, + MPI.Comm_rank(comm), + buffer, + win, + header, + remote_header, + header_win, + CWin(header_win), + true, + 0, + Condition(), + Condition(), + ReentrantLock(), + nothing) + + w.waiter = Task(function() + wait(w.read_requested) + while w.is_open + while !has_data_available(w) && w.is_open + yield() + end + if w.is_open + notify(w.data_available) + wait(w.read_requested) + end + end + end) + + yield(w.waiter) + + return w + end +end + + +Base.nb_available(w::WindowIO)::WinCountT = w.header.count - w.ptr + +# Checks if data is available and grows the buffer if needed by the writing side +function has_data_available(w::WindowIO) + if !w.is_open + return false + end + + if w.header.count > w.ptr && w.header.needed_length == w.header.length # fast check without window sync + return true + end + + # Check if we need to grow the buffer + MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate + if w.header.needed_length > w.header.length + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + MPI.Win_detach(w.win, w.buffer) + resize!(w.buffer, w.header.needed_length) + MPI.Win_attach(w.win, w.buffer) + w.header.address = MPI.Get_address(w.buffer) + w.header.length = w.header.needed_length + MPI.Win_unlock(w.myrank, w.header_win) + end + + return w.header.count > w.ptr +end + +function Base.wait(w::WindowIO) + notify(w.read_requested) + wait(w.data_available) +end + +# Waits for data and returns the number of available bytes +function wait_nb_available(w) + if !has_data_available(w) + wait(w) + end + return nb_available(w) +end + +# wait until the specified number of bytes is available or the stream is closed +function wait_nb_available(w, nb) + nb_found = wait_nb_available(w) + while nb_found < nb && w.is_open + MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates + nb_found = wait_nb_available(w) + end + return nb_found +end + +mutable struct WindowWriter <: IO + winio::WindowIO + target::Int + # Writes are buffered to only lock and communicate upon flush + write_buffer::Vector{UInt8} + lock::ReentrantLock + nb_written::Int + + function WindowWriter(w::WindowIO, target::Integer) + return new(w, target, Vector{UInt8}(1024^2), ReentrantLock(), 0) + end +end + +@inline Base.isopen(w::WindowIO)::Bool = w.is_open +@inline Base.isopen(s::WindowWriter)::Bool = s.winio.is_open + +function Base.eof(w::WindowIO) + if !isopen(w) + return true + else + wait_nb_available(w) + end + return !isopen(w) +end + +Base.iswritable(::WindowIO) = false +Base.isreadable(::WindowIO) = true +Base.iswritable(::WindowWriter) = true +Base.isreadable(::WindowWriter) = false + +function Base.close(w::WindowIO) + w.is_open = false + notify(w.read_requested) + wait(w.waiter) # Wait for the data notification loop to finish + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + w.header.count = 0 + w.ptr = 0 + MPI.Win_unlock(w.myrank, w.header_win) + MPI.Barrier(w.comm) + MPI.Win_free(w.win) + MPI.Win_free(w.header_win) +end +Base.close(s::WindowWriter) = nothing + +# Checks if all available data is read, and if so resets the counter with the number of written bytes to 0 +function complete_read(w::WindowIO) + if w.header.count != 0 && w.header.count == w.ptr + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + if w.header.count != 0 && w.header.count == w.ptr # Check again after locking + w.header.count = 0 + w.ptr = 0 + end + MPI.Win_unlock(w.myrank, w.header_win) + end +end + +function Base.read(w::WindowIO, ::Type{UInt8}) + if wait_nb_available(w) < 1 + throw(EOFError()) + end + + w.ptr += 1 + result = w.buffer[w.ptr] + complete_read(w) + return result +end + +function Base.readbytes!(w::WindowIO, b::AbstractVector{UInt8}, nb=length(b); all::Bool=true) + nb_obtained = nb_available(w) + if all + nb_obtained = wait_nb_available(w,nb) + if nb_obtained < nb + throw(EOFError()) + end + resize!(b, nb) + end + nb_read = min(nb_obtained, nb) + if nb_read == 0 + return 0 + end + copy!(b, 1, w.buffer, w.ptr+1, nb_read) + w.ptr += nb_read + complete_read(w) + return nb_read +end + +Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w))) + +@inline function Base.unsafe_read(w::WindowIO, p::Ptr{UInt8}, nb::UInt) + nb_obtained = wait_nb_available(w,nb) + nb_read = min(nb_obtained, nb) + unsafe_copy!(p, pointer(w.buffer, w.ptr+1), nb_read) + w.ptr += nb_read + complete_read(w) + if nb_read != nb + throw(EOFError()) + end + return +end + +function Base.read(w::WindowIO, nb::Integer; all::Bool=true) + buf = Vector{UInt8}(nb) + readbytes!(w, buf, nb, all=all) + return buf +end + +function ensureroom(w::WindowWriter) + if w.nb_written > length(w.write_buffer) + resize!(w.write_buffer, w.nb_written) + end +end + +function Base.write(w::WindowWriter, b::UInt8) + w.nb_written += 1 + ensureroom(w) + w.write_buffer[w.nb_written] = b + return sizeof(UInt8) +end +function Base.unsafe_write(w::WindowWriter, p::Ptr{UInt8}, nb::UInt) + offset = w.nb_written+1 + w.nb_written += nb + ensureroom(w) + copy!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) + return nb +end + +Base.flush(::WindowIO) = error("WindowIO is read-only, did you mean to flush an associated WindowWriter?") + +function Base.flush(s::WindowWriter) + if !isopen(s) + throw(EOFError()) + end + nb_to_write = s.nb_written + free = 0 + header = s.winio.remote_header + header_win = s.winio.header_win + while free < nb_to_write + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, s.target, 0, header_win) + MPI.Get(Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader), s.target, 0, header_win) + MPI.Win_flush(s.target, header_win) + free = header.length - header.count + if free >= nb_to_write + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, s.target, 0, s.winio.win) + MPI.Put(pointer(s.write_buffer), nb_to_write, s.target, header.address + header.count, s.winio.win) + MPI.Win_unlock(s.target, s.winio.win) + MPI.Put(Ref{WinCountT}(header.count + nb_to_write), s.target, header_win) + s.nb_written = 0 + else + # Request to grow buffer, if not done already + new_needed_length = max(header.needed_length, header.count + nb_to_write) + if (new_needed_length > header.needed_length) + header.needed_length = new_needed_length + MPI.Put(Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader), s.target, 0, header_win) + end + end + MPI.Win_unlock(s.target, header_win) + end +end diff --git a/test/test_windowio.jl b/test/test_windowio.jl new file mode 100644 index 000000000..5700c3c7a --- /dev/null +++ b/test/test_windowio.jl @@ -0,0 +1,176 @@ +using Base.Test +using MPI + +MPI.Init() + +comm = MPI.COMM_WORLD + +const rank = MPI.Comm_rank(comm) +const N = MPI.Comm_size(comm) + +if N < 2 + error("This test needs at least 2 processes") + exit(1) +end + +const winio = WindowIO(comm) +const writer = WindowWriter(winio, 0) + +# directly test the different read functions +if rank == 1 + write(writer, UInt8(5)) + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 1 + @test read(winio, UInt8) == UInt8(5) + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 5 + arr = Vector{UInt8}(5) + readbytes!(winio, arr) + @test String(arr) == "hello" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 5 + arr = Vector{UInt8}(5) + unsafe_read(winio, pointer(arr), 2) + @test nb_available(winio) == 3 + unsafe_read(winio, pointer(arr,3), 3) + @test String(arr) == "hello" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + arr = Vector{UInt8}(3) + readbytes!(winio, arr) # waits for data + @test nb_available(winio) == 2 + @test String(arr) == "hel" + fill!(arr, UInt8('a')) + readbytes!(winio, arr, all=false) # reads what's available + @test String(arr) == "loa" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + result = "hello" + for i in 1:5 + @test read(winio, Char) == result[i] + @test nb_available(winio) == 5 - i + end +end + +MPI.Barrier(comm) + +if rank != 0 + write(writer, 1, 2.0, Cint(3)) + flush(writer) # Must flush to trigger communication +else + expected_nb_recv = sizeof(1) + sizeof(2.0) + sizeof(Cint(3)) + result = read(winio, (N-1)*expected_nb_recv) + @test nb_available(winio) == 0 + @test length(result) == (N-1)*expected_nb_recv +end + +MPI.Barrier(comm) + +# Test blocking read +if rank != 0 + write(writer, rank) + flush(writer) # Must flush to trigger communication +else + result = read(winio, (N-1)*sizeof(Int)) # Blocks until all required data is read + @test sort(reinterpret(Int,result)) == collect(1:N-1) + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +# Timing +if rank == 1 + const arr = ones(UInt8,100000) + println("write and flush timings:") + @time write(writer, arr) + @time flush(writer) + @time write(writer, arr) + @time flush(writer) + @time write(writer, arr) + @time flush(writer) + MPI.Barrier(comm) +elseif rank == 0 + const recarr = Vector{UInt8}(100000) + MPI.Barrier(comm) + + println("read timings:") + @time read!(winio, recarr) + @time read!(winio, recarr) + @time read!(winio, recarr) + @test recarr == ones(UInt8,100000) +else + MPI.Barrier(comm) +end + +MPI.Barrier(comm) + +# Test readline +message = "Hi from rank " +if rank != 0 + println(writer, message*string(rank)) + flush(writer) # Must flush to trigger communication +else + nb_received = 0 + while nb_received != N-1 + line = readline(winio) + @test startswith(line, message) + @test line[length(message)+1:end] ∈ string.(1:N) + nb_received += 1 + end +end + +MPI.Barrier(comm) +close(winio) +MPI.Barrier(comm) + +# Send more than the buffer can contain +BS = 24 +const winio2 = WindowIO(comm, BS+sizeof(MPI.WinCountT)) +const writer2 = WindowWriter(winio2, 0) +if rank != 0 + write(writer2, rank*ones(BS)) + flush(writer2) # Must flush to trigger communication +else + result = Vector{Float64}(BS*(N-1)) + read!(winio2, result) + header = winio2.header + @test nb_available(winio2) == 0 + @test header.count == 0 + @test header.length == header.needed_length + @test header.length > BS+sizeof(MPI.WinCountT) + @test sum(result) == sum((1:N-1)*BS) +end + +close(winio2) +MPI.Finalize()