diff --git a/ush/ioda/bufr2ioda/run_bufr2ioda.py b/ush/ioda/bufr2ioda/run_bufr2ioda.py index c5ab09517..2976cc241 100755 --- a/ush/ioda/bufr2ioda/run_bufr2ioda.py +++ b/ush/ioda/bufr2ioda/run_bufr2ioda.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 import argparse import glob +import multiprocessing as mp import os import shutil +from itertools import repeat from pathlib import Path from gen_bufr2ioda_json import gen_bufr_json from gen_bufr2ioda_yaml import gen_bufr_yaml @@ -12,6 +14,24 @@ # Initialize root logger logger = Logger('run_bufr2ioda.py', level='INFO', colored_log=True) +# get parallel processing info +num_cores = mp.cpu_count() + + +def mp_bufr_py(script, infile): + cmd = Executable(script) + cmd.add_default_arg('-c') + cmd.add_default_arg(infile) + logger.info(f"Executing {cmd}") + cmd() + + +def mp_bufr_yaml(bufr2iodaexe, yamlfile): + cmd = Executable(bufr2iodaexe) + cmd.add_default_arg(yamlfile) + logger.info(f"Executing {cmd}") + cmd() + @logit(logger) def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS): @@ -44,7 +64,8 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS): BUFR_py_files = [os.path.basename(f) for f in BUFR_py_files] BUFR_py = [f.replace('bufr2ioda_', '').replace('.py', '') for f in BUFR_py_files] - # NOTE or TODO - how to parallelize these loops???? + json_files = [] + scripts = [] for obtype in BUFR_py: logger.info(f"Convert {obtype}...") json_output_file = os.path.join(COM_OBS, f"{obtype}_{datetime_to_YMDH(current_cycle)}.json") @@ -54,21 +75,25 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS): # Use the converter script for the ob type bufr2iodapy = USH_IODA + '/bufr2ioda_' + obtype + ".py" - cmd = Executable(bufr2iodapy) - cmd.add_default_arg('-c') - cmd.add_default_arg(json_output_file) - logger.info(f"Executing {cmd}") - cmd() + + # append the values to the lists + json_files.append(json_output_file) + scripts.append(bufr2iodapy) # Check if the converter was successful # if os.path.exists(json_output_file): # rm_p(json_output_file) + # run all python scripts in parallel + with mp.Pool(num_cores) as pool: + pool.starmap(mp_bufr_py, zip(scripts, json_files)) + # Specify observation types to be processed by the bufr2ioda executable BUFR_yaml_files = glob.glob(os.path.join(config_template_dir, '*.yaml')) BUFR_yaml_files = [os.path.basename(f) for f in BUFR_yaml_files] BUFR_yaml = [f.replace('bufr2ioda_', '').replace('.yaml', '') for f in BUFR_yaml_files] + yaml_files = [] for obtype in BUFR_yaml: logger.info(f"Convert {obtype}...") yaml_output_file = os.path.join(COM_OBS, f"{obtype}_{datetime_to_YMDH(current_cycle)}.yaml") @@ -78,15 +103,18 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS): # use the bufr2ioda executable for the ob type bufr2iodaexe = BIN_GDAS + '/bufr2ioda.x' - cmd = Executable(bufr2iodaexe) - cmd.add_default_arg(yaml_output_file) - logger.info(f"Executing {cmd}") - cmd() + + # append the values to the lists + yaml_files.append(yaml_output_file) # Check if the converter was successful # if os.path.exists(yaml_output_file): # rm_p(yaml_output_file) + # run all bufr2ioda yamls in parallel + with mp.Pool(num_cores) as pool: + pool.starmap(mp_bufr_yaml, zip(repeat(bufr2iodaexe), yaml_files)) + if __name__ == "__main__": parser = argparse.ArgumentParser(description='Convert bufr dump file to ioda format')