From e3b3be6e33d85c1246f0f2e0025d0baaa7bb39a1 Mon Sep 17 00:00:00 2001 From: Chris Hammond Date: Thu, 17 Oct 2024 15:48:46 -0400 Subject: [PATCH] RTR Robustness Updates (#205) * Resolves #187 and some other general RTR snags. Also bumps dependencies. * Resolves linting issues * Caracara 0.9.0: Preparation for Python 3.13 (and huge code cleanup) (#206) * Major code formatting cleanup, deprecation of Python 3.8, and removal of setuptools requirement * Add note about Python version compatibility * Temporarily set Python 3.13 CI version to 3.13 RC2 * Changes to sate isort and black together * Unified development dependencies * Bumps py7zr to 0.22.0 as we no longer support Python 3.7. Also, bumps caracara-filters to 1.0.0+. * Bumps dependencies * Resolves #187 and some other general RTR snags. Also bumps dependencies. * Resolves linting issues * Resolves linting issues after merge --- caracara/modules/rtr/batch_session.py | 111 +++++++++++++++++++------- 1 file changed, 81 insertions(+), 30 deletions(-) diff --git a/caracara/modules/rtr/batch_session.py b/caracara/modules/rtr/batch_session.py index c7a1923..af192bb 100644 --- a/caracara/modules/rtr/batch_session.py +++ b/caracara/modules/rtr/batch_session.py @@ -5,8 +5,9 @@ from dataclasses import dataclass from datetime import datetime, timedelta from functools import partial, wraps +from itertools import repeat from threading import current_thread -from typing import Dict, List +from typing import Dict, List, Optional, Tuple from falconpy import RealTimeResponse, RealTimeResponseAdmin @@ -44,11 +45,6 @@ class InnerRTRBatchSession: # pylint: disable=too-few-public-methods a list of InnerRTRBatchSession objects. """ - batch_id: str = None - devices: Dict = None - expiry: datetime = None - logger: logging.Logger = None - def __init__(self, batch_id: str, devices: Dict, expiry: datetime, logger: logging.Logger): """Configure an inner batch of RTR sessions.""" self.batch_id = batch_id @@ -74,7 +70,7 @@ def generic_rtr_worker( func: partial, logger: logging.Logger, device_ids: List[str] = None, -): +) -> Tuple[str, Dict]: """ Execute a partial function against an RTR batch session. @@ -96,7 +92,7 @@ def generic_rtr_worker( func.keywords["optional_hosts"] = device_ids_in_batch response = func(batch_id=session.batch_id)["body"] logger.debug("%s | %s", thread_name, response) - return response + return thread_name, response class RTRBatchSession: @@ -121,14 +117,14 @@ class RTRBatchSession: @_batch_session_required def device_ids(self) -> List[str]: """Return a list of device IDs from all inner batch sessions.""" - return [x.devices.keys() for x in self.batch_sessions] + return [x.devices for x in self.batch_sessions] def connect( self, device_ids: List[str], queueing: bool = False, timeout: int = default_timeout, - ): + ) -> bool: """ Establish a connection to one or more hosts. @@ -142,7 +138,10 @@ def connect( batches.append(device_ids[i : i + MAX_BATCH_SESSION_HOSTS]) self.logger.info("Divided up devices into %d batches", len(batches)) - def worker(batch_device_ids: List[str], batch_func: partial): + def worker( + batch_device_ids: List[str], + batch_func: partial, + ) -> Tuple[str, Optional[InnerRTRBatchSession]]: thread_name = current_thread().name self.logger.info( "%s | Batch worker started with a list of %d devices", @@ -150,16 +149,34 @@ def worker(batch_device_ids: List[str], batch_func: partial): len(batch_device_ids), ) response = batch_func(host_ids=batch_device_ids)["body"] + self.logger.debug("%s | %s", thread_name, str(response)) resources = response["resources"] - self.logger.info("%s | Connected to %s systems", thread_name, len(resources)) + + # Identify devices that failed to connect and/or returned an error + # Resolves GitHub issue #187 + if not resources: + self.logger.info("%s | Resource list is empty", thread_name) + return thread_name, None + + successful_devices = { + device_id: device_data + for device_id, device_data in resources.items() + if not device_data.get("errors") + } + + if not successful_devices: + self.logger.info("%s | Successful device list is empty", thread_name) + return thread_name, None + + self.logger.info("%s | Connected to %s systems", thread_name, len(successful_devices)) self.logger.debug("%s | %s", thread_name, response) batch_data = InnerRTRBatchSession( batch_id=response["batch_id"], - devices=resources, + devices=successful_devices, expiry=datetime.now() + timedelta(seconds=SESSION_EXPIRY), logger=self.logger, ) - return batch_data + return thread_name, batch_data batch_func = partial( self.api.batch_init_sessions, @@ -171,17 +188,27 @@ def worker(batch_device_ids: List[str], batch_func: partial): with concurrent.futures.ThreadPoolExecutor( max_workers=MAX_BATCH_SESSION_THREADS ) as executor: - completed = executor.map(worker, batches, [batch_func]) + completed = executor.map(worker, batches, repeat(batch_func)) self.batch_sessions = [] - for complete in completed: - self.logger.info("Completed a batch of RTR connections") - self.batch_sessions.append(complete) + for thread_name, thread_data in completed: + self.logger.info("%s | Completed a batch of RTR connections", thread_name) + if thread_data is None: + self.logger.info("%s | Batch contained no successful connections", thread_name) + else: + self.logger.info( + "%s | Batch contained %d successful connections", + thread_name, + len(thread_data.devices), + ) + self.batch_sessions.append(thread_data) device_count = sum(len(d.devices) for d in self.batch_sessions) self.logger.info("Connected to %d devices", device_count) self.logger.debug(self.batch_sessions) + return len(self.batch_sessions) > 0 + @_batch_session_required def disconnect(self): """Disconnect the RTR batch session.""" @@ -218,15 +245,29 @@ def get( completed: List[Dict] = executor.map( partial_worker, self.batch_sessions, - [partial_get_func], + repeat(partial_get_func), ) batch_get_cmd_reqs: List[BatchGetCmdRequest] = [] - for complete in completed: - self.logger.info("Executed commands on a batch of %d hosts", len(complete)) + for thread_name, response in completed: + try: + devices = response["combined"]["resources"] + except KeyError: + self.logger.warning( + "%s | Batch contained no successful get command executions", + thread_name, + ) + continue + + self.logger.info( + "%s | Executed get command on a batch of %d devices", + thread_name, + len(devices), + ) + batch_get_cmd_req = BatchGetCmdRequest( - batch_get_cmd_req_id=complete["batch_get_cmd_req_id"], - devices=complete["combined"]["resources"], + batch_get_cmd_req_id=response["batch_get_cmd_req_id"], + devices=devices, ) batch_get_cmd_reqs.append(batch_get_cmd_req) @@ -278,7 +319,7 @@ def worker(batch_get_cmd_req: BatchGetCmdRequest, func: partial) -> List[GetFile timeout_duration=f"{timeout}s", ) with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: - completed = executor.map(worker, batch_get_cmd_reqs, [partial_func]) + completed = executor.map(worker, batch_get_cmd_reqs, repeat(partial_func)) all_get_files: List[GetFile] = [] @@ -345,13 +386,13 @@ def worker(session: InnerRTRBatchSession, func: partial): with concurrent.futures.ThreadPoolExecutor( max_workers=MAX_BATCH_SESSION_THREADS ) as executor: - completed = executor.map(worker, self.batch_sessions, [batch_func]) + completed = executor.map(worker, self.batch_sessions, repeat(batch_func)) for complete in completed: self.logger.info("Refreshed session %s", complete) @_batch_session_required - def run_generic_command( + def run_generic_command( # pylint: disable=too-many-locals self, command_string: str, device_ids: List[str] = None, @@ -399,13 +440,23 @@ def run_generic_command( completed: List[Dict] = executor.map( partial_worker, self.batch_sessions, - [partial_cmd_func], + repeat(partial_cmd_func), ) all_responses: Dict = {} - for complete in completed: - self.logger.info("Executed commands on a batch of %d hosts", len(complete)) - all_responses.update(complete["combined"]["resources"]) + for thread_name, response in completed: + try: + devices = response["combined"]["resources"] + except KeyError: + self.logger.warning( + "%s | Batch contained no successful command executions", + thread_name, + ) + continue + self.logger.info( + "%s | Executed commands on a batch of %d hosts", thread_name, len(devices) + ) + all_responses.update(devices) return all_responses