Skip to content

Commit

Permalink
Merge pull request #6031 from hjoliver/fix-workflow-state-alt-run-dir
Browse files Browse the repository at this point in the history
Fix workflow-state alternate run-dir
  • Loading branch information
hjoliver authored Apr 4, 2024
2 parents e1701b0 + 8fa278f commit 74a2ab2
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 36 deletions.
1 change: 1 addition & 0 deletions changes.d/6031.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed workflow-state command and xtrigger for alternate cylc-run directory.
11 changes: 8 additions & 3 deletions cylc/flow/id_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async def parse_ids_async(
constraint: str = 'tasks',
max_workflows: Optional[int] = None,
max_tasks: Optional[int] = None,
alt_run_dir: Optional[str] = None,
) -> Tuple[Dict[str, List[Tokens]], Any]:
"""Parse IDs from the command line.
Expand Down Expand Up @@ -237,6 +238,8 @@ async def parse_ids_async(
max_tasks:
Specify the maximum number of tasks permitted to be specified
in the ids.
alt_run_dir:
Specify a non-standard cylc-run location, e.g. for another user.
Returns:
With src=True":
Expand Down Expand Up @@ -299,7 +302,8 @@ async def parse_ids_async(

# infer the run number if not specified the ID (and if possible)
if infer_latest_runs:
_infer_latest_runs(*tokens_list, src_path=src_path)
_infer_latest_runs(
*tokens_list, src_path=src_path, alt_run_dir=alt_run_dir)

_validate_number(
*tokens_list,
Expand Down Expand Up @@ -414,12 +418,13 @@ def _validate_workflow_ids(*tokens_list, src_path):
detect_both_flow_and_suite(src_path)


def _infer_latest_runs(*tokens_list, src_path):
def _infer_latest_runs(*tokens_list, src_path, alt_run_dir=None):
for ind, tokens in enumerate(tokens_list):
if ind == 0 and src_path:
# source workflow passed in as a path
continue
tokens['workflow'] = infer_latest_run_from_id(tokens['workflow'])
tokens['workflow'] = infer_latest_run_from_id(
tokens['workflow'], alt_run_dir)
pass


Expand Down
23 changes: 18 additions & 5 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,30 @@ def get_remote_workflow_run_job_dir(
return get_remote_workflow_run_dir(workflow_id, 'log', 'job', *args)


def get_cylc_run_dir() -> str:
"""Return the cylc-run dir path with vars/user expanded."""
return expand_path(_CYLC_RUN_DIR)
def get_cylc_run_dir(alt_run_dir: Optional[str] = None) -> str:
"""Return the cylc-run dir, or alt path, with vars/user expanded."""
return expand_path(alt_run_dir or _CYLC_RUN_DIR)


def get_alt_workflow_run_dir(
alt_run_dir: Union[Path, str],
workflow_id: Union[Path, str],
*args: Union[Path, str]
) -> str:
"""Return alternate workflow run directory.
Join any extra args, and expand vars and user.
Does not check that the directory exists.
"""
return expand_path(alt_run_dir, workflow_id, *args)


def get_workflow_run_dir(
workflow_id: Union[Path, str], *args: Union[Path, str]
) -> str:
"""Return local workflow run directory, joining any extra args, and
expanding vars and user.
"""Return local workflow run directory.
Join any extra args, and expand vars and user.
Does not check that the directory exists.
"""
return expand_path(_CYLC_RUN_DIR, workflow_id, *args)
Expand Down
20 changes: 6 additions & 14 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

from cylc.flow.exceptions import CylcError, InputError
import cylc.flow.flags
from cylc.flow.id_cli import parse_id
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
Expand All @@ -68,7 +67,8 @@
from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.terminal import cli_function
from cylc.flow.cycling.util import add_offset
from cylc.flow.pathutil import expand_path, get_cylc_run_dir
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.workflow_files import infer_latest_run_from_id

from metomi.isodatetime.parsers import TimePointParser

Expand Down Expand Up @@ -162,7 +162,7 @@ def get_option_parser() -> COP:
help="The top level cylc run directory if non-standard. The "
"database should be DIR/WORKFLOW_ID/log/db. Use to interrogate "
"workflows owned by others, etc.; see note above.",
metavar="DIR", action="store", dest="run_dir", default=None)
metavar="DIR", action="store", dest="alt_run_dir", default=None)

parser.add_option(
"-s", "--offset",
Expand Down Expand Up @@ -196,10 +196,6 @@ def get_option_parser() -> COP:

@cli_function(get_option_parser, remove_opts=["--db"])
def main(parser: COP, options: 'Values', workflow_id: str) -> None:
workflow_id, *_ = parse_id(
workflow_id,
constraint='workflows',
)

if options.use_task_point and options.cycle:
raise InputError(
Expand Down Expand Up @@ -231,15 +227,11 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
options.status not in CylcWorkflowDBChecker.STATE_ALIASES):
raise InputError(f"invalid status '{options.status}'")

# this only runs locally
if options.run_dir:
run_dir = expand_path(options.run_dir)
else:
run_dir = get_cylc_run_dir()
workflow_id = infer_latest_run_from_id(workflow_id, options.alt_run_dir)

pollargs = {
'workflow_id': workflow_id,
'run_dir': run_dir,
'run_dir': get_cylc_run_dir(alt_run_dir=options.alt_run_dir),
'task': options.task,
'cycle': options.cycle,
'status': options.status,
Expand All @@ -256,7 +248,7 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
connected, formatted_pt = spoller.connect()

if not connected:
raise CylcError("cannot connect to the workflow_id DB")
raise CylcError(f"Cannot connect to the {workflow_id} DB")

Check warning on line 251 in cylc/flow/scripts/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/workflow_state.py#L251

Added line #L251 was not covered by tests

if options.status and options.task and options.cycle:
# check a task status
Expand Down
19 changes: 15 additions & 4 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
expand_path,
get_cylc_run_dir,
get_workflow_run_dir,
get_alt_workflow_run_dir,
make_localhost_symlinks,
)
from cylc.flow.remote import (
Expand Down Expand Up @@ -840,16 +841,23 @@ def check_reserved_dir_names(name: Union[Path, str]) -> None:
raise WorkflowFilesError(err_msg.format('run<number>'))


def infer_latest_run_from_id(workflow_id: str) -> str:
run_dir = Path(get_workflow_run_dir(workflow_id))
_, id_ = infer_latest_run(run_dir)
def infer_latest_run_from_id(
workflow_id: str, alt_run_dir: Optional[str] = None
) -> str:
"""Wrapper to make the workflow run-dir absolute."""
if alt_run_dir is not None:
run_dir = Path(get_alt_workflow_run_dir(alt_run_dir, workflow_id))
else:
run_dir = Path(get_workflow_run_dir(workflow_id))
_, id_ = infer_latest_run(run_dir, alt_run_dir=alt_run_dir)
return id_


def infer_latest_run(
path: Path,
implicit_runN: bool = True,
warn_runN: bool = True,
alt_run_dir: Optional[str] = None,
) -> Tuple[Path, str]:
"""Infer the numbered run dir if the workflow has a runN symlink.
Expand All @@ -858,6 +866,7 @@ def infer_latest_run(
implicit_runN: If True, add runN on the end of the path if the path
doesn't include it.
warn_runN: If True, warn that explicit use of runN is unnecessary.
alt_run_dir: Path to alternate cylc-run location (e.g. for other user).
Returns:
path: Absolute path of the numbered run dir if applicable, otherwise
Expand All @@ -868,15 +877,17 @@ def infer_latest_run(
- WorkflowFilesError if the runN symlink is not valid.
- InputError if the path does not exist.
"""
cylc_run_dir = get_cylc_run_dir()
cylc_run_dir = get_cylc_run_dir(alt_run_dir)
try:
id_ = str(path.relative_to(cylc_run_dir))
except ValueError:
raise ValueError(f"{path} is not in the cylc-run directory")

if not path.exists():
raise InputError(
f'Workflow ID not found: {id_}\n(Directory not found: {path})'
)

if path.name == WorkflowFiles.RUN_N:
runN_path = path
if warn_runN:
Expand Down
17 changes: 7 additions & 10 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import sqlite3
from typing import Dict, Optional, Tuple

from metomi.isodatetime.parsers import TimePointParser

from cylc.flow.cycling.util import add_offset
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.pathutil import expand_path, get_cylc_run_dir
from cylc.flow.workflow_files import infer_latest_run
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.workflow_files import infer_latest_run_from_id


def workflow_state(
Expand Down Expand Up @@ -58,9 +57,8 @@ def workflow_state(
.. note::
This cannot be specified in conjunction with ``status``.
cylc_run_dir:
The directory in which the workflow to interrogate.
Alternate cylc-run directory, e.g. for another user.
.. note::
Expand All @@ -78,13 +76,12 @@ def workflow_state(
to this xtrigger.
"""
if cylc_run_dir:
cylc_run_dir = expand_path(cylc_run_dir)
else:
cylc_run_dir = get_cylc_run_dir()
workflow = infer_latest_run_from_id(workflow, cylc_run_dir)
cylc_run_dir = get_cylc_run_dir(cylc_run_dir)

if offset is not None:
point = str(add_offset(point, offset))
_, workflow = infer_latest_run(Path(cylc_run_dir, workflow))

try:
checker = CylcWorkflowDBChecker(cylc_run_dir, workflow)
except (OSError, sqlite3.Error):
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/test_id_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
from pathlib import Path
import pytest
from shutil import copytree, rmtree

from cylc.flow import CYLC_LOG
from cylc.flow.async_util import pipe
Expand Down Expand Up @@ -248,6 +249,32 @@ async def test_parse_ids_infer_run_name(tmp_run_dir):
)
assert list(workflows) == ['bar']

# Now test we can see workflows in alternate cylc-run directories
# e.g. for `cylc workflow-state` or xtriggers targetting another user.
cylc_run_dir = get_cylc_run_dir()
alt_cylc_run_dir = cylc_run_dir + "_alt"

# copy the cylc-run dir to alt location and delete the original.
copytree(cylc_run_dir, alt_cylc_run_dir, symlinks=True)
rmtree(cylc_run_dir)

# It can no longer parse IDs in the original cylc-run location.
with pytest.raises(InputError):
workflows, *_ = await parse_ids_async(
'bar//',
constraint='workflows',
infer_latest_runs=True,
)

# But it can if we specify the alternate location.
workflows, *_ = await parse_ids_async(
'bar//',
constraint='workflows',
infer_latest_runs=True,
alt_run_dir=alt_cylc_run_dir
)
assert list(workflows) == ['bar/run2']


@pytest.fixture
def patch_expand_workflow_tokens(monkeypatch):
Expand Down
24 changes: 24 additions & 0 deletions tests/unit/xtriggers/test_workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import pytest
import sqlite3
from typing import Callable
from unittest.mock import Mock
from shutil import copytree, rmtree

from cylc.flow.exceptions import InputError
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.xtriggers.workflow_state import workflow_state
from ..conftest import MonkeyMock
Expand All @@ -41,6 +45,26 @@ def test_inferred_run(tmp_run_dir: Callable, monkeymock: MonkeyMock):
mock_db_checker.assert_called_once_with(cylc_run_dir, expected_workflow_id)
assert results['workflow'] == expected_workflow_id

# Now test we can see workflows in alternate cylc-run directories
# e.g. for `cylc workflow-state` or xtriggers targetting another user.
alt_cylc_run_dir = cylc_run_dir + "_alt"

# copy the cylc-run dir to alt location and delete the original.
copytree(cylc_run_dir, alt_cylc_run_dir, symlinks=True)
rmtree(cylc_run_dir)

# It can no longer parse IDs in the original cylc-run location.
with pytest.raises(InputError):
_, results = workflow_state(id_, task='precious', point='3000')

# But it can via an explicit alternate run directory.
mock_db_checker.reset_mock()
_, results = workflow_state(
id_, task='precious', point='3000', cylc_run_dir=alt_cylc_run_dir)
mock_db_checker.assert_called_once_with(
alt_cylc_run_dir, expected_workflow_id)
assert results['workflow'] == expected_workflow_id


def test_back_compat(tmp_run_dir):
"""Test workflow_state xtrigger backwards compatibility with Cylc 7
Expand Down

0 comments on commit 74a2ab2

Please sign in to comment.