Skip to content

Commit

Permalink
Implement restart timeout.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed May 3, 2023
1 parent e404147 commit 41c97a4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 49 deletions.
9 changes: 9 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@
.. versionchanged:: 8.0.0
{REPLACES}``abort on inactivity``.
''',
'restart timeout': '''
How long to wait for intervention on restarting a completed workflow.
The timer stops if any task is triggered.
.. versionadded:: 8.1.0
'''
}

Expand Down Expand Up @@ -839,6 +846,8 @@ def default_for(
vdr_type = VDR.V_INTERVAL
if item == "stall timeout":
default = DurationFloat(3600)
elif item == "restart timeout":
default = DurationFloat(120)
else:
default = None
Conf(item, vdr_type, default, desc=desc)
Expand Down
122 changes: 73 additions & 49 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from time import sleep, time
import traceback
from typing import (
TYPE_CHECKING,
Callable,
Iterable,
NoReturn,
Expand Down Expand Up @@ -149,9 +148,6 @@
get_utc_mode)
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy


class SchedulerStop(CylcError):
"""Scheduler normal stop."""
Expand All @@ -169,9 +165,10 @@ class Scheduler:
EVENT_SHUTDOWN = WorkflowEventHandler.EVENT_SHUTDOWN
EVENT_ABORTED = WorkflowEventHandler.EVENT_ABORTED
EVENT_WORKFLOW_TIMEOUT = WorkflowEventHandler.EVENT_WORKFLOW_TIMEOUT
EVENT_STALL = WorkflowEventHandler.EVENT_STALL
EVENT_STALL_TIMEOUT = WorkflowEventHandler.EVENT_STALL_TIMEOUT
EVENT_RESTART_TIMEOUT = WorkflowEventHandler.EVENT_RESTART_TIMEOUT
EVENT_INACTIVITY_TIMEOUT = WorkflowEventHandler.EVENT_INACTIVITY_TIMEOUT
EVENT_STALL = WorkflowEventHandler.EVENT_STALL

# Intervals in seconds
INTERVAL_MAIN_LOOP = 1.0
Expand Down Expand Up @@ -251,6 +248,7 @@ class Scheduler:
is_paused = False
is_updated = False
is_stalled = False
is_restart_timeout_wait = False
is_reloaded = False

# main loop
Expand Down Expand Up @@ -499,7 +497,8 @@ async def configure(self):
for event, start_now, log_reset_func in [
(self.EVENT_INACTIVITY_TIMEOUT, True, LOG.debug),
(self.EVENT_WORKFLOW_TIMEOUT, True, None),
(self.EVENT_STALL_TIMEOUT, False, None)
(self.EVENT_STALL_TIMEOUT, False, None),
(self.EVENT_RESTART_TIMEOUT, False, None)
]:
interval = self._get_events_conf(event)
if interval is not None:
Expand All @@ -508,6 +507,17 @@ async def configure(self):
timer.reset()
self.timers[event] = timer

if self.is_restart and not self.pool.get_all_tasks():
# This workflow completed before restart; wait for intervention.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
self.is_restart_timeout_wait = True
LOG.warning(
"This workflow already ran to completion."
"\nTo make it continue, trigger new tasks"
" before the restart timeout."
)

# Main loop plugins
self.main_loop_plugins = main_loop.load(
self.cylc_config.get('main loop', {}),
Expand Down Expand Up @@ -1413,7 +1423,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure()
await self.update_data_structure(self.is_reloaded)
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1633,11 +1643,36 @@ async def main_loop(self) -> None:

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
has_updated = await self.update_data_structure()
if has_updated and not self.is_stalled:
# Stop the stalled timer.

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)

if not self.is_reloaded:
# (A reload cannot unstall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

self.process_workflow_db_queue()

Expand Down Expand Up @@ -1686,17 +1721,12 @@ async def main_loop(self) -> None:
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP

async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
updated_tasks = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = self.is_updated or updated_tasks
reloaded = self.is_reloaded
# Add tasks that have moved moved from runahead to live pool.
if has_updated or self.data_store_mgr.updates_pending:
# Add tasks that have moved from runahead to live pool.
if self.data_store_mgr.updates_pending:
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
self.is_reloaded = False
# Publish updates:
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
Expand All @@ -1705,17 +1735,9 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
if has_updated:
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
# Reset workflow and task updated flags.
self.is_updated = False
if not reloaded: # (A reload cannot unstall workflow by itself)
self.is_stalled = False
for itask in updated_tasks:
itask.state.is_updated = False
self.update_data_store()
return has_updated
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
self.update_data_store()

def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand All @@ -1728,6 +1750,10 @@ def check_workflow_timers(self):
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
with suppress(KeyError):
self.is_restart_timeout_wait = False

def check_workflow_stalled(self) -> bool:
"""Check if workflow is stalled or not."""
Expand Down Expand Up @@ -1896,27 +1922,25 @@ def stop_clock_done(self):

def check_auto_shutdown(self):
"""Check if we should shut down now."""
if self.is_paused:
# Don't if paused.
return False

if self.check_workflow_stalled():
return False

if any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
if (
self.is_paused or
self.is_restart_timeout_wait or
self.check_workflow_stalled() or
# if more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
)
)
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
return False

# Can shut down.
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class WorkflowEventHandler():
EVENT_INACTIVITY_TIMEOUT = 'inactivity timeout'
EVENT_STALL = 'stall'
EVENT_STALL_TIMEOUT = 'stall timeout'
EVENT_RESTART_TIMEOUT = 'restart timeout'

WORKFLOW_EVENT_HANDLER = 'workflow-event-handler'
WORKFLOW_EVENT_MAIL = 'workflow-event-mail'
Expand Down

0 comments on commit 41c97a4

Please sign in to comment.