diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 6eb20b10..91b2dfa1 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -543,6 +543,9 @@ def __init__(self, source, file_lister=None, url=None): def from_dict(cls, action_dict): return JsonTransferAction(source=action_dict["source"], url=action_dict["url"]) + def to_dict(self): + return self._extend_base_dict(url=self.url) + def write_to_path(self, path): self._to_path = path diff --git a/pulsar/client/client.py b/pulsar/client/client.py index d6d20166..64879e2c 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -498,6 +498,7 @@ def launch( # 4. stage outputs back using manifest [handled by ARC] pass + class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 57b7b300..5a01dd94 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -72,6 +72,13 @@ def submit_job(client, client_job_description, job_config=None): launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint + # populate `to_path` + for action in file_stager.action_mapper.actions: + name = basename(action.path) + input_type = "input" + path = file_stager.job_directory.calculate_path(name, input_type) + action.write_to_path(path) + staging_manifest = file_stager.action_mapper.finalize() if staging_manifest: launch_kwds["staging_manifest"] = staging_manifest @@ -564,7 +571,8 @@ def register_rewrite(self, local_path, remote_path, type, force=False): def register_rewrite_action(self, action, remote_path, force=False): if action.staging_needed or force: path = getattr(action, 'path', None) - if path: + if path and path not in self.file_renames: + # this should only happen in unit testing ... don't really know why self.file_renames[path] = remote_path def rewrite_input_paths(self): diff --git a/pulsar/scripts/staging_arc.py b/pulsar/scripts/staging_arc.py new file mode 100644 index 00000000..693f0bf9 --- /dev/null +++ b/pulsar/scripts/staging_arc.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python + +"""Stage files in or out of a compute environment made available via the Advanced Resource Connector (ARC) [1]. + +This script reads a set of source and target URL (with `http`, `https` or `file` as URL scheme) and/or path pairs passed +either as command line arguments and/or from a file in the form of a JSON array. It then reads the files from the source +URLs and posts (copies them for `file://` urls) them to the target URLs. + +Example usage: + +```shell +$ ./staging_arc.py --stage https://example.org file.dat --stage file:///home/user/text.txt https://example.org/files \ + --json staging_manifest.json +``` + +_staging_manifest.json_ +```json +[ + { + "source": "file:///home/user/data.txt", + "target": "file:///home/person/file.txt" + }, + { + "source": "file:///home/user/analysis.txt", + "target": "https://example.org/files/analysis.txt" + } +] +``` + +Retrieve files from a set of source URLs and save them to a set of target URLs. + +References: +- [1] https://www.nordugrid.org/arc/about-arc.html +""" + +# When the URL is the target, use POST. + +import aiohttp +import json +import sys +from typing import Iterable +from typing import Literal +from dataclasses import dataclass, field +from argparse import ArgumentParser +from typing import Optional + + +@dataclass +class StagingDeclaration: + """Declare where to read a file from and where to save it to.""" + + source: str # a URL + target: str # a URL + + +... + + +def parse_json_manifest() -> tuple[StagingDeclaration]: + ... + + +HELP_STAGE = "Read a file from `source` and save it to `target`." +HELP_JSON = "Read a list of `source` and `target` URLs from a JSON file." + + +def make_parser() -> ArgumentParser: + """Construct an argument parser used to call the script from the command line.""" + + module_docstring = sys.modules[__name__].__doc__ + + parser = ArgumentParser(description=module_docstring) + + parser.add_argument( + "--stage", dest="stage", metavar=("source", "target"), nargs=2, action="append", help=HELP_STAGE + ) + parser.add_argument("--json", dest="json", nargs=1, action="append", help=HELP_JSON) + + return parser + + +if __name__ == "__main__": + """Invoke script from the command line.""" + argument_parser = make_parser() + args = argument_parser.parse_args(sys.argv[1:]) diff --git a/pulsar/scripts/submit.py b/pulsar/scripts/submit.py index bf0c2f17..04be8db8 100644 --- a/pulsar/scripts/submit.py +++ b/pulsar/scripts/submit.py @@ -15,6 +15,7 @@ def main(args=None): add_common_submit_args(arg_parser) arg_parser.add_argument('--wait', action='store_true') arg_parser.add_argument('--no_wait', "--no-wait", dest='wait', action='store_false') + arg_parser.add_argument('--build_client_manager', action='store_true') arg_parser.set_defaults(wait=True) args = arg_parser.parse_args(args) run_server_for_job(args) diff --git a/pulsar/scripts/submit_util.py b/pulsar/scripts/submit_util.py index c00a2c32..85257e2c 100644 --- a/pulsar/scripts/submit_util.py +++ b/pulsar/scripts/submit_util.py @@ -4,6 +4,9 @@ import logging import time +from pulsar.client import ClientJobDescription, ClientOutputs, ClientInput +from pulsar.client import submit_job as submit_client_job +from pulsar.client.manager import build_client_manager from pulsar.client.util import from_base64_json from pulsar.main import ( load_pulsar_app, @@ -32,6 +35,23 @@ def run_server_for_job(args): submit_job(manager, job_config) if wait: log.info("Co-execution job setup, now waiting for job completion and postprocessing.") + if args.build_client_manager: + client_manager = build_client_manager(arc_enabled=True) + client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": app.staging_directory}, job_id=job_config["job_id"], default_file_action=job_config["remote_staging"]["action_mapper"]["default_action"], files_endpoint=job_config["remote_staging"]["action_mapper"]["files_endpoint"]) + # FIXME: we can probably only test the input staging here, so adjust tests accordingly + client_inputs = [ + ClientInput(path=action_source["path"], input_type="input_path") + for action_source in job_config["remote_staging"]["client_inputs"] + ] + client_outputs = ClientOutputs.from_dict(job_config["remote_staging"]["client_outputs"]) + job_description = ClientJobDescription( + command_line=job_config["command_line"], + working_directory=client_outputs.working_directory, + client_inputs=client_inputs, + client_outputs=client_outputs, + ) + job_config["working_directory"] = client_outputs.working_directory + submit_client_job(client, job_description) wait_for_job(manager, job_config) log.info("Leaving finish_execution and shutting down app") except BaseException: diff --git a/test/test_cli_submit.py b/test/test_cli_submit.py index 9c8897a3..c02335fc 100644 --- a/test/test_cli_submit.py +++ b/test/test_cli_submit.py @@ -9,7 +9,7 @@ temp_directory_persist, ) -from pulsar.client import ClientOutputs +from pulsar.client import ClientOutputs, ClientInput from pulsar.client.util import to_base64_json from pulsar.scripts import submit @@ -25,25 +25,40 @@ def setup_action_mapper(self, files_endpoint): def run_and_check_submission(self): job_id = "0" galaxy_working = temp_directory_persist() + input_name = "dataset_1.dat" output_name = "dataset_1211231231231231231.dat" + # TODO: input should not be in working directory + galaxy_input = os.path.join(galaxy_working, input_name) + with open(galaxy_input, "w") as handle: + handle.write("cow_file_contents\n") galaxy_output = os.path.join(galaxy_working, output_name) + pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", input_name) pulsar_output = os.path.join( self.staging_directory, job_id, "outputs", output_name ) - pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", "cow") with files_server("/") as test_files_server: files_endpoint = test_files_server.application_url action = { "name": "cow", "type": "input", - "action": {"action_type": "message", "contents": "cow file contents\n"}, + "action": { + "action_type": "json_transfer", + "files_endpoint": files_endpoint, + "path": galaxy_input, + }, } + client_inputs = [ + ClientInput( + path=galaxy_input, + input_type="input_path", + ).action_source + ] client_outputs = ClientOutputs( working_directory=galaxy_working, output_files=[os.path.join(galaxy_working, output_name)], ) launch_params = dict( - command_line="cat '{}' > '{}'".format(pulsar_input, pulsar_output), + command_line="cat '{}' > '{}'".format(galaxy_input, galaxy_output), job_id=job_id, setup_params=dict( job_id=job_id, @@ -52,12 +67,13 @@ def run_and_check_submission(self): remote_staging={ "setup": [action], "action_mapper": self.setup_action_mapper(files_endpoint), + "client_inputs": client_inputs, "client_outputs": client_outputs.to_dict(), }, ) base64 = to_base64_json(launch_params) assert not os.path.exists(galaxy_output) - submit.main(["--build_client_manager", "true", "--base64", base64] + self._encode_application()) + submit.main(["--build_client_manager", "--base64", base64] + self._encode_application()) assert os.path.exists(galaxy_output) out_contents = open(galaxy_output).read() assert out_contents == "cow file contents\n", out_contents