From 25e39a31c73a99808e4d42ebc8d18eb1517afa24 Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Thu, 5 Dec 2024 18:50:40 +0100 Subject: [PATCH 1/3] Enhance docstring with limitations of the singleton decorator --- lib/core_utils/singleton_decorator.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/core_utils/singleton_decorator.py b/lib/core_utils/singleton_decorator.py index 475e8f2..8713a9d 100644 --- a/lib/core_utils/singleton_decorator.py +++ b/lib/core_utils/singleton_decorator.py @@ -23,6 +23,17 @@ def singleton(cls: Type[Any]) -> Type[Any]: Returns: Type[Any]: The singleton class with SingletonMeta as its metaclass. + + Limitations: + - **Pickling Not Supported:** Singleton instances created with this decorator + cannot be pickled. Attempting to pickle such an instance will result in a + `TypeError` or `AttributeError`. If pickling is required, consider implementing + custom pickling methods or using a different singleton pattern. + - **Incompatible with Custom Metaclasses:** The singleton decorator cannot be + applied to classes that already have a custom metaclass. Doing so will raise a + `TypeError` due to metaclass conflicts. To use the singleton pattern with such + classes, you'll need to implement the singleton behavior manually or adjust + your class design. """ # Create a new class with SingletonMeta as its metaclass From 9eef82278007efd7d87905ce2420bc763af00c6d Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Thu, 5 Dec 2024 18:51:12 +0100 Subject: [PATCH 2/3] Cleanup old commented code --- lib/module_utils/sjob_manager.py | 108 +------------------------------ 1 file changed, 2 insertions(+), 106 deletions(-) diff --git a/lib/module_utils/sjob_manager.py b/lib/module_utils/sjob_manager.py index a1f1a25..2eeed5b 100644 --- a/lib/module_utils/sjob_manager.py +++ b/lib/module_utils/sjob_manager.py @@ -9,112 +9,6 @@ logging = custom_logger(__name__.split(".")[-1]) -# import asyncio -# import logging -# import re - -# from pathlib import Path - -# from lib.utils.config_loader import configs - -# class SlurmJobManager: -# def __init__(self, polling_interval=1.0, command_timeout=8.0): -# self.polling_interval = polling_interval -# self.command_timeout = command_timeout - -# # TODO: Make sure the path to the slurm_manager.sh script exists or log an error -# self.slurm_script_path = Path(configs['yggdrasil_script_dir']) / "slurm_manager.sh" # Adjust this path as necessary - -# async def submit_job(self, script_path): -# command = [self.slurm_script_path, "submit", script_path] - -# print(">>>> COMMAND: ", command) -# try: -# process = await asyncio.create_subprocess_exec( -# *command, -# stdout=asyncio.subprocess.PIPE, -# stderr=asyncio.subprocess.PIPE -# ) -# stdout, stderr = await asyncio.wait_for(process.communicate(), self.command_timeout) - -# if process.returncode != 0: -# logging.error("Error submitting job. STDOUT: %s, STDERR: %s", stdout.decode(), stderr.decode()) -# return None - -# logging.debug(f"Slurm RAW submit output: {stdout}") -# logging.debug(f"STDOUT from slurm_manager.sh: {stdout.decode().strip()}") -# logging.debug(f"STDERR from slurm_manager.sh: {stderr.decode().strip()}") -# stdout_decoded = stdout.decode().strip() -# logging.debug(f"Slurm submit output: {stdout_decoded}") - -# # Improved regex to capture the job ID from a string like "Submitted batch job 123456" -# match = re.search(r'Submitted batch job (\d+)', stdout_decoded) -# job_id = match.group(1) if match else None - -# if job_id: -# logging.info(f"Job submitted with ID: {job_id}") -# return job_id -# else: -# logging.error("Failed to extract job ID from sbatch output.") - -# except asyncio.TimeoutError: -# logging.error("Timeout while submitting job.") -# except Exception as e: -# logging.error(f"Unexpected error: {e}") - -# return None - -# async def monitor_job(self, job_id, sample): -# """Monitors the specified job and calls the sample's post-process method based on job status.""" -# while True: -# status = await self._job_status(job_id) -# print(f">>>> RECEIVED STATUS: {status}") -# if status in ["COMPLETED", "FAILED", "CANCELLED"]: -# logging.info(f"Job {job_id} status: {status}") -# self.check_status(job_id, status, sample) -# break -# await asyncio.sleep(self.polling_interval) - -# async def _job_status(self, job_id): -# command = [self.slurm_script_path, "monitor", job_id] -# try: -# process = await asyncio.create_subprocess_exec( -# *command, -# stdout=asyncio.subprocess.PIPE, -# stderr=asyncio.subprocess.PIPE -# ) -# stdout, stderr = await asyncio.wait_for(process.communicate(), self.command_timeout) - -# if process.returncode == 0: -# return stdout.decode().strip() - -# except asyncio.TimeoutError: -# logging.error(f"Timeout while checking status of job {job_id}.") -# except Exception as e: -# logging.error(f"Unexpected error while checking status of job {job_id}: {e}") - -# return None - -# @staticmethod -# def check_status(job_id, status, sample): -# """ -# Checks the status of a job and calls the appropriate method on the sample object. - -# Args: -# job_id (str): The job ID. -# status (str): The status of the job. -# sample (object): The sample object (must have a post_process method and id attribute). -# """ -# print(f"Job {job_id} status: {status}") -# if status == "COMPLETED": -# print(f"Sample {sample.id} processing completed.") -# sample.post_process() -# sample.status = "completed" -# elif status in ["FAILED", "CANCELLED"]: -# sample.status = "failed" -# print(f"Sample {sample.id} processing failed.") - - ################################################################################################# ######### CLASS BELOW ASSUMES ACCESS TO THE HOST SYSTEM TO SUBMIT SLURM JOBS #################### ################################################################################################# @@ -241,6 +135,8 @@ async def _job_status(self, job_id: str) -> Optional[str]: return stdout_decoded except asyncio.TimeoutError: logging.error(f"Timeout while checking status of job {job_id}.") + except UnicodeDecodeError: + logging.error(f"Failed to decode sbatch stdout for job {job_id}.") except Exception as e: logging.error( f"Unexpected error while checking status of job {job_id}: {e}" From 4710224996df5bc783127aab9b064c905c6c1ac4 Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Thu, 5 Dec 2024 18:53:17 +0100 Subject: [PATCH 3/3] Never peaked up, remove. Could revisit in the future --- lib/core_utils/destiny_interface.py | 26 -------------------------- 1 file changed, 26 deletions(-) delete mode 100644 lib/core_utils/destiny_interface.py diff --git a/lib/core_utils/destiny_interface.py b/lib/core_utils/destiny_interface.py deleted file mode 100644 index 8279d21..0000000 --- a/lib/core_utils/destiny_interface.py +++ /dev/null @@ -1,26 +0,0 @@ -import warnings -from abc import ABC, abstractmethod - - -class DestinyInterface(ABC): - """ - DestinyInterface serves as an abstract base for different 'destiny' strategies in the Yggdrasil application. - It defines the interface for processing documents, where each destiny encapsulates a unique pathway - or processing logic, akin to the diverse fates woven by the Norns under Yggdrasil. - """ - - def __init__(self): - warnings.warn( - "DestinyInterface is deprecated and will be removed in future releases, use RealmTemplate instead.", - DeprecationWarning, - ) - - @abstractmethod - def process(self, doc): - """ - Process a document according to the specific destiny (strategy). This method needs to be implemented - by each concrete destiny class, defining how each document's journey unfolds. - - :param doc: The document to be processed. - """ - pass