-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add retry logic to PRM watch thread #116
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
from typing import Dict, List, Optional, Set | ||
import copy | ||
import dataclasses | ||
import sys | ||
|
||
# Third Party | ||
from kubernetes import watch | ||
|
@@ -14,6 +15,7 @@ | |
import alog | ||
|
||
# Local | ||
from .... import config | ||
from ....deploy_manager import DeployManagerBase, KubeEventType, KubeWatchEvent | ||
from ....managed_object import ManagedObject | ||
from ..filters import FilterManager, get_configured_filter | ||
|
@@ -24,6 +26,7 @@ | |
ResourceId, | ||
WatchedResource, | ||
WatchRequest, | ||
parse_time_delta, | ||
) | ||
from .base import ThreadBase | ||
|
||
|
@@ -95,6 +98,12 @@ def __init__( # pylint: disable=too-many-arguments | |
# Lock for adding/gathering watch requests | ||
self.watch_request_lock = Lock() | ||
|
||
# Variables for tracking retries | ||
self.attempts_left = config.python_watch_manager.watch_retry_count | ||
self.retry_delay = parse_time_delta( | ||
config.python_watch_manager.watch_retry_delay or "" | ||
) | ||
|
||
def run(self): | ||
"""The WatchThread's control loop continuously watches the DeployManager for any new | ||
events. For every event it gets it gathers all the WatchRequests whose `watched` value | ||
|
@@ -110,56 +119,73 @@ def run(self): | |
log.debug("Checking preconditions failed. Shuting down") | ||
return | ||
|
||
for event in self.deploy_manager.watch_objects( | ||
self.kind, | ||
self.api_version, | ||
namespace=self.namespace, | ||
resource_version=list_resource_version, | ||
watch_manager=self.kubernetes_watch, | ||
): | ||
# Validate leadership on each event | ||
if not self.check_preconditions(): | ||
log.debug("Checking preconditions failed. Shuting down") | ||
return | ||
try: | ||
for event in self.deploy_manager.watch_objects( | ||
self.kind, | ||
self.api_version, | ||
namespace=self.namespace, | ||
resource_version=list_resource_version, | ||
watch_manager=self.kubernetes_watch, | ||
): | ||
# Validate leadership on each event | ||
if not self.check_preconditions(): | ||
log.debug("Checking preconditions failed. Shuting down") | ||
return | ||
|
||
resource = event.resource | ||
|
||
# Gather all the watch requests which apply to this event | ||
watch_requests = self._gather_resource_requests(resource) | ||
if not watch_requests: | ||
log.debug2("Skipping resource without requested watch") | ||
self._clean_event(event) | ||
continue | ||
|
||
resource = event.resource | ||
# Ensure a watched object exists for every resource | ||
if resource.uid not in self.watched_resources: | ||
self._create_watched_resource(resource, watch_requests) | ||
|
||
# Gather all the watch requests which apply to this event | ||
watch_requests = self._gather_resource_requests(resource) | ||
if not watch_requests: | ||
log.debug2("Skipping resource without requested watch") | ||
self._clean_event(event) | ||
continue | ||
# Check both global and watch specific filters | ||
watch_requests = self._check_filters( | ||
watch_requests, resource, event.type | ||
) | ||
if not watch_requests: | ||
log.debug2( | ||
"Skipping event %s as all requests failed filters", event | ||
) | ||
self._clean_event(event) | ||
continue | ||
|
||
# Ensure a watched object exists for every resource | ||
if resource.uid not in self.watched_resources: | ||
self._create_watched_resource(resource, watch_requests) | ||
# Push a reconcile request for each watch requested | ||
for watch_request in watch_requests: | ||
log.debug( | ||
"Requesting reconcile for %s", | ||
resource, | ||
extra={"resource": watch_request.requester.get_resource()}, | ||
) | ||
self._request_reconcile(event, watch_request) | ||
|
||
# Check both global and watch specific filters | ||
watch_requests = self._check_filters( | ||
watch_requests, resource, event.type | ||
) | ||
if not watch_requests: | ||
log.debug2( | ||
"Skipping event %s as all requests failed filters", event | ||
) | ||
# Clean up any resources used for the event | ||
self._clean_event(event) | ||
continue | ||
|
||
# Push a reconcile request for each watch requested | ||
for watch_request in watch_requests: | ||
log.debug( | ||
"Requesting reconcile for %s", | ||
resource, | ||
extra={"resource": watch_request.requester.get_resource()}, | ||
# Update the resource version to only get new events | ||
list_resource_version = self.kubernetes_watch.resource_version | ||
except Exception as exc: | ||
log.info( | ||
"Exception raised when attempting to watch %s", | ||
repr(exc), | ||
exc_info=exc, | ||
) | ||
if self.attempts_left <= 0: | ||
log.error( | ||
"Unable to start watch within %d attempts", | ||
config.python_watch_manager.watch_retry_count, | ||
) | ||
self._request_reconcile(event, watch_request) | ||
|
||
# Clean up any resources used for the event | ||
self._clean_event(event) | ||
sys.exit(1) | ||
|
||
# Update the resource version to only get new events | ||
list_resource_version = self.kubernetes_watch.resource_version | ||
self.wait_on_precondition(self.retry_delay.total_seconds()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This returns a bool, but I'm not seeing it used anywhere. Can the preconditions fail? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
good catch. It should be checked in the retry logic.
Yep! If the thread should shutdown then it returns |
||
self.attempts_left = self.attempts_left - 1 | ||
log.info("Restarting watch with %d attempts left", self.attempts_left) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may be because I'm reviewing on my phone and can't see indentation well, but I'm not seeing how this actually performs the retry. There isn't a loop above the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is why I shouldn't review on a phone! |
||
|
||
## Class Interface ################################################### | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is an indentation change, but
Shutting