Skip to content

Commit

Permalink
cylc set: log warning when no outputs specified and task has no req…
Browse files Browse the repository at this point in the history
…uired outputs
  • Loading branch information
MetRonnie committed Nov 29, 2024
1 parent 9ff50f8 commit b438799
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 9 deletions.
3 changes: 2 additions & 1 deletion cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def filter_ids(
* If IDTokens.Cycle all CyclePoints with any matching tasks will
be returned.
warn:
Whether to log a warning if no matching tasks are found.
Whether to log a warning if no matching tasks are found in the
pool.
TODO:
Consider using wcmatch which would add support for
Expand Down
34 changes: 27 additions & 7 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1943,14 +1943,15 @@ def set_prereqs_and_outputs(
"""
# Get matching pool tasks and inactive task definitions.
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
itasks, inactive_tasks, _unmatched = self.filter_task_proxies(
items,
inactive=True,
warn_no_active=False,
)

flow_nums = self._get_flow_nums(flow, flow_descr)

nothing_set: Set[str] = set()
# Set existing task proxies.
for itask in itasks:
if flow == ['none'] and itask.flow_nums != set():
Expand All @@ -1966,7 +1967,8 @@ def set_prereqs_and_outputs(
# Spawn as if seq xtrig of parentless task was satisfied,
# with associated task producing these outputs.
self.check_spawn_psx_task(itask)
self._set_outputs_itask(itask, outputs)
if not self._set_outputs_itask(itask, outputs):
nothing_set.add(itask.identity)

# Spawn and set inactive tasks.
if not flow:
Expand All @@ -1982,8 +1984,18 @@ def set_prereqs_and_outputs(
point, tdef, flow_nums,
flow_wait=flow_wait, transient=True
)
if trans is not None:
self._set_outputs_itask(trans, outputs)
if trans is not None and self._set_outputs_itask(
trans, outputs
):
nothing_set.add(trans.identity)

if nothing_set:
msg = "Task has" if len(nothing_set) == 1 else "Tasks have"
msg += (
" no required outputs to set: "
f"{', '.join(sorted(nothing_set))}"
)
LOG.warning(msg)

if self.compute_runahead():
self.release_runahead_tasks()
Expand All @@ -1992,10 +2004,16 @@ def _set_outputs_itask(
self,
itask: 'TaskProxy',
outputs: List[str],
) -> None:
"""Set requested outputs on a task proxy and spawn children."""
) -> bool:
"""Set requested outputs on a task proxy and spawn children.
Return False if no outputs were specified and the task has no required
outputs to set
"""
if not outputs:
outputs = list(itask.state.outputs.iter_required_messages())
if not outputs:
return False
else:
outputs = self._standardise_outputs(
itask.point, itask.tdef, outputs
Expand All @@ -2018,6 +2036,7 @@ def _set_outputs_itask(
self.workflow_db_mgr.put_update_task_state(itask)
self.workflow_db_mgr.put_update_task_outputs(itask)
self.workflow_db_mgr.process_queued_ops()
return True

def _set_prereqs_itask(
self,
Expand Down Expand Up @@ -2316,7 +2335,8 @@ def filter_task_proxies(
ids:
ID strings.
warn_no_active:
Whether to log a warning if no matching active tasks are found.
Whether to log a warning if no matching tasks are found in the
pool.
inactive:
If True, unmatched IDs will be checked against taskdefs
and cycle, and any matches will be returned in the second
Expand Down
66 changes: 65 additions & 1 deletion tests/integration/scripts/test_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
Note: see also functional tests
"""

import logging

from cylc.flow.commands import (
run_cmd,
set_prereqs_and_outputs,
)
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.data_messages_pb2 import PbTaskProxy
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.flow_mgr import FLOW_ALL
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_SUCCEEDED, TASK_STATUS_WAITING
from cylc.flow.task_state import (
TASK_STATUS_SUCCEEDED,
TASK_STATUS_WAITING,
)


async def test_set_parentless_spawning(
Expand Down Expand Up @@ -164,3 +174,57 @@ async def test_pre_all(flow, scheduler, run):
schd.pool.set_prereqs_and_outputs(['1/z'], [], ['all'], ['all'])
warn_or_higher = [i for i in log.records if i.levelno > 30]
assert warn_or_higher == []


async def test_logging(flow, scheduler, start, log_filter):
"""Test logging of a mixture of valid and invalid tasks, tasks with
some required and no required outputs."""
schd: Scheduler = scheduler(
flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '2000',
'graph': {
'R3//P1Y': 'a? & a:x & b? => c?',
},
},
'runtime': {
'a': {
'outputs': {'x': 'whatever'}
}
}
})
)
tasks_to_set = [
# Tasks with required outputs:
'2000/a',
# Tasks without required outputs:
'2000/b', '2000/c',
# Glob that matches future tasks:
'2002/*',
# Invalid tasks:
'2005/a', '2000/doh',
]
async with start(schd):
await run_cmd(set_prereqs_and_outputs(schd, tasks_to_set, [FLOW_ALL]))

assert log_filter(
logging.WARNING,
"Tasks have no required outputs to set: 2000/a, 2000/b, 2002/a, 2002/b",
)
assert log_filter(
logging.WARNING, "Invalid cycle point for task: a, 2005"
)
assert log_filter(logging.WARNING, "No matching tasks found: doh")
assert len(log_filter(logging.WARNING)) == 3

# Check singular form of the above message
await run_cmd(set_prereqs_and_outputs(schd, ['2000/b'], [FLOW_ALL]))

assert log_filter(
logging.WARNING,
"Task has no required outputs to set: 2000/b",
)
assert len(log_filter(logging.WARNING)) == 4

0 comments on commit b438799

Please sign in to comment.