Skip to content

Commit

Permalink
Merge pull request #5 from nkongenelly/arteria_runfolder_handlers
Browse files Browse the repository at this point in the history
Arteria runfolder handlers
  • Loading branch information
Aratz authored Sep 11, 2024
2 parents f144191 + a83ca30 commit 345a2db
Show file tree
Hide file tree
Showing 13 changed files with 752 additions and 409 deletions.
5 changes: 4 additions & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
*.xml binary
* text=auto

poetry.lock text -diff
*.xml text -diff
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.venv
.coverage
.idea/
__pycache__
dist/
*.log
3 changes: 3 additions & 0 deletions arteria/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
Arteria
"""
import importlib.metadata

__version__ = importlib.metadata.version("arteria")
2 changes: 1 addition & 1 deletion arteria/config_schemas/schema_arteria_runfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
"port",
"logger_config_file",
]
}
}
173 changes: 173 additions & 0 deletions arteria/handlers/arteria_runfolder_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from aiohttp import web
from pathlib import Path
from arteria import __version__
import logging
from arteria.models.state import State
from arteria.models.config import Config
from arteria.handlers.base import base_routes
from arteria.models.runfolder_utils import Runfolder, list_runfolders


routes = base_routes
log = logging.getLogger(__name__)


@routes.post("/runfolders/path/{runfolder:.*}")
async def post_runfolders(request):
"""
When this is called with payload {"state": "STARTED"},
the state of the runfolder is set to STARTED
"""

data = await request.post()
runfolder_path = Path(request.match_info['runfolder'])

if not any([
runfolder_path.parent == Path(monitored_directory)
for monitored_directory in Config()['monitored_directories']
]):
raise web.HTTPBadRequest(
reason=f"{runfolder_path} does not belong to a monitored directory")

try:
runfolder = Runfolder(runfolder_path)
except AssertionError as exc:
log.exception(exc)
raise web.HTTPNotFound(reason=exc) from exc

state = data["state"]
try:
runfolder.state = State[state]
except KeyError as exc:
raise web.HTTPBadRequest(reason=f"The state '{state}' is not valid") from exc

return web.Response(status=200)


@routes.get("/runfolders/path/{runfolder:.*}")
async def get_runfolders(request):
"""
Returns some information about the runfolder as json
"""
runfolder_path = Path(request.match_info['runfolder'])

if not any([
runfolder_path.parent == Path(monitored_directory)
for monitored_directory in Config()['monitored_directories']
]):
raise web.HTTPBadRequest(
reason=f"{runfolder_path} does not belong to a monitored directory")

try:
runfolder = Runfolder(runfolder_path)
except AssertionError as exc:
log.exception(exc)
raise web.HTTPNotFound(reason=exc) from exc

return web.json_response(
data=serialize_runfolder_path(runfolder, request),
status=200
)


@routes.get("/runfolders/next")
async def get_next_runfolder(request):
"""
Finds unprocessed runfolder (state=ready) and then
returns some information about this runfolder.
"""
try:
runfolders = list_runfolders(
Config()['monitored_directories'],
filter_key=lambda r: r.state == State.READY
)

if len(runfolders) > 0:
runfolder_dict = serialize_runfolder_path(runfolders[0], request)

return web.json_response(
data=runfolder_dict,
status=200
)
else:
raise web.HTTPNoContent(
reason="No ready runfolder found."
)
except AssertionError as exc:
log.exception(exc)
raise web.HTTPNotFound(reason=exc) from exc


@routes.get("/runfolders/pickup")
async def get_pickup_runfolder(request):
"""
Used to start processing runfolders and also sets the runfolder to PENDING state.
"""
try:
runfolders = list_runfolders(
Config()['monitored_directories'],
filter_key=lambda r: r.state == State.READY
)

if runfolders:
runfolders[0].state = State.PENDING
runfolder_dict = serialize_runfolder_path(runfolders[0], request)
return web.json_response(
data=runfolder_dict,
status=200
)
else:
raise web.HTTPNoContent(
reason="No ready runfolders available."
)
except AssertionError as exc:
log.exception(exc)
raise web.HTTPNotFound(reason=exc) from exc


@routes.get("/runfolders")
async def get_all_runfolders(request):
"""
Returns information about all the runfolders that
match the state specified (or all runfolders when state
is not specified)
"""
try:
runfolders = list_runfolders(
Config()['monitored_directories'],
filter_key=lambda r: r.state == State.READY
)

runfolders = [
serialize_runfolder_path(runfolder, request)
for runfolder in runfolders
]

return web.json_response(
data={"runfolders": runfolders},
status=200
)
except AssertionError as exc:
log.exception(exc)
raise web.HTTPNotFound(reason=exc) from exc


def get_host_link(request, runfolder_path, ):
host = request.url.raw_host
link = f"{request.scheme}://{host}/api/1.0/runfolders/path{runfolder_path}"

return host, link


def serialize_runfolder_path(runfolder_cls, request):
"""
Get the path uri as web.json_response gives an error when
self.path is of type Path
"""
runfolder_dict = runfolder_cls.to_dict()
runfolder_dict['service_version'] = __version__
runfolder_path = Path(runfolder_dict['path'])
runfolder_dict['path'] = runfolder_path.as_uri()
runfolder_dict['host'], runfolder_dict['link'] = get_host_link(request, runfolder_path)

return runfolder_dict
3 changes: 0 additions & 3 deletions arteria/handlers/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
"""
Base routes that should be included in all services
"""

from aiohttp import web

from arteria import __version__


base_routes = web.RouteTableDef()


Expand Down
49 changes: 37 additions & 12 deletions arteria/models/runfolder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,41 @@ def list_runfolders(monitored_directories, filter_key=lambda r: True):
runfolders when no state filter is given.
"""
runfolders = []
for monitored_directory in monitored_directories:
monitored_dir_path = Path(monitored_directory)
for subdir in monitored_dir_path.iterdir():
try:
if filter_key(runfolder := Runfolder(monitored_dir_path / subdir)):
runfolders.append(runfolder)
except AssertionError as e:
if e == f"File [Rr]unParameters.xml not found in runfolder {subdir}":
continue
monitored_runfolders_paths = get_monitored_subdirs(monitored_directories)
for monitored_runfolders_path in monitored_runfolders_paths:
try:
if filter_key(runfolder := Runfolder(monitored_runfolders_path)):
runfolders.append(runfolder)
except AssertionError as e:
if (
str(e) != (
"File [Rr]unParameters.xml not found in runfolder "
f"{monitored_runfolders_path}"
)
):
raise

return runfolders


def get_monitored_subdirs(monitored_directories):
return [
Path(monitored_directory) / subdir
for monitored_directory in monitored_directories
for subdir in Path(monitored_directory).iterdir()
]


class Runfolder():
"""
A class to manipulate runfolders on disk
"""
def __init__(self, path):
self.config = Config(DEFAULT_CONFIG)
self.path = Path(path)
assert self.path.is_dir()
self.path = path

assert self.path.is_dir(), f"Runfolder '{path.name}' does not exist"

try:
run_parameter_file = next(
path
Expand All @@ -53,10 +68,13 @@ def __init__(self, path):
)
self.run_parameters = xmltodict.parse(run_parameter_file.read_text())["RunParameters"]
except StopIteration as exc:
raise AssertionError(f"File [Rr]unParameters.xml not found in runfolder {path}") from exc
raise AssertionError(
f"File [Rr]unParameters.xml not found in runfolder {path}"
) from exc

marker_file_name = Instrument(self.run_parameters).completed_marker_file
marker_file = (self.path / marker_file_name)

assert (
marker_file.exists()
and (
Expand Down Expand Up @@ -116,6 +134,13 @@ def metadata(self):

return metadata

def to_dict(self):
return {
"metadata": self.metadata,
"path": self.path,
"state": self.state.name
}


class Instrument:
RUNPARAMETERS_INSTRUMENT_ID_KEYS = [
Expand Down
5 changes: 2 additions & 3 deletions arteria/services/arteria_runfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import logging.config

from aiohttp import web

from arteria.models.config import Config
from arteria.handlers.base import base_routes as routes
from arteria.handlers.arteria_runfolder_handlers import routes
from arteria.config_schemas.schema_arteria_runfolder import runfolder_schema


Expand Down Expand Up @@ -36,4 +35,4 @@ def main():
config_dict = yaml.safe_load(config_file.read())

app = get_app(config_dict)
web.run_app(app, port=config.get("port"))
web.run_app(app, port=config_dict.get("port"))
Loading

0 comments on commit 345a2db

Please sign in to comment.