Skip to content

Commit

Permalink
Update GSL changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kayeekayee committed Sep 3, 2024
1 parent ece5a3e commit a66cc19
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 92 deletions.
11 changes: 1 addition & 10 deletions env/HERA.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ if [[ $# -ne 1 ]]; then
echo "Must specify an input argument to set runtime environment variables!"
#JKH
echo "argument can be any one of the following:"
echo "atmanlrun atmensanlrun aeroanlrun landanlrun"
echo "atmanlrun atmensanlrun aeroanlrun snowanl"
echo "anal sfcanl fcst post metp"
echo "eobs eupd ecen efcs epos"
echo "postsnd awips gempak"
Expand Down Expand Up @@ -96,15 +96,6 @@ elif [[ "${step}" = "aeroanlrun" ]]; then
export NTHREADS_AEROANL=${NTHREADSmax}

export APRUN_AEROANL="${APRUN} --cpus-per-task=${NTHREADS_AEROANL}"
#JKH
elif [[ "${step}" = "landanlrun" ]]; then

nth_max=$((npe_node_max / npe_node_landanlrun))

export NTHREADS_LANDANL=${nth_landanlrun:-${nth_max}}
[[ ${NTHREADS_LANDANL} -gt ${nth_max} ]] && export NTHREADS_LANDANL=${nth_max}
export APRUN_LANDANL="${launcher} -n ${npe_landanlrun}"
#JKH

elif [[ "${step}" = "atmanlfv3inc" ]]; then

Expand Down
124 changes: 83 additions & 41 deletions workflow/rocoto/tasks_emc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python3

import copy
import numpy as np
from applications.applications import AppConfig
import rocoto.rocoto as rocoto
from wxflow import Template, TemplateConstants, to_timedelta
from typing import List

__all__ = ['Tasks']

Expand All @@ -12,21 +14,21 @@ class Tasks:
SERVICE_TASKS = ['arch', 'earc']
VALID_TASKS = ['aerosol_init', 'stage_ic',
'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch', "cleanup",
'prepatmiodaobs', 'atmanlinit', 'atmanlrun', 'atmanlfinal',
'prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal',
'prepoceanobs',
'ocnanalprep', 'ocnanalbmat', 'ocnanalrun', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy',
'ocnanalprep', 'marinebmat', 'ocnanalrun', 'ocnanalecen', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy',
'earc', 'ecen', 'echgres', 'ediag', 'efcs',
'eobs', 'eomg', 'epos', 'esfc', 'eupd',
'atmensanlinit', 'atmensanlrun', 'atmensanlfinal',
'atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal',
'aeroanlinit', 'aeroanlrun', 'aeroanlfinal',
'preplandobs', 'landanl',
'prepsnowobs', 'snowanl',
'fcst',
'atmanlupp', 'atmanlprod', 'atmupp', 'atmprod', 'goesupp',
'ocnpost',
'atmanlupp', 'atmanlprod', 'atmupp', 'goesupp',
'atmos_prod', 'ocean_prod', 'ice_prod',
'verfozn', 'verfrad', 'vminmon',
'metp',
'tracker', 'genesis', 'genesis_fsu',
'postsnd', 'awips_g2', 'awips_20km_1p0deg', 'fbwind',
'postsnd', 'awips_20km_1p0deg', 'fbwind',
'gempak', 'gempakmeta', 'gempakmetancdc', 'gempakncdcupapgif', 'gempakpgrb2spec', 'npoess_pgrb2_0p5deg'
'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit',
'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep',
Expand All @@ -35,17 +37,26 @@ class Tasks:
'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst',
'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen']

def __init__(self, app_config: AppConfig, cdump: str) -> None:
def __init__(self, app_config: AppConfig, run: str) -> None:

self.app_config = app_config
self.cdump = cdump
self.app_config = copy.deepcopy(app_config)
self.run = run
# Re-source the configs with RUN specified
print(f"Source configs with RUN={run}")
self._configs = self.app_config.source_configs(run=run, log=False)

# Update the base config for the application
self._configs['base'] = self.app_config.update_base(self._configs['base'])
# Save dict_configs and base in the internal state (never know where it may be needed)
self._configs = self.app_config.configs
self._base = self._configs['base']

self.HOMEgfs = self._base['HOMEgfs']
self.rotdir = self._base['ROTDIR']
self.pslot = self._base['PSLOT']
if self.run == "enkfgfs":
self.nmem = int(self._base['NMEM_ENS_GFS'])
else:
self.nmem = int(self._base['NMEM_ENS'])
self._base['cycle_interval'] = to_timedelta(f'{self._base["assim_freq"]}H')

self.n_tiles = 6 # TODO - this needs to be elsewhere
Expand All @@ -54,13 +65,13 @@ def __init__(self, app_config: AppConfig, cdump: str) -> None:
'HOMEgfs': self.HOMEgfs,
'EXPDIR': self._base.get('EXPDIR'),
'NET': self._base.get('NET'),
'CDUMP': self.cdump,
'RUN': self.cdump,
'RUN': self.run,
'CDATE': '<cyclestr>@Y@m@d@H</cyclestr>',
'PDY': '<cyclestr>@Y@m@d</cyclestr>',
'cyc': '<cyclestr>@H</cyclestr>',
'COMROOT': self._base.get('COMROOT'),
'DATAROOT': self._base.get('DATAROOT')}

self.envars = self._set_envars(envar_dict)

@staticmethod
Expand All @@ -72,12 +83,6 @@ def _set_envars(envar_dict) -> list:

return envars

@staticmethod
def _get_hybgroups(nens: int, nmem_per_group: int, start_index: int = 1):
ngrps = nens / nmem_per_group
groups = ' '.join([f'{x:02d}' for x in range(start_index, int(ngrps) + 1)])
return groups

def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> str:
'''
Takes a string templated with ${ } and converts it into a string suitable
Expand All @@ -87,8 +92,8 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) ->
Variables substitued by default:
${ROTDIR} -> '&ROTDIR;'
${RUN} -> self.cdump
${DUMP} -> self.cdump
${RUN} -> self.run
${DUMP} -> self.run
${MEMDIR} -> ''
${YMD} -> '@Y@m@d'
${HH} -> '@H'
Expand All @@ -110,8 +115,8 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) ->
# Defaults
rocoto_conversion_dict = {
'ROTDIR': '&ROTDIR;',
'RUN': self.cdump,
'DUMP': self.cdump,
'RUN': self.run,
'DUMP': self.run,
'MEMDIR': '',
'YMD': '@Y@m@d',
'HH': '@H'
Expand All @@ -123,12 +128,49 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) ->
TemplateConstants.DOLLAR_CURLY_BRACE,
rocoto_conversion_dict.get)

@staticmethod
def _get_forecast_hours(run, config, component='atmos') -> List[str]:
# Make a local copy of the config to avoid modifying the original
local_config = config.copy()

# Ocean/Ice components do not have a HF output option like the atmosphere
if component in ['ocean', 'ice']:
local_config['FHMAX_HF_GFS'] = 0

if component in ['ocean']:
local_config['FHOUT_HF_GFS'] = config['FHOUT_OCN_GFS']
local_config['FHOUT_GFS'] = config['FHOUT_OCN_GFS']
local_config['FHOUT'] = config['FHOUT_OCN']

if component in ['ice']:
local_config['FHOUT_HF_GFS'] = config['FHOUT_ICE_GFS']
local_config['FHOUT_GFS'] = config['FHOUT_ICE_GFS']
local_config['FHOUT'] = config['FHOUT_ICE']

fhmin = local_config['FHMIN']

# Get a list of all forecast hours
fhrs = []
if run in ['gdas']:
fhmax = local_config['FHMAX']
fhout = local_config['FHOUT']
fhrs = list(range(fhmin, fhmax + fhout, fhout))
elif run in ['gfs', 'gefs']:
fhmax = local_config['FHMAX_GFS']
fhout = local_config['FHOUT_GFS']
fhmax_hf = local_config['FHMAX_HF_GFS']
fhout_hf = local_config['FHOUT_HF_GFS']
fhrs_hf = range(fhmin, fhmax_hf + fhout_hf, fhout_hf)
fhrs = list(fhrs_hf) + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout))

return fhrs

def get_resource(self, task_name):
"""
Given a task name (task_name) and its configuration (task_names),
return a dictionary of resources (task_resource) used by the task.
Task resource dictionary includes:
account, walltime, cores, nodes, ppn, threads, memory, queue, partition, native
account, walltime, ntasks, nodes, ppn, threads, memory, queue, partition, native
"""

scheduler = self.app_config.scheduler
Expand All @@ -137,39 +179,39 @@ def get_resource(self, task_name):

account = task_config['ACCOUNT']

walltime = task_config[f'wtime_{task_name}']
if self.cdump in ['gfs'] and f'wtime_{task_name}_gfs' in task_config.keys():
walltime = task_config[f'wtime_{task_name}_gfs']
walltime = task_config[f'walltime']
ntasks = task_config[f'ntasks']
ppn = task_config[f'tasks_per_node']

cores = task_config[f'npe_{task_name}']
if self.cdump in ['gfs'] and f'npe_{task_name}_gfs' in task_config.keys():
cores = task_config[f'npe_{task_name}_gfs']
nodes = int(np.ceil(float(ntasks) / float(ppn)))

ppn = task_config[f'npe_node_{task_name}']
if self.cdump in ['gfs'] and f'npe_node_{task_name}_gfs' in task_config.keys():
ppn = task_config[f'npe_node_{task_name}_gfs']
threads = task_config[f'threads_per_task']

nodes = int(np.ceil(float(cores) / float(ppn)))
# Memory is not required
memory = task_config.get(f'memory', None)

threads = task_config[f'nth_{task_name}']
if self.cdump in ['gfs'] and f'nth_{task_name}_gfs' in task_config.keys():
threads = task_config[f'nth_{task_name}_gfs']

memory = task_config.get(f'memory_{task_name}', None)
if scheduler in ['pbspro']:
if task_config.get('prepost', False):
memory += ':prepost=true'

native = None
if scheduler in ['pbspro']:
native = '-l debug=true,place=vscatter'
# Set place=vscatter by default and debug=true if DEBUG_POSTSCRIPT="YES"
if self._base['DEBUG_POSTSCRIPT']:
native = '-l debug=true,place=vscatter'
else:
native = '-l place=vscatter'
# Set either exclusive or shared - default on WCOSS2 is exclusive when not set
if task_config.get('is_exclusive', False):
native += ':exclhost'
else:
native += ':shared'
elif scheduler in ['slurm']:
native = '--export=NONE'
if task_config['RESERVATION'] != "":
native += '' if task_name in Tasks.SERVICE_TASKS else ' --reservation=' + task_config['RESERVATION']
if task_config.get('CLUSTERS', "") not in ["", '@CLUSTERS@']:
native += ' --clusters=' + task_config['CLUSTERS']

queue = task_config['QUEUE_SERVICE'] if task_name in Tasks.SERVICE_TASKS else task_config['QUEUE']

Expand All @@ -181,7 +223,7 @@ def get_resource(self, task_name):
task_resource = {'account': account,
'walltime': walltime,
'nodes': nodes,
'cores': cores,
'ntasks': ntasks,
'ppn': ppn,
'threads': threads,
'memory': memory,
Expand Down
Loading

0 comments on commit a66cc19

Please sign in to comment.