Skip to content

Commit

Permalink
Merge pull request #289 from ICRAR/ryan_mpi_fixes
Browse files Browse the repository at this point in the history
MPI Fixes & Hyades Deployment
  • Loading branch information
myxie authored Nov 5, 2024
2 parents 7163226 + 7db0fe5 commit e7b5115
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 45 deletions.
98 changes: 90 additions & 8 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,24 @@
# MA 02111-1307 USA
#
"""Module containing MPI application wrapping support"""

import json
import logging
import os
import signal
import subprocess
import sys

from dlg import utils, droputils
from dlg.apps.app_base import BarrierAppDROP
from ..exceptions import InvalidDropException
from dlg.named_port_utils import (
DropParser,
get_port_reader_function,
replace_named_ports,
)
from dlg.exceptions import InvalidDropException
from ..meta import (
dlg_enum_param,
)

logger = logging.getLogger(__name__)

Expand All @@ -38,15 +48,15 @@
# @par EAGLE_START
# @param category Mpi
# @param tag template
# @param num_of_procs 1/Integer/ComponentParameter/NoPort/ReadWrite//False/False/Number of processes used for this application
# @param command /String/ComponentParameter/NoPort/ReadWrite//False/False/The command to be executed
# @param args /String/ComponentParameter/NoPort/ReadWrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param num_of_procs 1/Integer/ComponentParameter/NoPort/ReadWrite//False/False/Number of processes used for this application
# @param use_wrapper False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/If the command should be executed inside the existing MPI communicator set this to True
# @param input_redirection /String/ComponentParameter/NoPort/ReadWrite//False/False/The command line argument that specifies the input into this application
# @param output_redirection /String/ComponentParameter/NoPort/ReadWrite//False/False/The command line argument that specifies the output from this application
# @param command_line_arguments /String/ComponentParameter/NoPort/ReadWrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator " "/String/ComponentParameter/NoPort/ReadWrite//False/False/Separator character(s) between parameters on the command line
# @param argumentPrefix "--"/String/ComponentParameter/NoPort/ReadWrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dlg.apps.mpi.MPIApp/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param mpi construct/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
Expand All @@ -66,33 +76,105 @@ class MPIApp(BarrierAppDROP):
This drop will gather the individual exit codes from the launched
applications and transition to ERROR if any of them did not exit cleanly,
or to FINISHED if all of them finished successfully.
"""

input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore

def initialize(self, **kwargs):
super(MPIApp, self).initialize(**kwargs)

self._command = self._popArg(kwargs, "command", None)
self._maxprocs = self._popArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._popArg(kwargs, "use_wrapper", False)
self._args = self._popArg(kwargs, "args", [])
self._args = self._popArg(kwargs, "args", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")
self._inputRedirect = self._popArg(kwargs, "input_redirection", "")
self._outputRedirect = self._popArg(kwargs, "output_redirection", "")

self._command = self._popArg(kwargs, "command", None)
if not self._command:
raise InvalidDropException(
self, "No command specified, cannot create MPIApp"
)
self._recompute_data = {}

def run(self):
from mpi4py import MPI

cmd, args = self._command, self._args
inputs = self._inputs
outputs = self._outputs

logger.debug("Parameters found: %s", json.dumps(self.parameters))
logger.debug("MPI Inputs: %s; MPI Outputs: %s", inputs, outputs)
# we only support passing a path for bash apps
fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)}
fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)}
dataURLInputs = {
uid: i for uid, i in inputs.items() if not droputils.has_path(i)
}
dataURLOutputs = {
uid: o for uid, o in outputs.items() if not droputils.has_path(o)
}
# deal with named ports
inport_names = self.parameters["inputs"] if "inputs" in self.parameters else []
outport_names = (
self.parameters["outputs"] if "outputs" in self.parameters else []
)
reader = get_port_reader_function(self.input_parser)
keyargs, pargs = replace_named_ports(
inputs.items(),
outputs.items(),
inport_names,
outport_names,
self._applicationArgs,
argumentPrefix=self._argumentPrefix,
separator=self._paramValueSeparator,
parser=reader,
)
argumentString = (
f"{' '.join(map(str,pargs + keyargs))}" # add kwargs to end of pargs
)
# complete command including all additional parameters and optional redirects
if len(argumentString.strip()) > 0:
# the _cmdLineArgs would very likely make the command line invalid
cmd = f"{self._command} {argumentString} "
else:
cmd = f"{self._command} {argumentString} {args} "
if self._outputRedirect:
cmd = f"{cmd} > {self._outputRedirect}"
if self._inputRedirect:
cmd = f"cat {self._inputRedirect} > {cmd}"
cmd = cmd.strip()

app_uid = self.uid

# Replace inputs/outputs in command line with paths or data URLs
cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs)

cmd = droputils.replace_dataurl_placeholders(cmd, dataURLInputs, dataURLOutputs)

# Pass down daliuge-specific information to the subprocesses as environment variables
env = os.environ.copy()
env.update({"DLG_UID": self._uid})
if self._dlg_session_id:
env.update({"DLG_SESSION_ID": self._dlg_session_id})

env.update({"DLG_ROOT": utils.getDlgDir()})

logger.info("Command after wrapping is: %s", cmd)

if self._use_wrapper:
# We spawn this very same module
# When invoked as a program (see at the bottom) this module
# will get the parent communicator, run the program we're giving in the
# command line, and send back the exit code.
# Likewise, we barrier on the children communicator, and thus
# we wait until all children processes are completed
args = ["-m", __name__, cmd]
cmd = sys.executable
args = ["-m", __name__, self._command] + self._args

errcodes = []

Expand Down
87 changes: 60 additions & 27 deletions daliuge-engine/dlg/deploy/configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# ===================
# Deployment defaults
# ====================
USER = ""
ACCOUNT = ""
LOGIN_NODE = ""
HOME_DIR = os.environ["HOME"] if "HOME" in os.environ else ""
Expand All @@ -34,6 +35,7 @@
VENV = f"{DLG_ROOT}/venv"
DEFAULT_MON_HOST = "dlg-mon.icrar.org" # TODO: need to get this running
DEFAULT_MON_PORT = 8898
EXEC_PREFIX = "srun -l"


__sub_tpl_str = """#!/bin/bash --login
Expand All @@ -48,35 +50,64 @@
export DLG_ROOT=$DLG_ROOT
$VENV
srun -l $PY_BIN -m dlg.deploy.start_dlg_cluster --log_dir $LOG_DIR $GRAPH_PAR $PROXY_PAR $GRAPH_VIS_PAR $LOGV_PAR $ZERORUN_PAR $MAXTHREADS_PAR $SNC_PAR $NUM_ISLANDS_PAR $ALL_NICS $CHECK_WITH_SESSION --ssid $SESSION_ID
$EXEC_PREFIX $PY_BIN -m dlg.deploy.start_dlg_cluster --log_dir $LOG_DIR $GRAPH_PAR $PROXY_PAR $GRAPH_VIS_PAR $LOGV_PAR $ZERORUN_PAR $MAXTHREADS_PAR $SNC_PAR $NUM_ISLANDS_PAR $ALL_NICS $CHECK_WITH_SESSION --ssid $SESSION_ID
"""
init_tpl = string.Template(__sub_tpl_str)


class DefaultConfig(object):

def __init__(self):
def __init__(self, user=None):
self._dict = dict()
self.setpar("host", self.LOGIN_NODE)
self.setpar("account", self.ACCOUNT)
self.setpar("home_dir", self.HOME_DIR.strip())
self.setpar("dlg_root", self.DLG_ROOT.strip())
self.setpar("log_root", self.LOG_DIR)
self.setpar("modules", self.MODULES.strip())
self.setpar("venv", self.VENV.strip())
if user:
print(f"Setting user to {user}")
self._dict["user"] = user
self.setpar("host", "LOGIN_NODE")
self.setpar("account", "ACCOUNT")
self.setpar("home_dir", "HOME_DIR")
self.setpar("dlg_root", "DLG_ROOT")
self.setpar("log_root", "LOG_DIR")
self.setpar("modules", "MODULES")
self.setpar("venv", "VENV")
self.setpar("exec_prefix", "EXEC_PREFIX")

@abstractmethod
def init_list(self):
pass

def setpar(self, k, v):
self._dict[k] = v
if hasattr(self, v):
value = getattr(self, v)
if "user" in self._dict:
pardict = {"USER": self._dict["user"]}
value = string.Template(value).safe_substitute(pardict)
self._dict[k] = value
else:
print(f"default[{v}] = '{globals()[v]}'")
self._dict[k] = globals()[v]

def getpar(self, k):
return self._dict.get(k)


#############################
class ICRARHyadesConfig(DefaultConfig):
MODULES = """
"""
# The following is more a workaround than a solution
# requires the user to have a venv exectly in that place
LOGIN_NODE = "hyades.icrar.org"
HOME_DIR = "/home/$USER"
DLG_ROOT = "/home/$USER/dlg"
LOG_DIR = "/home/$USER/dlg/log"
VENV = "source /home/$USER/dlg/venv/bin/activate"
EXEC_PREFIX = ""

def __init__(self, user=None):
super(ICRARHyadesConfig, self).__init__(user=user)

def init_list(self): # TODO please fill in
return [self.ACCOUNT, self.LOG_DIR, self.MODULES, self.VENV]


class ICRARoodConfig(DefaultConfig):
Expand All @@ -85,14 +116,14 @@ class ICRARoodConfig(DefaultConfig):
"""
# The following is more a workaround than a solution
# requires the user to have a venv exectly in that place
ACCOUNT = os.environ["USER"]
LOGIN_NODE = "hyades.icrar.org"
HOME_DIR = os.environ["HOME"] if "HOME" in os.environ else ""
DLG_ROOT = f"{HOME_DIR}/dlg"
LOG_DIR = f"{DLG_ROOT}/log"
VENV = f"source {HOME_DIR}/dlg/venv/bin/activate"

def __init__(self):
super(ICRARoodConfig, self).__init__()
def __init__(self, user=None):
super(ICRARoodConfig, self).__init__(user=user)

def init_list(self): # TODO please fill in
return [self.ACCOUNT, self.LOG_DIR, self.MODULES, self.VENV]
Expand All @@ -108,16 +139,16 @@ class ICRARoodCldConfig(DefaultConfig):
# The compute nodes have have required python and DALiuGE but just in case....
VENV = f"source {DLG_ROOT}/venv/bin/activate"

def __init__(self):
super(ICRARoodCldConfig, self).__init__()
def __init__(self, user=None):
super(ICRARoodCldConfig, self).__init__(user=user)

def init_list(self): # TODO please fill in
return [self.ACCOUNT, self.LOG_DIR, self.VENV]


class GalaxyMWAConfig(DefaultConfig):
def __init__(self):
super(GalaxyMWAConfig, self).__init__()
def __init__(self, user=None):
super(GalaxyMWAConfig, self).__init__(user=user)

def init_list(self):
return ["mwaops", "/group/mwaops/cwu/dfms/logs"]
Expand All @@ -131,8 +162,8 @@ class GalaxyASKAPConfig(DefaultConfig):
"""
VENV = ""

def __init__(self):
super(GalaxyASKAPConfig, self).__init__()
def __init__(self, user=None):
super(GalaxyASKAPConfig, self).__init__(user=user)

def init_list(self):
return [
Expand All @@ -143,8 +174,8 @@ def init_list(self):


class MagnusConfig(DefaultConfig):
def __init__(self):
super(MagnusConfig, self).__init__()
def __init__(self, user=None):
super(MagnusConfig, self).__init__(user=user)

def init_list(self):
return ["pawsey0129", "/group/pawsey0129/daliuge_logs"]
Expand All @@ -159,15 +190,15 @@ class Setonix411Config(DefaultConfig):
ACCOUNT = "pawsey0411"
USER = os.environ["USER"] if "USER" in os.environ else ""
HOME_DIR = f"/scratch/{ACCOUNT}"
DLG_ROOT = f"{HOME_DIR}/{USER}/dlg"
DLG_ROOT = f"{HOME_DIR}/$USER/dlg"
LOG_DIR = f"{DLG_ROOT}/log"
MODULES = ""
VENV = f"source /software/projects/{ACCOUNT}/venv/bin/activate"

MODULES = ""

def __init__(self):
super(Setonix411Config, self).__init__()
def __init__(self, user=None):
super(Setonix411Config, self).__init__(user=user)

def init_list(self):
return [self.ACCOUNT, f"{self.HOME_DIR}/logs"]
Expand All @@ -192,7 +223,8 @@ class ConfigFactory:
"galaxy": GalaxyASKAPConfig,
"setonix": Setonix411Config,
"shao": TianHe2Config,
"hyades.icrar.org": ICRARoodConfig,
"hyades": ICRARHyadesConfig,
"ood": ICRARoodConfig,
"ood_cloud": ICRARoodCldConfig,
}

Expand All @@ -201,6 +233,7 @@ def available():
return list(ConfigFactory.mapping.keys())

@staticmethod
def create_config(facility=None):
def create_config(facility=None, user=None):
facility = facility.lower() if (facility is not None) else facility
return ConfigFactory.mapping.get(facility)()
config = ConfigFactory.mapping.get(facility)(user=user)
return config
4 changes: 3 additions & 1 deletion daliuge-engine/dlg/deploy/create_dlg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,9 @@ def main():
# you can specify:
# either a single directory
if opts.log_root is None:
config = ConfigFactory.create_config(facility=opts.facility)
config = ConfigFactory.create_config(
facility=opts.facility, user=opts.username
)
log_root = config.getpar("log_root")
else:
log_root = opts.log_root
Expand Down
Loading

0 comments on commit e7b5115

Please sign in to comment.