Skip to content

Commit

Permalink
Merge pull request #29 from glrs/feature/core
Browse files Browse the repository at this point in the history
Feature/core : Cleanup
  • Loading branch information
glrs authored Dec 5, 2024
2 parents 10ef0c8 + 4710224 commit 430889d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 132 deletions.
26 changes: 0 additions & 26 deletions lib/core_utils/destiny_interface.py

This file was deleted.

11 changes: 11 additions & 0 deletions lib/core_utils/singleton_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ def singleton(cls: Type[Any]) -> Type[Any]:
Returns:
Type[Any]: The singleton class with SingletonMeta as its metaclass.
Limitations:
- **Pickling Not Supported:** Singleton instances created with this decorator
cannot be pickled. Attempting to pickle such an instance will result in a
`TypeError` or `AttributeError`. If pickling is required, consider implementing
custom pickling methods or using a different singleton pattern.
- **Incompatible with Custom Metaclasses:** The singleton decorator cannot be
applied to classes that already have a custom metaclass. Doing so will raise a
`TypeError` due to metaclass conflicts. To use the singleton pattern with such
classes, you'll need to implement the singleton behavior manually or adjust
your class design.
"""

# Create a new class with SingletonMeta as its metaclass
Expand Down
108 changes: 2 additions & 106 deletions lib/module_utils/sjob_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,112 +9,6 @@

logging = custom_logger(__name__.split(".")[-1])

# import asyncio
# import logging
# import re

# from pathlib import Path

# from lib.utils.config_loader import configs

# class SlurmJobManager:
# def __init__(self, polling_interval=1.0, command_timeout=8.0):
# self.polling_interval = polling_interval
# self.command_timeout = command_timeout

# # TODO: Make sure the path to the slurm_manager.sh script exists or log an error
# self.slurm_script_path = Path(configs['yggdrasil_script_dir']) / "slurm_manager.sh" # Adjust this path as necessary

# async def submit_job(self, script_path):
# command = [self.slurm_script_path, "submit", script_path]

# print(">>>> COMMAND: ", command)
# try:
# process = await asyncio.create_subprocess_exec(
# *command,
# stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE
# )
# stdout, stderr = await asyncio.wait_for(process.communicate(), self.command_timeout)

# if process.returncode != 0:
# logging.error("Error submitting job. STDOUT: %s, STDERR: %s", stdout.decode(), stderr.decode())
# return None

# logging.debug(f"Slurm RAW submit output: {stdout}")
# logging.debug(f"STDOUT from slurm_manager.sh: {stdout.decode().strip()}")
# logging.debug(f"STDERR from slurm_manager.sh: {stderr.decode().strip()}")
# stdout_decoded = stdout.decode().strip()
# logging.debug(f"Slurm submit output: {stdout_decoded}")

# # Improved regex to capture the job ID from a string like "Submitted batch job 123456"
# match = re.search(r'Submitted batch job (\d+)', stdout_decoded)
# job_id = match.group(1) if match else None

# if job_id:
# logging.info(f"Job submitted with ID: {job_id}")
# return job_id
# else:
# logging.error("Failed to extract job ID from sbatch output.")

# except asyncio.TimeoutError:
# logging.error("Timeout while submitting job.")
# except Exception as e:
# logging.error(f"Unexpected error: {e}")

# return None

# async def monitor_job(self, job_id, sample):
# """Monitors the specified job and calls the sample's post-process method based on job status."""
# while True:
# status = await self._job_status(job_id)
# print(f">>>> RECEIVED STATUS: {status}")
# if status in ["COMPLETED", "FAILED", "CANCELLED"]:
# logging.info(f"Job {job_id} status: {status}")
# self.check_status(job_id, status, sample)
# break
# await asyncio.sleep(self.polling_interval)

# async def _job_status(self, job_id):
# command = [self.slurm_script_path, "monitor", job_id]
# try:
# process = await asyncio.create_subprocess_exec(
# *command,
# stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE
# )
# stdout, stderr = await asyncio.wait_for(process.communicate(), self.command_timeout)

# if process.returncode == 0:
# return stdout.decode().strip()

# except asyncio.TimeoutError:
# logging.error(f"Timeout while checking status of job {job_id}.")
# except Exception as e:
# logging.error(f"Unexpected error while checking status of job {job_id}: {e}")

# return None

# @staticmethod
# def check_status(job_id, status, sample):
# """
# Checks the status of a job and calls the appropriate method on the sample object.

# Args:
# job_id (str): The job ID.
# status (str): The status of the job.
# sample (object): The sample object (must have a post_process method and id attribute).
# """
# print(f"Job {job_id} status: {status}")
# if status == "COMPLETED":
# print(f"Sample {sample.id} processing completed.")
# sample.post_process()
# sample.status = "completed"
# elif status in ["FAILED", "CANCELLED"]:
# sample.status = "failed"
# print(f"Sample {sample.id} processing failed.")


#################################################################################################
######### CLASS BELOW ASSUMES ACCESS TO THE HOST SYSTEM TO SUBMIT SLURM JOBS ####################
#################################################################################################
Expand Down Expand Up @@ -241,6 +135,8 @@ async def _job_status(self, job_id: str) -> Optional[str]:
return stdout_decoded
except asyncio.TimeoutError:
logging.error(f"Timeout while checking status of job {job_id}.")
except UnicodeDecodeError:
logging.error(f"Failed to decode sbatch stdout for job {job_id}.")
except Exception as e:
logging.error(
f"Unexpected error while checking status of job {job_id}: {e}"
Expand Down

0 comments on commit 430889d

Please sign in to comment.