Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add k8s event interpretation #382

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions acto/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from acto.kubectl_client import KubectlClient
from acto.snapshot import Snapshot
from acto.utils import acto_timer, get_thread_logger
from acto.utils.k8s_event_watcher import K8sEventWatcher

RunnerHookType = Callable[[kubernetes.client.ApiClient], None]
CustomSystemStateHookType = Callable[
Expand Down Expand Up @@ -456,7 +457,7 @@ def wait_for_system_converge(self, hard_timeout=480) -> bool:
while True:
try:
event = combined_event_queue.get(timeout=self.wait_time)
if event == "timeout":
if event in ["timeout", K8sEventWatcher.ABORT]:
converge = False
break
except queue.Empty:
Expand Down Expand Up @@ -590,9 +591,13 @@ def wait_for_system_converge(self, hard_timeout=480) -> bool:

def watch_system_events(self, event_stream, q: multiprocessing.Queue):
"""A process that watches namespaced events"""
for _ in event_stream:

watcher = K8sEventWatcher(q)

for payload in event_stream:
try:
q.put("event")
watcher.observe(payload)
except (ValueError, AssertionError):
pass

Expand Down
135 changes: 135 additions & 0 deletions acto/utils/k8s_event_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""determines whether cluster is stuck in an unhealthy state through interpreting K8s events"""

import json
import multiprocessing
from typing import Callable, Optional
import copy
from acto.utils.thread_logger import get_thread_logger


class Predicate:
"""Predicate for deciding abort"""

def __init__(
self,
reason: str,
message_filter: Callable[[Optional[str]], bool] = lambda x: True,
threshold: Optional[int] = None,
) -> None:
self.reason = reason
self.message_filter = (
message_filter # event count threshold for deciding an abort
)
self.threshold = threshold

def match(self, reason: str, message: str = ""):
"""decide whether a k8s event reason and message matches predicate"""
return self.reason == reason and self.message_filter(message)

def __str__(self) -> str:
return f"(reason: {self.reason}, count_threshold: {self.threshold})"


# todo: unify this with other Acto configs
k8s_event_watcher_config = {
"default_threshold": 3,
"abort_predicates": [
# a full list of kubelet emitted reason can be found at
# https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go
# event reason emitted by K8s controllers however, needs a scan
# from source code unfortunately
# kublet reasons, todo: add fine-calibrated message filters
Predicate("Failed"),
Predicate("BackOff", threshold=5),
Predicate("FailedCreatePodContainer"),
Predicate("ErrImageNeverPull"),
Predicate("FailedAttachVolume"),
Predicate("FailedMount"),
Predicate("VolumeResizeFailed"),
Predicate("FileSystemResizeFailed"),
Predicate("FailedMapVolume"),
# it is possible that scheduling fails due to node.kubernetes.io/not-ready
# which should be transient. We need to filter for truly alarming ones
Predicate(
"FailedScheduling",
lambda msg: any(
keyword in msg
for keyword in [
"affinity",
"Insufficient memory",
"Insufficient cpu",
]
),
),
],
}


class K8sEventWatcher:
"""watch for K8s events that might signal an unresolvable state
and request Acto to abort the convergence wait"""

ABORT = "k8s_event_watcher_abort_request"

def __init__(self, output_channel: multiprocessing.Queue) -> None:
self.logger = get_thread_logger(with_prefix=True)
self.output_channel = output_channel
self.counter = dict()
self.abort_requested = False
self.abort_predicates = copy.deepcopy(
k8s_event_watcher_config.get("abort_predicates", [])
)
for predicate in self.abort_predicates:
t = predicate.threshold
predicate.threshold = (
t
if t is not None and t > 0
else k8s_event_watcher_config.get("default_threshold", 3)
)

def observe(self, payload: bytes) -> None:
"""observe a K8s event in json byte string format"""
if (
self.abort_requested
): # do nothing since we have already requested Acto to abort the convergence wait
return

try:
event: dict = json.loads(payload.decode("utf-8"))
reason = event.get("object", {}).get("reason")
message = event.get("object", {}).get("message")
except Exception:
self.logger.warning(
"Failed to deserialize K8s event from payload %s", str(payload)
)
return

for predicate in self.abort_predicates:
if predicate.match(reason, message):
involved_object = event["object"].get("involvedObject", {})
self.logger.info(
"Observered K8s event matching abort predicate %s for object %s"
,predicate, str(involved_object)
)
object_id = involved_object.get("uid", "")

need_abort = self._inc_and_check(object_id, predicate)
if need_abort:
self.logger.warning(
"Aborting convergence wait due to failed predicate %s",
predicate
)
self.output_channel.put(self.ABORT)
self.abort_requested = True
break

def _inc_and_check(self, object_id: str, predicate: Predicate) -> bool:
if not predicate in self.counter:
self.counter[predicate] = {}

if not object_id in self.counter[predicate]:
self.counter[predicate][object_id] = 0

self.counter[predicate][object_id] += 1
need_abort = self.counter[predicate][object_id] >= predicate.threshold
return need_abort
124 changes: 124 additions & 0 deletions test/e2e_tests/test_convergence_wait_abort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""This module tests quick abort from waiting for system convergence upon observing unresolvable
errors from K8s events when deploying a testcase"""

import logging
import os
import pathlib
from typing import Callable
import tempfile
import unittest
from acto import utils
from acto.kubernetes_engine.kind import Kind
from acto.utils.k8s_event_watcher import k8s_event_watcher_config
from acto.runner import Runner

test_dir = pathlib.Path(__file__).parent.resolve()
test_data_dir = os.path.join(test_dir, "test_data")


class TestConvergenceWaitAbort(unittest.TestCase):
"""tests if K8sEventWatcher correctly issues abort request by examing log output"""

def __init__(self, methodName: str = "runTest") -> None:
super().__init__(methodName)
# lower threshold for the sake of faster test
k8s_event_watcher_config["default_threshold"] = 2

def test_unsatisfiable_affinity_rule(self):
"""should issue abort when affinity cannot be satisfied"""

def log_file_test(log_file_path) -> bool:
keyword = "Aborting convergence wait due to failed predicate (reason: FailedScheduling,"
with open(log_file_path, "r", encoding="utf-8") as log_file:
for log_line in log_file:
if keyword in log_line:
return True
return False

resource_manifest_path = os.path.join(
test_data_dir, "k8s-event-watcher", "unsatisfiable-affinity.yaml"
)
self._test_convergence_wait_abort(
"unsatisfiable-affinity", resource_manifest_path, log_file_test
)

def test_invalid_image(self):
"""should abort when detecting image pull errors"""
def log_file_test(log_file_path) -> bool:
keyword = "Aborting convergence wait due to failed predicate (reason: Failed,"
with open(log_file_path, "r", encoding="utf-8") as log_file:
for log_line in log_file:
if keyword in log_line:
return True
return False

resource_manifest_path = os.path.join(
test_data_dir, "k8s-event-watcher", "invalid-image.yaml"
)
self._test_convergence_wait_abort(
"invalid-image", resource_manifest_path, log_file_test
)

def test_satisfiable_deployment(self):
"should never abort a convergence wait for satisfiable deployments"
def log_file_test(log_file_path) -> bool:
keyword = "Aborting convergence"
with open(log_file_path, "r", encoding="utf-8") as log_file:
for log_line in log_file:
if keyword in log_line:
return False
return True

resource_manifest_path = os.path.join(
test_data_dir, "k8s-event-watcher", "satisfiable-deployment.yaml"
)
self._test_convergence_wait_abort(
"satisfiable", resource_manifest_path, log_file_test
)


def _test_convergence_wait_abort(
self,
cluster_name: str,
resource_file_path: str,
log_test_predicate: Callable[[str], bool],
) -> str:
"""apply a resource manifest and examine the log file"""

tmp_dir = tempfile.TemporaryDirectory()

log_file_path = os.path.join(tmp_dir.name, "test.log")

logging.basicConfig(
filename=log_file_path,
level=logging.WARN,
format="%(message)s",
force=True,
)

kube_config_path = os.path.join(
os.path.expanduser("~"), ".kube/test-" + cluster_name
)

cluster = Kind(acto_namespace=0, num_nodes=3, version="v1.27.3")

cluster.create_cluster(cluster_name, kube_config_path)

runner = Runner(
context={
"namespace": "test",
"crd": None,
"preload_images": set(),
},
trial_dir=tmp_dir.name,
kubeconfig=kube_config_path,
context_name="kind-" + cluster_name,
)

utils.create_namespace(runner.apiclient, "test")

runner.run_without_collect(resource_file_path)
cluster.delete_cluster(cluster_name, kube_config_path)

assert log_test_predicate(log_file_path)
tmp_dir.cleanup()
30 changes: 30 additions & 0 deletions test/e2e_tests/test_data/k8s-event-watcher/invalid-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
spec:
selector:
matchLabels:
app: busybox
replicas: 12
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: strange-invalid-image
resources:
requests:
cpu: 1

limits:
cpu: 1

command:
- "sh"
- "-c"
- "while true; do sleep 3600; done"


Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
spec:
selector:
matchLabels:
app: busybox
replicas: 3
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox
command:
- "sh"
- "-c"
- "while true; do sleep 3600; done"


Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
spec:
selector:
matchLabels:
app: busybox
replicas: 5 #unsatisfiable affinity rule when we have only 3 worker nodes
template:
metadata:
labels:
app: busybox
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- busybox
topologyKey: kubernetes.io/hostname
containers:
- name: busybox
image: busybox
command:
- "sh"
- "-c"
- "while true; do sleep 3600; done"