Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parallel processing to speed up obs processing #733

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions ush/ioda/bufr2ioda/run_bufr2ioda.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#!/usr/bin/env python3
import argparse
import glob
import multiprocessing as mp
import os
import shutil
from itertools import repeat
from pathlib import Path
from gen_bufr2ioda_json import gen_bufr_json
from gen_bufr2ioda_yaml import gen_bufr_yaml
Expand All @@ -12,6 +14,24 @@
# Initialize root logger
logger = Logger('run_bufr2ioda.py', level='INFO', colored_log=True)

# get parallel processing info
num_cores = mp.cpu_count()


def mp_bufr_py(script, infile):
cmd = Executable(script)
cmd.add_default_arg('-c')
cmd.add_default_arg(infile)
logger.info(f"Executing {cmd}")
cmd()


def mp_bufr_yaml(bufr2iodaexe, yamlfile):
cmd = Executable(bufr2iodaexe)
cmd.add_default_arg(yamlfile)
logger.info(f"Executing {cmd}")
cmd()


@logit(logger)
def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):
Expand Down Expand Up @@ -44,7 +64,8 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):
BUFR_py_files = [os.path.basename(f) for f in BUFR_py_files]
BUFR_py = [f.replace('bufr2ioda_', '').replace('.py', '') for f in BUFR_py_files]

# NOTE or TODO - how to parallelize these loops????
json_files = []
scripts = []
for obtype in BUFR_py:
logger.info(f"Convert {obtype}...")
json_output_file = os.path.join(COM_OBS, f"{obtype}_{datetime_to_YMDH(current_cycle)}.json")
Expand All @@ -54,21 +75,25 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):

# Use the converter script for the ob type
bufr2iodapy = USH_IODA + '/bufr2ioda_' + obtype + ".py"
cmd = Executable(bufr2iodapy)
cmd.add_default_arg('-c')
cmd.add_default_arg(json_output_file)
logger.info(f"Executing {cmd}")
cmd()

# append the values to the lists
json_files.append(json_output_file)
scripts.append(bufr2iodapy)

# Check if the converter was successful
# if os.path.exists(json_output_file):
# rm_p(json_output_file)

# run all python scripts in parallel
with mp.Pool(num_cores) as pool:
pool.starmap(mp_bufr_py, zip(scripts, json_files))

# Specify observation types to be processed by the bufr2ioda executable
BUFR_yaml_files = glob.glob(os.path.join(config_template_dir, '*.yaml'))
BUFR_yaml_files = [os.path.basename(f) for f in BUFR_yaml_files]
BUFR_yaml = [f.replace('bufr2ioda_', '').replace('.yaml', '') for f in BUFR_yaml_files]

yaml_files = []
for obtype in BUFR_yaml:
logger.info(f"Convert {obtype}...")
yaml_output_file = os.path.join(COM_OBS, f"{obtype}_{datetime_to_YMDH(current_cycle)}.yaml")
Expand All @@ -78,15 +103,18 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):

# use the bufr2ioda executable for the ob type
bufr2iodaexe = BIN_GDAS + '/bufr2ioda.x'
cmd = Executable(bufr2iodaexe)
cmd.add_default_arg(yaml_output_file)
logger.info(f"Executing {cmd}")
cmd()

# append the values to the lists
yaml_files.append(yaml_output_file)

# Check if the converter was successful
# if os.path.exists(yaml_output_file):
# rm_p(yaml_output_file)

# run all bufr2ioda yamls in parallel
with mp.Pool(num_cores) as pool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, the easiest way to do this was to split them into python and YAML+executable groups. If they are roughly equal in size, that probably is okay, but we may want to combine them all into the same pool?

pool.starmap(mp_bufr_yaml, zip(repeat(bufr2iodaexe), yaml_files))


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert bufr dump file to ioda format')
Expand Down
Loading