diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e29e1f8..f71ab77 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -148,3 +148,33 @@ jobs: RUSTFLAGS: "-Z sanitizer=address" RUSTDOCFLAGS: "-Z sanitizer=address" run: cargo test --features netcdf-sys/static,netcdf/derive --target x86_64-unknown-linux-gnu --workspace --exclude netcdf-derive + + mpi: + name: mpi-runner + runs-on: ubuntu-latest + env: + NETCDF_DIR: /usr/lib/x86_64-linux-gnu/netcdf/mpi/ + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: {submodules: false} + + - name: Install netcdf + run: sudo apt-get update && sudo apt-get install libnetcdf-mpi-dev libhdf5-openmpi-dev + + - name: Install rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: "nightly" + + - name: Build + run: cargo build --verbose --workspace --exclude netcdf-src --features netcdf/mpi,derive + + - name: Test + run: cargo test --verbose --workspace --exclude netcdf-src --features netcdf/mpi,derive + + - name: Run example + run: cargo run --verbose --package netcdf-examples --features mpi + + - name: Run example in parallel + run: mpirun -np 10 --oversubscribe -- target/debug/netcdf-examples diff --git a/Cargo.toml b/Cargo.toml index 461fb9c..6594082 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "netcdf-sys", "netcdf-src", "netcdf-derive", + "netcdf-examples", ] default-members = ["netcdf", "netcdf-sys"] resolver = "2" @@ -14,3 +15,4 @@ netcdf-sys = { path = "netcdf-sys", version = "0.6.2" } netcdf-src = { path = "netcdf-src", version = "0.3.6" } netcdf-derive = { path = "netcdf-derive", version = "0.1.0" } hdf5-sys = { version = "0.8.0" } +mpi-sys = { version = "0.2.1" } diff --git a/netcdf-examples/Cargo.toml b/netcdf-examples/Cargo.toml new file mode 100644 index 0000000..e1b23ca --- /dev/null +++ b/netcdf-examples/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "netcdf-examples" +version = "0.1.0" +edition = "2021" +publish = false + +[features] +mpi = ["dep:mpi", "netcdf/mpi", "dep:mpi-sys"] + +[dependencies] +netcdf = { workspace = true } +mpi = { version = "0.7.0", optional = true } +mpi-sys = { workspace = true, optional = true } +ndarray = "0.15.6" diff --git a/netcdf-examples/src/main.rs b/netcdf-examples/src/main.rs new file mode 100644 index 0000000..6e0998b --- /dev/null +++ b/netcdf-examples/src/main.rs @@ -0,0 +1,10 @@ +#[cfg(feature = "mpi")] +mod parallel; + +fn main() { + #[cfg(feature = "mpi")] + parallel::main().unwrap(); + + #[cfg(not(feature = "mpi"))] + println!("MPI support is not included, will not run this example"); +} diff --git a/netcdf-examples/src/parallel.rs b/netcdf-examples/src/parallel.rs new file mode 100644 index 0000000..48c414d --- /dev/null +++ b/netcdf-examples/src/parallel.rs @@ -0,0 +1,69 @@ +use mpi::traits::{AsRaw, Communicator}; + +fn target_function(rank: i32, t: usize) -> i32 { + 100 * (t as i32) + rank +} + +fn mpi_null_info() -> mpi_sys::MPI_Info { + let mut info = std::ptr::null_mut(); + let e = unsafe { mpi_sys::MPI_Info_create(&mut info) }; + assert_eq!(e, mpi_sys::MPI_SUCCESS.try_into().unwrap()); + + info +} + +fn create( + path: &str, + communicator: impl Communicator + AsRaw, +) -> Result<(), Box> { + let info = mpi_null_info(); + let mut file = + netcdf::create_par_with(path, communicator.as_raw(), info, netcdf::Options::NETCDF4)?; + + let size = communicator.size() as usize; + let rank = communicator.rank(); + + file.add_dimension("x", size)?; + file.add_unlimited_dimension("t")?; + let var = file.add_variable::("output", &["t", "x"])?; + var.access_collective()?; + + file.enddef()?; + + let mut var = file.variable_mut("output").unwrap(); + + let values = ndarray::Array1::from_shape_fn(10, |t| target_function(rank, t)); + var.put((.., rank as usize), values.view())?; + + Ok(()) +} + +fn read( + path: &str, + communicator: impl Communicator + AsRaw, +) -> Result<(), Box> { + let info = mpi_null_info(); + + let file = netcdf::open_par_with(path, communicator.as_raw(), info, netcdf::Options::empty())?; + + let rank = communicator.rank(); + let var = file.variable("output").unwrap(); + var.access_collective()?; + let values = var.get::((.., rank as usize))?; + + for (t, &v) in values.iter().enumerate() { + assert_eq!(v, target_function(rank, t)); + } + Ok(()) +} + +pub fn main() -> Result<(), Box> { + let universe = mpi::initialize().unwrap(); + let path = "par.nc"; + + create(path, universe.world())?; + + read(path, universe.world())?; + + Ok(()) +} diff --git a/netcdf-src/Cargo.toml b/netcdf-src/Cargo.toml index 672aac4..8641f66 100644 --- a/netcdf-src/Cargo.toml +++ b/netcdf-src/Cargo.toml @@ -29,6 +29,7 @@ exclude = [ [features] dap = ["dep:link-cplusplus"] +mpi = [] [dependencies] hdf5-sys = { workspace = true, features = ["hl", "deprecated", "zlib"] } diff --git a/netcdf-src/build.rs b/netcdf-src/build.rs index d979a1c..c53c331 100644 --- a/netcdf-src/build.rs +++ b/netcdf-src/build.rs @@ -83,6 +83,10 @@ fn main() { netcdf_config.define("ENABLE_BYTERANGE", "ON"); } + if feature!("MPI").is_ok() { + panic!("MPI feature was requested but the static build of netcdf does not support this"); + } + let netcdf = netcdf_config.build(); println!("cargo:lib=netcdf"); diff --git a/netcdf-sys/Cargo.toml b/netcdf-sys/Cargo.toml index 30bf0f4..0750fc3 100644 --- a/netcdf-sys/Cargo.toml +++ b/netcdf-sys/Cargo.toml @@ -24,6 +24,7 @@ libz-sys = { version = "1.0.25" } curl-sys = { version = "0.4.51", optional = true } hdf5-sys = { workspace = true } netcdf-src = { workspace = true, optional = true } +mpi-sys = { workspace = true, optional = true } [dev-dependencies] @@ -32,6 +33,7 @@ default = [] memio = [] static = ["libz-sys/static", "hdf5-sys/static", "hdf5-sys/hl", "hdf5-sys/deprecated", "hdf5-sys/zlib", "dep:netcdf-src", "curl-sys?/static-curl", "curl-sys?/static-ssl"] dap = ["dep:curl-sys", "netcdf-src?/dap"] +mpi = ["dep:mpi-sys", "netcdf-src?/mpi"] [build-dependencies] semver = "1.0.9" diff --git a/netcdf-sys/build.rs b/netcdf-sys/build.rs index 35a1e36..5dc8343 100644 --- a/netcdf-sys/build.rs +++ b/netcdf-sys/build.rs @@ -146,6 +146,14 @@ impl NcMetaHeader { "MEMIO requested but not found in this installation of netCDF" ); } + if self.has_parallel { + println!("cargo:rustc-cfg=feature=\"has-par\""); + } else { + assert!( + feature!("MPI").is_err(), + "MPI requested but not found in this installation of netCDF" + ); + } } } diff --git a/netcdf-sys/src/lib.rs b/netcdf-sys/src/lib.rs index cdbf08d..b6d383f 100644 --- a/netcdf-sys/src/lib.rs +++ b/netcdf-sys/src/lib.rs @@ -30,6 +30,9 @@ mod filter; #[cfg(feature = "4.8.0")] pub use filter::*; +#[cfg(feature = "mpi")] +pub mod par; + use std::sync::Mutex; /// Global netCDF lock for using all functions in the netCDF library diff --git a/netcdf-sys/src/par.rs b/netcdf-sys/src/par.rs new file mode 100644 index 0000000..93ab8d1 --- /dev/null +++ b/netcdf-sys/src/par.rs @@ -0,0 +1,25 @@ +#![cfg(feature = "mpi")] +use std::ffi::{c_char, c_int}; + +use mpi_sys::{MPI_Comm, MPI_Info}; + +pub const NC_INDEPENDENT: c_int = 0; +pub const NC_COLLECTIVE: c_int = 1; + +extern "C" { + pub fn nc_create_par( + path: *const c_char, + cmode: c_int, + comm: MPI_Comm, + info: MPI_Info, + ncidp: *mut c_int, + ) -> c_int; + pub fn nc_open_par( + path: *const c_char, + mode: c_int, + comm: MPI_Comm, + info: MPI_Info, + ncidp: *mut c_int, + ) -> c_int; + pub fn nc_var_par_access(ncid: c_int, varid: c_int, par_access: c_int) -> c_int; +} diff --git a/netcdf/Cargo.toml b/netcdf/Cargo.toml index 906f5c0..62b0021 100644 --- a/netcdf/Cargo.toml +++ b/netcdf/Cargo.toml @@ -19,6 +19,8 @@ build = "build.rs" default = ["ndarray"] static = ["netcdf-sys/static"] derive = ["dep:netcdf-derive"] +mpi = ["dep:mpi-sys", "netcdf-sys/mpi"] +ndarray = ["dep:ndarray"] [dependencies] ndarray = { version = "0.15", optional = true } @@ -26,6 +28,7 @@ netcdf-sys = { workspace = true } netcdf-derive = { workspace = true, optional = true } bitflags = "2.4.2" libc = "0.2.155" +mpi-sys = { workspace = true, optional = true } [dev-dependencies] clap = { version = "4.5.1", features = ["derive"] } diff --git a/netcdf/src/file.rs b/netcdf/src/file.rs index 6c720b5..ef0e287 100644 --- a/netcdf/src/file.rs +++ b/netcdf/src/file.rs @@ -86,6 +86,30 @@ impl RawFile { Ok(File(Self { ncid })) } + /// Open a `netCDF` file in read only mode in parallel mode. + #[cfg(feature = "mpi")] + pub(crate) fn open_par_with( + path: &path::Path, + communicator: mpi_sys::MPI_Comm, + info: mpi_sys::MPI_Info, + options: Options, + ) -> error::Result { + let f = get_ffi_from_path(path); + let mut ncid: nc_type = 0; + unsafe { + error::checked(with_lock(|| { + netcdf_sys::par::nc_open_par( + f.as_ptr().cast(), + options.bits(), + communicator, + info, + &mut ncid, + ) + }))?; + } + Ok(File(Self { ncid })) + } + /// Open a `netCDF` file in append mode (read/write). pub(crate) fn append_with(path: &path::Path, options: Options) -> error::Result { let file = Self::open_with(path, options | Options::WRITE)?; @@ -105,6 +129,31 @@ impl RawFile { Ok(FileMut(File(Self { ncid }))) } + /// Create a new `netCDF` file in parallel mode + #[cfg(feature = "mpi")] + pub(crate) fn create_par_with( + path: &path::Path, + communicator: mpi_sys::MPI_Comm, + info: mpi_sys::MPI_Info, + options: Options, + ) -> error::Result { + let f = get_ffi_from_path(path); + let mut ncid: nc_type = -1; + unsafe { + error::checked(with_lock(|| { + netcdf_sys::par::nc_create_par( + f.as_ptr().cast(), + options.bits(), + communicator, + info, + &mut ncid, + ) + }))?; + } + + Ok(FileMut(File(Self { ncid }))) + } + #[cfg(feature = "has-mmap")] pub(crate) fn open_from_memory<'buffer>( name: Option<&str>, @@ -225,6 +274,14 @@ impl File { .unwrap() .map(Result::unwrap) } + /// Get the length of a dimension + pub fn dimension_len(&self, name: &str) -> Option { + let (ncid, name) = + super::group::try_get_parent_ncid_and_stem(self.ncid(), name).unwrap()?; + super::dimension::dimension_from_name(ncid, name) + .unwrap() + .map(|x| x.len()) + } /// Get a group /// @@ -440,6 +497,16 @@ impl FileMut { let Self(File(file)) = self; file.close() } + + /// Open the file for new definitions + pub fn redef(&mut self) -> error::Result<()> { + error::checked(with_lock(|| unsafe { netcdf_sys::nc_redef(self.ncid()) })) + } + + /// Close the file for new definitions + pub fn enddef(&mut self) -> error::Result<()> { + error::checked(with_lock(|| unsafe { netcdf_sys::nc_enddef(self.ncid()) })) + } } #[cfg(feature = "has-mmap")] diff --git a/netcdf/src/lib.rs b/netcdf/src/lib.rs index b653d98..639f44b 100644 --- a/netcdf/src/lib.rs +++ b/netcdf/src/lib.rs @@ -131,6 +131,8 @@ pub(crate) mod error; pub(crate) mod extent; pub(crate) mod file; pub(crate) mod group; +#[cfg(feature = "mpi")] +pub(crate) mod par; pub(crate) mod putget; #[cfg(feature = "4.9.2")] pub mod rc; @@ -170,6 +172,20 @@ where RawFile::create_with(name.as_ref(), options) } +/// Open a `netCDF` file in create and parallel mode with the given options +#[cfg(feature = "mpi")] +pub fn create_par_with

( + name: P, + communicator: mpi_sys::MPI_Comm, + info: mpi_sys::MPI_Info, + options: Options, +) -> error::Result +where + P: AsRef, +{ + RawFile::create_par_with(name.as_ref(), communicator, info, options) +} + /// Open a `netCDF` file in append mode pub fn append

(name: P) -> error::Result where @@ -194,6 +210,20 @@ where open_with(name, Options::default()) } +/// Open in parallel mode +#[cfg(feature = "mpi")] +pub fn open_par_with

( + name: P, + communicator: mpi_sys::MPI_Comm, + info: mpi_sys::MPI_Info, + options: Options, +) -> error::Result +where + P: AsRef, +{ + RawFile::open_par_with(name.as_ref(), communicator, info, options) +} + /// Open a `netCDF` file in read mode with the given options pub fn open_with

(name: P, options: Options) -> error::Result where diff --git a/netcdf/src/par.rs b/netcdf/src/par.rs new file mode 100644 index 0000000..d99586d --- /dev/null +++ b/netcdf/src/par.rs @@ -0,0 +1,19 @@ +use crate::{ + error::{checked, Result}, + utils::with_lock, +}; + +use netcdf_sys::nc_type; + +#[derive(Copy, Clone)] +#[repr(i32)] +pub(crate) enum AccessMode { + Independent = netcdf_sys::par::NC_INDEPENDENT, + Collective = netcdf_sys::par::NC_COLLECTIVE, +} + +pub(crate) fn set_access_mode(ncid: nc_type, varid: nc_type, mode: AccessMode) -> Result<()> { + checked(with_lock(|| unsafe { + netcdf_sys::par::nc_var_par_access(ncid, varid, mode as i32 as std::ffi::c_int) + })) +} diff --git a/netcdf/src/variable.rs b/netcdf/src/variable.rs index 0f38beb..1386f68 100644 --- a/netcdf/src/variable.rs +++ b/netcdf/src/variable.rs @@ -191,6 +191,34 @@ impl<'g> Variable<'g> { _ => Err(NC_EVARMETA.into()), } } + + #[cfg(feature = "mpi")] + fn access_mode(&self, mode: crate::par::AccessMode) -> error::Result<()> { + error::checked(utils::with_lock(|| unsafe { + netcdf_sys::par::nc_var_par_access( + self.ncid, + self.varid, + mode as i32 as std::ffi::c_int, + ) + })) + } + + /// Access the variable in independent mode + /// for parallell reading using MPI. + /// File must have been opened using `open_par` + /// + /// This is the default access mode + #[cfg(feature = "mpi")] + pub fn access_independent(&self) -> error::Result<()> { + self.access_mode(crate::par::AccessMode::Independent) + } + /// Access the variable in collective mode + /// for parallell reading using MPI. + /// File must have been opened using `open_par` + #[cfg(feature = "mpi")] + pub fn access_collective(&self) -> error::Result<()> { + self.access_mode(crate::par::AccessMode::Collective) + } } impl<'g> VariableMut<'g> { /// Sets compression on the variable. Must be set before filling in data.