Skip to content

Commit

Permalink
Log job failure even when it does not cause a change in task state.
Browse files Browse the repository at this point in the history
Added fuller message
Added test
  • Loading branch information
wxtim committed Jul 22, 2024
1 parent c029d6b commit 1341355
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 66 deletions.
1 change: 1 addition & 0 deletions changes.d/fix.6169.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that job failure is logged, even when the presence of retries causes the task not to change state.
21 changes: 15 additions & 6 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def process_message(
# Already failed.
return True
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -790,7 +790,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand All @@ -807,7 +807,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
itask, event_time, aborted_with, forced
itask, event_time, aborted_with, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -1292,10 +1292,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)

def _process_message_failed(self, itask, event_time, message, forced):
def _process_message_failed(
self, itask, event_time, message, forced, full_message
):
"""Helper for process_message, handle a failed message.
Return True if no retries (hence go to the failed state).
Args:
full_message:
If we have retries lined up we still tell users what
happened to cause the this attempt to fail.
"""
no_retries = False
if event_time is None:
Expand Down Expand Up @@ -1325,9 +1332,11 @@ def _process_message_failed(self, itask, event_time, message, forced):
else:
# There is an execution retry lined up.
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
LOG.warning(
f'[{itask}] => {TASK_OUTPUT_FAILED} with {full_message}'
f' \n({delay_msg})')
self._retry_task(itask, timer.timeout)
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
Expand Down
79 changes: 19 additions & 60 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,65 +108,24 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
] == [1, 2]


async def test__always_insert_task_job(
flow, scheduler, mock_glbl_cfg, start, run
):
"""Insert Task Job _Always_ inserts a task into the data store.
Bug https://github.com/cylc/cylc-flow/issues/6172 was caused
by passing task state to data_store_mgr.insert_job: Where
a submission retry was in progress the task state would be
"waiting" which caused the data_store_mgr.insert_job
to return without adding the task to the data store.
This is testing two different cases:
async def test__process_message_failed_with_retry(one, start):
"""Log job failure, even if a retry is scheduled.
* Could not select host from platform
* Could not select host from platform group
"""
global_config = """
[platforms]
[[broken1]]
hosts = no-such-host-1
[[broken2]]
hosts = no-such-host-2
[platform groups]
[[broken]]
platforms = broken1
See: https://github.com/cylc/cylc-flow/pull/6169
"""
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_config)

id_ = flow({
'scheduling': {'graph': {'R1': 'broken & broken2'}},
'runtime': {
'root': {'submission retry delays': 'PT10M'},
'broken': {'platform': 'broken'},
'broken2': {'platform': 'broken2'}
}
})

schd = scheduler(id_, run_mode='live')
schd.bad_hosts = {'no-such-host-1', 'no-such-host-2'}
async with start(schd):
schd.task_job_mgr.submit_task_jobs(
schd.workflow,
schd.pool.get_tasks(),
schd.server.curve_auth,
schd.server.client_pub_key_dir,
is_simulation=False
)

# Both tasks are in a waiting state:
assert all(
i.state.status == TASK_STATUS_WAITING
for i in schd.pool.get_tasks())

# Both tasks have updated the data store with info
# about a failed job:
updates = {
k.split('//')[-1]: v.state
for k, v in schd.data_store_mgr.updated[JOBS].items()
}
assert updates == {
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
}
async with start(one) as LOG:
fail_once = one.pool.get_tasks()[0]

# Add retry timers:
one.task_job_mgr._set_retry_timers(
fail_once, {
'execution retry delays': [1],
'submission retry delays': [1]
})

# Process failed message:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')

# Check that failure reported:
assert 'failed with failed/OOK' in LOG.messages[-1]

0 comments on commit 1341355

Please sign in to comment.