Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log job failure even when there are retries configured #6169

Open
wants to merge 13 commits into
base: 8.4.x
Choose a base branch
from
1 change: 1 addition & 0 deletions changes.d/6169.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that job failure is logged, even when the presence of retries causes the task not to change state.
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 15 additions & 5 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def process_message(
# Already failed.
return True
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
wxtim marked this conversation as resolved.
Show resolved Hide resolved
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -795,7 +795,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
wxtim marked this conversation as resolved.
Show resolved Hide resolved
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand All @@ -812,7 +812,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
itask, event_time, aborted_with, forced
itask, event_time, aborted_with, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -1297,10 +1297,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)

def _process_message_failed(self, itask, event_time, message, forced):
def _process_message_failed(
self, itask, event_time, message, forced, full_message
):
"""Helper for process_message, handle a failed message.

Return True if no retries (hence go to the failed state).

Args:
full_message:
If we have retries lined up we still tell users what
happened to cause the this attempt to fail.
"""
no_retries = False
if event_time is None:
Expand Down Expand Up @@ -1332,7 +1339,10 @@ def _process_message_failed(self, itask, event_time, message, forced):
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
LOG.warning(
f'[{itask}] {full_message or self.EVENT_FAILED} - '
f'{delay_msg}'
)
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
Expand Down
27 changes: 24 additions & 3 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from itertools import product
import logging
from typing import Any as Fixture

from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler
from cylc.flow.data_store_mgr import (
JOBS,
TASK_STATUSES_ORDERED,
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
)


Expand Down Expand Up @@ -170,3 +167,27 @@ async def test__always_insert_task_job(
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
}


async def test__process_message_failed_with_retry(one, start):
"""Log job failure, even if a retry is scheduled.

See: https://github.com/cylc/cylc-flow/pull/6169

"""

async with start(one) as LOG:
fail_once = one.pool.get_tasks()[0]
# Add retry timers:
one.task_job_mgr._set_retry_timers(
fail_once, {
'execution retry delays': [1],
'submission retry delays': [1]
})
Comment on lines +187 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less fragile if this was just set in the workflow config for the test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really make any difference? - the aim was to avoid fiddling with one. Can do if you insist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely creating a workflow config with these retry delays set is not any more involved than fiddling the internal retry timers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not going to argue. Will change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went away and had a look at it - essentially this is more unit-testy than integration test-like: If you set these timers in the config you still need to run this function here to set the retry timers. So I think that I'll leave it.


# Process failed message:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')

# Check that failure reported:
assert 'failed/OOK' in LOG.messages[-1]
Loading