From a66cc1945cc6c34b790a8013c6a0660a3f11bdd0 Mon Sep 17 00:00:00 2001 From: "kayee.wong" Date: Tue, 3 Sep 2024 20:36:28 +0000 Subject: [PATCH] Update GSL changes. --- env/HERA.env | 11 +-- workflow/rocoto/tasks_emc.py | 124 ++++++++++++++++++++++------------ workflow/rocoto/tasks_gsl.py | 126 +++++++++++++++++++++++------------ 3 files changed, 169 insertions(+), 92 deletions(-) diff --git a/env/HERA.env b/env/HERA.env index 6f45b77f30..66377b2ad4 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -5,7 +5,7 @@ if [[ $# -ne 1 ]]; then echo "Must specify an input argument to set runtime environment variables!" #JKH echo "argument can be any one of the following:" - echo "atmanlrun atmensanlrun aeroanlrun landanlrun" + echo "atmanlrun atmensanlrun aeroanlrun snowanl" echo "anal sfcanl fcst post metp" echo "eobs eupd ecen efcs epos" echo "postsnd awips gempak" @@ -96,15 +96,6 @@ elif [[ "${step}" = "aeroanlrun" ]]; then export NTHREADS_AEROANL=${NTHREADSmax} export APRUN_AEROANL="${APRUN} --cpus-per-task=${NTHREADS_AEROANL}" -#JKH -elif [[ "${step}" = "landanlrun" ]]; then - - nth_max=$((npe_node_max / npe_node_landanlrun)) - - export NTHREADS_LANDANL=${nth_landanlrun:-${nth_max}} - [[ ${NTHREADS_LANDANL} -gt ${nth_max} ]] && export NTHREADS_LANDANL=${nth_max} - export APRUN_LANDANL="${launcher} -n ${npe_landanlrun}" -#JKH elif [[ "${step}" = "atmanlfv3inc" ]]; then diff --git a/workflow/rocoto/tasks_emc.py b/workflow/rocoto/tasks_emc.py index 1c79de0c19..0c84eaba15 100644 --- a/workflow/rocoto/tasks_emc.py +++ b/workflow/rocoto/tasks_emc.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +import copy import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto from wxflow import Template, TemplateConstants, to_timedelta +from typing import List __all__ = ['Tasks'] @@ -12,21 +14,21 @@ class Tasks: SERVICE_TASKS = ['arch', 'earc'] VALID_TASKS = ['aerosol_init', 'stage_ic', 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch', "cleanup", - 'prepatmiodaobs', 'atmanlinit', 'atmanlrun', 'atmanlfinal', + 'prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal', 'prepoceanobs', - 'ocnanalprep', 'ocnanalbmat', 'ocnanalrun', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy', + 'ocnanalprep', 'marinebmat', 'ocnanalrun', 'ocnanalecen', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy', 'earc', 'ecen', 'echgres', 'ediag', 'efcs', 'eobs', 'eomg', 'epos', 'esfc', 'eupd', - 'atmensanlinit', 'atmensanlrun', 'atmensanlfinal', + 'atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', 'aeroanlinit', 'aeroanlrun', 'aeroanlfinal', - 'preplandobs', 'landanl', + 'prepsnowobs', 'snowanl', 'fcst', - 'atmanlupp', 'atmanlprod', 'atmupp', 'atmprod', 'goesupp', - 'ocnpost', + 'atmanlupp', 'atmanlprod', 'atmupp', 'goesupp', + 'atmos_prod', 'ocean_prod', 'ice_prod', 'verfozn', 'verfrad', 'vminmon', 'metp', 'tracker', 'genesis', 'genesis_fsu', - 'postsnd', 'awips_g2', 'awips_20km_1p0deg', 'fbwind', + 'postsnd', 'awips_20km_1p0deg', 'fbwind', 'gempak', 'gempakmeta', 'gempakmetancdc', 'gempakncdcupapgif', 'gempakpgrb2spec', 'npoess_pgrb2_0p5deg' 'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep', @@ -35,17 +37,26 @@ class Tasks: 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] - def __init__(self, app_config: AppConfig, cdump: str) -> None: + def __init__(self, app_config: AppConfig, run: str) -> None: - self.app_config = app_config - self.cdump = cdump + self.app_config = copy.deepcopy(app_config) + self.run = run + # Re-source the configs with RUN specified + print(f"Source configs with RUN={run}") + self._configs = self.app_config.source_configs(run=run, log=False) + # Update the base config for the application + self._configs['base'] = self.app_config.update_base(self._configs['base']) # Save dict_configs and base in the internal state (never know where it may be needed) - self._configs = self.app_config.configs self._base = self._configs['base'] + self.HOMEgfs = self._base['HOMEgfs'] self.rotdir = self._base['ROTDIR'] self.pslot = self._base['PSLOT'] + if self.run == "enkfgfs": + self.nmem = int(self._base['NMEM_ENS_GFS']) + else: + self.nmem = int(self._base['NMEM_ENS']) self._base['cycle_interval'] = to_timedelta(f'{self._base["assim_freq"]}H') self.n_tiles = 6 # TODO - this needs to be elsewhere @@ -54,13 +65,13 @@ def __init__(self, app_config: AppConfig, cdump: str) -> None: 'HOMEgfs': self.HOMEgfs, 'EXPDIR': self._base.get('EXPDIR'), 'NET': self._base.get('NET'), - 'CDUMP': self.cdump, - 'RUN': self.cdump, + 'RUN': self.run, 'CDATE': '@Y@m@d@H', 'PDY': '@Y@m@d', 'cyc': '@H', 'COMROOT': self._base.get('COMROOT'), 'DATAROOT': self._base.get('DATAROOT')} + self.envars = self._set_envars(envar_dict) @staticmethod @@ -72,12 +83,6 @@ def _set_envars(envar_dict) -> list: return envars - @staticmethod - def _get_hybgroups(nens: int, nmem_per_group: int, start_index: int = 1): - ngrps = nens / nmem_per_group - groups = ' '.join([f'{x:02d}' for x in range(start_index, int(ngrps) + 1)]) - return groups - def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> str: ''' Takes a string templated with ${ } and converts it into a string suitable @@ -87,8 +92,8 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> Variables substitued by default: ${ROTDIR} -> '&ROTDIR;' - ${RUN} -> self.cdump - ${DUMP} -> self.cdump + ${RUN} -> self.run + ${DUMP} -> self.run ${MEMDIR} -> '' ${YMD} -> '@Y@m@d' ${HH} -> '@H' @@ -110,8 +115,8 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> # Defaults rocoto_conversion_dict = { 'ROTDIR': '&ROTDIR;', - 'RUN': self.cdump, - 'DUMP': self.cdump, + 'RUN': self.run, + 'DUMP': self.run, 'MEMDIR': '', 'YMD': '@Y@m@d', 'HH': '@H' @@ -123,12 +128,49 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> TemplateConstants.DOLLAR_CURLY_BRACE, rocoto_conversion_dict.get) + @staticmethod + def _get_forecast_hours(run, config, component='atmos') -> List[str]: + # Make a local copy of the config to avoid modifying the original + local_config = config.copy() + + # Ocean/Ice components do not have a HF output option like the atmosphere + if component in ['ocean', 'ice']: + local_config['FHMAX_HF_GFS'] = 0 + + if component in ['ocean']: + local_config['FHOUT_HF_GFS'] = config['FHOUT_OCN_GFS'] + local_config['FHOUT_GFS'] = config['FHOUT_OCN_GFS'] + local_config['FHOUT'] = config['FHOUT_OCN'] + + if component in ['ice']: + local_config['FHOUT_HF_GFS'] = config['FHOUT_ICE_GFS'] + local_config['FHOUT_GFS'] = config['FHOUT_ICE_GFS'] + local_config['FHOUT'] = config['FHOUT_ICE'] + + fhmin = local_config['FHMIN'] + + # Get a list of all forecast hours + fhrs = [] + if run in ['gdas']: + fhmax = local_config['FHMAX'] + fhout = local_config['FHOUT'] + fhrs = list(range(fhmin, fhmax + fhout, fhout)) + elif run in ['gfs', 'gefs']: + fhmax = local_config['FHMAX_GFS'] + fhout = local_config['FHOUT_GFS'] + fhmax_hf = local_config['FHMAX_HF_GFS'] + fhout_hf = local_config['FHOUT_HF_GFS'] + fhrs_hf = range(fhmin, fhmax_hf + fhout_hf, fhout_hf) + fhrs = list(fhrs_hf) + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout)) + + return fhrs + def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names), return a dictionary of resources (task_resource) used by the task. Task resource dictionary includes: - account, walltime, cores, nodes, ppn, threads, memory, queue, partition, native + account, walltime, ntasks, nodes, ppn, threads, memory, queue, partition, native """ scheduler = self.app_config.scheduler @@ -137,32 +179,28 @@ def get_resource(self, task_name): account = task_config['ACCOUNT'] - walltime = task_config[f'wtime_{task_name}'] - if self.cdump in ['gfs'] and f'wtime_{task_name}_gfs' in task_config.keys(): - walltime = task_config[f'wtime_{task_name}_gfs'] + walltime = task_config[f'walltime'] + ntasks = task_config[f'ntasks'] + ppn = task_config[f'tasks_per_node'] - cores = task_config[f'npe_{task_name}'] - if self.cdump in ['gfs'] and f'npe_{task_name}_gfs' in task_config.keys(): - cores = task_config[f'npe_{task_name}_gfs'] + nodes = int(np.ceil(float(ntasks) / float(ppn))) - ppn = task_config[f'npe_node_{task_name}'] - if self.cdump in ['gfs'] and f'npe_node_{task_name}_gfs' in task_config.keys(): - ppn = task_config[f'npe_node_{task_name}_gfs'] + threads = task_config[f'threads_per_task'] - nodes = int(np.ceil(float(cores) / float(ppn))) + # Memory is not required + memory = task_config.get(f'memory', None) - threads = task_config[f'nth_{task_name}'] - if self.cdump in ['gfs'] and f'nth_{task_name}_gfs' in task_config.keys(): - threads = task_config[f'nth_{task_name}_gfs'] - - memory = task_config.get(f'memory_{task_name}', None) if scheduler in ['pbspro']: if task_config.get('prepost', False): memory += ':prepost=true' native = None if scheduler in ['pbspro']: - native = '-l debug=true,place=vscatter' + # Set place=vscatter by default and debug=true if DEBUG_POSTSCRIPT="YES" + if self._base['DEBUG_POSTSCRIPT']: + native = '-l debug=true,place=vscatter' + else: + native = '-l place=vscatter' # Set either exclusive or shared - default on WCOSS2 is exclusive when not set if task_config.get('is_exclusive', False): native += ':exclhost' @@ -170,6 +208,10 @@ def get_resource(self, task_name): native += ':shared' elif scheduler in ['slurm']: native = '--export=NONE' + if task_config['RESERVATION'] != "": + native += '' if task_name in Tasks.SERVICE_TASKS else ' --reservation=' + task_config['RESERVATION'] + if task_config.get('CLUSTERS', "") not in ["", '@CLUSTERS@']: + native += ' --clusters=' + task_config['CLUSTERS'] queue = task_config['QUEUE_SERVICE'] if task_name in Tasks.SERVICE_TASKS else task_config['QUEUE'] @@ -181,7 +223,7 @@ def get_resource(self, task_name): task_resource = {'account': account, 'walltime': walltime, 'nodes': nodes, - 'cores': cores, + 'ntasks': ntasks, 'ppn': ppn, 'threads': threads, 'memory': memory, diff --git a/workflow/rocoto/tasks_gsl.py b/workflow/rocoto/tasks_gsl.py index 371721bbb9..18f1f8bf5d 100644 --- a/workflow/rocoto/tasks_gsl.py +++ b/workflow/rocoto/tasks_gsl.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +import copy import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto from wxflow import Template, TemplateConstants, to_timedelta +from typing import List __all__ = ['Tasks'] @@ -12,21 +14,21 @@ class Tasks: SERVICE_TASKS = ['arch', 'earc'] VALID_TASKS = ['aerosol_init', 'stage_ic', 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch', "cleanup", - 'prepatmiodaobs', 'atmanlinit', 'atmanlrun', 'atmanlfinal', + 'prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal', 'prepoceanobs', - 'ocnanalprep', 'ocnanalbmat', 'ocnanalrun', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy', + 'ocnanalprep', 'marinebmat', 'ocnanalrun', 'ocnanalecen', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy', 'earc', 'ecen', 'echgres', 'ediag', 'efcs', 'eobs', 'eomg', 'epos', 'esfc', 'eupd', - 'atmensanlinit', 'atmensanlrun', 'atmensanlfinal', + 'atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', 'aeroanlinit', 'aeroanlrun', 'aeroanlfinal', - 'preplandobs', 'landanl', + 'prepsnowobs', 'snowanl', 'fcst', - 'atmanlupp', 'atmanlprod', 'atmupp', 'atmprod', 'goesupp', - 'ocnpost', + 'atmanlupp', 'atmanlprod', 'atmupp', 'goesupp', + 'atmos_prod', 'ocean_prod', 'ice_prod', 'verfozn', 'verfrad', 'vminmon', 'metp', 'tracker', 'genesis', 'genesis_fsu', - 'postsnd', 'awips_g2', 'awips_20km_1p0deg', 'fbwind', + 'postsnd', 'awips_20km_1p0deg', 'fbwind', 'gempak', 'gempakmeta', 'gempakmetancdc', 'gempakncdcupapgif', 'gempakpgrb2spec', 'npoess_pgrb2_0p5deg' 'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep', @@ -35,17 +37,26 @@ class Tasks: 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] - def __init__(self, app_config: AppConfig, cdump: str) -> None: + def __init__(self, app_config: AppConfig, run: str) -> None: - self.app_config = app_config - self.cdump = cdump + self.app_config = copy.deepcopy(app_config) + self.run = run + # Re-source the configs with RUN specified + print(f"Source configs with RUN={run}") + self._configs = self.app_config.source_configs(run=run, log=False) + # Update the base config for the application + self._configs['base'] = self.app_config.update_base(self._configs['base']) # Save dict_configs and base in the internal state (never know where it may be needed) - self._configs = self.app_config.configs self._base = self._configs['base'] + self.HOMEgfs = self._base['HOMEgfs'] self.rotdir = self._base['ROTDIR'] self.pslot = self._base['PSLOT'] + if self.run == "enkfgfs": + self.nmem = int(self._base['NMEM_ENS_GFS']) + else: + self.nmem = int(self._base['NMEM_ENS']) self._base['cycle_interval'] = to_timedelta(f'{self._base["assim_freq"]}H') self.n_tiles = 6 # TODO - this needs to be elsewhere @@ -53,15 +64,16 @@ def __init__(self, app_config: AppConfig, cdump: str) -> None: envar_dict = {'RUN_ENVIR': self._base.get('RUN_ENVIR', 'emc'), 'HOMEgfs': self.HOMEgfs, 'EXPDIR': self._base.get('EXPDIR'), +#JKH 'ROTDIR': self._base.get('ROTDIR'), 'NET': self._base.get('NET'), - 'CDUMP': self.cdump, - 'RUN': self.cdump, + 'RUN': self.run, 'CDATE': '@Y@m@d@H', 'PDY': '@Y@m@d', 'cyc': '@H', 'COMROOT': self._base.get('COMROOT'), 'DATAROOT': self._base.get('DATAROOT')} + self.envars = self._set_envars(envar_dict) @staticmethod @@ -73,12 +85,6 @@ def _set_envars(envar_dict) -> list: return envars - @staticmethod - def _get_hybgroups(nens: int, nmem_per_group: int, start_index: int = 1): - ngrps = nens / nmem_per_group - groups = ' '.join([f'{x:02d}' for x in range(start_index, int(ngrps) + 1)]) - return groups - def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> str: ''' Takes a string templated with ${ } and converts it into a string suitable @@ -88,8 +94,8 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> Variables substitued by default: ${ROTDIR} -> '&ROTDIR;' - ${RUN} -> self.cdump - ${DUMP} -> self.cdump + ${RUN} -> self.run + ${DUMP} -> self.run ${MEMDIR} -> '' ${YMD} -> '@Y@m@d' ${HH} -> '@H' @@ -111,8 +117,8 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> # Defaults rocoto_conversion_dict = { 'ROTDIR': '&ROTDIR;', - 'RUN': self.cdump, - 'DUMP': self.cdump, + 'RUN': self.run, + 'DUMP': self.run, 'MEMDIR': '', 'YMD': '@Y@m@d', 'HH': '@H' @@ -124,12 +130,49 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> TemplateConstants.DOLLAR_CURLY_BRACE, rocoto_conversion_dict.get) + @staticmethod + def _get_forecast_hours(run, config, component='atmos') -> List[str]: + # Make a local copy of the config to avoid modifying the original + local_config = config.copy() + + # Ocean/Ice components do not have a HF output option like the atmosphere + if component in ['ocean', 'ice']: + local_config['FHMAX_HF_GFS'] = 0 + + if component in ['ocean']: + local_config['FHOUT_HF_GFS'] = config['FHOUT_OCN_GFS'] + local_config['FHOUT_GFS'] = config['FHOUT_OCN_GFS'] + local_config['FHOUT'] = config['FHOUT_OCN'] + + if component in ['ice']: + local_config['FHOUT_HF_GFS'] = config['FHOUT_ICE_GFS'] + local_config['FHOUT_GFS'] = config['FHOUT_ICE_GFS'] + local_config['FHOUT'] = config['FHOUT_ICE'] + + fhmin = local_config['FHMIN'] + + # Get a list of all forecast hours + fhrs = [] + if run in ['gdas']: + fhmax = local_config['FHMAX'] + fhout = local_config['FHOUT'] + fhrs = list(range(fhmin, fhmax + fhout, fhout)) + elif run in ['gfs', 'gefs']: + fhmax = local_config['FHMAX_GFS'] + fhout = local_config['FHOUT_GFS'] + fhmax_hf = local_config['FHMAX_HF_GFS'] + fhout_hf = local_config['FHOUT_HF_GFS'] + fhrs_hf = range(fhmin, fhmax_hf + fhout_hf, fhout_hf) + fhrs = list(fhrs_hf) + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout)) + + return fhrs + def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names), return a dictionary of resources (task_resource) used by the task. Task resource dictionary includes: - account, walltime, cores, nodes, ppn, threads, memory, queue, partition, native + account, walltime, ntasks, nodes, ppn, threads, memory, queue, partition, native """ scheduler = self.app_config.scheduler @@ -138,39 +181,40 @@ def get_resource(self, task_name): account = task_config['ACCOUNT'] - walltime = task_config[f'wtime_{task_name}'] - if self.cdump in ['gfs'] and f'wtime_{task_name}_gfs' in task_config.keys(): - walltime = task_config[f'wtime_{task_name}_gfs'] + walltime = task_config[f'walltime'] + ntasks = task_config[f'ntasks'] + ppn = task_config[f'tasks_per_node'] - cores = task_config[f'npe_{task_name}'] - if self.cdump in ['gfs'] and f'npe_{task_name}_gfs' in task_config.keys(): - cores = task_config[f'npe_{task_name}_gfs'] + nodes = int(np.ceil(float(ntasks) / float(ppn))) - ppn = task_config[f'npe_node_{task_name}'] - if self.cdump in ['gfs'] and f'npe_node_{task_name}_gfs' in task_config.keys(): - ppn = task_config[f'npe_node_{task_name}_gfs'] + threads = task_config[f'threads_per_task'] - nodes = int(np.ceil(float(cores) / float(ppn))) + # Memory is not required + memory = task_config.get(f'memory', None) - threads = task_config[f'nth_{task_name}'] - if self.cdump in ['gfs'] and f'nth_{task_name}_gfs' in task_config.keys(): - threads = task_config[f'nth_{task_name}_gfs'] - - memory = task_config.get(f'memory_{task_name}', None) if scheduler in ['pbspro']: if task_config.get('prepost', False): memory += ':prepost=true' native = None if scheduler in ['pbspro']: - native = '-l debug=true,place=vscatter' + # Set place=vscatter by default and debug=true if DEBUG_POSTSCRIPT="YES" + if self._base['DEBUG_POSTSCRIPT']: + native = '-l debug=true,place=vscatter' + else: + native = '-l place=vscatter' # Set either exclusive or shared - default on WCOSS2 is exclusive when not set if task_config.get('is_exclusive', False): native += ':exclhost' else: native += ':shared' elif scheduler in ['slurm']: +#JKH native = '--export=NONE' native = '&NATIVE_STR;' + if task_config['RESERVATION'] != "": + native += '' if task_name in Tasks.SERVICE_TASKS else ' --reservation=' + task_config['RESERVATION'] + if task_config.get('CLUSTERS', "") not in ["", '@CLUSTERS@']: + native += ' --clusters=' + task_config['CLUSTERS'] queue = task_config['QUEUE_SERVICE'] if task_name in Tasks.SERVICE_TASKS else task_config['QUEUE'] @@ -182,7 +226,7 @@ def get_resource(self, task_name): task_resource = {'account': account, 'walltime': walltime, 'nodes': nodes, - 'cores': cores, + 'ntasks': ntasks, 'ppn': ppn, 'threads': threads, 'memory': memory,