Skip to content

Commit

Permalink
fix: components which do not specify an environment use the "environm…
Browse files Browse the repository at this point in the history
…ent" environment (#268)

Resolves https://github.ibm.com/st4sd/st4sd-runtime-core/issues/254

Signed-off-by: Vassilis Vassiladis <[email protected]>
  • Loading branch information
VassilisVassiliadis authored and GitHub Enterprise committed Aug 21, 2023
1 parent 7b73d38 commit 9ce7301
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 113 deletions.
206 changes: 107 additions & 99 deletions python/experiment/model/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,18 +1139,39 @@ def variablesForNode(self, nodeName):

return self._concrete.get_component_variable_references(comp_id)

def environmentForNode(self, nodeName, expand=True, include_default=True, is_primitive=None):
# type: (str, bool, bool, Optional[bool]) -> Optional[Dict[str, str]]
"""Build environment for node.
A) If the environment is None (i.e. no environment is selected) then the environment contains the
active shell environment.
B) If the name is "none" then the environment contains {}
C) Otherwise we fetch the appropriate environment (which can be the default one).
def environmentForNode(
self,
nodeName: str,
expand : bool = True,
include_default : bool = True,
is_primitive: Optional[bool] = None,
remove_defaults_key: bool = True,
) -> Optional[Dict[str, str]]:
"""Returns the environment for a node
This method finds the name of the environment that this component uses and then invokes
returns the output of environmentWithName(). Next it uses string-interpolation to replace references
to %(component-variables)s with their values.
Finally, if the component is an "interpreter" then this method ensures that the interpreter has the
following env-vars (uses the active shell of the runtime as reference to resolve the environment variables):
- PATH
- PYTHONPATH
- PYTHONHOME
- LD_LIBRARY_PATH
In all cases we layer the constructed environment on top of the system environment variables generated by Flow.
Args:
nodeName: the absolute reference of the node (e.g. stage0.helloWorld)
expand: If true, expand the values of the environment variables using other variables
in the environment right before returning the dictionary and then use the env-vars of the active
shell environment to potentially expand more variables. References to env-vars which are neither in the
environment, nor the active shell environment are left as is.
include_default: use default component variables to string-interpolate the contents of the environment
variables
is_primitive: whether the graph is pritimitive (i.e. no replicas) or not.
remove_defaults_key: If True, remove the DEFAULTS special key from the environment after processing it
Returns: A dictionary containing the environment variables
"""
# VV: @tag:FlowIR:Component
# @tag:FlowIR:Environment
Expand All @@ -1173,7 +1194,11 @@ def environmentForNode(self, nodeName, expand=True, include_default=True, is_pri
)

# VV: Fetch the environment and then fill it in using global default and platform variables
env = self.environmentWithName(environment_name, expand=expand)
env = self.environmentWithName(
environment_name,
expand=expand,
remove_defaults_key=remove_defaults_key
)

global_variables = self._concrete.get_default_global_variables()
platform_vars = self._concrete.get_platform_global_variables()
Expand Down Expand Up @@ -1201,39 +1226,56 @@ def environmentForNode(self, nodeName, expand=True, include_default=True, is_pri

return env

def environmentWithName(self, environment_name, expand=True, strict_checks=True):
# type: (str, bool, bool) -> Dict[str, str]
def environmentWithName(
self,
environment_name: Optional[str],
expand: bool = True,
strict_checks: bool = True,
remove_defaults_key: bool = True,
) -> Dict[str, str]:
"""Build environment with a specific name.
A) If the environment is None (i.e. no environment is selected) then the environment contains the
active shell environment.
Notes:
A) If the environment is empty (None or '') (i.e. no environment is selected) then the environment contains
the default environment. The default environment is the environment called "environment". If that is unset
then the default environment defaults to the environment variables in the active shell of the runtime.
B) If the name is "none" then the environment contains {}
B) If the name is "none" then the environment contains {}
C) Otherwise we fetch the appropriate environment (which can be the default one).
In all cases we layer the environment on top of the system environment variables generated by the runtime.
In all cases we layer the constructed environment on top of the system environment variables generated by Flow.
If an environment defines a `DEFAULTS` key then that key is expected to have the format `VAR1:VAR2:VAR3...`.
Other options in the environment could reference the aforementioned vars using the $VAR and ${VAR} notation.
These references will be expanded to the values of the respective environment variables in the active shell
of the runtime.
If an environment defines a `DEFAULTS` key then that key is expected to have the format `VAR1:VAR2:VAR3...`.
Other options in the environment could reference the aforementioned vars using the $VAR and ${VAR} notation
and these options will be resolved using their matching keys in the default environment.
Any $VAR and ${VAR} references not matched by `DEFAULTS` keys will be resolved using the active shell
(workflow launch environment).
Any $VAR and ${VAR} references not matched by `DEFAULTS` keys will be resolved using the active shell
(workflow launch environment).
If a variable is defined in `DEFAULTS` but there is no value for it in the default environment then treat it
as if it was never in the `DEFAULTS` option in the first place i.e. just leave it as is.
If a variable is defined in `DEFAULTS` but there is no value for it in the default environment then treat it
as if it was never in the `DEFAULTS` option in the first place.
Args:
environment_name: The environment_name, special values are None, "none", and "experiment".
The method converts the environment name to lowercase letters. Read the notes for more information.
expand: If True, expand the values of the environment variables using other variables
in the environment right before returning the dictionary and then use the env-vars of the active
shell environment to potentially expand more variables. References to env-vars which are neither in the
environment, nor the active shell environment are left as is.
strict_checks: when True, raises an error if the selected platform does not contain the environment
remove_defaults_key: If True, remove the DEFAULTS special key from the environment after processing it
"""
default_env = self.defaultEnvironment()
environment = (self._system_vars or {}).copy()

environment_name = environment_name or ''
# VV: The default environment is called "environment"
if not environment_name:
environment_name = 'environment'
environment_name = environment_name.lower()

# VV: A "none" environment is just the system variables
if environment_name == '':
environment.update(os.environ.copy())
elif environment_name == 'environment':
if environment_name in ['', 'environment']:
# VV: This is the default environment
environment.update(default_env)
elif environment_name == 'none':
Expand Down Expand Up @@ -1270,69 +1312,32 @@ def pretty_json(entry):

environment.update(flowir_env_vars)

if environment_name not in ['', None, 'environment']:
# VV: For named environments:
# Resolve variables using the default environment if
# a) their name is in the DEFAULTS key, and
# b) env_value contains a reference to them, and
# c) there is an entry for them in the default environment

# VV: These keys are manually specified; as such they are not guaranteed to exist in the actual
# Default environment
keys_from_default_env = environment.get('DEFAULTS', None)

if keys_from_default_env:
keys_from_default_env = keys_from_default_env.split(':')
keys_from_default_env = [name
if name.startswith('$') is False
else name[1:] for name in keys_from_default_env]
keys_from_default_env = [name for name in keys_from_default_env if name is not None and len(name)]

del environment['DEFAULTS']
pattern_short = re.compile(r'\$\w+')
pattern_long = re.compile(r'\${\w+}')

for key in keys_from_default_env:
if key not in default_env:
default_env[key] = os.environ.get(key, '')

if key in environment:
# VV: The environment already defines the same environment variable. The developer may be using
# `KEY: $KEY:some other value` to use the value of $KEY in the default context as a building
# part of the eventual value of $KEY (e.g. to prepend/append a directory to $PATH etc).
# Here, we just expand $KEY using the `default` environment context. Then, we record this new
# value in `default_env` so that if other env-vars in this environment rely on $KEY they
# actually get what we just computed.
environment[key] = experiment.model.frontends.flowir.replace_env_var(
environment[key], key, default_env[key])
default_env[key] = environment[key]

for env_var in environment:
env_value = environment[env_var]

referenced_variables = set(
[ev.group()[1:] for ev in pattern_short.finditer(env_value)] +
[ev.group()[2:-1] for ev in pattern_long.finditer(env_value)]
).intersection(set(keys_from_default_env))

# VV: search for `DEFAULTS` vars and replace any references to them ($VAR and ${VAR}) with
# their corresponding value from the default environment
for key in referenced_variables:
if key not in default_env:
msg = ("Environment %s references DEFAULTS variable %s which doesn't have a value "
"in the default environment. Will resolve it using active shell" % (
environment_name, key))
self.suppressed_warning(msg)
continue

env_value = experiment.model.frontends.flowir.replace_env_var(env_value, key, default_env[key])

environment[env_var] = env_value

# VV: if the environment does not define an env-var that is in `DEFAULTS` inject it here
for key in keys_from_default_env:
if key not in environment:
environment[key] = default_env[key] if default_env[key] is not None else ""
# VV: First add any env-vars from DEFAULTS for which there're no keys already in the environment
lbl_defaults = experiment.model.frontends.flowir.FlowIR.LabelEnvironmentDefaults

if lbl_defaults in environment:
default_env_vars = environment[lbl_defaults].split(':')
default_environment = {}
# VV: Expand env-vars in the environment which are also part of DEFAULTS
# e.g. PATH: my/custom/path:$PATH,
# if the env-var is not in the environment already, then just add it
# if expand is True the code will then auto-expand references to the DEFAULTS env-vars because
# they'll already be inside the environment
for def_env_var in default_env_vars:
if def_env_var not in os.environ:
continue
default_environment[def_env_var] = os.environ[def_env_var]

if def_env_var not in environment:
environment[def_env_var] = default_environment[def_env_var]
else:
environment[def_env_var] = experiment.model.frontends.flowir.expand_vars(
environment[def_env_var],
{def_env_var: os.environ[def_env_var]}
)

if remove_defaults_key:
environment.pop(lbl_defaults)

# VV: Finally replace any references to env variables of the environment with their values and then
# use the active-shell environment to expand any remaining environment variables (this final step
Expand All @@ -1346,14 +1351,17 @@ def pretty_json(entry):

return environment

def defaultEnvironment(self):
# type: () -> Dict[str, str]
def defaultEnvironment(self, fill_when_unset: bool = True) -> Dict[str, str]:
try:
config_env = self._concrete.get_environment('environment')
config_env = self._concrete.get_environment('environment', strict_checks=True)
except experiment.model.errors.FlowIREnvironmentUnknown:
log = logging.getLogger('Environment')
log.warning('No default environment defined, will assume its empty')
config_env = {}
if fill_when_unset:
log = logging.getLogger('Environment')
log.warning('No default environment defined, will assume it contains the environment variables '
'in os.environ')
config_env = copy.deepcopy(os.environ)
else:
raise

return config_env

Expand Down
13 changes: 8 additions & 5 deletions python/experiment/model/frontends/flowir.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,15 +1317,16 @@ class FlowIR(object):
'blueprint', 'platforms', 'virtual-environments',
'version', 'interface'
)
LabelGlobal, LabelDefault, LabelStages, LabelDoWhile, LabelWorkflow = (
'global', 'default', 'stages', 'DoWhile', 'Workflow'
LabelGlobal, LabelDefault, LabelStages, LabelDoWhile, LabelWorkflow, LabelEnvironmentDefaults = (
'global', 'default', 'stages', 'DoWhile', 'Workflow', 'DEFAULTS'
)

LabelKubernetesQosGuaranteed, LabelKubernetesQosBurstable, LabelKubernetesQosBestEffort = (
'guaranteed', 'burstable', 'besteffort'
)

SpecialEnvironments = ['SANDBOX', 'ENVIRONMENT']
EnvironmentsSpecial = ['environment', 'none']
EnvironmentsReserved = ['none']

SpecialFolders = ['input', 'data', 'bin', 'conf']

Expand Down Expand Up @@ -5472,8 +5473,10 @@ def get_environment(
try:
environment = platform_environments[name]
except KeyError:
# VV: It's valid for an environment to be global (i.e. non-platform specific)
if default_environment is None or strict_checks is True:
# VV: Raise an exception if this is the default value, or the environment doesn't exist neither for this
# platform, nor the default platform, or the environment exists in the default platform, but not this one.
# i.e. It's valid for an environment to be inherited from the default platform if strict_checks=False
if platform == FlowIR.LabelDefault or default_environment is None or strict_checks is True:
raise experiment.model.errors.FlowIREnvironmentUnknown(
name, platform, self._flowir
)
Expand Down
6 changes: 2 additions & 4 deletions python/experiment/model/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2657,8 +2657,7 @@ def _concrete(self):
return self.configuration.get_flowir_concrete(return_copy=False)

@property
def configuration(self):
# type: () -> FlowIRExperimentConfiguration
def configuration(self) -> FlowIRExperimentConfiguration:
return self._flowir_configuration

def get_stage_description(self, stage_index):
Expand Down Expand Up @@ -3325,8 +3324,7 @@ def producerReferencesForNode(self, nodeName):
inputEdges = networkx.reverse(self.graph).edges(nodeName)
return [el[1] for el in inputEdges]

def environmentForNode(self, nodeName):
# type: (str) -> Dict[str, str]
def environmentForNode(self, nodeName: str) -> Dict[str, str]:
"""Build environment for node.
If a component uses the `none` environment we assume that the environment is just {}.
Expand Down
2 changes: 1 addition & 1 deletion python/experiment/service/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
try:
import tinydb
except ImportError:
logging.getLogger().warning("Unable to import tinydb module - tinydb interface not available")
logging.getLogger().debug("Unable to import tinydb module - tinydb interface not available")

# VV: Suppress warnings about contacting a https URL that uses self-verified TLS
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand Down
4 changes: 2 additions & 2 deletions scripts/elaunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,15 +1092,15 @@ def arg_to_bool(name, val):


def build_parser() -> NoSystemExitOptparseOptionParser:
import pkg_resources
import importlib_metadata

# HACK: Daresbury system dependant
projectDir = os.path.split(os.path.expanduser("~"))[0]
haltfile = os.path.join(projectDir, 'shared/CHPCBackend/.halt_backend')
killfile = os.path.join(projectDir, 'shared/CHPCBackend/.kill_backend')

parser = NoSystemExitOptparseOptionParser(
usage=usage, version=pkg_resources.get_distribution("st4sd-runtime-core").version, description=__doc__)
usage=usage, version=importlib_metadata.version(distribution_name="st4sd-runtime-core"), description=__doc__)

launchOptions = optparse.OptionGroup(parser, "Launch Options")
parser.add_option_group(launchOptions)
Expand Down
4 changes: 2 additions & 2 deletions scripts/ewrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import experiment.runtime.output
import experiment.model.frontends.flowir
import experiment.model.storage
import pkg_resources
import importlib_metadata
import yaml

usage = "usage: %prog [options] [package]"
Expand Down Expand Up @@ -192,7 +192,7 @@ def main():

log.info(f"Generating {options.outputPath}/elaunch.yaml")

elaunch_version = pkg_resources.get_distribution("st4sd-runtime-core").version
elaunch_version = importlib_metadata.version(distribution_name="st4sd-runtime-core")
actual_variables = concrete.get_workflow_variables()
# VV: 'global' userMetadata variables end up becoming `platform-stage` variables (this is the second most
# high priority scope right after variables defined by components). As a result, we need to backpatch them
Expand Down
Loading

0 comments on commit 9ce7301

Please sign in to comment.