diff --git a/oper8/config/config.yaml b/oper8/config/config.yaml index 8f1f229..6caeda2 100644 --- a/oper8/config/config.yaml +++ b/oper8/config/config.yaml @@ -82,6 +82,10 @@ python_watch_manager: heartbeat_file: null heartbeat_period: 30s + # Amount of times to retry a watch before exiting + watch_retry_count: 5 + watch_retry_delay: 5s + # watch filter used to reduce the amount of reconciles. The following # filters are available by default, or you can supply a custom filter via # the module.Filter notation. diff --git a/oper8/test_helpers/helpers.py b/oper8/test_helpers/helpers.py index a61764c..7bc67c7 100644 --- a/oper8/test_helpers/helpers.py +++ b/oper8/test_helpers/helpers.py @@ -209,6 +209,8 @@ def __init__( disable_raise=False, get_state_fail=False, get_state_raise=False, + watch_fail=False, + watch_raise=False, generate_resource_version=True, set_status_fail=False, set_status_raise=False, @@ -235,6 +237,7 @@ def __init__( resources, generate_resource_version=generate_resource_version, **kwargs ) + self.watch_fail = "assert" if watch_raise else watch_fail self.deploy_fail = "assert" if deploy_raise else deploy_fail self.disable_fail = "assert" if disable_raise else disable_fail self.get_state_fail = "assert" if get_state_raise else get_state_fail @@ -270,6 +273,9 @@ def enable_mocks(self): self.set_status_fail, super().set_status, (False, False) ) ) + self.watch_objects = mock.Mock( + side_effect=get_failable_method(self.watch_fail, super().watch_objects, []) + ) def get_obj(self, kind, name, namespace=None, api_version=None): return self.get_object_current_state(kind, name, namespace, api_version)[1] diff --git a/oper8/watch_manager/python_watch_manager/threads/base.py b/oper8/watch_manager/python_watch_manager/threads/base.py index 661f6eb..ce4eb64 100644 --- a/oper8/watch_manager/python_watch_manager/threads/base.py +++ b/oper8/watch_manager/python_watch_manager/threads/base.py @@ -84,3 +84,10 @@ def check_preconditions(self) -> bool: self.leadership_manager.acquire() return True + + def wait_on_precondition(self, timeout: float) -> bool: + """Helper function to allow threads to wait for a certain period of time + only being interrupted for preconditions""" + self.shutdown.wait(timeout) + + return self.check_preconditions() diff --git a/oper8/watch_manager/python_watch_manager/threads/watch.py b/oper8/watch_manager/python_watch_manager/threads/watch.py index 11ecd19..6a97e87 100644 --- a/oper8/watch_manager/python_watch_manager/threads/watch.py +++ b/oper8/watch_manager/python_watch_manager/threads/watch.py @@ -6,6 +6,7 @@ from typing import Dict, List, Optional, Set import copy import dataclasses +import sys # Third Party from kubernetes import watch @@ -14,6 +15,7 @@ import alog # Local +from .... import config from ....deploy_manager import DeployManagerBase, KubeEventType, KubeWatchEvent from ....managed_object import ManagedObject from ..filters import FilterManager, get_configured_filter @@ -24,6 +26,7 @@ ResourceId, WatchedResource, WatchRequest, + parse_time_delta, ) from .base import ThreadBase @@ -95,6 +98,12 @@ def __init__( # pylint: disable=too-many-arguments # Lock for adding/gathering watch requests self.watch_request_lock = Lock() + # Variables for tracking retries + self.attempts_left = config.python_watch_manager.watch_retry_count + self.retry_delay = parse_time_delta( + config.python_watch_manager.watch_retry_delay or "" + ) + def run(self): """The WatchThread's control loop continuously watches the DeployManager for any new events. For every event it gets it gathers all the WatchRequests whose `watched` value @@ -106,60 +115,81 @@ def run(self): # Check for leadership and shutdown at the start list_resource_version = 0 while True: - if not self.check_preconditions(): - log.debug("Checking preconditions failed. Shuting down") - return - - for event in self.deploy_manager.watch_objects( - self.kind, - self.api_version, - namespace=self.namespace, - resource_version=list_resource_version, - watch_manager=self.kubernetes_watch, - ): - # Validate leadership on each event + try: if not self.check_preconditions(): - log.debug("Checking preconditions failed. Shuting down") + log.debug("Checking preconditions failed. Shutting down") return - resource = event.resource + for event in self.deploy_manager.watch_objects( + self.kind, + self.api_version, + namespace=self.namespace, + resource_version=list_resource_version, + watch_manager=self.kubernetes_watch, + ): + # Validate leadership on each event + if not self.check_preconditions(): + log.debug("Checking preconditions failed. Shutting down") + return + + resource = event.resource + + # Gather all the watch requests which apply to this event + watch_requests = self._gather_resource_requests(resource) + if not watch_requests: + log.debug2("Skipping resource without requested watch") + self._clean_event(event) + continue - # Gather all the watch requests which apply to this event - watch_requests = self._gather_resource_requests(resource) - if not watch_requests: - log.debug2("Skipping resource without requested watch") - self._clean_event(event) - continue + # Ensure a watched object exists for every resource + if resource.uid not in self.watched_resources: + self._create_watched_resource(resource, watch_requests) + + # Check both global and watch specific filters + watch_requests = self._check_filters( + watch_requests, resource, event.type + ) + if not watch_requests: + log.debug2( + "Skipping event %s as all requests failed filters", event + ) + self._clean_event(event) + continue - # Ensure a watched object exists for every resource - if resource.uid not in self.watched_resources: - self._create_watched_resource(resource, watch_requests) + # Push a reconcile request for each watch requested + for watch_request in watch_requests: + log.debug( + "Requesting reconcile for %s", + resource, + extra={"resource": watch_request.requester.get_resource()}, + ) + self._request_reconcile(event, watch_request) + + # Clean up any resources used for the event + self._clean_event(event) - # Check both global and watch specific filters - watch_requests = self._check_filters( - watch_requests, resource, event.type + # Update the resource version to only get new events + list_resource_version = self.kubernetes_watch.resource_version + except Exception as exc: + log.info( + "Exception raised when attempting to watch %s", + repr(exc), + exc_info=exc, ) - if not watch_requests: - log.debug2( - "Skipping event %s as all requests failed filters", event + if self.attempts_left <= 0: + log.error( + "Unable to start watch within %d attempts", + config.python_watch_manager.watch_retry_count, ) - self._clean_event(event) - continue + sys.exit(1) - # Push a reconcile request for each watch requested - for watch_request in watch_requests: + if not self.wait_on_precondition(self.retry_delay.total_seconds()): log.debug( - "Requesting reconcile for %s", - resource, - extra={"resource": watch_request.requester.get_resource()}, + "Checking preconditions failed during retry. Shutting down" ) - self._request_reconcile(event, watch_request) - - # Clean up any resources used for the event - self._clean_event(event) - - # Update the resource version to only get new events - list_resource_version = self.kubernetes_watch.resource_version + return + self.attempts_left = self.attempts_left - 1 + log.info("Restarting watch with %d attempts left", self.attempts_left) ## Class Interface ################################################### diff --git a/tests/watch_manager/python_watch_manager/threads/test_watch_thread.py b/tests/watch_manager/python_watch_manager/threads/test_watch_thread.py index b644810..98aa624 100644 --- a/tests/watch_manager/python_watch_manager/threads/test_watch_thread.py +++ b/tests/watch_manager/python_watch_manager/threads/test_watch_thread.py @@ -2,6 +2,7 @@ Tests for the WatchThread """ # Standard +from unittest.mock import patch import time # Third Party @@ -10,7 +11,7 @@ # Local from oper8.deploy_manager.dry_run_deploy_manager import DryRunDeployManager from oper8.deploy_manager.kube_event import KubeEventType -from oper8.test_helpers.helpers import library_config +from oper8.test_helpers.helpers import MockDeployManager, library_config from oper8.test_helpers.pwm_helpers import ( DisabledLeadershipManager, MockedReconcileThread, @@ -32,11 +33,13 @@ @pytest.mark.timeout(5) def test_watch_thread_happy_path(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() watched_object = make_resource(spec={"test": "value"}) watched_object_id = ResourceId.from_resource(watched_object) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -66,11 +69,13 @@ def test_watch_thread_happy_path(): @pytest.mark.timeout(5) def test_watch_thread_filtered(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() watched_object = make_resource(spec={"test": "value"}) watched_object_id = ResourceId.from_resource(watched_object) - with library_config(python_watch_manager={"filter": DisableFilter}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": DisableFilter} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -99,11 +104,13 @@ def test_watch_thread_filtered(): @pytest.mark.timeout(5) def test_watch_thread_deleted(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() watched_object = make_resource(spec={"test": "value"}) watched_object_id = ResourceId.from_resource(watched_object) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -132,7 +139,6 @@ def test_watch_thread_deleted(): @pytest.mark.timeout(5) def test_watch_thread_owner_watch(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() owner_object = make_resource( kind="DifferentKind", name="owner", spec={"test": "value"} ) @@ -145,7 +151,10 @@ def test_watch_thread_owner_watch(): # Deploy owner before watch has started dm.deploy([owner_object]) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -178,7 +187,6 @@ def test_watch_thread_owner_watch(): @pytest.mark.timeout(5) def test_watch_thread_global_watch(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() owner_object = make_resource( kind="DifferentKind", name="owner", spec={"test": "value"} ) @@ -189,7 +197,10 @@ def test_watch_thread_global_watch(): dm.deploy([owner_object]) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -226,13 +237,15 @@ def test_watch_thread_global_watch(): @pytest.mark.timeout(5) def test_watch_thread_all_events(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() watched_object = make_resource(spec={"test": "value"}) request_resource_id = ResourceId( api_version=watched_object.get("apiVersion"), kind=watched_object.get("kind") ) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -266,7 +279,6 @@ def test_watch_thread_all_events(): @pytest.mark.timeout(5) def test_watch_thread_global_watch_two_owners(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() owner_object = make_resource(kind="OwnerKind", name="owner", spec={"test": "value"}) owner_2_object = make_resource( kind="OwnerKind", name="owner2", spec={"test": "value"} @@ -280,7 +292,10 @@ def test_watch_thread_global_watch_two_owners(): dm.deploy([owner_object]) dm.deploy([owner_2_object]) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -329,10 +344,12 @@ def test_watch_thread_global_watch_two_owners(): @pytest.mark.timeout(5) def test_watch_thread_no_watch(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() watched_object = make_resource(spec={"test": "value"}) - with library_config(python_watch_manager={"filter": DisableFilter}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": DisableFilter} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( reconcile_thread=mocked_reconcile_thread, kind="Foo", @@ -354,11 +371,13 @@ def test_watch_thread_no_watch(): @pytest.mark.timeout(5) def test_watch_thread_not_leader(): dm = DryRunDeployManager() - mocked_reconcile_thread = MockedReconcileThread() watched_object = make_resource(spec={"test": "value"}) watched_object_id = ResourceId.from_resource(watched_object) - with library_config(python_watch_manager={"filter": None}): + with library_config( + python_watch_manager={"process_context": "fork", "filter": None} + ): + mocked_reconcile_thread = MockedReconcileThread() watch_thread = WatchThread( leadership_manager=DisabledLeadershipManager(), reconcile_thread=mocked_reconcile_thread, @@ -378,3 +397,42 @@ def test_watch_thread_not_leader(): time.sleep(1.5) watch_thread.stop_thread() assert mocked_reconcile_thread.requests.empty() + + +@pytest.mark.timeout(5) +@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning") +def test_watch_thread_invalid_rbac(): + dm = MockDeployManager(watch_raise=True) + watched_object = make_resource(spec={"test": "value"}) + watched_object_id = ResourceId.from_resource(watched_object) + + with patch( + "oper8.watch_manager.python_watch_manager.threads.watch.sys.exit", + side_effect=Exception("EndTest"), + ) as exit_mock, library_config( + python_watch_manager={ + "process_context": "fork", + "filter": None, + "watch_retry_count": 3, + "watch_retry_delay": "0.1s", + } + ): + mocked_reconcile_thread = MockedReconcileThread() + watch_thread = WatchThread( + reconcile_thread=mocked_reconcile_thread, + kind="Foo", + api_version="foo.bar.com/v1", + namespace="test", + deploy_manager=dm, + ) + + request = WatchRequest(watched=watched_object_id, requester=watched_object_id) + watch_thread.request_watch(request) + watch_thread.start_thread() + + # Wait for the retries + time.sleep(1) + + # Assert we tried to watch 4 times (3 retries plus the initial) + assert dm.watch_objects.call_count == 4 + assert exit_mock.called