Skip to content

Commit

Permalink
Merge pull request #116 from HonakerM/add_watch_retry_and_exit
Browse files Browse the repository at this point in the history
Add retry logic to PRM watch thread
  • Loading branch information
gabe-l-hart authored Sep 6, 2024
2 parents 41ff81a + 13b7f89 commit 9ac5c17
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 62 deletions.
4 changes: 4 additions & 0 deletions oper8/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ python_watch_manager:
heartbeat_file: null
heartbeat_period: 30s

# Amount of times to retry a watch before exiting
watch_retry_count: 5
watch_retry_delay: 5s

# watch filter used to reduce the amount of reconciles. The following
# filters are available by default, or you can supply a custom filter via
# the module.Filter notation.
Expand Down
6 changes: 6 additions & 0 deletions oper8/test_helpers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ def __init__(
disable_raise=False,
get_state_fail=False,
get_state_raise=False,
watch_fail=False,
watch_raise=False,
generate_resource_version=True,
set_status_fail=False,
set_status_raise=False,
Expand All @@ -235,6 +237,7 @@ def __init__(
resources, generate_resource_version=generate_resource_version, **kwargs
)

self.watch_fail = "assert" if watch_raise else watch_fail
self.deploy_fail = "assert" if deploy_raise else deploy_fail
self.disable_fail = "assert" if disable_raise else disable_fail
self.get_state_fail = "assert" if get_state_raise else get_state_fail
Expand Down Expand Up @@ -270,6 +273,9 @@ def enable_mocks(self):
self.set_status_fail, super().set_status, (False, False)
)
)
self.watch_objects = mock.Mock(
side_effect=get_failable_method(self.watch_fail, super().watch_objects, [])
)

def get_obj(self, kind, name, namespace=None, api_version=None):
return self.get_object_current_state(kind, name, namespace, api_version)[1]
Expand Down
7 changes: 7 additions & 0 deletions oper8/watch_manager/python_watch_manager/threads/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ def check_preconditions(self) -> bool:
self.leadership_manager.acquire()

return True

def wait_on_precondition(self, timeout: float) -> bool:
"""Helper function to allow threads to wait for a certain period of time
only being interrupted for preconditions"""
self.shutdown.wait(timeout)

return self.check_preconditions()
116 changes: 73 additions & 43 deletions oper8/watch_manager/python_watch_manager/threads/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Optional, Set
import copy
import dataclasses
import sys

# Third Party
from kubernetes import watch
Expand All @@ -14,6 +15,7 @@
import alog

# Local
from .... import config
from ....deploy_manager import DeployManagerBase, KubeEventType, KubeWatchEvent
from ....managed_object import ManagedObject
from ..filters import FilterManager, get_configured_filter
Expand All @@ -24,6 +26,7 @@
ResourceId,
WatchedResource,
WatchRequest,
parse_time_delta,
)
from .base import ThreadBase

Expand Down Expand Up @@ -95,6 +98,12 @@ def __init__( # pylint: disable=too-many-arguments
# Lock for adding/gathering watch requests
self.watch_request_lock = Lock()

# Variables for tracking retries
self.attempts_left = config.python_watch_manager.watch_retry_count
self.retry_delay = parse_time_delta(
config.python_watch_manager.watch_retry_delay or ""
)

def run(self):
"""The WatchThread's control loop continuously watches the DeployManager for any new
events. For every event it gets it gathers all the WatchRequests whose `watched` value
Expand All @@ -106,60 +115,81 @@ def run(self):
# Check for leadership and shutdown at the start
list_resource_version = 0
while True:
if not self.check_preconditions():
log.debug("Checking preconditions failed. Shuting down")
return

for event in self.deploy_manager.watch_objects(
self.kind,
self.api_version,
namespace=self.namespace,
resource_version=list_resource_version,
watch_manager=self.kubernetes_watch,
):
# Validate leadership on each event
try:
if not self.check_preconditions():
log.debug("Checking preconditions failed. Shuting down")
log.debug("Checking preconditions failed. Shutting down")
return

resource = event.resource
for event in self.deploy_manager.watch_objects(
self.kind,
self.api_version,
namespace=self.namespace,
resource_version=list_resource_version,
watch_manager=self.kubernetes_watch,
):
# Validate leadership on each event
if not self.check_preconditions():
log.debug("Checking preconditions failed. Shutting down")
return

resource = event.resource

# Gather all the watch requests which apply to this event
watch_requests = self._gather_resource_requests(resource)
if not watch_requests:
log.debug2("Skipping resource without requested watch")
self._clean_event(event)
continue

# Gather all the watch requests which apply to this event
watch_requests = self._gather_resource_requests(resource)
if not watch_requests:
log.debug2("Skipping resource without requested watch")
self._clean_event(event)
continue
# Ensure a watched object exists for every resource
if resource.uid not in self.watched_resources:
self._create_watched_resource(resource, watch_requests)

# Check both global and watch specific filters
watch_requests = self._check_filters(
watch_requests, resource, event.type
)
if not watch_requests:
log.debug2(
"Skipping event %s as all requests failed filters", event
)
self._clean_event(event)
continue

# Ensure a watched object exists for every resource
if resource.uid not in self.watched_resources:
self._create_watched_resource(resource, watch_requests)
# Push a reconcile request for each watch requested
for watch_request in watch_requests:
log.debug(
"Requesting reconcile for %s",
resource,
extra={"resource": watch_request.requester.get_resource()},
)
self._request_reconcile(event, watch_request)

# Clean up any resources used for the event
self._clean_event(event)

# Check both global and watch specific filters
watch_requests = self._check_filters(
watch_requests, resource, event.type
# Update the resource version to only get new events
list_resource_version = self.kubernetes_watch.resource_version
except Exception as exc:
log.info(
"Exception raised when attempting to watch %s",
repr(exc),
exc_info=exc,
)
if not watch_requests:
log.debug2(
"Skipping event %s as all requests failed filters", event
if self.attempts_left <= 0:
log.error(
"Unable to start watch within %d attempts",
config.python_watch_manager.watch_retry_count,
)
self._clean_event(event)
continue
sys.exit(1)

# Push a reconcile request for each watch requested
for watch_request in watch_requests:
if not self.wait_on_precondition(self.retry_delay.total_seconds()):
log.debug(
"Requesting reconcile for %s",
resource,
extra={"resource": watch_request.requester.get_resource()},
"Checking preconditions failed during retry. Shutting down"
)
self._request_reconcile(event, watch_request)

# Clean up any resources used for the event
self._clean_event(event)

# Update the resource version to only get new events
list_resource_version = self.kubernetes_watch.resource_version
return
self.attempts_left = self.attempts_left - 1
log.info("Restarting watch with %d attempts left", self.attempts_left)

## Class Interface ###################################################

Expand Down
Loading

0 comments on commit 9ac5c17

Please sign in to comment.