Skip to content

Commit

Permalink
Reinstate product groups
Browse files Browse the repository at this point in the history
Testing with full-sized GEFS found that the sheer number of tasks
overloads rocoto, resulting in `rocotorun` taking over 10 min to
complete or hanging entirely. To reduce the number of tasks, product
groups are reimplemented so that multiple forecast hour are processed
in a single task. However, the implementation is a little different
than previously.

The jobs where groups are enabled (atmos_products, oceanice_products,
and wavepostsbs) have a new variable, `MAX_TASKS`, that controls how
many groups to use. This setting is currently *per member*. The
forecast hours to be processed are then divided into this many groups
as evenly as possible without crossing forecast segment boundaries.
The walltime for those jobs is then multiplied by the number of times
in the largest group.

A number of helper methods are added to Tasks to determine these
groups and make a standard metatask variable dict in a centralized
location. There is also a function to multiply the walltime, but this
may be better off relocated to wxflow with the other time functions.

As part of switching from a single value to a list, hours are no longer
passed by rocoto as zero-padded values. The lists are comma-delimited
(without spaces) and split apart in the job stub (`jobs/rocoto/*`), so
each j-job call is still a single forecast hour.

The offline post (upp) job is not broken into groups, since it really
isn't used outside the analysis anymore. Gempak jobs that run over
multiple forecast hours also aren't broken into groups yet.

Resolves #2999
  • Loading branch information
WalterKolczynski-NOAA committed Jan 6, 2025
1 parent 060aec3 commit f983698
Show file tree
Hide file tree
Showing 14 changed files with 331 additions and 86 deletions.
20 changes: 12 additions & 8 deletions jobs/rocoto/atmos_products.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi
export job="atmos_products"
export jobid="${job}.$$"

# Negatation needs to be before the base
fhr3_base="10#${FHR3}"
export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} ))
# shellcheck disable=SC2153
IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}"

###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS"
export FORECAST_HOUR
for FORECAST_HOUR in "${fhr_list[@]}"; do
###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS"
status=$?
[[ ${status} -ne 0 ]] && exit "${status}"
done

exit $?
exit 0
18 changes: 12 additions & 6 deletions jobs/rocoto/oceanice_products.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi
export job="oceanice_products"
export jobid="${job}.$$"

export FORECAST_HOUR=$(( 10#${FHR3} ))
# shellcheck disable=SC2153
IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}"

###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS"
export FORECAST_HOUR
for FORECAST_HOUR in "${fhr_list[@]}"; do
###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS"
status=$?
[[ ${status} -ne 0 ]] && exit "${status}"
done

exit $?
exit 0
19 changes: 13 additions & 6 deletions jobs/rocoto/wavepostsbs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ source "${HOMEgfs}/ush/preamble.sh"
###############################################################
# Source FV3GFS workflow modules
#. ${HOMEgfs}/ush/load_fv3gfs_modules.sh
. ${HOMEgfs}/ush/load_ufswm_modules.sh
source "${HOMEgfs}/ush/load_ufswm_modules.sh"
status=$?
[[ ${status} -ne 0 ]] && exit ${status}
[[ ${status} -ne 0 ]] && exit "${status}"

export job="wavepostsbs"
export jobid="${job}.$$"

###############################################################
# Execute the JJOB
${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS
status=$?
[[ ${status} -ne 0 ]] && exit ${status}
# shellcheck disable=SC2153
IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}"

export FHR3
for FORECAST_HOUR in "${fhr_list[@]}"; do
FHR3=$(printf '%03d' "${FORECAST_HOUR}")
# Execute the JJOB
"${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS"
status=$?
[[ ${status} -ne 0 ]] && exit "${status}"
done

exit 0
4 changes: 2 additions & 2 deletions parm/config/gefs/config.atmos_products
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products"
# Get task specific resources
. "${EXPDIR}/config.resources" atmos_products

# No. of forecast hours to process in a single job
export NFHRS_PER_GROUP=3
# Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Scripts used by this job
export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh"
Expand Down
4 changes: 2 additions & 2 deletions parm/config/gefs/config.oceanice_products
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ source "${EXPDIR}/config.resources" oceanice_products

export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products_gefs.yaml"

# No. of forecast hours to process in a single job
export NFHRS_PER_GROUP=3
# Maximum number of rocoto tasks per member
export MAX_TASKS=25

echo "END: config.oceanice_products"
5 changes: 4 additions & 1 deletion parm/config/gefs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ case ${step} in
;;

"atmos_products")
# Walltime is per forecast hour; will be multipled by group size
export walltime="00:15:00"
export ntasks=24
export threads_per_task=1
Expand All @@ -250,6 +251,7 @@ case ${step} in
;;

"oceanice_products")
# Walltime is per forecast hour; will be multipled by group size
export walltime="00:15:00"
export ntasks=1
export tasks_per_node=1
Expand All @@ -258,7 +260,8 @@ case ${step} in
;;

"wavepostsbs")
export walltime="03:00:00"
# Walltime is per forecast hour; will be multipled by group size
export walltime="00:15:00"
export ntasks=1
export threads_per_task=1
export tasks_per_node=$(( max_tasks_per_node / threads_per_task ))
Expand Down
3 changes: 3 additions & 0 deletions parm/config/gefs/config.wavepostsbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs"
# Get task specific resources
source "${EXPDIR}/config.resources" wavepostsbs

# Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Subgrid info for grib2 encoding
export WAV_SUBGRBSRC=""
export WAV_SUBGRB=""
Expand Down
4 changes: 2 additions & 2 deletions parm/config/gfs/config.atmos_products
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products"
# Get task specific resources
. "${EXPDIR}/config.resources" atmos_products

# No. of forecast hours to process in a single job
export NFHRS_PER_GROUP=3
## Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Scripts used by this job
export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh"
Expand Down
3 changes: 3 additions & 0 deletions parm/config/gfs/config.oceanice_products
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ echo "BEGIN: config.oceanice_products"
# Get task specific resources
source "${EXPDIR}/config.resources" oceanice_products

# Maximum number of rocoto tasks per member
export MAX_TASKS=25

export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products.yaml"

# No. of forecast hours to process in a single job
Expand Down
7 changes: 5 additions & 2 deletions parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ case ${step} in
;;

"wavepostsbs")
walltime_gdas="00:20:00"
walltime_gfs="03:00:00"
# Walltime is per forecast hour; will be multipled by group size
walltime_gdas="00:15:00"
walltime_gfs="00:15:00"
ntasks=8
threads_per_task=1
tasks_per_node=$(( max_tasks_per_node / threads_per_task ))
Expand Down Expand Up @@ -911,6 +912,7 @@ case ${step} in
;;

"oceanice_products")
# Walltime is per forecast hour; will be multipled by group size
walltime="00:15:00"
ntasks=1
tasks_per_node=1
Expand Down Expand Up @@ -944,6 +946,7 @@ case ${step} in
;;

"atmos_products")
# Walltime is per forecast hour; will be multipled by group size
walltime="00:15:00"
ntasks=24
threads_per_task=1
Expand Down
3 changes: 3 additions & 0 deletions parm/config/gfs/config.wavepostsbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs"
# Get task specific resources
source "${EXPDIR}/config.resources" wavepostsbs

# Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Subgrid info for grib2 encoding
export WAV_SUBGRBSRC=""
export WAV_SUBGRB=""
Expand Down
72 changes: 40 additions & 32 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,39 +190,57 @@ def _atmosoceaniceprod(self, component: str):
fhout_ice_gfs = self._configs['base']['FHOUT_ICE_GFS']
products_dict = {'atmos': {'config': 'atmos_products',
'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL',
'history_file_tmpl': f'{self.run}[email protected]#fhr#'},
'history_file_tmpl': f'{self.run}[email protected]#fhr3_last#'},
'ocean': {'config': 'oceanice_products',
'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL',
'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'},
'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr3_next#.nc'},
'ice': {'config': 'oceanice_products',
'history_path_tmpl': 'COM_ICE_HISTORY_TMPL',
'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}}
'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr3_last#.nc'}}

component_dict = products_dict[component]
config = component_dict['config']
history_path_tmpl = component_dict['history_path_tmpl']
history_file_tmpl = component_dict['history_file_tmpl']

max_tasks = self._configs[config]['MAX_TASKS']
resources = self.get_resource(config)

fhrs = self._get_forecast_hours('gefs', self._configs[config], component)

# when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs
is_replay = self._configs[config]['REPLAY_ICS']
if is_replay and component in ['atmos'] and 0 in fhrs:
fhrs.remove(0)

# ocean/ice components do not have fhr 0 as they are averaged output
if component in ['ocean', 'ice'] and 0 in fhrs:
fhrs.remove(0)

fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks)

# Adjust walltime based on the largest group
largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')])
resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group)

history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl], {'MEMDIR': 'mem#member#'})
deps = []
data = f'{history_path}/{history_file_tmpl}'
dep_dict = {'type': 'data', 'data': data, 'age': 120}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': 'gefs_fcst_mem#member#'}
dep_dict = {'type': 'task', 'name': 'gefs_fcst_mem#member#_#seg_dep#'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps, dep_condition='or')

postenvars = self.envars.copy()
postenvar_dict = {'ENSMEM': '#member#',
'MEMDIR': 'mem#member#',
'FHR3': '#fhr#',
'FHR_LIST': '#fhr_list#',
'COMPONENT': component}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))

task_name = f'gefs_{component}_prod_mem#member#_f#fhr#'
task_name = f'gefs_{component}_prod_mem#member#_#fhr_label#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
Expand All @@ -233,22 +251,6 @@ def _atmosoceaniceprod(self, component: str):
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'}

fhrs = self._get_forecast_hours('gefs', self._configs[config], component)

# when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs
is_replay = self._configs[config]['REPLAY_ICS']
if is_replay and component in ['atmos'] and 0 in fhrs:
fhrs.remove(0)

# ocean/ice components do not have fhr 0 as they are averaged output
if component in ['ocean', 'ice'] and 0 in fhrs:
fhrs.remove(0)

fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}
if component in ['ocean']:
fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])]
fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next])

fhr_metatask_dict = {'task_name': f'gefs_{component}_prod_#member#',
'task_dict': task_dict,
'var_dict': fhr_var_dict}
Expand Down Expand Up @@ -308,22 +310,35 @@ def atmos_ensstat(self):
return task

def wavepostsbs(self):

deps = []
dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'}
dep_dict = {'type': 'task', 'name': f'gefs_fcst_mem#member#_#seg_dep#'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave')
is_replay = self._configs['wavepostsbs']['REPLAY_ICS']
if is_replay:
fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]]

max_tasks = self._configs['wavepostsbs']['MAX_TASKS']
fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks)

wave_post_envars = self.envars.copy()
postenvar_dict = {'ENSMEM': '#member#',
'MEMDIR': 'mem#member#',
'FHR3': '#fhr#',
'FHR_LIST': '#fhr_list#',
}
for key, value in postenvar_dict.items():
wave_post_envars.append(rocoto.create_envar(name=key, value=str(value)))

resources = self.get_resource('wavepostsbs')

task_name = f'gefs_wave_post_grid_mem#member#_f#fhr#'
# Adjust walltime based on the largest group
largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')])
resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group)

task_name = f'gefs_wave_post_grid_mem#member#_#fhr_label#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
Expand All @@ -335,13 +350,6 @@ def wavepostsbs(self):
'maxtries': '&MAXTRIES;'
}

fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave')
is_replay = self._configs['wavepostsbs']['REPLAY_ICS']
if is_replay:
fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]]

fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}

fhr_metatask_dict = {'task_name': f'gefs_wave_post_grid_#member#',
'task_dict': task_dict,
'var_dict': fhr_var_dict}
Expand Down
Loading

0 comments on commit f983698

Please sign in to comment.