Skip to content

Commit

Permalink
Suspend execution feature
Browse files Browse the repository at this point in the history
This commit introduces the suspend execution feature to the nrunner. The
suspend execution was available on the legacy runner, but we didn't move
it to the nrunner. With this feature, it is possible to pause execution
of python based task on process spawner by sending SIGTSTP signal
(ctrl+z). It is helpful for debugging test execution.

Reference: #6059
Signed-off-by: Jan Richter <[email protected]>
  • Loading branch information
richtja committed Jan 20, 2025
1 parent f442648 commit 52ef928
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 18 deletions.
56 changes: 39 additions & 17 deletions avocado/core/nrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from avocado.core.nrunner.runnable import RUNNERS_REGISTRY_STANDALONE_EXECUTABLE
from avocado.core.plugin_interfaces import RunnableRunner
from avocado.core.utils import messages
from avocado.utils import process

#: The amount of time (in seconds) between each internal status check
RUNNER_RUN_CHECK_INTERVAL = 0.01
Expand Down Expand Up @@ -99,14 +100,30 @@ class PythonBaseRunner(BaseRunner, abc.ABC):
Base class for Python runners
"""

@staticmethod
def signal_handler(signum, frame): # pylint: disable=W0613
def __init__(self):
super().__init__()
self.proc = None
self.sigtstp = multiprocessing.Lock()
self.sigstopped = False
self.timeout = float("inf")

def signal_handler(self, signum, frame): # pylint: disable=W0613
if signum == signal.SIGTERM.value:
raise TestInterrupt("Test interrupted: Timeout reached")

@staticmethod
def _monitor(proc, time_started, queue):
timeout = float("inf")
elif signum == signal.SIGTSTP.value:
if self.sigstopped:
self.sigstopped = False
sign = signal.SIGCONT
else:
self.sigstopped = True
sign = signal.SIGSTOP
if not self.proc: # Ignore ctrl+z when proc not yet started
return
with self.sigtstp:
self.timeout = float("inf")
process.kill_process_tree(self.proc.pid, sign, False)

def _monitor(self, time_started, queue):
next_status_time = None
while True:
time.sleep(RUNNER_RUN_CHECK_INTERVAL)
Expand All @@ -115,37 +132,42 @@ def _monitor(proc, time_started, queue):
if next_status_time is None or now > next_status_time:
next_status_time = now + RUNNER_RUN_STATUS_INTERVAL
yield messages.RunningMessage.get()
if (now - time_started) > timeout:
proc.terminate()
if (now - time_started) > self.timeout:
self.proc.terminate()
else:
message = queue.get()
if message.get("type") == "early_state":
timeout = float(message.get("timeout") or float("inf"))
self.timeout = float(message.get("timeout") or float("inf"))
else:
yield message
if message.get("status") == "finished":
break
while self.sigstopped:
time.sleep(RUNNER_RUN_CHECK_INTERVAL)

def run(self, runnable):
# pylint: disable=W0201
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGTSTP, self.signal_handler)
# pylint: disable=W0201
self.runnable = runnable
yield messages.StartedMessage.get()
try:
queue = multiprocessing.SimpleQueue()
process = multiprocessing.Process(
self.proc = multiprocessing.Process(
target=self._run, args=(self.runnable, queue)
)

process.start()

while self.sigstopped:
pass
with self.sigtstp:
self.proc.start()
time_started = time.monotonic()
for message in self._monitor(process, time_started, queue):
for message in self._monitor(time_started, queue):
yield message

except TestInterrupt:
process.terminate()
for message in self._monitor(process, time_started, queue):
self.proc.terminate()
for message in self._monitor(time_started, queue):
yield message
except Exception as e:
yield messages.StderrMessage.get(traceback.format_exc())
Expand Down
22 changes: 22 additions & 0 deletions avocado/core/plugin_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,28 @@ async def terminate_task(self, runtime_task):
:rtype: bool
"""

async def stop_task(self, runtime_task):
"""Stop already spawned task.
:param runtime_task: wrapper for a Task with additional runtime
information.
:type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask`
:returns: whether the task has been stopped or not.
:rtype: bool
"""
raise NotImplementedError()

async def resume_task(self, runtime_task):
"""Resume already stopped task.
:param runtime_task: wrapper for a Task with additional runtime
information.
:type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask`
:returns: whether the task has been resumed or not.
:rtype: bool
"""
raise NotImplementedError()

@staticmethod
@abc.abstractmethod
async def check_task_requirements(runtime_task):
Expand Down
1 change: 1 addition & 0 deletions avocado/core/task/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class RuntimeTaskStatus(Enum):
FAIL_TRIAGE = "FINISHED WITH FAILURE ON TRIAGE"
FAIL_START = "FINISHED FAILING TO START"
STARTED = "STARTED"
PAUSED = "PAUSED"

@staticmethod
def finished_statuses():
Expand Down
26 changes: 26 additions & 0 deletions avocado/core/task/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from avocado.core.exceptions import JobFailFast
from avocado.core.output import LOG_UI
from avocado.core.task.runtime import RuntimeTaskStatus
from avocado.core.teststatus import STATUSES_NOT_OK
from avocado.core.utils import messages
Expand Down Expand Up @@ -493,6 +494,31 @@ async def terminate_tasks_interrupted(self):
terminated = await self._terminate_tasks(task_status)
await self._send_finished_tasks_message(terminated, "Interrupted by user")

@staticmethod
async def stop_resume_tasks(state_machine, spawner):
async with state_machine.lock:
try:
for runtime_task in state_machine.monitored:
if runtime_task.status == RuntimeTaskStatus.STARTED:
await spawner.stop_task(runtime_task)
runtime_task.status = RuntimeTaskStatus.PAUSED
LOG_UI.warning(
f"{runtime_task.task.identifier}: {runtime_task.status.value}"
)
elif runtime_task.status == RuntimeTaskStatus.PAUSED:
await spawner.resume_task(runtime_task)
runtime_task.status = RuntimeTaskStatus.STARTED
LOG_UI.warning(
f"{runtime_task.task.identifier}: {runtime_task.status.value}"
)
except NotImplementedError:
LOG.warning(
f"Sending signals to tasks is not implemented for spawner: {spawner}"
)
LOG_UI.warning(
f"Sending signals to tasks is not implemented for spawner: {spawner}"
)

async def run(self):
"""Pushes Tasks forward and makes them do something with their lives."""
while True:
Expand Down
10 changes: 10 additions & 0 deletions avocado/plugins/runner_nrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import platform
import random
import signal
import tempfile

from avocado.core.dispatcher import SpawnerDispatcher
Expand Down Expand Up @@ -269,6 +270,10 @@ def _abort_if_missing_runners(runnables):
)
raise JobError(msg)

@staticmethod
def signal_handler(spawner, state_machine):
asyncio.create_task(Worker.stop_resume_tasks(state_machine, spawner))

def run_suite(self, job, test_suite):
summary = set()

Expand Down Expand Up @@ -335,6 +340,11 @@ def run_suite(self, job, test_suite):
]
asyncio.ensure_future(self._update_status(job))
loop = asyncio.get_event_loop()
if hasattr(signal, "SIGTSTP"):
loop.add_signal_handler(
signal.SIGTSTP,
lambda: self.signal_handler(spawner, self.tsm),
)
try:
try:
loop.run_until_complete(
Expand Down
11 changes: 11 additions & 0 deletions avocado/plugins/spawners/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import signal
import socket

from avocado.core.dependencies.requirements import cache
Expand Down Expand Up @@ -109,6 +110,16 @@ async def terminate_task(self, runtime_task):
pass
return returncode is not None

async def stop_task(self, runtime_task):
try:
runtime_task.spawner_handle.process.send_signal(signal.SIGTSTP)
except ProcessLookupError:
return False
return

async def resume_task(self, runtime_task):
await self.stop_task(runtime_task)

@staticmethod
async def check_task_requirements(runtime_task):
"""Check the runtime task requirements needed to be able to run"""
Expand Down
18 changes: 18 additions & 0 deletions docs/source/guides/contributor/chapters/tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ During the execution look for::

avocado --show avocado.utils.debug run examples/tests/assets.py

Interrupting test
-----------------

In case you want to "pause" the running test, you can use SIGTSTP (ctrl+z)
signal sent to the main avocado process. This signal is forwarded to test
and it's children processes. To resume testing you repeat the same signal.

.. note::
The job timeouts are still enabled on stopped processes.

.. note::
It is supported on on process spawner only.

.. warning::
This feature is meant only for debugging purposes and it can
cause unreliable behavior especially if the signal is sent during the
test initialization. Therefore use it with caution.

Line-profiler
-------------

Expand Down
2 changes: 1 addition & 1 deletion selftests/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"unit": 682,
"jobs": 11,
"functional-parallel": 317,
"functional-serial": 7,
"functional-serial": 8,
"optional-plugins": 0,
"optional-plugins-golang": 2,
"optional-plugins-html": 3,
Expand Down
52 changes: 52 additions & 0 deletions selftests/functional/serial/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import re
import signal
import time

from avocado.utils import process, script
from selftests.utils import AVOCADO, TestCaseTmpDir

SLEEP_TEST = """import time
from avocado import Test
class SleepTest(Test):
timeout = 10
def test(self):
self.log.debug("Sleeping starts: %s", time.time())
time.sleep(9)
self.log.debug("Sleeping ends: %s", time.time())
"""


class RunnerOperationTest(TestCaseTmpDir):
def test_pause(self):
with script.TemporaryScript(
"sleep.py",
SLEEP_TEST,
) as tst:
cmd_line = f"{AVOCADO} run --disable-sysinfo --job-results-dir {self.tmpdir.name} -- {tst}"
proc = process.SubProcess(cmd_line)
proc.start()
init = True
while init:
output = proc.get_stdout()
if b"STARTED" in output:
init = False
proc.send_signal(signal.SIGTSTP)
time.sleep(10)
proc.send_signal(signal.SIGTSTP)
proc.wait()
full_log_path = os.path.join(self.tmpdir.name, "latest", "full.log")
with open(full_log_path, encoding="utf-8") as full_log_file:
full_log = full_log_file.read()
regex_start = re.search("Sleeping starts: ([0-9]*)", full_log)
regex_end = re.search("Sleeping ends: ([0-9]*)", full_log)
start_time = int(regex_start.group(1))
start_end = int(regex_end.group(1))
self.assertIn("SleepTest.test: PAUSED", full_log)
self.assertIn("SleepTest.test: STARTED", full_log)
self.assertGreaterEqual(start_end - start_time, 7)

0 comments on commit 52ef928

Please sign in to comment.