Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EPP to recusively fetch last recorded derived sample UDF #384

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
33498d8
draft script
kedhammar Dec 19, 2024
0cf2c0a
bugfix and none-case
kedhammar Dec 19, 2024
3c88aac
wip, add TODOs
kedhammar Dec 19, 2024
f3381bb
rename script file and bump docstring
kedhammar Dec 20, 2024
ff412b0
live testing dev
kedhammar Dec 20, 2024
89c7a80
fix
kedhammar Dec 20, 2024
a7c65e6
var ref fixes
kedhammar Dec 20, 2024
6e41824
bump logs
kedhammar Dec 20, 2024
3434eed
bump vlog
kedhammar Dec 20, 2024
d6ade1b
shut up mypy
kedhammar Dec 20, 2024
901ef86
wip
kedhammar Jan 7, 2025
f07296b
big swap, need testing
kedhammar Jan 7, 2025
391c274
fix so function can return None explicitly
kedhammar Jan 7, 2025
cd0ee61
try displaying steps used on exit 0
kedhammar Jan 7, 2025
5654317
bugfix
kedhammar Jan 7, 2025
e97f896
try stderr
kedhammar Jan 7, 2025
87a3a0c
try w/o
kedhammar Jan 7, 2025
94750dd
rename and change banner approach
kedhammar Jan 7, 2025
e796034
test exit 0 message
kedhammar Jan 8, 2025
f778f39
prev
kedhammar Jan 8, 2025
38a1ee5
prev
kedhammar Jan 8, 2025
16423cb
Add a very useful utility function to check for unpopulated UDFs and …
kedhammar Jan 8, 2025
58a22f8
improve logs
kedhammar Jan 8, 2025
29201df
ruff
kedhammar Jan 8, 2025
8d5b95e
ruff check fixes
kedhammar Jan 8, 2025
a5ab60c
bump docstring
kedhammar Jan 8, 2025
0b5913e
fix log
kedhammar Jan 8, 2025
b8e314e
fix issue with ont pooling, add argument to allow fetching from targe…
kedhammar Jan 8, 2025
b5ad584
remove superfluous handling of traceback
kedhammar Jan 8, 2025
b4165f9
ruff fixes
kedhammar Jan 8, 2025
5426656
shut up mypy
kedhammar Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Scilifelab_epps Version Log

## 20241220.1

Introduce EPP to fetch last recorded derived sample UDF.

## 20241211.1

No longer reserve PromethION column 3 for Clinical Genomics.
Expand Down
30 changes: 4 additions & 26 deletions scilifelab_epps/calc_from_args/udf_arg_methods.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#!/usr/bin/env python
import logging
from typing import Any

import yaml
from genologics.entities import Artifact, Process

from scilifelab_epps.utils import udf_tools
Expand All @@ -23,7 +21,6 @@ def fetch_from_arg(

"""

history: str | None = None
source: Artifact | Process
source_name: str

Expand All @@ -47,19 +44,11 @@ def fetch_from_arg(
value = process.udf[arg_dict["udf"]]
else:
if arg_dict["recursive"]:
# Fetch UDF recursively, back-tracking the input-output tuple
if arg_dict["source"] == "input":
use_current = False
else:
assert arg_dict["source"] == "output"
use_current = True

value, history = udf_tools.fetch_last(
currentStep=process,
art_tuple=art_tuple,
# Fetch UDF recursively

value = udf_tools.fetch_last(
target_art=source,
target_udfs=arg_dict["udf"],
use_current=use_current,
print_history=True,
)
else:
# Fetch UDF from input or output artifact
Expand All @@ -78,17 +67,6 @@ def fetch_from_arg(
else:
return on_fail

# Log what has been done
log_str = f"Fetched UDF '{arg_dict['udf']}': {value} from {arg_dict['source']} '{source_name}'."

if history:
history_yaml = yaml.load(history, Loader=yaml.FullLoader)
last_step_name = history_yaml[-1]["Step name"]
last_step_id = history_yaml[-1]["Step ID"]
log_str += f"\n\tUDF recusively fetched from step: '{last_step_name}' (ID: '{last_step_id}')"

logging.info(log_str)

return value


Expand Down
270 changes: 151 additions & 119 deletions scilifelab_epps/utils/udf_tools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
from typing import Union
import logging
import xml.etree.ElementTree as ET
from typing import Any, Union

from genologics.entities import Artifact, Process
from requests.exceptions import HTTPError
Expand All @@ -9,6 +11,32 @@
"""


def process_has_udfs(process: Process, target_udfs: list[str]) -> list[str]:
"""Check whether any target UDFs are present in the sample fields of the process associated type.

This function is necessary because a non-required sample UDF left blank will not be detected in the artifact object.

Returns a list of found UDFs, or an empty list if none were found.
"""

# Get the raw xml of the process associated type
raw_xml = process.type.xml()

# Parse as tree object
root = ET.fromstring(raw_xml)

# Instantiate return object
target_udfs_found = []

# Check whether the target UDF is present in the sample fields
for sample_field in root.iter("sample-field"):
for target_udf in target_udfs:
if sample_field.attrib["name"] == target_udf:
target_udfs_found.append(target_udf)

return target_udfs_found


def put(target: Artifact | Process, target_udf: str, val, on_fail=AssertionError):
"""Try to put UDF on artifact or process, optionally without causing fatal error.
Evaluates true on success and error (default) or on_fail param on failure.
Expand Down Expand Up @@ -39,22 +67,6 @@ def is_filled(art: Artifact, target_udf: str) -> bool:
return False


def no_outputs(currentStep: Process) -> bool:
"""Check whether step has outputs or not"""

art_tuples = get_art_tuples(currentStep)

if art_tuples:
none_outputs = [t[1] is None for t in art_tuples]

if all(none_outputs):
return True
else:
return False
else:
return True


def get_art_tuples(currentStep: Process) -> list:
"""Return I/O tuples whose elements are either
1) both analytes
Expand Down Expand Up @@ -135,125 +147,145 @@ def list_udfs(art: Artifact) -> list:


def fetch_last(
currentStep: Process,
art_tuple: tuple,
target_art: Artifact,
target_udfs: str | list,
use_current=True,
print_history=False,
include_current=False,
log_traceback=False,
return_traceback=False,
on_fail=AssertionError,
):
) -> Any | tuple[Any, dict]:
"""Recursively look for target UDF.

Target UDF can be supplied as a string, or as a prioritized list of strings.
Arguments:

target_art Artifact to traceback.

target_udfs The UDF(s) to look for. Can be supplied as a string, or as a prioritized
list of strings.

If "print_history" == True, will return both the target metric and the lookup history as a string.
include_current If True, will pull target UDFs if found in the target artifact.

log_traceback If True, will log the full traceback.

return_traceback If True, will additionally return the traceback as a dict.

on_fail If this is a subclass of Exception, will raise this exception on failure.
If not, will return this value on failure instead of the UDF value.
"""

# Convert to list, to enable iteration
if isinstance(target_udfs, str):
target_udfs = [target_udfs]

history = []
# Instantiate traceback
traceback = []
steps_visited = []

while True:
history.append({"Step name": currentStep.type.name, "Step ID": currentStep.id})

# Try to grab input and output articles, if possible
try:
input_art = art_tuple[0]["uri"]
except:
input_art = None
try:
output_art = art_tuple[1]["uri"]
except:
output_art = None

if len(history) == 1 and use_current is not True:
# If we are in the original step and "use_current" is false, skip
pass
else:
# Look trough outputs
if output_art:
history[-1].update(
{
"Derived sample ID": output_art.id,
"Derived sample UDFs": dict(output_art.udf.items()),
# Instantiate recursion variables
current_art = target_art
n = 1
try:
# Start recursive search
while True:
# Dynamically reassign parent process
pp = current_art.parent_process

# Keep track of visited parent processes
if pp is not None:
steps_visited.append(f"'{pp.type.name}' ({pp.id})")
target_udfs_in_parent_process = process_has_udfs(pp, target_udfs)

traceback.append(
{
"Artifact": {
"Name": current_art.name,
"ID": current_art.id,
"UDFs": dict(current_art.udf.items()),
"Parent Step": {
"Name": pp.type.name if pp else None,
"ID": pp.id if pp else None,
},
}
)
}
)

for target_udf in target_udfs:
if target_udf in list_udfs(output_art):
if print_history is True:
return output_art.udf[target_udf], json.dumps(
history, indent=2
# Search for correct UDF
for target_udf in target_udfs:
if target_udf in list_udfs(current_art):
if include_current is not True and n == 1:
logging.info(
"Target UDF was found in specified artifact, but include_current is set to False. Skipping."
)
else:
if log_traceback is True:
logging.info(
f"Traceback:\n{json.dumps(traceback, indent=2)}"
)
logging.info(
f"Found target UDF '{target_udf}'"
+ f" with value '{current_art.udf[target_udf]}'"
+ f" in process {steps_visited[-1]}"
+ f" {'output' if pp else 'input'}"
+ f" artifact '{current_art.name}' ({current_art.id})"
)

if return_traceback:
return current_art.udf[target_udf], traceback
else:
return output_art.udf[target_udf]

# Look through inputs
if input_art:
if input_art.parent_process:
history[-1].update(
{
"Input sample parent step name": input_art.parent_process.type.name,
"Input sample parent step ID": input_art.parent_process.id,
}
)
history[-1].update(
{
"Input sample ID": input_art.id,
"Input sample UDFs": dict(input_art.udf.items()),
}
return current_art.udf[target_udf]

# Address the case that no target UDFs were found on the artifact, even though they were present in the parent process
if pp is not None and target_udfs_in_parent_process != []:
logging.warning(
f"Parent process '{pp.type.name}' ({pp.id})"
+ f" has target UDF(s) {target_udfs_in_parent_process},"
+ f" but it's not filled in for artifact '{current_art.name}' ({current_art.id})."
+ " Please double check that you haven't missed filling it in."
)
for target_udf in target_udfs:
if target_udf in list_udfs(input_art):
if print_history is True:
return input_art.udf[target_udf], json.dumps(
history, indent=2
)
else:
return input_art.udf[target_udf]

# Cycle to previous step, if possible
try:
pp = input_art.parent_process
assert pp is not None

pp_tuples = get_art_tuples(pp)
matching_tuples = []
for pp_tuple in pp_tuples:
try:
pp_input = pp_tuple[0]["uri"]
except:
pp_input = None
try:
pp_output = pp_tuple[1]["uri"]
except:
pp_output = None

if (pp_input and pp_input.id == input_art.id) or (
pp_output and pp_output.id == input_art.id
):
matching_tuples.append(pp_tuple)

assert (
len(matching_tuples) == 1
), "Target artifact matches multiple inputs/outputs in previous step."

# Back-tracking successful, re-assign variables to represent previous step
currentStep = pp
art_tuple = matching_tuples[0]

except AssertionError:
if isinstance(on_fail, type) and issubclass(on_fail, Exception):
if print_history is True:
print(json.dumps(history, indent=2))
raise on_fail(
f"Could not find matching UDF(s) [{', '.join(target_udfs)}] for artifact tuple {art_tuple}"
# Stop traceback if no parent process is found
if pp is None:
raise AssertionError(
f"Artifact '{current_art.name}' ({current_art.id}) has no parent process linked and can't be traced back further."
)

pp_art_tuples = get_art_tuples(pp)

# If parent process has valid input-output tuples, use for linkage
linked_input_arts = []
if pp_art_tuples != []:
for pp_tuple in pp_art_tuples:
if pp_tuple[1]["uri"].id == current_art.id:
linked_input_arts.append(pp_tuple[0]["uri"])
else:
raise NotImplementedError(
"Parent process has no valid input-output links, traceback can't continue."
)

if len(linked_input_arts) == 1:
# Dynamically reassign current artifact
current_art = linked_input_arts[0]
elif len(linked_input_arts) > 1:
raise AssertionError(
"Parent process has multiple input artifacts linked to the same output artifact, can't traceback."
)
else:
if print_history is True:
print(json.dumps(history, indent=2))
return on_fail, json.dumps(history, indent=2)
else:
return on_fail
raise AssertionError(
"Parent process has no input artifacts linked to the output artifact, can't traceback."
)

n += 1

except AssertionError:
if isinstance(on_fail, type) and issubclass(on_fail, Exception):
raise on_fail(
f"Could not find matching UDF(s) [{', '.join(target_udfs)}] for artifact '{target_art.name}' ({target_art.id})"
)
else:
logging.warning(
f"Failed traceback for artifact '{target_art.name}' ({target_art.id}), falling back to value '{on_fail}'"
)
if return_traceback:
return on_fail, traceback
else:
return on_fail
6 changes: 5 additions & 1 deletion scilifelab_epps/zika/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ def fetch_sample_data(currentStep: Process, to_fetch: dict) -> pd.DataFrame:
except KeyError:
row[col_name] = None
else:
row[col_name] = fetch_last(currentStep, art_tuple, udf_query)
row[col_name] = fetch_last(
target_art=art_tuple[0]["uri"],
target_udfs=udf_query,
include_current=True,
)
rows.append(row)

# Transform to dataframe
Expand Down
Loading
Loading