Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mpi on candide #661

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fb91f20
Tests to skip module (MCCD PSF) if output file exists
martinkilbinger Sep 5, 2023
ac061c0
Merge remote-tracking branch 'upstream/develop' into output_exists
martinkilbinger Sep 17, 2023
a93546e
n_smp for more jobs; N_EPOCH bugs; ngmix checking for existing output
martinkilbinger Sep 17, 2023
1ed596c
numbering scheme with re pattern: copied, not changed
martinkilbinger Sep 24, 2023
ba4aa19
numbering scheme with re pattern: copied, not changed
martinkilbinger Sep 24, 2023
776943c
removed galsim from job script message
martinkilbinger Sep 24, 2023
3216eb6
Fixed import typo
martinkilbinger Oct 16, 2023
eeadfbf
ngmix template
martinkilbinger Oct 18, 2023
ee90dc1
testing openmpi 5.0.0 on candide
martinkilbinger Oct 28, 2023
22c2f56
Testing MPI on candide; errors with process list
martinkilbinger Oct 31, 2023
27e9bbb
Updated MPI setting and candide job
martinkilbinger Oct 31, 2023
c380cd4
ngmix runner reset to develop
martinkilbinger Oct 31, 2023
13940f5
ngmix script reset to develop
martinkilbinger Oct 31, 2023
0bec318
mccd and pysap dependencies added back in to example
martinkilbinger Oct 31, 2023
691daf3
submit run added missing arg
martinkilbinger Oct 31, 2023
3712734
config mpi
martinkilbinger Oct 31, 2023
3fb5a25
mpi4py upgraded to 3.1.5
martinkilbinger Nov 6, 2023
6a76454
changed warning print output
martinkilbinger Nov 6, 2023
7ac5042
ngmix_runner checked out from develop
martinkilbinger Nov 6, 2023
8fba14f
ngmix config template MPI mode
martinkilbinger Nov 10, 2023
1a802cc
changes from output_exists, MPI not working
martinkilbinger Nov 13, 2023
ff1863c
testing again with adding datetime to output
martinkilbinger Nov 15, 2023
d3aeccb
removed pysap check for example runner
martinkilbinger Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
dependencies:
- python=3.9
- pip>=21.2.4
- numpy==1.21.6
- numpy==1.22
- astropy==5.0
- automake==1.16.2
- autoconf==2.69
Expand All @@ -17,11 +17,9 @@ dependencies:
- pandas==1.4.1
- pip:
- cs_util==0.0.5
- mccd==1.2.3
- modopt==1.6.0
- PyQt5==5.15.6
- pyqtgraph==0.12.4
- python-pysap==0.0.6
- reproject==0.8
- sip_tpv==1.1
- sf_tools==2.0.4
Expand Down
2 changes: 1 addition & 1 deletion example/cfis/config_tile_MiViSmVi.ini
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ OUTPUT_DIR = $SP_RUN/output
[JOB]

# Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial
SMP_BATCH_SIZE = 12
SMP_BATCH_SIZE = 1

# Timeout value (optional), default is None, i.e. no timeout limit applied
TIMEOUT = 96:00:00
Expand Down
7 changes: 4 additions & 3 deletions example/cfis/config_tile_Ng_template.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN_DATETIME = False
MODULE = ngmix_runner

# Parallel processing mode, SMP or MPI
MODE = SMP
MODE = MPI


## ShapePipe file handling options
Expand All @@ -44,10 +44,10 @@ OUTPUT_DIR = $SP_RUN/output
[JOB]

# Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial
SMP_BATCH_SIZE = 1
SMP_BATCH_SIZE = 8

# Timeout value (optional), default is None, i.e. no timeout limit applied
TIMEOUT = 96:00:00
TIMEOUT = 48:00:00


## Module options
Expand All @@ -62,6 +62,7 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne
FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite

# NUMBERING_SCHEME (optional) string with numbering pattern for input files
#NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}9
NUMBERING_SCHEME = -000-000

# Multi-epoch mode: Path to file with single-exposure WCS header information
Expand Down
2 changes: 1 addition & 1 deletion example/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# MODULE (required) must be a valid module runner name (or a comma separated list of names)
MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner, execute_example_runner
# MODE (optional) options are smp or mpi, default is smp
; MODE = mpi
MODE = mpi

## ShapePipe file handling options
[FILE]
Expand Down
Empty file removed example/output/.gitkeep
Empty file.
26 changes: 21 additions & 5 deletions example/pbs/candide_mpi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,36 @@
#PBS -l nodes=2:ppn=2

# Full path to environment
export SPENV="$HOME/.conda/envs/shapepipe"
export SPENV="$HOME/.conda/envs/shapepipe_mpi"

# Full path to example config file and input data
export SPDIR="$HOME/shapepipe"

# Load modules
module load intelpython/3
module load openmpi/4.0.5
module load gcc/9.3.0
module load intelpython/3-2023.1.0
module load openmpi/5.0.0

# Activate conda environment
source activate $SPENV

# Run ShapePipe using full paths to executables
$SPENV/bin/mpiexec --map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/config_mpi.ini
# Other options to test
# -map-by

if [ -f "$PBS_NODEFILE" ]; then
NSLOTS=`cat $PBS_NODEFILE | wc -l`
echo "Using $NSLOTS CPUs from PBS_NODEFILE $PBS_NODEFILE"
else
NSLOTS=4
echo "Using $NSLOTS CPUs set by hand"
fi

# Creates #node output dirs
MPI_CMD=/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun
MPI_ARGS="-np $NSLOTS"

${MPI_CMD} ${MPI_ARGS} hostname
${MPI_CMD} ${MPI_ARGS} $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini

# Return exit code
exit 0
12 changes: 9 additions & 3 deletions example/pbs/config_mpi.ini
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# ShapePipe Configuration File Example for MPI

## Default ShapePipe options
[DEFAULT]

# Add date and time to RUN_NAME, optional, default: False
RUN_DATETIME = True

## ShapePipe execution options
[EXECUTION]
MODULE = python_example, serial_example, execute_example
MODULE = python_example_runner, serial_example_runner, execute_example_runner
MODE = mpi

## ShapePipe file handling options
Expand All @@ -15,8 +21,8 @@ OUTPUT_DIR = $SPDIR/example/output
TIMEOUT = 00:01:35

## Module options
[PYTHON_EXAMPLE]
[PYTHON_EXAMPLE_RUNNER]
MESSAGE = The obtained value is:

[SERIAL_EXAMPLE]
[SERIAL_EXAMPLE_RUNNER]
ADD_INPUT_DIR = $SPDIR/example/data/numbers, $SPDIR/example/data/letters
2 changes: 1 addition & 1 deletion install_shapepipe
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ last_update="08/03/22"
# Conda package versions
fftw_ver="3.3.10"
libpng_ver="1.6.37"
mpi4py_ver="3.1.3"
mpi4py_ver="3.1.5"
openblas_ver="0.3.18"

# SExtractor Package
Expand Down
6 changes: 3 additions & 3 deletions scripts/sh/job_sp.bash
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ if [[ $do_job != 0 ]]; then
### Star detection, selection, PSF model. setools can exit with an error for CCD with insufficient stars,
### the script should continue
STOP=0
command_sp "shapepipe_run -c $SP_CONFIG/config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)"
command_cfg_shapepipe "config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)" $n_smp
STOP=1

fi
Expand All @@ -432,7 +432,7 @@ if [[ $do_job != 0 ]]; then
### PSF model letter: 'P' (psfex) or 'M' (mccd)
letter=${psf:0:1}
Letter=${letter^}
command_sp "shapepipe_run -c $SP_CONFIG/config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)"
command_cfg_shapepipe "config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)" $n_smp

fi

Expand Down Expand Up @@ -464,7 +464,7 @@ if [[ $do_job != 0 ]]; then
### Shapes, run $nsh_jobs parallel processes
VERBOSE=0
for k in $(seq 1 $nsh_jobs); do
command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix+galsim $k)" &
command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix $k)" &
done
wait
VERBOSE=1
Expand Down
2 changes: 1 addition & 1 deletion shapepipe/modules/make_cat_package/make_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sqlitedict import SqliteDict

from shapepipe.pipeline import file_io
from shapepipe.utitities import galaxy
from shapepipe.utilities import galaxy


def prepare_final_cat_file(output_path, file_number_string):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def _interpolate_me(self):

all_id = np.copy(cat.get_data()['NUMBER'])
key_ne = 'N_EPOCH'
if key_ne not in cat.get_data():
if key_ne not in cat.get_data().dtype.names:
raise KeyError(
f'Key {key_ne} not found in input galaxy catalogue, needed for'
+ ' PSF interpolation to multi-epoch data; run previous module'
Expand Down
8 changes: 8 additions & 0 deletions shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def process(self):
list_col_name = cat0.get_col_names()
cat0.close()

# Check that exactly 6 (1 primary + secondary HDUs) exist
if len(list_ext_name) != 6:
msg = f"Number of HDUs is {len(list_ext_name)}, expected 6"
#raise ValueError(msg)
print(f"warning: {msg}; setting to 6")
list_ext_name = list_ext_name[:6]
list_col_name = list_col_name[:6]

# Create empty dictionary
# data dimension = n_extension x n_column x n_obj
data = {}
Expand Down
1 change: 0 additions & 1 deletion shapepipe/modules/python_example_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
'mccd',
'ngmix',
'pandas',
'pysap',
'scipy',
'sf_tools',
'sip_tpv',
Expand Down
28 changes: 18 additions & 10 deletions shapepipe/pipeline/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def run_dir(self):
@run_dir.setter
def run_dir(self, value):

self._run_dir = self.check_dir(value, check_exists=True)
self._run_dir = self.check_dir(value, check_exists=False)

@property
def _input_dir(self):
Expand Down Expand Up @@ -158,7 +158,8 @@ def check_dir(cls, dir_name, check_exists=False):

"""
if check_exists and os.path.isdir(dir_name):
raise OSError(f'Directory {dir_name} already exists.')
#raise OSError(f'Directory {dir_name} already exists.')
print(f'Warning: Directory {dir_name} already exists.')

return cls.strip_slash(dir_name)

Expand Down Expand Up @@ -188,8 +189,9 @@ def mkdir(cls, dir_name):
Directory name with full path

"""
cls.check_dir(dir_name, check_exists=True)
mkdir(dir_name)
cls.check_dir(dir_name, check_exists=False)
if not os.path.isdir(dir_name):
mkdir(dir_name)

@staticmethod
def setpath(path, name, ext=''):
Expand Down Expand Up @@ -386,9 +388,12 @@ def create_global_run_dirs(self):
self.run_dir,
)

self.mkdir(self.run_dir)
self.mkdir(self._log_dir)
self.mkdir(self._tmp_dir)
if not os.path.exists(self.run_dir):
self.mkdir(self.run_dir)
if not os.path.exists(self._log_dir):
self.mkdir(self._log_dir)
if not os.path.exists(self._tmp_dir):
self.mkdir(self._tmp_dir)

self._get_input_dir()
self._copy_config_to_log()
Expand Down Expand Up @@ -648,9 +653,12 @@ def _create_module_run_dirs(self, module, run_name):
)
)

self.mkdir(self._module_dict[module][run_name]['run_dir'])
self.mkdir(self._module_dict[module][run_name]['log_dir'])
self.mkdir(self._module_dict[module][run_name]['output_dir'])
if not os.path.exists(self._module_dict[module][run_name]['run_dir']):
self.mkdir(self._module_dict[module][run_name]['run_dir'])
if not os.path.exists(self._module_dict[module][run_name]['log_dir']):
self.mkdir(self._module_dict[module][run_name]['log_dir'])
if not os.path.exists(self._module_dict[module][run_name]['output_dir']):
self.mkdir(self._module_dict[module][run_name]['output_dir'])

# Set current output directory to module output directory
self.output_dir = self._module_dict[module][run_name]['output_dir']
Expand Down
2 changes: 2 additions & 0 deletions shapepipe/pipeline/mpi_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def submit_mpi_jobs(
timeout,
run_dirs,
module_runner,
module_config_sec,
worker_log,
verbose,
):
Expand All @@ -57,6 +58,7 @@ def submit_mpi_jobs(
w_log_name,
run_dirs,
config,
module_config_sec,
timeout,
module_runner
))
Expand Down
9 changes: 6 additions & 3 deletions shapepipe/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,16 @@ def run_mpi(pipe, comm):
# Get file handler objects
run_dirs = jh.filehd.module_run_dirs
module_runner = jh.filehd.module_runners[module]
module_config_sec = jh.filehd.get_module_config_sec(module)
worker_log = jh.filehd.get_worker_log_name
# Define process list
process_list = jh.filehd.process_list
# Define job list
jobs = split_mpi_jobs(process_list, comm.size)
del process_list
else:
job_type = module_runner = worker_log = timeout = \
jobs = run_dirs = None
job_type = module_runner = module_config_sec = worker_log = \
timeout = jobs = run_dirs = None

# Broadcast job type to all nodes
job_type = comm.bcast(job_type, root=0)
Expand All @@ -424,6 +425,7 @@ def run_mpi(pipe, comm):
run_dirs = comm.bcast(run_dirs, root=0)

module_runner = comm.bcast(module_runner, root=0)
module_config_sec = comm.bcast(module_config_sec, root=0)
worker_log = comm.bcast(worker_log, root=0)
timeout = comm.bcast(timeout, root=0)
jobs = comm.scatter(jobs, root=0)
Expand All @@ -436,14 +438,15 @@ def run_mpi(pipe, comm):
timeout,
run_dirs,
module_runner,
module_config_sec,
worker_log,
verbose
),
root=0,
)

# Delete broadcast objects
del module_runner, worker_log, timeout, jobs
del module_runner, module_config_sec, worker_log, timeout, jobs

# Finish up parallel jobs
if master:
Expand Down
2 changes: 1 addition & 1 deletion shapepipe/utilities/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def mkdir(dir_name, check_created=True, exist_ok=True):
If directory not properly created

"""
os.makedirs(dir_name, exist_ok=exist_ok)
os.makedirs(dir_name, exist_ok=True)

if check_created and not check_dir(dir_name):
raise FileSystemError(
Expand Down
Loading