diff --git a/environment.yml b/environment.yml index 9279402ef..2d3317523 100644 --- a/environment.yml +++ b/environment.yml @@ -4,7 +4,7 @@ channels: dependencies: - python=3.9 - pip>=21.2.4 - - numpy==1.21.6 + - numpy==1.22 - astropy==5.0 - automake==1.16.2 - autoconf==2.69 @@ -17,11 +17,9 @@ dependencies: - pandas==1.4.1 - pip: - cs_util==0.0.5 - - mccd==1.2.3 - modopt==1.6.0 - PyQt5==5.15.6 - pyqtgraph==0.12.4 - - python-pysap==0.0.6 - reproject==0.8 - sip_tpv==1.1 - sf_tools==2.0.4 diff --git a/example/cfis/config_tile_MiViSmVi.ini b/example/cfis/config_tile_MiViSmVi.ini index 6198c324c..7dcbe6c50 100644 --- a/example/cfis/config_tile_MiViSmVi.ini +++ b/example/cfis/config_tile_MiViSmVi.ini @@ -47,7 +47,7 @@ OUTPUT_DIR = $SP_RUN/output [JOB] # Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial -SMP_BATCH_SIZE = 12 +SMP_BATCH_SIZE = 1 # Timeout value (optional), default is None, i.e. no timeout limit applied TIMEOUT = 96:00:00 diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index f9d733d71..d6e20a677 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -21,7 +21,7 @@ RUN_DATETIME = False MODULE = ngmix_runner # Parallel processing mode, SMP or MPI -MODE = SMP +MODE = MPI ## ShapePipe file handling options @@ -44,10 +44,10 @@ OUTPUT_DIR = $SP_RUN/output [JOB] # Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial -SMP_BATCH_SIZE = 1 +SMP_BATCH_SIZE = 8 # Timeout value (optional), default is None, i.e. no timeout limit applied -TIMEOUT = 96:00:00 +TIMEOUT = 48:00:00 ## Module options @@ -62,6 +62,7 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite # NUMBERING_SCHEME (optional) string with numbering pattern for input files +#NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}9 NUMBERING_SCHEME = -000-000 # Multi-epoch mode: Path to file with single-exposure WCS header information diff --git a/example/config.ini b/example/config.ini index 49541f944..51d1479b5 100644 --- a/example/config.ini +++ b/example/config.ini @@ -14,7 +14,7 @@ # MODULE (required) must be a valid module runner name (or a comma separated list of names) MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner, execute_example_runner # MODE (optional) options are smp or mpi, default is smp -; MODE = mpi +MODE = mpi ## ShapePipe file handling options [FILE] diff --git a/example/output/.gitkeep b/example/output/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/example/pbs/candide_mpi.sh b/example/pbs/candide_mpi.sh index 0abbbb7f4..41f7a5a25 100644 --- a/example/pbs/candide_mpi.sh +++ b/example/pbs/candide_mpi.sh @@ -21,20 +21,36 @@ #PBS -l nodes=2:ppn=2 # Full path to environment -export SPENV="$HOME/.conda/envs/shapepipe" +export SPENV="$HOME/.conda/envs/shapepipe_mpi" # Full path to example config file and input data export SPDIR="$HOME/shapepipe" # Load modules -module load intelpython/3 -module load openmpi/4.0.5 +module load gcc/9.3.0 +module load intelpython/3-2023.1.0 +module load openmpi/5.0.0 # Activate conda environment source activate $SPENV -# Run ShapePipe using full paths to executables -$SPENV/bin/mpiexec --map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/config_mpi.ini +# Other options to test +# -map-by + +if [ -f "$PBS_NODEFILE" ]; then + NSLOTS=`cat $PBS_NODEFILE | wc -l` + echo "Using $NSLOTS CPUs from PBS_NODEFILE $PBS_NODEFILE" +else + NSLOTS=4 + echo "Using $NSLOTS CPUs set by hand" +fi + +# Creates #node output dirs +MPI_CMD=/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun +MPI_ARGS="-np $NSLOTS" + +${MPI_CMD} ${MPI_ARGS} hostname +${MPI_CMD} ${MPI_ARGS} $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini # Return exit code exit 0 diff --git a/example/pbs/config_mpi.ini b/example/pbs/config_mpi.ini index bb2b8f95d..559fe1151 100644 --- a/example/pbs/config_mpi.ini +++ b/example/pbs/config_mpi.ini @@ -1,8 +1,14 @@ # ShapePipe Configuration File Example for MPI +## Default ShapePipe options +[DEFAULT] + +# Add date and time to RUN_NAME, optional, default: False +RUN_DATETIME = True + ## ShapePipe execution options [EXECUTION] -MODULE = python_example, serial_example, execute_example +MODULE = python_example_runner, serial_example_runner, execute_example_runner MODE = mpi ## ShapePipe file handling options @@ -15,8 +21,8 @@ OUTPUT_DIR = $SPDIR/example/output TIMEOUT = 00:01:35 ## Module options -[PYTHON_EXAMPLE] +[PYTHON_EXAMPLE_RUNNER] MESSAGE = The obtained value is: -[SERIAL_EXAMPLE] +[SERIAL_EXAMPLE_RUNNER] ADD_INPUT_DIR = $SPDIR/example/data/numbers, $SPDIR/example/data/letters diff --git a/install_shapepipe b/install_shapepipe index c370d91d0..5af39c158 100755 --- a/install_shapepipe +++ b/install_shapepipe @@ -22,7 +22,7 @@ last_update="08/03/22" # Conda package versions fftw_ver="3.3.10" libpng_ver="1.6.37" -mpi4py_ver="3.1.3" +mpi4py_ver="3.1.5" openblas_ver="0.3.18" # SExtractor Package diff --git a/scripts/sh/job_sp.bash b/scripts/sh/job_sp.bash index 15cb1428a..89692703e 100755 --- a/scripts/sh/job_sp.bash +++ b/scripts/sh/job_sp.bash @@ -420,7 +420,7 @@ if [[ $do_job != 0 ]]; then ### Star detection, selection, PSF model. setools can exit with an error for CCD with insufficient stars, ### the script should continue STOP=0 - command_sp "shapepipe_run -c $SP_CONFIG/config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)" + command_cfg_shapepipe "config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)" $n_smp STOP=1 fi @@ -432,7 +432,7 @@ if [[ $do_job != 0 ]]; then ### PSF model letter: 'P' (psfex) or 'M' (mccd) letter=${psf:0:1} Letter=${letter^} - command_sp "shapepipe_run -c $SP_CONFIG/config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)" + command_cfg_shapepipe "config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)" $n_smp fi @@ -464,7 +464,7 @@ if [[ $do_job != 0 ]]; then ### Shapes, run $nsh_jobs parallel processes VERBOSE=0 for k in $(seq 1 $nsh_jobs); do - command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix+galsim $k)" & + command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix $k)" & done wait VERBOSE=1 diff --git a/shapepipe/modules/make_cat_package/make_cat.py b/shapepipe/modules/make_cat_package/make_cat.py index 6bbe26648..4e30e3552 100644 --- a/shapepipe/modules/make_cat_package/make_cat.py +++ b/shapepipe/modules/make_cat_package/make_cat.py @@ -16,7 +16,7 @@ from sqlitedict import SqliteDict from shapepipe.pipeline import file_io -from shapepipe.utitities import galaxy +from shapepipe.utilities import galaxy def prepare_final_cat_file(output_path, file_number_string): diff --git a/shapepipe/modules/mccd_package/mccd_interpolation_script.py b/shapepipe/modules/mccd_package/mccd_interpolation_script.py index 40e644e8b..84ccb02a4 100644 --- a/shapepipe/modules/mccd_package/mccd_interpolation_script.py +++ b/shapepipe/modules/mccd_package/mccd_interpolation_script.py @@ -395,7 +395,7 @@ def _interpolate_me(self): all_id = np.copy(cat.get_data()['NUMBER']) key_ne = 'N_EPOCH' - if key_ne not in cat.get_data(): + if key_ne not in cat.get_data().dtype.names: raise KeyError( f'Key {key_ne} not found in input galaxy catalogue, needed for' + ' PSF interpolation to multi-epoch data; run previous module' diff --git a/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py b/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py index 85f7e194f..83d95f567 100644 --- a/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py +++ b/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py @@ -89,6 +89,14 @@ def process(self): list_col_name = cat0.get_col_names() cat0.close() + # Check that exactly 6 (1 primary + secondary HDUs) exist + if len(list_ext_name) != 6: + msg = f"Number of HDUs is {len(list_ext_name)}, expected 6" + #raise ValueError(msg) + print(f"warning: {msg}; setting to 6") + list_ext_name = list_ext_name[:6] + list_col_name = list_col_name[:6] + # Create empty dictionary # data dimension = n_extension x n_column x n_obj data = {} diff --git a/shapepipe/modules/python_example_runner.py b/shapepipe/modules/python_example_runner.py index d468c10e0..418122198 100644 --- a/shapepipe/modules/python_example_runner.py +++ b/shapepipe/modules/python_example_runner.py @@ -22,7 +22,6 @@ 'mccd', 'ngmix', 'pandas', - 'pysap', 'scipy', 'sf_tools', 'sip_tpv', diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index b7479f6f3..c2076571f 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -93,7 +93,7 @@ def run_dir(self): @run_dir.setter def run_dir(self, value): - self._run_dir = self.check_dir(value, check_exists=True) + self._run_dir = self.check_dir(value, check_exists=False) @property def _input_dir(self): @@ -158,7 +158,8 @@ def check_dir(cls, dir_name, check_exists=False): """ if check_exists and os.path.isdir(dir_name): - raise OSError(f'Directory {dir_name} already exists.') + #raise OSError(f'Directory {dir_name} already exists.') + print(f'Warning: Directory {dir_name} already exists.') return cls.strip_slash(dir_name) @@ -188,8 +189,9 @@ def mkdir(cls, dir_name): Directory name with full path """ - cls.check_dir(dir_name, check_exists=True) - mkdir(dir_name) + cls.check_dir(dir_name, check_exists=False) + if not os.path.isdir(dir_name): + mkdir(dir_name) @staticmethod def setpath(path, name, ext=''): @@ -386,9 +388,12 @@ def create_global_run_dirs(self): self.run_dir, ) - self.mkdir(self.run_dir) - self.mkdir(self._log_dir) - self.mkdir(self._tmp_dir) + if not os.path.exists(self.run_dir): + self.mkdir(self.run_dir) + if not os.path.exists(self._log_dir): + self.mkdir(self._log_dir) + if not os.path.exists(self._tmp_dir): + self.mkdir(self._tmp_dir) self._get_input_dir() self._copy_config_to_log() @@ -648,9 +653,12 @@ def _create_module_run_dirs(self, module, run_name): ) ) - self.mkdir(self._module_dict[module][run_name]['run_dir']) - self.mkdir(self._module_dict[module][run_name]['log_dir']) - self.mkdir(self._module_dict[module][run_name]['output_dir']) + if not os.path.exists(self._module_dict[module][run_name]['run_dir']): + self.mkdir(self._module_dict[module][run_name]['run_dir']) + if not os.path.exists(self._module_dict[module][run_name]['log_dir']): + self.mkdir(self._module_dict[module][run_name]['log_dir']) + if not os.path.exists(self._module_dict[module][run_name]['output_dir']): + self.mkdir(self._module_dict[module][run_name]['output_dir']) # Set current output directory to module output directory self.output_dir = self._module_dict[module][run_name]['output_dir'] diff --git a/shapepipe/pipeline/mpi_run.py b/shapepipe/pipeline/mpi_run.py index c92d943c8..5baf84227 100644 --- a/shapepipe/pipeline/mpi_run.py +++ b/shapepipe/pipeline/mpi_run.py @@ -36,6 +36,7 @@ def submit_mpi_jobs( timeout, run_dirs, module_runner, + module_config_sec, worker_log, verbose, ): @@ -57,6 +58,7 @@ def submit_mpi_jobs( w_log_name, run_dirs, config, + module_config_sec, timeout, module_runner )) diff --git a/shapepipe/run.py b/shapepipe/run.py index 2443eab50..7ae36aa10 100644 --- a/shapepipe/run.py +++ b/shapepipe/run.py @@ -405,6 +405,7 @@ def run_mpi(pipe, comm): # Get file handler objects run_dirs = jh.filehd.module_run_dirs module_runner = jh.filehd.module_runners[module] + module_config_sec = jh.filehd.get_module_config_sec(module) worker_log = jh.filehd.get_worker_log_name # Define process list process_list = jh.filehd.process_list @@ -412,8 +413,8 @@ def run_mpi(pipe, comm): jobs = split_mpi_jobs(process_list, comm.size) del process_list else: - job_type = module_runner = worker_log = timeout = \ - jobs = run_dirs = None + job_type = module_runner = module_config_sec = worker_log = \ + timeout = jobs = run_dirs = None # Broadcast job type to all nodes job_type = comm.bcast(job_type, root=0) @@ -424,6 +425,7 @@ def run_mpi(pipe, comm): run_dirs = comm.bcast(run_dirs, root=0) module_runner = comm.bcast(module_runner, root=0) + module_config_sec = comm.bcast(module_config_sec, root=0) worker_log = comm.bcast(worker_log, root=0) timeout = comm.bcast(timeout, root=0) jobs = comm.scatter(jobs, root=0) @@ -436,6 +438,7 @@ def run_mpi(pipe, comm): timeout, run_dirs, module_runner, + module_config_sec, worker_log, verbose ), @@ -443,7 +446,7 @@ def run_mpi(pipe, comm): ) # Delete broadcast objects - del module_runner, worker_log, timeout, jobs + del module_runner, module_config_sec, worker_log, timeout, jobs # Finish up parallel jobs if master: diff --git a/shapepipe/utilities/file_system.py b/shapepipe/utilities/file_system.py index 52f5fd906..dd304807b 100644 --- a/shapepipe/utilities/file_system.py +++ b/shapepipe/utilities/file_system.py @@ -67,7 +67,7 @@ def mkdir(dir_name, check_created=True, exist_ok=True): If directory not properly created """ - os.makedirs(dir_name, exist_ok=exist_ok) + os.makedirs(dir_name, exist_ok=True) if check_created and not check_dir(dir_name): raise FileSystemError(