Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xtriggers: differentiate "not satisfied" from "not yet evaluated" #6560

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ message PbTrigger {
optional string message = 3;
optional bool satisfied = 4;
optional double time = 5;
optional string status = 6;
}

message PbTaskProxy {
Expand Down
73 changes: 36 additions & 37 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

68 changes: 35 additions & 33 deletions cylc/flow/data_messages_pb2.pyi

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
TIME_ZONE_UTC_INFO,
get_utc_mode
)
from cylc.flow.xtrigger_mgr import should_run_xtriggers

if TYPE_CHECKING:
from cylc.flow.cycling import PointBase
Expand Down Expand Up @@ -1519,10 +1520,11 @@ def _process_internal_task_proxy(
for label, satisfied in itask.state.xtriggers.items():
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
xtrig = tproxy.xtriggers[sig]
xtrig = tproxy.xtriggers[sig] # this creates a new PbTrigger
xtrig.id = sig
xtrig.label = label
xtrig.satisfied = satisfied
xtrig.status = get_xtrigger_status(itask, satisfied)
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))

if tproxy.state in self.latest_state_tasks:
Expand Down Expand Up @@ -2327,6 +2329,10 @@ def delta_task_state(self, itask: 'TaskProxy') -> None:
self.updated[TASKS].setdefault(
t_id,
PbTask(id=t_id)).MergeFrom(t_delta)

for label, satisfied in itask.state.xtriggers.items():
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(itask, label).get_signature()
tproxy.xtriggers[sig].status = get_xtrigger_status(itask, satisfied)
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

Expand Down Expand Up @@ -2539,6 +2545,7 @@ def delta_task_xtrigger(self, sig, satisfied):
xtrigger.label = label
xtrigger.satisfied = satisfied
xtrigger.time = update_time
xtrigger.status = 'running' if not satisfied else 'succeeded'
self.updates_pending = True

def delta_from_task_proxy(self, itask: TaskProxy) -> None:
Expand Down Expand Up @@ -2791,3 +2798,13 @@ def edge_id(self, left_tokens: Tokens, right_tokens: Tokens) -> str:
f'$edge|{left_tokens.relative_id}|{right_tokens.relative_id}'
)
).id


def get_xtrigger_status(itask: 'TaskProxy', xtrig_satisfied: bool):
if xtrig_satisfied:
status = 'satisfied'
elif should_run_xtriggers(itask):
status = 'running'
else:
status = 'waiting'
return status
1 change: 1 addition & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ class Meta:
message = String()
satisfied = Boolean()
time = Float()
status = String()


class TaskProxy(ObjectType):
Expand Down
8 changes: 2 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
AutoRestartMode,
StopMode,
)
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.xtrigger_mgr import XtriggerManager, should_run_xtriggers


if TYPE_CHECKING:
Expand Down Expand Up @@ -1759,11 +1759,7 @@ async def _main_loop(self) -> None:
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
if not should_run_xtriggers(itask):
continue

if (
Expand Down
27 changes: 18 additions & 9 deletions cylc/flow/scripts/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import json
import sys
from textwrap import indent
from typing import Any, Dict, TYPE_CHECKING
from typing import Any, Dict, Optional, TYPE_CHECKING

from ansimarkup import ansiprint

Expand All @@ -62,7 +62,7 @@
ID_MULTI_ARG_DOC,
Options,
)
from cylc.flow.terminal import cli_function
from cylc.flow.terminal import DIM, cli_function
from cylc.flow.util import BOOL_SYMBOLS


Expand Down Expand Up @@ -142,6 +142,7 @@
id
label
satisfied
status
}
runtime {
completion
Expand All @@ -153,13 +154,16 @@

SATISFIED = BOOL_SYMBOLS[True]
UNSATISFIED = BOOL_SYMBOLS[False]
PENDING = 'o'


def print_msg_state(msg, state):
if state:
def print_msg_state(msg, state: Optional[bool]):
if state is False:
ansiprint(f'<green> {SATISFIED} {msg}</green>')
else:
elif state is True:
ansiprint(f'<red> {UNSATISFIED} {msg}</red>')
else:
ansiprint(f'<{DIM}> {PENDING} {msg}</{DIM}>')


def print_completion_state(t_proxy):
Expand Down Expand Up @@ -391,7 +395,8 @@ async def prereqs_and_outputs_query(
):
ansiprint(
"<bold>other:</bold>"
f" ('<red>{UNSATISFIED}</red>': not satisfied)"
f" ('<red>{UNSATISFIED}</red>': not satisfied,"
f" '<{DIM}>{PENDING}</{DIM}>': not yet evaluated)"
)
for ext_trig in t_proxy['externalTriggers']:
state = ext_trig['satisfied']
Expand All @@ -400,11 +405,15 @@ async def prereqs_and_outputs_query(
state)
for xtrig in t_proxy['xtriggers']:
label = get_wallclock_label(xtrig) or xtrig['id']
state = xtrig['satisfied']
satisfied = xtrig['satisfied']
status = xtrig.get(
'status',
'succeeded' if satisfied else 'running'
)
print_msg_state(
f'xtrigger "{xtrig["label"]} = {label}"',
state)

None if status == 'waiting' else satisfied == 'succeeded'
)
print_completion_state(t_proxy)

if not task_proxies:
Expand Down
9 changes: 9 additions & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocctx import add_kwarg_to_sig
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.xtriggers.wall_clock import _wall_clock
from cylc.flow.xtriggers.workflow_state import (
workflow_state,
Expand Down Expand Up @@ -776,3 +777,11 @@ def callback(self, ctx: 'SubFuncContext'):
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results
self.do_housekeeping = True


def should_run_xtriggers(itask: 'TaskProxy'):
return (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_queued
and not itask.state.is_runahead
)
Loading