Skip to content

Commit

Permalink
refactor!: rename study --> study statement / statement
Browse files Browse the repository at this point in the history
  • Loading branch information
korikuzma committed Nov 14, 2024
1 parent 126c19a commit 1217429
Show file tree
Hide file tree
Showing 16 changed files with 763 additions and 740 deletions.
133 changes: 67 additions & 66 deletions src/metakb/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ def _add_mappings_and_exts_to_obj(obj: dict, obj_keys: list[str]) -> None:
obj_keys.append(f"{name}:${name}")


def _add_method(tx: ManagedTransaction, method: dict, ids_in_studies: set[str]) -> None:
def _add_method(tx: ManagedTransaction, method: dict, ids_in_stmts: set[str]) -> None:
"""Add Method node and its relationships to DB
:param tx: Transaction object provided to transaction functions
:param method: CDM method object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
"""
if method["id"] not in ids_in_studies:
if method["id"] not in ids_in_stmts:
return

query = """
Expand All @@ -80,7 +80,7 @@ def _add_method(tx: ManagedTransaction, method: dict, ids_in_studies: set[str])
# Method's documents are unique and do not currently have IDs
# They also only have one document
document = is_reported_in[0]
_add_document(tx, document, ids_in_studies)
_add_document(tx, document, ids_in_stmts)
doc_doi = document["doi"]
query += f"""
MERGE (d:Document {{ doi:'{doc_doi}' }})
Expand All @@ -91,16 +91,16 @@ def _add_method(tx: ManagedTransaction, method: dict, ids_in_studies: set[str])


def _add_gene_or_disease(
tx: ManagedTransaction, obj_in: dict, ids_in_studies: set[str]
tx: ManagedTransaction, obj_in: dict, ids_in_stmts: set[str]
) -> None:
"""Add gene or disease node and its relationships to DB
:param tx: Transaction object provided to transaction functions
:param obj_in: CDM gene or disease object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
:raises TypeError: When `obj_in` is not a disease or gene
"""
if obj_in["id"] not in ids_in_studies:
if obj_in["id"] not in ids_in_stmts:
return

obj = obj_in.copy()
Expand Down Expand Up @@ -129,16 +129,16 @@ def _add_gene_or_disease(
def _add_therapeutic_procedure(
tx: ManagedTransaction,
therapeutic_procedure: dict,
ids_in_studies: set[str],
ids_in_stmts: set[str],
) -> None:
"""Add therapeutic procedure node and its relationships
:param tx: Transaction object provided to transaction functions
:param therapeutic_procedure: Therapeutic procedure CDM object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
:raises TypeError: When therapeutic procedure type is invalid
"""
if therapeutic_procedure["id"] not in ids_in_studies:
if therapeutic_procedure["id"] not in ids_in_stmts:
return

tp = therapeutic_procedure.copy()
Expand Down Expand Up @@ -264,15 +264,15 @@ def _add_variation(tx: ManagedTransaction, variation_in: dict) -> None:
def _add_categorical_variant(
tx: ManagedTransaction,
categorical_variant_in: dict,
ids_in_studies: set[str],
ids_in_stmts: set[str],
) -> None:
"""Add categorical variant objects to DB.
:param tx: Transaction object provided to transaction functions
:param categorical_variant_in: Categorical variant CDM object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
"""
if categorical_variant_in["id"] not in ids_in_studies:
if categorical_variant_in["id"] not in ids_in_stmts:
return

cv = categorical_variant_in.copy()
Expand Down Expand Up @@ -311,19 +311,19 @@ def _add_categorical_variant(


def _add_document(
tx: ManagedTransaction, document_in: dict, ids_in_studies: set[str]
tx: ManagedTransaction, document_in: dict, ids_in_stmts: set[str]
) -> None:
"""Add Document object to DB.
:param tx: Transaction object provided to transaction functions
:param document: Document CDM object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
"""
# Not all document's have IDs. These are the fields that can uniquely identify
# a document
if "id" in document_in:
query = "MATCH (n:Document {id:$id}) RETURN n"
if document_in["id"] not in ids_in_studies:
if document_in["id"] not in ids_in_stmts:
return
elif "doi" in document_in:
query = "MATCH (n:Document {doi:$doi}) RETURN n"
Expand Down Expand Up @@ -351,81 +351,81 @@ def _add_document(
tx.run(query, **document)


def _get_ids_from_studies(studies: list[dict]) -> set[str]:
"""Get unique IDs from studies
def _get_ids_from_stmts(statements: list[dict]) -> set[str]:
"""Get unique IDs from statements
:param studies: List of studies
:return: Set of IDs found in studies
:param statements: List of statements
:return: Set of IDs found in statements
"""

def _add_obj_id_to_set(obj: dict, ids_set: set[str]) -> None:
"""Add object id to set of IDs
:param obj: Object to get ID for
:param ids_set: IDs found in studies. This will be mutated.
:param ids_set: IDs found in statements. This will be mutated.
"""
obj_id = obj.get("id")
if obj_id:
ids_set.add(obj_id)

ids_in_studies = set()
ids_in_stmts = set()

for study in studies:
for statement in statements:
for obj in [
study.get("specifiedBy"), # method
study.get("reportedIn"),
study.get("subjectVariant"),
study.get("objectTherapeutic"),
study.get("conditionQualifier"),
study.get("geneContextQualifier"),
statement.get("specifiedBy"), # method
statement.get("reportedIn"),
statement.get("subjectVariant"),
statement.get("objectTherapeutic"),
statement.get("conditionQualifier"),
statement.get("geneContextQualifier"),
]:
if obj:
if isinstance(obj, list):
for item in obj:
_add_obj_id_to_set(item, ids_in_studies)
_add_obj_id_to_set(item, ids_in_stmts)
else: # This is a dictionary
_add_obj_id_to_set(obj, ids_in_studies)
_add_obj_id_to_set(obj, ids_in_stmts)

return ids_in_studies
return ids_in_stmts


def _add_study(tx: ManagedTransaction, study_in: dict) -> None:
"""Add study node and its relationships
def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships
:param tx: Transaction object provided to transaction functions
:param study_in: Statement CDM object
:param statement_in: Statement CDM object
"""
study = study_in.copy()
study_type = study["type"]
study_keys = _create_parameterized_query(
study, ("id", "description", "direction", "predicate", "type")
statement = statement_in.copy()
statement_type = statement["type"]
statement_keys = _create_parameterized_query(
statement, ("id", "description", "direction", "predicate", "type")
)

match_line = ""
rel_line = ""

is_reported_in_docs = study.get("reportedIn", [])
is_reported_in_docs = statement.get("reportedIn", [])
for ri_doc in is_reported_in_docs:
ri_doc_id = ri_doc["id"]
name = f"doc_{ri_doc_id.split(':')[-1]}"
match_line += f"MERGE ({name} {{ id: '{ri_doc_id}'}})\n"
rel_line += f"MERGE (s) -[:IS_REPORTED_IN] -> ({name})\n"

allele_origin = study.get("alleleOriginQualifier")
allele_origin = statement.get("alleleOriginQualifier")
if allele_origin:
study["alleleOriginQualifier"] = allele_origin
statement["alleleOriginQualifier"] = allele_origin
match_line += "SET s.alleleOriginQualifier=$alleleOriginQualifier\n"

gene_context_id = study.get("geneContextQualifier", {}).get("id")
gene_context_id = statement.get("geneContextQualifier", {}).get("id")
if gene_context_id:
match_line += f"MERGE (g:Gene {{id: '{gene_context_id}'}})\n"
rel_line += "MERGE (s) -[:HAS_GENE_CONTEXT] -> (g)\n"

method_id = study["specifiedBy"]["id"]
method_id = statement["specifiedBy"]["id"]
match_line += f"MERGE (m {{ id: '{method_id}' }})\n"
rel_line += "MERGE (s) -[:IS_SPECIFIED_BY] -> (m)\n"

coding = study.get("strength")
coding = statement.get("strength")
if coding:
coding_key_fields = ("code", "label", "system")

Expand All @@ -435,75 +435,76 @@ def _add_study(tx: ManagedTransaction, study_in: dict) -> None:
for k in coding_key_fields:
v = coding.get(k)
if v:
study[f"coding_{k}"] = v
statement[f"coding_{k}"] = v

match_line += f"MERGE (c:Coding {{ {coding_keys} }})\n"
rel_line += "MERGE (s) -[:HAS_STRENGTH] -> (c)\n"

variant_id = study["subjectVariant"]["id"]
variant_id = statement["subjectVariant"]["id"]
match_line += f"MERGE (v:Variation {{ id: '{variant_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_VARIANT] -> (v)\n"

therapeutic_id = study["objectTherapeutic"]["id"]
therapeutic_id = statement["objectTherapeutic"]["id"]
match_line += f"MERGE (t:TherapeuticProcedure {{ id: '{therapeutic_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_THERAPEUTIC] -> (t)\n"

tumor_type_id = study["conditionQualifier"]["id"]
tumor_type_id = statement["conditionQualifier"]["id"]
match_line += f"MERGE (tt:Condition {{ id: '{tumor_type_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_TUMOR_TYPE] -> (tt)\n"

query = f"""
MERGE (s:{study_type}:StudyStatement:Statement {{ {study_keys} }})
MERGE (s:{statement_type}:StudyStatement:Statement {{ {statement_keys} }})
{match_line}
{rel_line}
"""

tx.run(query, **study)
tx.run(query, **statement)


def add_transformed_data(driver: Driver, data: dict) -> None:
"""Add set of data formatted per Common Data Model to DB.
:param data: contains key/value pairs for data objects to add to DB, including
studies, variation, therapeutic procedures, conditions, genes, methods,
statements, variation, therapeutic procedures, conditions, genes, methods,
documents, etc.
"""
# Used to keep track of IDs that are in studies. This is used to prevent adding
# nodes that aren't associated to studies
ids_in_studies = _get_ids_from_studies(data.get("studies", []))
# Used to keep track of IDs that are in statements. This is used to prevent adding
# nodes that aren't associated to statements
statements = data.get("statements", [])
ids_in_stmts = _get_ids_from_stmts(statements)

with driver.session() as session:
loaded_study_count = 0
loaded_stmt_count = 0

for cv in data.get("categorical_variants", []):
session.execute_write(_add_categorical_variant, cv, ids_in_studies)
session.execute_write(_add_categorical_variant, cv, ids_in_stmts)

for doc in data.get("documents", []):
session.execute_write(_add_document, doc, ids_in_studies)
session.execute_write(_add_document, doc, ids_in_stmts)

for method in data.get("methods", []):
session.execute_write(_add_method, method, ids_in_studies)
session.execute_write(_add_method, method, ids_in_stmts)

for obj_type in {"genes", "conditions"}:
for obj in data.get(obj_type, []):
session.execute_write(_add_gene_or_disease, obj, ids_in_studies)
session.execute_write(_add_gene_or_disease, obj, ids_in_stmts)

for tp in data.get("therapeutic_procedures", []):
session.execute_write(_add_therapeutic_procedure, tp, ids_in_studies)
session.execute_write(_add_therapeutic_procedure, tp, ids_in_stmts)

# This should always be done last
for study in data.get("studies", []):
session.execute_write(_add_study, study)
loaded_study_count += 1
for statement in statements:
session.execute_write(_add_statement, statement)
loaded_stmt_count += 1

_logger.info("Successfully loaded %s studies.", loaded_study_count)
_logger.info("Successfully loaded %s statements.", loaded_stmt_count)


def load_from_json(src_transformed_cdm: Path, driver: Driver | None = None) -> None:
"""Load evidence into DB from given CDM JSON file.
:param src_transformed_cdm: path to file for a source's transformed data to
common data model containing studies, variation, therapeutic procedures,
common data model containing statements, variation, therapeutic procedures,
conditions, genes, methods, documents, etc.
:param driver: Neo4j graph driver, if available
"""
Expand Down
Loading

0 comments on commit 1217429

Please sign in to comment.