From 0525bb9affb615d37a488ce8593b9b7815d9c998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 12 Nov 2024 14:44:28 +0100 Subject: [PATCH] WIP: work in progress --- pulsar/client/client.py | 16 ++++++++++-- pulsar/client/test/check.py | 4 +++ pulsar/manager_endpoint_util.py | 1 + pulsar/scripts/run.py | 14 +++++++++- pulsar/scripts/submit_util.py | 46 ++++++++++++++++++++++++++++++--- test/test_cli_submit.py | 21 ++++++++++++++- 6 files changed, 94 insertions(+), 8 deletions(-) diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 8478742d..affe6f64 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -167,8 +167,17 @@ def __init__(self, destination_params, job_id, job_manager_interface): super().__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): + def launch( + self, + command_line, + dependencies_description=None, + env=None, + remote_staging=None, + job_config=None, + dynamic_file_sources=None, + token_endpoint=None, + staging_manifest=None + ): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -780,6 +789,9 @@ class LocalSequentialClient(BaseMessageCoexecutionJobClient, LocalSequentialLaun def __init__(self, destination_params, job_id, client_manager): super().__init__(destination_params, job_id, client_manager) + def put_file(self): + ... + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 5ddc00f4..bd200d81 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -503,6 +503,8 @@ def extract_client_options(options): client_options["tes_url"] = options.tes_url if hasattr(options, "container"): client_options["container"] = options.container + if hasattr(options, "arc_url"): + client_options["arc_url"] = options.arc_url return client_options @@ -532,6 +534,8 @@ def client_manager_from_args(options): manager_args['tes_url'] = options.tes_url if getattr(options, "k8s_enabled", None): manager_args['k8s_enabled'] = options.k8s_enabled + if getattr(options, 'arc_enabled'): + manager_args['arc_enabled'] = options.arc_enabled cm = build_client_manager(**manager_args) return cm diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index 1749f568..5bfe2d87 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -115,6 +115,7 @@ def submit_job(manager, job_config): "token_endpoint": token_endpoint, } manager.preprocess_and_launch(job_id, launch_config) + return launch_config except Exception: manager.handle_failure_before_launch(job_id) raise diff --git a/pulsar/scripts/run.py b/pulsar/scripts/run.py index 9d27076d..34039dc5 100644 --- a/pulsar/scripts/run.py +++ b/pulsar/scripts/run.py @@ -1,7 +1,9 @@ """ CLI related utilities for submitting Pulsar jobs. """ import fnmatch +from typing import Optional import sys +from dataclasses import dataclass, field import uuid from pulsar.client import ( @@ -32,11 +34,15 @@ HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL." HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one." HELP_COMMAND = "Shell command to execute on Pulsar server." +HELP_JOBS_DIRECTORY = "Local jobs directory" HELP_WORKING_DIRECTORY = "Local working directory (will be translated to a new directory)." HELP_OUTPUT = "Output glob to collect from job (relative to remote working directory)." HELP_OUTPUT_PATTERN = "Output pattern to collect from job (relative to remote working directory)." +HELP_ARC = "Submit job to an ARC endpoint." +HELP_ARC_URL = "Use this URL as ARC endpoint." DEFAULT_CLIENT_URL = 'http://localhost:8913/' +DEFAULT_ARC_URL = 'http://localhost:8082' def main(argv=None): @@ -55,10 +61,14 @@ def main(argv=None): arg_parser.add_argument('--server', default=False, action="store_true", help=HELP_SERVER) arg_parser.add_argument('--job_id', default=None, help=HELP_JOB_ID) arg_parser.add_argument('--command', help=HELP_COMMAND) + # arg_parser.add_argument('--jobs_directory', default=".", help=HELP_JOBS_DIRECTORY) arg_parser.add_argument('--working_directory', default=".", help=HELP_WORKING_DIRECTORY) arg_parser.add_argument('--result_json', default=None) arg_parser.add_argument('--output', default=[], action="append", help=HELP_OUTPUT) arg_parser.add_argument('--output_pattern', default=[], action="append", help=HELP_OUTPUT_PATTERN) + # arg_parser.add_argument('--arc_enabled', default=False, action="store_true", help=HELP_ARC) + # arg_parser.add_argument('--arc_url', default=DEFAULT_ARC_URL, help=HELP_ARC_URL) + args = arg_parser.parse_args(argv) if args.server: @@ -71,7 +81,9 @@ def main(argv=None): return 0 -def _run_client_for_job(args): +def _run_client_for_job( + args: JobDescription +): if args.job_id is None: args.job_id = str(uuid.uuid4()) output_patterns = [] diff --git a/pulsar/scripts/submit_util.py b/pulsar/scripts/submit_util.py index 071069fe..12cf94f5 100644 --- a/pulsar/scripts/submit_util.py +++ b/pulsar/scripts/submit_util.py @@ -3,6 +3,8 @@ import json import logging import time +from dataclasses import dataclass, field +from typing import Optional from pulsar.client.util import from_base64_json from pulsar.client.manager import build_client_manager @@ -18,6 +20,40 @@ DEFAULT_POLL_TIME = 2 +HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL." +HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one." +HELP_COMMAND = "Shell command to execute on Pulsar server." +HELP_JOBS_DIRECTORY = "Local jobs directory" +HELP_WORKING_DIRECTORY = "Local working directory (will be translated to a new directory)." +HELP_OUTPUT = "Output glob to collect from job (relative to remote working directory)." +HELP_OUTPUT_PATTERN = "Output pattern to collect from job (relative to remote working directory)." +HELP_ARC = "Submit job to an ARC endpoint." +HELP_ARC_URL = "Use this URL as ARC endpoint." + +DEFAULT_CLIENT_URL = 'http://localhost:8913/' +DEFAULT_ARC_URL = 'http://localhost:8082' + + +@dataclass +class JobDescription: + + command: str + working_directory: str = "." + result_json: Optional[str] = None + output: list[str] = field(default_factory=list) + output_pattern: list[str] = field(default_factory=list) + private_token: Optional[str] = None + url: str = DEFAULT_CLIENT_URL + amqp_url: str = DEFAULT_CLIENT_URL + default_file_action: str = "none" + file_action_config: Optional[str] = None + transport: Optional[str] = None + suppress_output: bool = False + cleanup: bool = True + server: bool = False + job_id: Optional[str] = None + + def add_common_submit_args(arg_parser): arg_parser.add_argument("--file", default=None) arg_parser.add_argument("--base64", default=None) @@ -31,13 +67,15 @@ def run_server_for_job(args): manager, app = manager_from_args(config_builder) try: job_config = _load_job_config(args) - submit_job(manager, job_config) + launch_config = submit_job(manager, job_config) if wait: log.info("Co-execution job setup, now waiting for job completion and postprocessing.") if args.build_client_manager: - client_manager = build_client_manager(arc_enabled=True) - client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": "/works"}, job_id=job_config["job_id"]) - client.launch() + job_description = JobDescription(**) + # client_manager = build_client_manager(arc_enabled=True) + # client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": "/works"}, job_id=job_config["job_id"]) + # launch_config = {key: value for key, value in launch_config.items() if key not in {"submit_params", "setup_params"}} + client.launch(**launch_config) wait_for_job(manager, job_config) log.info("Leaving finish_execution and shutting down app") except BaseException: diff --git a/test/test_cli_submit.py b/test/test_cli_submit.py index 9c8897a3..888464b8 100644 --- a/test/test_cli_submit.py +++ b/test/test_cli_submit.py @@ -1,4 +1,6 @@ + import os +import threading import yaml from .test_utils import ( @@ -11,7 +13,7 @@ from pulsar.client import ClientOutputs from pulsar.client.util import to_base64_json -from pulsar.scripts import submit +from pulsar.scripts import run, submit class BaseCliTestCase(TempDirectoryTestCase): @@ -57,7 +59,24 @@ def run_and_check_submission(self): ) base64 = to_base64_json(launch_params) assert not os.path.exists(galaxy_output) + submit.main(["--build_client_manager", "true", "--base64", base64] + self._encode_application()) + + # run server + # server = threading.Thread( + # target=lambda: submit.main(["--base64", base64] + self._encode_application()), + # name="server" + # ) + # server.start() + + # run client + # client = threading.Thread( + # target=lambda: run.main(["--base64", base64, "--job_id", job_id, "--arc_enabled", "--command", launch_params["command_line"], "--working_directory", "empty"] + self._encode_application()), + # name="client", + # ) + # client.start() + + # server.join(), client.join() assert os.path.exists(galaxy_output) out_contents = open(galaxy_output).read() assert out_contents == "cow file contents\n", out_contents