Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sprint/refactor resubmit #1139

Draft
wants to merge 107 commits into
base: release
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
80d3b6a
Added some temporary comments in workflow.py.
nwieters Sep 4, 2023
b63e9a9
Started to add docstrings to workflow.py
nwieters Sep 6, 2023
f9c0659
Added a docstring to esm_plugin_mamager
nwieters Sep 7, 2023
0c4a962
Some docstring changes in workflow.py
nwieters Sep 7, 2023
f0bd385
Added comments to esm_plugin_manager
nwieters Oct 6, 2023
8c4b753
Start to refactor workflow.
nwieters Oct 6, 2023
e329e14
Started to refactor workflow.py
nwieters Oct 6, 2023
d7f4565
Placed the default definition of workflow phases in esm_software.../d…
nwieters Oct 16, 2023
a2fc1a4
Update src/esm_plugin_manager/esm_plugin_manager.py
nwieters Oct 17, 2023
fef151d
Update src/esm_plugin_manager/esm_plugin_manager.py
nwieters Oct 17, 2023
c733a00
Update src/esm_runscripts/workflow.py
nwieters Oct 17, 2023
506c0a7
Correcte syntax of docstrings.
nwieters Oct 17, 2023
03b6e24
Renamed default_workflow_phases entry in defaults.yaml
nwieters Oct 17, 2023
b3b5fb1
Renamed workflow method, added property decorator and added test.
nwieters Oct 18, 2023
6260278
Added a temporary workflow to awicm3.
nwieters Oct 18, 2023
de0e598
Added method to return a list of an attribute for all phases.
nwieters Oct 20, 2023
11fa96b
Convert functions into methods, moved class variables to be instance …
nwieters Oct 23, 2023
b8d1db0
Further developments until order_cluster.
nwieters Oct 27, 2023
2cd0da6
Further changes for workflow manager.
nwieters Nov 3, 2023
98f4a7f
Merge branch 'release' into sprint/workflow_manager
nwieters Nov 9, 2023
273175f
Added prepend_newrun, skip_cluster, fixed next_submit entries.
nwieters Nov 10, 2023
3c72864
Reactivated function display_nicely.
nwieters Nov 13, 2023
547edb0
Added some flake8 style optimization.
nwieters Nov 14, 2023
465d8d3
Adapted the output for inspect (-i) workflow.
nwieters Nov 16, 2023
6aee395
Fix multiple phases in one cluster, fix batch_or_shell to be set by e…
nwieters Nov 16, 2023
138b3c2
Added some further comments and added cluster info in display_workflo…
nwieters Nov 17, 2023
fcf5a41
Add new workflow attribute in defaults.yaml.
nwieters Nov 17, 2023
5a3bfb8
Reactivated function complete_clusters.
nwieters Nov 17, 2023
e6bada5
Some changes for processing phases in awicm3.
nwieters Nov 17, 2023
ccafa8b
Update src/esm_runscripts/inspect.py
nwieters Nov 20, 2023
e107d99
Merged default and user phase class, made phase a subclass of dict.
nwieters Nov 20, 2023
ba204e6
Fixed some of the workflow tests.
nwieters Nov 20, 2023
6174fa3
Added review suggestions.
nwieters Nov 20, 2023
93b49b5
Changed initialization of workflow and phases.
nwieters Nov 22, 2023
afab16f
Changed initial config of default workflow phases.
nwieters Nov 22, 2023
ad0e620
Bugfix in error message.
nwieters Nov 22, 2023
91f1af6
Make next_run_triggered_by a keyword of phase (trigger_next_run) not …
nwieters Nov 22, 2023
aed2376
Bugfix if no phase trigger_next_run.
nwieters Nov 22, 2023
40c9190
Resolved function complete_cluster into other functions, fix next_sub…
nwieters Nov 23, 2023
1a8d121
Removed obsolete functions.
nwieters Nov 23, 2023
916a345
(Re)moved redundant code, renamed function for collecting phases.
nwieters Nov 24, 2023
c264f1a
Removed the possibility to set workflow keywords by user.
nwieters Nov 24, 2023
cf9d0e3
Worked on workflow tests, and other minor changes to workflow.
nwieters Nov 27, 2023
f86202d
Fixed a bug in order_phases_and_clusters
nwieters Nov 28, 2023
59590ed
Added workflow tests for test examples (in documentation).
nwieters Nov 28, 2023
ff7145b
Fixed test example_2, add preprocessing phase with adding newrun.
nwieters Nov 29, 2023
c784380
Added test for workflow example 5.
nwieters Nov 29, 2023
6aa7684
Added function to cluster phases after collecting them.
nwieters Nov 30, 2023
58e3729
Changes to pass all example tests.
nwieters Dec 1, 2023
ecf6bbb
Code optimizations
nwieters Dec 1, 2023
97d5d28
Removed unused function.
nwieters Dec 4, 2023
04d8ff0
Finished workflow tests.
nwieters Dec 4, 2023
16187d9
Changes in default workflow definition.
nwieters Dec 4, 2023
f26a115
Revive init_workflow funtion.
nwieters Dec 4, 2023
08d6976
Added my email to setup.py
nwieters Dec 4, 2023
7e1dd6b
Added comments and docstrings.
nwieters Dec 4, 2023
f57214d
Merge branch 'release' into sprint/workflow_manager
nwieters Dec 4, 2023
d1f5a21
Some more code improvements.
nwieters Dec 7, 2023
21fec58
Small changes in some comments.
nwieters Dec 8, 2023
67d08f8
Remove calc of nproc in workflow.py.
nwieters Dec 12, 2023
582caa1
Remove keyword run_on_queue (not used) and give error by missing run_…
nwieters Dec 12, 2023
698a523
Adapted log-file name and append mode.
nwieters Dec 13, 2023
bc6c082
Removed run_on_queue from default phases.
nwieters Dec 13, 2023
05a03ab
Removed run_on_queue.
nwieters Dec 13, 2023
8f75e6f
Added START and END statement to logfile of recipe steps.
nwieters Dec 13, 2023
4bfff6a
Added comments.
nwieters Dec 13, 2023
18fd44a
Added comments.
nwieters Dec 14, 2023
cdd7c20
Change comment
nwieters Dec 14, 2023
638a1ea
Adde jobid to logfiles.
nwieters Dec 14, 2023
ff0724c
Comment out unused functions.
nwieters Dec 15, 2023
e285372
Added -s argument to read current_date; some code syntax optimisations.
nwieters Jan 15, 2024
3bbaaae
Test, to remove maybe_resumbit.
nwieters Jan 15, 2024
09ac11b
Test to remove jobtype setting in workflow.py
nwieters Jan 23, 2024
ad15675
comment and clean esm_software/esm_runscripts config files
mandresm Feb 1, 2024
cb6e666
paetially refactoring of SimulationSetup.__init__ and some of the fun…
mandresm Feb 1, 2024
b509de7
restructure SimulationSetup.__init__: 1) the functions there do not c…
mandresm Feb 1, 2024
15fb97b
Merge branch 'release' into sprint/rearrange_prepexp
mandresm Feb 1, 2024
6f1e161
Merge branch 'release' into sprint/rearrange_prepexp
mandresm Feb 1, 2024
dfff3eb
fix a bug in init_interactive_info that was afecting esm_master
mandresm Feb 1, 2024
49c43e3
Added optional argument to _write_finalized_config.
nwieters Feb 2, 2024
65ad0e1
Merge branch 'sprint/rearrange_prepexp' of https://github.com/esm-too…
nwieters Feb 2, 2024
3456dd6
Add docstring
nwieters Feb 2, 2024
acfe195
add docstrings to all functions in config_initialization.py and to Si…
mandresm Feb 2, 2024
c96a53a
Merge branch 'sprint/rearrange_prepexp' of https://github.com/esm-too…
mandresm Feb 2, 2024
ddf643a
black formating of config_initialization.py and remove strings concat…
mandresm Feb 2, 2024
4257e41
Put the esm_runscripts call via subprocess in its own prepexp revipe.
nwieters Feb 2, 2024
651219c
Merge branch 'sprint/rearrange_prepexp' of https://github.com/esm-too…
nwieters Feb 2, 2024
6f6633a
Small changes.
nwieters Feb 2, 2024
bad0f35
Added docstrings and refactoring _copy_preliminary_files_from_experim…
nwieters Feb 12, 2024
b799dba
Rename step in prepexp recipe, extract function that runs esm_runscri…
nwieters Feb 13, 2024
ec2f8a7
Bugfix
nwieters Feb 14, 2024
5ed14cb
Small changes.
nwieters Feb 14, 2024
7b6ff5b
Merge branch 'sprint/rearrange_prepexp' into sprint/refactor_resubmit
nwieters Feb 15, 2024
50d25ac
isort src/esm_runscripts/prepexp.py
mandresm Feb 15, 2024
532792c
Add comment in prepexp recipe.
nwieters Feb 19, 2024
7dced13
Added review suggestions.
nwieters Feb 19, 2024
9910105
Changed condition for runscript update.
nwieters Feb 19, 2024
b86b071
Changed string concatenations to f-string format.
nwieters Feb 19, 2024
3dc080b
Applied flake8 recommendations.
nwieters Feb 19, 2024
ab82f95
Merge branch 'sprint/rearrange_prepexp' into sprint/refactor_resubmit
nwieters Feb 20, 2024
8208f29
Add flake8 recommondations.
nwieters Feb 20, 2024
e3f8e2b
Added workflow test.
nwieters Feb 20, 2024
6e3c43a
Reactivate call of maybe_resubmit.
nwieters Feb 22, 2024
8386d6f
Implement new function to write run file.
nwieters Feb 22, 2024
1742320
Start to refactor maybe_resubmit function.
nwieters Feb 22, 2024
0705bcd
Add workflow methon to set the run command of each phase.
nwieters Feb 22, 2024
458f0a0
Add default cluster.
nwieters Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement new function to write run file.
nwieters committed Feb 22, 2024
commit 8386d6f2137e19173e545948d593769fe5ef4731
167 changes: 148 additions & 19 deletions src/esm_runscripts/batch_system.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import copy
import os
import textwrap
import sys
import pdb
import stat
import copy
import sys
import textwrap

import esm_environment

from esm_parser import find_variable, user_error, user_note
from . import helpers
from . import dataprocess
from . import prepare
from .slurm import Slurm

from . import dataprocess, helpers, prepare
from .pbs import Pbs
from .slurm import Slurm

known_batch_systems = ["slurm", "pbs"]
reserved_jobtypes = ["prepcompute", "compute", "prepare", "tidy", "inspect"]
@@ -263,7 +262,7 @@ def calculate_requirements(config, cluster=None):

if (
not cluster
or not cluster in config["general"]["workflow"]["subjob_clusters"]
or cluster not in config["general"]["workflow"]["subjob_clusters"]
):
print(f"Unknown or unset cluster: {cluster}.")
sys.exit(-1)
@@ -324,7 +323,7 @@ def get_extra(config):
)
elif isinstance(pre_run_commands, str):
extras.append(pre_run_commands)
elif pre_run_commands == None:
elif pre_run_commands is None:
continue
else:
user_error(
@@ -371,6 +370,33 @@ def append_done_statement(config, subjob):

@staticmethod
def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py?
"""
Creates the command of the specific phase to be put in the *.run file.

This function is covering the following phase types:
- SimulationSetup: phases that are run as 'esm_runscripts' command
- batch: phases that are run via 'srun' command
- shell: phases that are run as shell scripts. The command is generated by
a function in the 'dataprocess' module.

Special case: phase 'compute':
- This phase is of type 'batch'

Todo: How about other phases of type batch? in dataprocess???

Parameters
----------
config: dict
subjob: str
Name of phase
batch_or_shell: str
Type of phase (SimulationSetup, batch, shell)

Returns
-------
commands: list
List of command and arguments of a phase depending of its type.
"""

commands = []
if subjob.startswith("compute"):
@@ -385,7 +411,7 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py?
)
if config["general"].get("multi_srun"):
return self.bs.get_run_commands_multisrun(config, commands)
# for shell scrips
# for shell scripts
else:
for model in config:
if model == "computer":
@@ -397,9 +423,48 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py?
+ f" 2>&1{config['computer'].get('write_execution_log', '')} &"
)
else:
subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell)
for task in subjob_tasks:
commands.append(task)
if batch_or_shell == "SimulationSetup":
# for phase type 'SimulationSetup' (e.g. prepcompute, tidy)
commands = []
commands.append("esm_runscripts")
# add runscript with absolute path
runscript = config["general"]["runscript_abspath"]
commands.append(runscript)
# add experiment id
commands.append(f"-e {config['general']['expid']}")
# add task
commands.append(f"-t {subjob}")
# add date
commands.append("-s " + config['general']['current_date'].format(
form=9, givenph=False, givenpm=False, givenps=False
))
# add
commands.append(f"-r {str(config['general']['run_number'])}")
# add verbose and no message_of_the day argument
commands.append("-v --no-motd")
# add last-jobtype argument
commands.append(f"--last-jobtype {subjob}")
# add --open-ran or use_venv argument
if "--open-run" in config["general"]["original_command"] or not config[
"general"
].get("use_venv"):
commands.append(" --open-run")
elif "--contained-run" in config["general"][
"original_command"
] or config["general"].get("use_venv"):
commands.append("--contained-run")
else:
print("ERROR -- Not sure if you were in a contained or open run!")
print(
"ERROR -- See write_simple_runscript for the code causing this."
)
sys.exit(1)
else:
# for all other phase types (batch, shell) except phase 'compute'
subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell)
# Why was this necessary? And not set commands directly?
for task in subjob_tasks:
commands.append(task)

return commands

@@ -430,8 +495,73 @@ def get_submit_command(config, batch_or_shell, runfilename):
return commands

@staticmethod
def write_simple_runscript(config, cluster, batch_or_shell="batch"):
def write_run_batch_script(config, cluster, batch_or_shell="batch"):

workflow = config["general"]["workflow"]["object"]
phases = workflow.phases

self = config["general"]["batch"]
runfilename = batch_system.get_run_filename(config, cluster)
if config["general"]["verbose"]:
print("jobtype: ", config["general"]["jobtype"])
print("writing run file for:", cluster)

with open(runfilename, "w") as runfile:
config = batch_system.calculate_requirements(config, "compute")
# TODO: remove it once it's not needed anymore (substituted by packjob)
if (
cluster in reserved_jobtypes
and config["computer"].get("taskset", False)
):
config = config["general"]["batch"].write_het_par_wrappers(config)
# Prepare launcher
config = config["general"]["batch"].prepare_launcher(config, "compute")
# Initiate the header
header = batch_system.get_batch_header(config, "compute")
for line in header:
runfile.write(line + "\n")
runfile.write("\n")
# environment for each phase of a cluster
environment = batch_system.get_environment(config, "compute")
batch_system.write_env(config, environment, runfilename)
for line in environment:
runfile.write(line + "\n")

# extra entries for each phase
extra = batch_system.get_extra(config)
for line in extra:
runfile.write(line + "\n")

for phase in ["compute", "tidy", "prepcompute"]:
# Add actual commands
commands = batch_system.get_run_commands(
config, phase, batch_or_shell
)
# commands = clusterconf.get("data_task_list", [])
runfile.write("\n")
runfile.write(self.append_start_statement(config, phase) + "\n")
runfile.write("\n")
runfile.write("cd " + config["general"]["thisrun_work_dir"] + "\n")

# if cluster in reserved_jobtypes:
config["general"]["batch"].add_pre_launcher_lines(
config, cluster, runfile
)

phase = workflow.get_workflow_phase_by_name(phase)
command = phase["run_command"]
runfile.write(f"{command} --run-from-batch-script\n")
runfile.write(workflow.append_done_statement(config, phase) + "\n")

runfile.write("\n")
runfile.write("wait\n")

breakpoint()
return config


@staticmethod
def write_simple_runscript(config, cluster, batch_or_shell="batch"):
# if no cluster is specified, work on the one we are in
# if not cluster:
# cluster = config["general"]["jobtype"]
@@ -833,7 +963,7 @@ def calc_launcher_flags(config, model, cluster):
cpus_per_proc = config[model].get("cpus_per_proc", omp_num_threads)
# Check for CPUs and OpenMP threads
if omp_num_threads > cpus_per_proc:
esm_parser.user_error(
user_error(
"OpenMP configuration",
(
"The number of OpenMP threads cannot be larger than the number"
@@ -845,7 +975,7 @@ def calc_launcher_flags(config, model, cluster):
elif "nproca" in config[model] and "nprocb" in config[model]:
# ``nproca``/``nprocb`` not compatible with ``omp_num_threads``
if omp_num_threads > 1:
esm_parser.user_note(
user_note(
"nproc",
"``nproca``/``nprocb`` not compatible with ``omp_num_threads``",
)
@@ -854,7 +984,7 @@ def calc_launcher_flags(config, model, cluster):
omp_num_threads = 1
else:

# kh 22.06.22 defensive (user_error/user_note could also be added here)
# kh 22.06.22 defensive (user_error/user_note could also be added here)
nproc = 0
cpus_per_proc = 0
# omp_num_threads = 0
@@ -885,7 +1015,6 @@ def calc_launcher_flags(config, model, cluster):
return launcher_flags



def submits_another_job(config, cluster):
clusterconf = config["general"]["workflow"]["subjob_clusters"][cluster]
if clusterconf.get("next_submit", []) == []: