diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index e8d949ecd..ecab8af57 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -11,16 +11,16 @@ anderson: gs://genetics-portal-input/v2g_input/andersson2014/enhancer_tss_associ javierre: gs://genetics-portal-input/v2g_input/javierre_2016_preprocessed.parquet jung: gs://genetics-portal-raw/pchic_jung2019/jung2019_pchic_tableS3.csv thurman: gs://genetics-portal-input/v2g_input/thurman2012/genomewideCorrs_above0.7_promoterPlusMinus500kb_withGeneNames_32celltypeCategories.bed8.gz -catalog_associations: ${datasets.inputs}/v2d/gwas_catalog_v1.0.2-associations_e110_r2023-11-24.tsv +catalog_associations: ${datasets.inputs}/v2d/gwas_catalog_v1.0.2-associations_e110_r2023-12-21.tsv catalog_studies: # To get a complete representation of all GWAS Catalog studies, we need to # ingest the list of unpublished studies from a different file. - - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-studies-r2023-11-24.tsv - - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-studies-r2023-11-24.tsv + - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-studies-r2023-12-21.tsv + - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-studies-r2023-12-21.tsv catalog_ancestries: - - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-ancestries-r2023-11-24.tsv - - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-ancestries-r2023-11-24.tsv -catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-11-24a.txt + - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-ancestries-r2023-12-21.tsv + - ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-ancestries-r2023-12-21.tsv +catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-12-21.txt ukbiobank_manifest: gs://genetics-portal-input/ukb_phenotypes/neale2_saige_study_manifest.190430.tsv l2g_gold_standard_curation: ${datasets.inputs}/l2g/gold_standard/curation.json gene_interactions: ${datasets.inputs}/l2g/interaction # 23.09 data @@ -40,6 +40,7 @@ v2g: ${datasets.outputs}/v2g ld_index: ${datasets.outputs}/ld_index catalog_study_index: ${datasets.study_index}/catalog catalog_study_locus: ${datasets.study_locus}/catalog_study_locus +gwas_catalog_study_curation: ${datasets.inputs}/v2d/GWAS_Catalog_study_curation.tsv finngen_study_index: ${datasets.study_index}/finngen finngen_summary_stats: ${datasets.summary_statistics}/finngen from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats diff --git a/config/step/clump.yaml b/config/step/clump.yaml deleted file mode 100644 index 04790663f..000000000 --- a/config/step/clump.yaml +++ /dev/null @@ -1,6 +0,0 @@ -_target_: otg.clump.ClumpStep -input_path: ??? -ld_index_path: null -study_index_path: null -locus_collect_distance: null -clumped_study_locus_path: ??? diff --git a/config/step/gwas_catalog_curation_update.yaml b/config/step/gwas_catalog_curation_update.yaml new file mode 100644 index 000000000..979439153 --- /dev/null +++ b/config/step/gwas_catalog_curation_update.yaml @@ -0,0 +1,6 @@ +_target_: otg.gwas_catalog_study_curation.GWASCatalogStudyCurationStep +catalog_study_files: ${datasets.catalog_studies} +catalog_ancestry_files: ${datasets.catalog_ancestries} +catalog_sumstats_lut: ${datasets.catalog_sumstats_lut} +gwas_catalog_study_curation_file: ${datasets.gwas_catalog_study_curation} +gwas_catalog_study_curation_out: ??? diff --git a/config/step/gwas_catalog_ingestion.yaml b/config/step/gwas_catalog_ingestion.yaml index 863c75ac2..66fe37ce9 100644 --- a/config/step/gwas_catalog_ingestion.yaml +++ b/config/step/gwas_catalog_ingestion.yaml @@ -1,4 +1,4 @@ -_target_: otg.gwas_catalog.GWASCatalogStep +_target_: otg.gwas_catalog_ingestion.GWASCatalogIngestionStep catalog_study_files: ${datasets.catalog_studies} catalog_ancestry_files: ${datasets.catalog_ancestries} catalog_associations_file: ${datasets.catalog_associations} @@ -6,3 +6,5 @@ catalog_sumstats_lut: ${datasets.catalog_sumstats_lut} variant_annotation_path: ${datasets.variant_annotation} catalog_studies_out: ${datasets.catalog_study_index} catalog_associations_out: ${datasets.catalog_study_locus} +gwas_catalog_study_curation_file: ${datasets.gwas_catalog_study_curation} +inclusion_list_path: ??? diff --git a/config/step/gwas_study_inclusion.yaml b/config/step/gwas_study_inclusion.yaml new file mode 100644 index 000000000..60916381b --- /dev/null +++ b/config/step/gwas_study_inclusion.yaml @@ -0,0 +1,10 @@ +_target_: otg.gwas_catalog_study_inclusion.GWASCatalogInclusionGenerator +catalog_study_files: ${datasets.catalog_studies} +catalog_ancestry_files: ${datasets.catalog_ancestries} +catalog_associations_file: ${datasets.catalog_associations} +variant_annotation_path: ${datasets.variant_annotation} +gwas_catalog_study_curation_file: ${datasets.gwas_catalog_study_curation} +harmonised_study_file: ??? +criteria: ??? +inclusion_list_path: ??? +exclusion_list_path: ??? diff --git a/config/step/ld_based_clumping.yaml b/config/step/ld_based_clumping.yaml new file mode 100644 index 000000000..eac354e4f --- /dev/null +++ b/config/step/ld_based_clumping.yaml @@ -0,0 +1,5 @@ +_target_: otg.ld_based_clumping.LdBasedClumpingStep +study_locus_input_path: ??? +ld_index_path: ??? +study_index_path: ??? +clumped_study_locus_output_path: ??? diff --git a/config/step/window_based_clumping.yaml b/config/step/window_based_clumping.yaml new file mode 100644 index 000000000..55cb952c8 --- /dev/null +++ b/config/step/window_based_clumping.yaml @@ -0,0 +1,5 @@ +_target_: otg.window_based_clumping.WindowBasedClumpingStep +summary_statistics_input_path: ??? +study_locus_output_path: ??? +inclusion_list_path: ??? +locus_collect_distance: null diff --git a/docs/python_api/step/clump.md b/docs/python_api/step/clump.md deleted file mode 100644 index aae56ad9e..000000000 --- a/docs/python_api/step/clump.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: Clump ---- - -::: otg.clump.ClumpStep diff --git a/docs/python_api/step/gwas_catalog_curation.md b/docs/python_api/step/gwas_catalog_curation.md new file mode 100644 index 000000000..d7ea38d7f --- /dev/null +++ b/docs/python_api/step/gwas_catalog_curation.md @@ -0,0 +1,5 @@ +--- +title: Apply in-house curation on GWAS Catalog studies +--- + +::: otg.gwas_catalog_study_curation.GWASCatalogStudyCurationStep diff --git a/docs/python_api/step/gwas_catalog_inclusion.md b/docs/python_api/step/gwas_catalog_inclusion.md new file mode 100644 index 000000000..b9f088a76 --- /dev/null +++ b/docs/python_api/step/gwas_catalog_inclusion.md @@ -0,0 +1,5 @@ +--- +title: Generate inclusion and exclusions lists for GWAS Catalog study ingestion. +--- + +::: otg.gwas_catalog_study_inclusion.GWASCatalogInclusionGenerator diff --git a/docs/python_api/step/ld_clump.md b/docs/python_api/step/ld_clump.md new file mode 100644 index 000000000..28973f8bf --- /dev/null +++ b/docs/python_api/step/ld_clump.md @@ -0,0 +1,5 @@ +--- +title: LD-based clumping +--- + +::: otg.ld_based_clumping.LdBasedClumpingStep diff --git a/docs/python_api/step/window_based_clumping.md b/docs/python_api/step/window_based_clumping.md new file mode 100644 index 000000000..ac8ea506a --- /dev/null +++ b/docs/python_api/step/window_based_clumping.md @@ -0,0 +1,5 @@ +--- +title: Window-based clumping +--- + +::: otg.window_based_clumping.WindowBasedClumpingStep diff --git a/src/airflow/dags/common_airflow.py b/src/airflow/dags/common_airflow.py index a859a870e..b2deb9268 100644 --- a/src/airflow/dags/common_airflow.py +++ b/src/airflow/dags/common_airflow.py @@ -19,8 +19,7 @@ from pathlib import Path # Code version. It has to be repeated here as well as in `pyproject.toml`, because Airflow isn't able to look at files outside of its `dags/` directory. -OTG_VERSION = "1.0.0" - +OTG_VERSION = "0.0.0" # Cloud configuration. GCP_PROJECT = "open-targets-genetics-dev" @@ -29,7 +28,6 @@ GCP_DATAPROC_IMAGE = "2.1" GCP_AUTOSCALING_POLICY = "otg-etl" - # Cluster init configuration. INITIALISATION_BASE_PATH = ( f"gs://genetics_etl_python_playground/initialisation/{OTG_VERSION}" @@ -40,18 +38,17 @@ f"{INITIALISATION_BASE_PATH}/install_dependencies_on_cluster.sh" ] - # CLI configuration. CLUSTER_CONFIG_DIR = "/config" CONFIG_NAME = "config" PYTHON_CLI = "cli.py" - # Shared DAG construction parameters. shared_dag_args = { "owner": "Open Targets Data Team", "retries": 0, } + shared_dag_kwargs = { "tags": ["genetics_etl", "experimental"], "start_date": pendulum.now(tz="Europe/London").subtract(days=1), diff --git a/src/airflow/dags/gwas_catalog_harmonisation.py b/src/airflow/dags/gwas_catalog_harmonisation.py index 180c7048a..41815caa2 100644 --- a/src/airflow/dags/gwas_catalog_harmonisation.py +++ b/src/airflow/dags/gwas_catalog_harmonisation.py @@ -99,7 +99,6 @@ def submit_jobs(**kwargs: Any) -> None: ], ) - # list_inputs >> ( [list_inputs, list_outputs] >> create_to_do_list() diff --git a/src/airflow/dags/gwas_catalog_preprocess.py b/src/airflow/dags/gwas_catalog_preprocess.py index e21dcd9bd..c3d094dd2 100644 --- a/src/airflow/dags/gwas_catalog_preprocess.py +++ b/src/airflow/dags/gwas_catalog_preprocess.py @@ -5,14 +5,39 @@ import common_airflow as common from airflow.models.dag import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator from airflow.utils.task_group import TaskGroup -from airflow.utils.trigger_rule import TriggerRule CLUSTER_NAME = "otg-preprocess-gwascatalog" AUTOSCALING = "otg-preprocess-gwascatalog" -SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised" RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" +RELEASEBUCKET_NAME = "genetics_etl_python_playground" +SUMMARY_STATS_BUCKET_NAME = "open-targets-gwas-summary-stats" +SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised" +MANIFESTS_PATH = f"{RELEASEBUCKET}/manifests/" + + +def upload_harmonized_study_list( + concatenated_studies: str, bucket_name: str, object_name: str +) -> None: + """This function uploads file to GCP. + + Args: + concatenated_studies (str): Concatenated list of harmonized summary statistics. + bucket_name (str): Bucket name + object_name (str): Name of the object + """ + hook = GCSHook(gcp_conn_id="google_cloud_default") + hook.upload( + bucket_name=bucket_name, + object_name=object_name, + data=concatenated_studies, + encoding="utf-8", + ) + with DAG( dag_id=Path(__file__).stem, @@ -20,77 +45,149 @@ default_args=common.shared_dag_args, **common.shared_dag_kwargs, ): - with TaskGroup(group_id="summary_stats_preprocessing") as summary_stats_group: - summary_stats_window_clumping = common.submit_step( + # Getting list of folders (each a gwas study with summary statistics) + list_harmonised_sumstats = GCSListObjectsOperator( + task_id="list_harmonised_parquet", + bucket=SUMMARY_STATS_BUCKET_NAME, + prefix="harmonised", + match_glob="**/_SUCCESS", + ) + + # Upload resuling list to a bucket: + upload_task = PythonOperator( + task_id="uploader", + python_callable=upload_harmonized_study_list, + op_kwargs={ + "concatenated_studies": '{{ "\n".join(ti.xcom_pull( key="return_value", task_ids="list_harmonised_parquet")) }}', + "bucket_name": RELEASEBUCKET_NAME, + "object_name": "output/python_etl/parquet/XX.XX/manifests/harmonised_sumstats.txt", + }, + ) + + # Processing curated GWAS Catalog top-bottom: + with TaskGroup(group_id="curation_processing") as curation_processing: + # Generate inclusion list: + curation_calculate_inclusion_list = common.submit_step( cluster_name=CLUSTER_NAME, - step_id="clump", - task_id="catalog_sumstats_window_clumping", + step_id="gwas_study_inclusion", + task_id="catalog_curation_inclusion_list", other_args=[ - f"step.input_path={SUMSTATS}", - f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", + "step.criteria=curation", + f"step.inclusion_list_path={MANIFESTS_PATH}manifest_curation", + f"step.exclusion_list_path={MANIFESTS_PATH}exclusion_curation", + f"step.harmonised_study_file={MANIFESTS_PATH}harmonised_sumstats.txt", ], ) - summary_stats_ld_clumping = common.submit_step( + + # Ingest curated associations from GWAS Catalog: + curation_ingest_data = common.submit_step( cluster_name=CLUSTER_NAME, - step_id="clump", - task_id="catalog_sumstats_ld_clumping", + step_id="gwas_catalog_ingestion", + task_id="ingest_curated_gwas_catalog_data", + other_args=[f"step.inclusion_list_path={MANIFESTS_PATH}manifest_curation"], + ) + + # Run LD-annotation and clumping on curated data: + curation_ld_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="ld_based_clumping", + task_id="catalog_curation_ld_clumping", other_args=[ - f"step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", - "step.ld_index_path={RELEASEBUCKET}/ld_index", - "step.study_index_path={RELEASEBUCKET}/study_index/catalog", - "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", + f"step.study_locus_input_path={RELEASEBUCKET}/study_locus/catalog_curated", + f"step.ld_index_path={RELEASEBUCKET}/ld_index", + f"step.study_index_path={RELEASEBUCKET}/study_index/catalog", + f"step.clumped_study_locus_output_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", ], - trigger_rule=TriggerRule.ALL_DONE, ) - summary_stats_pics = common.submit_step( + + # Do PICS based finemapping: + curation_pics = common.submit_step( cluster_name=CLUSTER_NAME, step_id="pics", - task_id="catalog_sumstats_pics", + task_id="catalog_curation_pics", other_args=[ - "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", - "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog", + f"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", + f"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated", ], - trigger_rule=TriggerRule.ALL_DONE, ) - summary_stats_window_clumping >> summary_stats_ld_clumping >> summary_stats_pics - with TaskGroup(group_id="curation_preprocessing") as curation_group: - parse_study_and_curated_assocs = common.submit_step( + # Define order of steps: + ( + curation_calculate_inclusion_list + >> curation_ingest_data + >> curation_ld_clumping + >> curation_pics + ) + + # Processing summary statistics from GWAS Catalog: + with TaskGroup( + group_id="summary_satistics_processing" + ) as summary_satistics_processing: + # Generate inclusion study lists: + summary_stats_calculate_inclusion_list = common.submit_step( cluster_name=CLUSTER_NAME, - step_id="gwas_catalog_ingestion", - task_id="catalog_ingestion", + step_id="gwas_study_inclusion", + task_id="catalog_sumstats_inclusion_list", + other_args=[ + "step.criteria=summary_stats", + f"step.inclusion_list_path={MANIFESTS_PATH}manifest_sumstats", + f"step.exclusion_list_path={MANIFESTS_PATH}exclusion_sumstats", + f"step.harmonised_study_file={MANIFESTS_PATH}harmonised_sumstats.txt", + ], ) - curation_ld_clumping = common.submit_step( + # Run window-based clumping: + summary_stats_window_based_clumping = common.submit_step( cluster_name=CLUSTER_NAME, - step_id="clump", - task_id="catalog_curation_ld_clumping", + step_id="window_based_clumping", + task_id="catalog_sumstats_window_clumping", other_args=[ - "step.input_path={RELEASEBUCKET}/study_locus/catalog_curated", - "step.ld_index_path={RELEASEBUCKET}/ld_index", - "step.study_index_path={RELEASEBUCKET}/study_index/catalog", - "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", + f"step.summary_statistics_input_path={SUMSTATS}", + f"step.study_locus_output_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", + f"step.inclusion_list_path={MANIFESTS_PATH}manifest_sumstats", ], - trigger_rule=TriggerRule.ALL_DONE, ) - curation_pics = common.submit_step( + # Run LD based clumping: + summary_stats_ld_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="ld_based_clumping", + task_id="catalog_sumstats_ld_clumping", + other_args=[ + f"step.study_locus_input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", + f"step.ld_index_path={RELEASEBUCKET}/ld_index", + f"step.study_index_path={RELEASEBUCKET}/study_index/catalog", + f"step.clumped_study_locus_output_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", + ], + ) + + # Run PICS finemapping: + summary_stats_pics = common.submit_step( cluster_name=CLUSTER_NAME, step_id="pics", - task_id="catalog_curation_pics", + task_id="catalog_sumstats_pics", other_args=[ - "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", - "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated", + f"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", + f"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog", ], - trigger_rule=TriggerRule.ALL_DONE, ) - parse_study_and_curated_assocs >> curation_ld_clumping >> curation_pics + # Order of steps within the group: + ( + summary_stats_calculate_inclusion_list + >> summary_stats_window_based_clumping + >> summary_stats_ld_clumping + >> summary_stats_pics + ) + + # DAG description: ( common.create_cluster( CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5 ) >> common.install_dependencies(CLUSTER_NAME) - >> [summary_stats_group, curation_group] - >> common.delete_cluster(CLUSTER_NAME) + >> list_harmonised_sumstats + >> upload_task + >> curation_processing + >> summary_satistics_processing ) diff --git a/src/airflow/dags/gwas_curation_update.py b/src/airflow/dags/gwas_curation_update.py new file mode 100644 index 000000000..1ef0f39f9 --- /dev/null +++ b/src/airflow/dags/gwas_curation_update.py @@ -0,0 +1,33 @@ +"""DAG for updating GWAS Catalog curation table.""" +from __future__ import annotations + +from datetime import datetime +from pathlib import Path + +import common_airflow as common +from airflow.models.dag import DAG + +CLUSTER_NAME = "otg-gwascatalog-curation" +RUN_DATE = datetime.today().strftime("%Y-%m-%d") + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — GWAS Catalog curation update", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +): + update_gwas_curation = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="gwas_catalog_curation_update", + task_id="gwas_catalog_curation_update", + other_args=[ + f"step.gwas_catalog_study_curation_out=gs://genetics_etl_python_playground/input/v2d/GWAS_Catalog_study_curation_{RUN_DATE}.tsv", + ], + ) + + # DAG description: + ( + common.create_cluster(CLUSTER_NAME, num_workers=2) + >> common.install_dependencies(CLUSTER_NAME) + >> update_gwas_curation + ) diff --git a/src/otg/assets/schemas/study_index.json b/src/otg/assets/schemas/study_index.json index f7e577921..08f61c072 100644 --- a/src/otg/assets/schemas/study_index.json +++ b/src/otg/assets/schemas/study_index.json @@ -193,6 +193,26 @@ "nullable": true, "metadata": {} }, + { + "name": "qualityControls", + "type": { + "type": "array", + "elementType": "string", + "containsNull": true + }, + "nullable": true, + "metadata": {} + }, + { + "name": "analysisFlags", + "type": { + "type": "array", + "elementType": "string", + "containsNull": true + }, + "nullable": true, + "metadata": {} + }, { "name": "summarystatsLocation", "type": "string", diff --git a/src/otg/clump.py b/src/otg/clump.py deleted file mode 100644 index 6ace21076..000000000 --- a/src/otg/clump.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Step to run clump associations from summary statistics or study locus.""" -from __future__ import annotations - -from dataclasses import dataclass, field - -from omegaconf import MISSING - -from otg.common.session import Session -from otg.dataset.ld_index import LDIndex -from otg.dataset.study_index import StudyIndex -from otg.dataset.study_locus import StudyLocus -from otg.dataset.summary_statistics import SummaryStatistics - - -@dataclass -class ClumpStep: - """Perform clumping of an association dataset to identify independent signals. - - Two types of clumping are supported and are applied based on the input dataset: - - Clumping of summary statistics based on a window-based approach. - - Clumping of study locus based on LD. - - Both approaches yield a StudyLocus dataset. - - Attributes: - session (Session): Session object. - input_path (str): Input path for the study locus or summary statistics files. - study_index_path (str): Path to study index. - ld_index_path (str): Path to LD index. - locus_collect_distance (int | None): The distance to collect locus around semi-indices. - clumped_study_locus_path (str): Output path for the clumped study locus dataset. - """ - - session: Session = MISSING - input_path: str = MISSING - clumped_study_locus_path: str = MISSING - study_index_path: str | None = field(default=None) - ld_index_path: str | None = field(default=None) - - locus_collect_distance: int | None = field(default=None) - - def __post_init__(self: ClumpStep) -> None: - """Run the clumping step. - - Raises: - ValueError: If study index and LD index paths are not provided for study locus. - """ - input_cols = self.session.spark.read.parquet( - self.input_path, recursiveFileLookup=True - ).columns - if "studyLocusId" in input_cols: - if self.study_index_path is None or self.ld_index_path is None: - raise ValueError( - "Study index and LD index paths are required for clumping study locus." - ) - study_locus = StudyLocus.from_parquet(self.session, self.input_path) - ld_index = LDIndex.from_parquet(self.session, self.ld_index_path) - study_index = StudyIndex.from_parquet(self.session, self.study_index_path) - - clumped_study_locus = study_locus.annotate_ld( - study_index=study_index, ld_index=ld_index - ).clump() - else: - sumstats = SummaryStatistics.from_parquet( - self.session, self.input_path, recursiveFileLookup=True - ).coalesce(4000) - clumped_study_locus = sumstats.window_based_clumping( - locus_collect_distance=self.locus_collect_distance - ) - - clumped_study_locus.df.write.mode(self.session.write_mode).parquet( - self.clumped_study_locus_path - ) diff --git a/src/otg/common/session.py b/src/otg/common/session.py index a985d4dcd..ae1d36651 100644 --- a/src/otg/common/session.py +++ b/src/otg/common/session.py @@ -124,21 +124,22 @@ def _create_merged_config( def read_parquet( self: Session, - path: str, + path: str | list[str], schema: StructType, **kwargs: bool | float | int | str | None, ) -> DataFrame: - """Reads parquet dataset with a provided schema. + """Reads parquet dataset (provided as a single path or a list of paths) with a provided schema. Args: - path (str): parquet dataset path + path (str | list[str]): path to the parquet dataset schema (StructType): Spark schema **kwargs (bool | float | int | str | None): Additional arguments to pass to spark.read.parquet Returns: DataFrame: Dataframe with provided schema """ - return self.spark.read.schema(schema).parquet(path, **kwargs) + path = [path] if isinstance(path, str) else path + return self.spark.read.schema(schema).parquet(*path, **kwargs) class Log4j: diff --git a/src/otg/dataset/dataset.py b/src/otg/dataset/dataset.py index 0d04a2779..bc79b4159 100644 --- a/src/otg/dataset/dataset.py +++ b/src/otg/dataset/dataset.py @@ -72,14 +72,14 @@ def get_schema(cls: type[Self]) -> StructType: def from_parquet( cls: type[Self], session: Session, - path: str, + path: str | list[str], **kwargs: bool | float | int | str | None, ) -> Self: - """Reads a parquet file into a Dataset with a given schema. + """Reads parquet into a Dataset with a given schema. Args: session (Session): Spark session - path (str): Path to the parquet file + path (str | list[str]): Path to the parquet dataset **kwargs (bool | float | int | str | None): Additional arguments to pass to spark.read.parquet Returns: @@ -89,7 +89,7 @@ def from_parquet( ValueError: Parquet file is empty """ schema = cls.get_schema() - df = session.read_parquet(path=path, schema=schema, **kwargs) + df = session.read_parquet(path, schema=schema, **kwargs) if df.isEmpty(): raise ValueError(f"Parquet file is empty: {path}") return cls(_df=df, _schema=schema) diff --git a/src/otg/dataset/study_index.py b/src/otg/dataset/study_index.py index f928e84b5..6fc84aa1a 100644 --- a/src/otg/dataset/study_index.py +++ b/src/otg/dataset/study_index.py @@ -139,3 +139,47 @@ def study_type_lut(self: StudyIndex) -> DataFrame: DataFrame: A dataframe containing `studyId` and `studyType` columns. """ return self.df.select("studyId", "studyType") + + def is_qtl(self: StudyIndex) -> Column: + """Return a boolean column with true values for QTL studies. + + Returns: + Column: True if the study is a QTL study. + """ + return self.df.studyType.endswith("qtl") + + def is_gwas(self: StudyIndex) -> Column: + """Return a boolean column with true values for GWAS studies. + + Returns: + Column: True if the study is a GWAS study. + """ + return self.df.studyType == "gwas" + + def has_mapped_trait(self: StudyIndex) -> Column: + """Return a boolean column indicating if a study has mapped disease. + + Returns: + Column: True if the study has mapped disease. + """ + return f.size(self.df.traitFromSourceMappedIds) > 0 + + def is_quality_flagged(self: StudyIndex) -> Column: + """Return a boolean column indicating if a study is flagged due to quality issues. + + Returns: + Column: True if the study is flagged. + """ + # Testing for the presence of the qualityControls column: + if "qualityControls" not in self.df.columns: + return f.lit(False) + else: + return f.size(self.df.qualityControls) != 0 + + def has_summarystats(self: StudyIndex) -> Column: + """Return a boolean column indicating if a study has harmonized summary statistics. + + Returns: + Column: True if the study has harmonized summary statistics. + """ + return self.df.hasSumstats diff --git a/src/otg/datasource/gwas_catalog/associations.py b/src/otg/datasource/gwas_catalog/associations.py index 9c3e6293f..94cf3c465 100644 --- a/src/otg/datasource/gwas_catalog/associations.py +++ b/src/otg/datasource/gwas_catalog/associations.py @@ -1116,3 +1116,19 @@ def qc_ambiguous_study(self: StudyLocusGWASCatalog) -> StudyLocusGWASCatalog: ), ) return self + + def apply_inclusion_list( + self: StudyLocusGWASCatalog, inclusion_list: DataFrame + ) -> StudyLocusGWASCatalog: + """Restricting GWAS Catalog studies based on a list of accpected study ids. + + Args: + inclusion_list (DataFrame): List of accepted GWAS Catalog study identifiers + + Returns: + StudyLocusGWASCatalog: Filtered dataset. + """ + return StudyLocusGWASCatalog( + _df=self.df.join(inclusion_list, on="studyId", how="inner"), + _schema=StudyLocusGWASCatalog.get_schema(), + ) diff --git a/src/otg/datasource/gwas_catalog/study_index.py b/src/otg/datasource/gwas_catalog/study_index.py index 27988c29d..c2fe1239b 100644 --- a/src/otg/datasource/gwas_catalog/study_index.py +++ b/src/otg/datasource/gwas_catalog/study_index.py @@ -6,7 +6,9 @@ import pyspark.sql.functions as f import pyspark.sql.types as t +from pyspark import SparkFiles +from otg.common.session import Session from otg.common.spark_helpers import column2camel_case from otg.common.utils import parse_efos from otg.dataset.study_index import StudyIndex @@ -15,6 +17,48 @@ from pyspark.sql import Column, DataFrame +def read_curation_table( + curation_path: str | None, session: Session +) -> DataFrame | None: + """Read curation table if path or URL is given. + + Curation itself is fully optional everything should work without it. + + Args: + curation_path (str | None): Optionally given path the curation tsv. + session (Session): session object + + Returns: + DataFrame | None: if curation was provided, + """ + # If no curation path provided, we are returning none: + if curation_path is None: + return None + # Read curation from the web: + elif curation_path.startswith("http"): + # Registering file: + session.spark.sparkContext.addFile(curation_path) + + # Reading file: + curation_df = session.spark.read.csv( + SparkFiles.get(curation_path.split("/")[-1]), sep="\t", header=True + ) + # Read curation from file: + else: + curation_df = session.spark.read.csv(curation_path, sep="\t", header=True) + return curation_df.select( + "studyId", + "studyType", + f.when(f.col("analysisFlag").isNotNull(), f.array(f.col("analysisFlag"))) + .otherwise(f.array()) + .alias("analysisFlags"), + f.when(f.col("qualityControl").isNotNull(), f.array(f.col("qualityControl"))) + .otherwise(f.array()) + .alias("qualityControls"), + f.col("isCurated").cast(t.BooleanType()), + ) + + @dataclass class StudyIndexGWASCatalogParser: """GWAS Catalog study index parser. @@ -340,6 +384,138 @@ def update_study_id( return self + def annotate_from_study_curation( + self: StudyIndexGWASCatalog, curation_table: DataFrame | None + ) -> StudyIndexGWASCatalog: + """Annotating study index with curation. + + Args: + curation_table (DataFrame | None): Curated GWAS Catalog studies with summary statistics + + Returns: + StudyIndexGWASCatalog: Updated study index + """ + # Providing curation table is optional. However once this method is called, the quality and studyFlag columns are added. + if curation_table is None: + return self + + columns = self.df.columns + + # Adding prefix to columns in the curation table: + curation_table = curation_table.select( + *[ + f.col(column).alias(f"curation_{column}") + if column != "studyId" + else f.col(column) + for column in curation_table.columns + ] + ) + + # Create expression how to update/create quality controls dataset: + qualityControls_expression = ( + f.col("curation_qualityControls") + if "qualityControls" not in columns + else f.when( + f.col("curation_qualityControls").isNotNull(), + f.array_union( + f.col("qualityControls"), f.array(f.col("curation_qualityControls")) + ), + ).otherwise(f.col("qualityControls")) + ) + + # Create expression how to update/create analysis flag: + analysis_expression = ( + f.col("curation_analysisFlags") + if "analysisFlags" not in columns + else f.when( + f.col("curation_analysisFlags").isNotNull(), + f.array_union( + f.col("analysisFlags"), f.array(f.col("curation_analysisFlags")) + ), + ).otherwise(f.col("analysisFlags")) + ) + + # Updating columns list. We might or might not list columns twice, but that doesn't matter, unique set will generated: + columns = list(set(columns + ["qualityControls", "analysisFlags"])) + + # Based on the curation table, columns needs to be updated: + curated_df = ( + self.df.join(curation_table, on="studyId", how="left") + # Updating study type: + .withColumn( + "studyType", f.coalesce(f.col("curation_studyType"), f.col("studyType")) + ) + # Updating quality controls: + .withColumn("qualityControls", qualityControls_expression) + # Updating study annotation flags: + .withColumn("analysisFlags", analysis_expression) + # Dropping columns coming from the curation table: + .select(*columns) + ) + return StudyIndexGWASCatalog( + _df=curated_df, _schema=StudyIndexGWASCatalog.get_schema() + ) + + def extract_studies_for_curation( + self: StudyIndexGWASCatalog, curation: DataFrame | None + ) -> DataFrame: + """Extract studies for curation. + + Args: + curation (DataFrame | None): Dataframe with curation. + + Returns: + DataFrame: Updated curation table. New studies are have the `isCurated` False. + """ + # If no curation table provided, assume all studies needs curation: + if curation is None: + return ( + self.df + # Curation only applyed on studies with summary statistics: + .filter(f.col("hasSumstats")) + # Adding columns expected in the curation table - array columns aready flattened: + .withColumn("studyType", f.lit(None).cast(t.StringType())) + .withColumn("analysisFlags", f.lit(None).cast(t.StringType())) + .withColumn("qualityControls", f.lit(None).cast(t.StringType())) + .withColumn("isCurated", f.lit(False).cast(t.StringType())) + ) + + # Adding prefix to columns in the curation table: + curation = curation.select( + *[ + f.col(column).alias(f"curation_{column}") + if column != "studyId" + else f.col(column) + for column in curation.columns + ] + ) + + return ( + self.df + # Curation only applyed on studies with summary statistics: + .filter(f.col("hasSumstats")) + .join(curation, on="studyId", how="left") + .select( + "studyId", + # Propagate existing curation - array columns are being flattened: + f.col("curation_studyType").alias("studyType"), + f.array_join(f.col("curation_analysisFlags"), "|").alias( + "analysisFlags" + ), + f.array_join(f.col("curation_qualityControls"), "|").alias( + "qualityControls" + ), + # This boolean flag needs to be casted to string, because saving to tsv would fail otherwise: + f.coalesce(f.col("curation_isCurated"), f.lit(False)) + .cast(t.StringType()) + .alias("isCurated"), + # The following columns are propagated to make curation easier: + "pubmedId", + "publicationTitle", + "traitFromSource", + ) + ) + def annotate_ancestries( self: StudyIndexGWASCatalog, ancestry_lut: DataFrame ) -> StudyIndexGWASCatalog: @@ -463,8 +639,9 @@ def annotate_ancestries( parsed_ancestry_lut = ancestry_stages.join( europeans_deconvoluted, on="studyId", how="outer" + ).select( + "studyId", "discoverySamples", "ldPopulationStructure", "replicationSamples" ) - self.df = self.df.join(parsed_ancestry_lut, on="studyId", how="left").join( cohorts, on="studyId", how="left" ) @@ -480,11 +657,19 @@ def annotate_sumstats_info( Returns: StudyIndexGWASCatalog: including `summarystatsLocation` and `hasSumstats` columns + + Raises: + ValueError: if the sumstats_lut table doesn't have the right columns """ gwas_sumstats_base_uri = ( "ftp://ftp.ebi.ac.uk/pub/databases/gwas/summary_statistics/" ) + if "_c0" not in sumstats_lut.columns: + raise ValueError( + f'Sumstats look-up table needs to have `_c0` column. However it has: {",".join(sumstats_lut.columns)}' + ) + parsed_sumstats_lut = sumstats_lut.withColumn( "summarystatsLocation", f.concat( @@ -492,13 +677,10 @@ def annotate_sumstats_info( f.regexp_replace(f.col("_c0"), r"^\.\/", ""), ), ).select( - f.regexp_extract(f.col("summarystatsLocation"), r"\/(GCST\d+)\/", 1).alias( - "studyId" - ), + self._parse_gwas_catalog_study_id("summarystatsLocation").alias("studyId"), "summarystatsLocation", f.lit(True).alias("hasSumstats"), ) - self.df = ( self.df.drop("hasSumstats") .join(parsed_sumstats_lut, on="studyId", how="left") @@ -550,3 +732,41 @@ def annotate_discovery_sample_sizes( ) self.df = self.df.join(sample_size_lut, on="studyId", how="left") return self + + def apply_inclusion_list( + self: StudyIndexGWASCatalog, inclusion_list: DataFrame + ) -> StudyIndexGWASCatalog: + """Restricting GWAS Catalog studies based on a list of accepted study identifiers. + + Args: + inclusion_list (DataFrame): List of accepted GWAS Catalog study identifiers + + Returns: + StudyIndexGWASCatalog: Filtered dataset. + """ + return StudyIndexGWASCatalog( + _df=self.df.join(inclusion_list, on="studyId", how="inner"), + _schema=StudyIndexGWASCatalog.get_schema(), + ) + + @staticmethod + def _parse_gwas_catalog_study_id(sumstats_path_column: str) -> Column: + """Extract GWAS Catalog study accession from the summary statistics path. + + Args: + sumstats_path_column (str): column *name* for the summary statistics path + + Returns: + Column: GWAS Catalog study accession. + + Examples: + >>> data = [ + ... ('./GCST90086001-GCST90087000/GCST90086758/harmonised/35078996-GCST90086758-EFO_0007937.h.tsv.gz',), + ... ('gs://open-targets-gwas-summary-stats/harmonised/GCST000568.parquet/',), + ... (None,) + ... ] + >>> spark.createDataFrame(data, ['testColumn']).select(StudyIndexGWASCatalog._parse_gwas_catalog_study_id('testColumn').alias('accessions')).collect() + [Row(accessions='GCST90086758'), Row(accessions='GCST000568'), Row(accessions=None)] + """ + accesions = f.expr(rf"regexp_extract_all({sumstats_path_column}, '(GCST\\d+)')") + return accesions[f.size(accesions) - 1] diff --git a/src/otg/datasource/gwas_catalog/study_splitter.py b/src/otg/datasource/gwas_catalog/study_splitter.py index d03c5c8c5..673db82fa 100644 --- a/src/otg/datasource/gwas_catalog/study_splitter.py +++ b/src/otg/datasource/gwas_catalog/study_splitter.py @@ -10,8 +10,8 @@ if TYPE_CHECKING: from pyspark.sql import Column + from otg.datasource.gwas_catalog.associations import StudyLocusGWASCatalog from otg.datasource.gwas_catalog.study_index import StudyIndexGWASCatalog - from otg.datasource.gwas_catalog.study_locus_dataset import StudyLocusGWASCatalog class GWASCatalogStudySplitter: diff --git a/src/otg/gwas_catalog_ingestion.py b/src/otg/gwas_catalog_ingestion.py index bea0bbed2..d47f82337 100644 --- a/src/otg/gwas_catalog_ingestion.py +++ b/src/otg/gwas_catalog_ingestion.py @@ -10,7 +10,10 @@ from otg.datasource.gwas_catalog.associations import ( GWASCatalogCuratedAssociationsParser, ) -from otg.datasource.gwas_catalog.study_index import StudyIndexGWASCatalogParser +from otg.datasource.gwas_catalog.study_index import ( + StudyIndexGWASCatalogParser, + read_curation_table, +) from otg.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter @@ -26,10 +29,11 @@ class GWASCatalogIngestionStep: catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. catalog_associations_file (str): Raw GWAS catalog associations file. + gwas_catalog_study_curation_file (str | None): file of the curation table. Optional. variant_annotation_path (str): Input variant annotation path. - ld_populations (list): List of populations to include. catalog_studies_out (str): Output GWAS catalog studies path. catalog_associations_out (str): Output GWAS catalog associations path. + inclusion_list_path (str | None): optional inclusion list (parquet) """ session: Session = MISSING @@ -37,19 +41,21 @@ class GWASCatalogIngestionStep: catalog_ancestry_files: list[str] = MISSING catalog_sumstats_lut: str = MISSING catalog_associations_file: str = MISSING + gwas_catalog_study_curation_file: str | None = None variant_annotation_path: str = MISSING catalog_studies_out: str = MISSING catalog_associations_out: str = MISSING + inclusion_list_path: str | None = None def __post_init__(self: GWASCatalogIngestionStep) -> None: """Run step.""" # Extract va = VariantAnnotation.from_parquet(self.session, self.variant_annotation_path) catalog_studies = self.session.spark.read.csv( - self.catalog_study_files, sep="\t", header=True + list(self.catalog_study_files), sep="\t", header=True ) ancestry_lut = self.session.spark.read.csv( - self.catalog_ancestry_files, sep="\t", header=True + list(self.catalog_ancestry_files), sep="\t", header=True ) sumstats_lut = self.session.spark.read.csv( self.catalog_sumstats_lut, sep="\t", header=False @@ -57,15 +63,27 @@ def __post_init__(self: GWASCatalogIngestionStep) -> None: catalog_associations = self.session.spark.read.csv( self.catalog_associations_file, sep="\t", header=True ).persist() + gwas_catalog_study_curation = read_curation_table( + self.gwas_catalog_study_curation_file, self.session + ) # Transform study_index, study_locus = GWASCatalogStudySplitter.split( StudyIndexGWASCatalogParser.from_source( catalog_studies, ancestry_lut, sumstats_lut - ), + ).annotate_from_study_curation(gwas_catalog_study_curation), GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va), ) + # if inclusion list is provided apply filter: + if self.inclusion_list_path: + inclusion_list = self.session.spark.read.parquet( + self.inclusion_list_path, sep="\t", header=True + ) + + study_index = study_index.apply_inclusion_list(inclusion_list) + study_locus = study_locus.apply_inclusion_list(inclusion_list) + # Load study_index.df.write.mode(self.session.write_mode).parquet( self.catalog_studies_out diff --git a/src/otg/gwas_catalog_study_curation.py b/src/otg/gwas_catalog_study_curation.py new file mode 100644 index 000000000..97f3a0b0a --- /dev/null +++ b/src/otg/gwas_catalog_study_curation.py @@ -0,0 +1,62 @@ +"""Step to update GWAS Catalog study curation file based on newly released GWAS Catalog dataset.""" +from __future__ import annotations + +from dataclasses import dataclass + +from omegaconf import MISSING + +from otg.common.session import Session +from otg.datasource.gwas_catalog.study_index import ( + StudyIndexGWASCatalogParser, + read_curation_table, +) + + +@dataclass +class GWASCatalogStudyCurationStep: + """Create an updated curation table for GWAS Catalog study table. + + Attributes: + session (Session): Session object. + catalog_study_files (list[str]): List of raw GWAS catalog studies file. + catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. + catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. + gwas_catalog_study_curation_file (str | None): Path to the original curation table. Optinal + gwas_catalog_study_curation_out (str): Path for the updated curation table. + """ + + session: Session = MISSING + catalog_study_files: list[str] = MISSING + catalog_ancestry_files: list[str] = MISSING + catalog_sumstats_lut: str = MISSING + gwas_catalog_study_curation_file: str | None = MISSING + gwas_catalog_study_curation_out: str = MISSING + + def __post_init__(self: GWASCatalogStudyCurationStep) -> None: + """Run step.""" + catalog_studies = self.session.spark.read.csv( + list(self.catalog_study_files), sep="\t", header=True + ) + ancestry_lut = self.session.spark.read.csv( + list(self.catalog_ancestry_files), sep="\t", header=True + ) + sumstats_lut = self.session.spark.read.csv( + self.catalog_sumstats_lut, sep="\t", header=False + ) + gwas_catalog_study_curation = read_curation_table( + self.gwas_catalog_study_curation_file, self.session + ) + + # Process GWAS Catalog studies and get list of studies for curation: + ( + StudyIndexGWASCatalogParser.from_source( + catalog_studies, ancestry_lut, sumstats_lut + ) + # Adding existing curation: + .annotate_from_study_curation(gwas_catalog_study_curation) + # Extract new studies for curation: + .extract_studies_for_curation(gwas_catalog_study_curation) + # Save table: + .toPandas() + .to_csv(self.gwas_catalog_study_curation_out, sep="\t", index=False) + ) diff --git a/src/otg/gwas_catalog_study_inclusion.py b/src/otg/gwas_catalog_study_inclusion.py new file mode 100644 index 000000000..8dd95d85f --- /dev/null +++ b/src/otg/gwas_catalog_study_inclusion.py @@ -0,0 +1,170 @@ +"""Step to generate an GWAS Catalog study identifier inclusion and exclusion list.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from omegaconf import MISSING +from pyspark.sql import functions as f + +from otg.common.session import Session +from otg.dataset.variant_annotation import VariantAnnotation +from otg.datasource.gwas_catalog.associations import ( + GWASCatalogCuratedAssociationsParser, +) +from otg.datasource.gwas_catalog.study_index import ( + StudyIndexGWASCatalog, + StudyIndexGWASCatalogParser, + read_curation_table, +) +from otg.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter + +if TYPE_CHECKING: + from pyspark.sql import Column, DataFrame + + +@dataclass +class GWASCatalogInclusionGenerator: + """Generate GWAS Catalog studies eligible for ingestion based on curation and the provided criteria. + + Attributes: + session (Session): Session object. + catalog_study_files (list[str]): List of raw GWAS catalog studies file. + catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. + catalog_associations_file (str): Raw GWAS catalog associations file. + variant_annotation_path (str): Input variant annotation path. + gwas_catalog_study_curation_file (str | None): file of the curation table. Optional. + harmonised_study_file (str): file with the list of all successfully harmonized summary statistics. + criteria (str): inclusion criteria depending on the dataset ("summary_stats" or "curation") + inclusion_list_path (str): output file name of the list of study identifiers that can be ingested. + exclusion_list_path (str): output file name of the list of study identifiers that excluded from ingestion. + """ + + session: Session = MISSING + # GWAS Catalog sources: + catalog_study_files: list[str] = MISSING + catalog_ancestry_files: list[str] = MISSING + catalog_associations_file: str = MISSING + # Custom curation: + gwas_catalog_study_curation_file: str | None = None + # GnomAD source: + variant_annotation_path: str = MISSING + # Dynamic sources: + harmonised_study_file: str = MISSING + criteria: str = MISSING + # Output: + inclusion_list_path: str = MISSING + exclusion_list_path: str = MISSING + + @staticmethod + def flag_eligible_studies( + study_index: StudyIndexGWASCatalog, criteria: str + ) -> DataFrame: + """Apply filter on GWAS Catalog studies based on the provided criteria. + + Args: + study_index (StudyIndexGWASCatalog): complete study index to be filtered based on the provided filter set + criteria (str): name of the filter set to be applied. + + Raises: + ValueError: if the provided filter set is not in the accepted values. + + Returns: + DataFrame: filtered dataframe containing only eligible studies. + """ + filters: dict[str, Column] = { + # Filters applied on studies for ingesting curated associations: + "curation": (study_index.is_gwas() & study_index.has_mapped_trait()), + # Filters applied on studies for ingesting summary statistics: + "summary_stats": ( + study_index.is_gwas() + & study_index.has_mapped_trait() + & (~study_index.is_quality_flagged()) + & study_index.has_summarystats() + ), + } + + if criteria not in filters: + raise ValueError( + f'Wrong value as filter set ({criteria}). Accepted: {",".join(filters.keys())}' + ) + + # Applying the relevant filter to the study: + return study_index.df.select( + "studyId", + "studyType", + "traitFromSource", + "traitFromSourceMappedIds", + "qualityControls", + "hasSumstats", + filters[criteria].alias("isEligible"), + ) + + @staticmethod + def process_harmonised_list(studies: list[str], session: Session) -> DataFrame: + """Generate spark dataframe from the provided list. + + Args: + studies (list[str]): list of path pointing to harmonised summary statistics. + session (Session): session + + Returns: + DataFrame: column name is consistent with original implementatin + """ + return session.spark.createDataFrame([{"_c0": path} for path in studies]) + + def get_gwas_catalog_study_index( + self: GWASCatalogInclusionGenerator, + ) -> StudyIndexGWASCatalog: + """Return GWAS Catalog study index. + + Returns: + StudyIndexGWASCatalog: Completely processed and fully annotated study index. + """ + # Extract + va = VariantAnnotation.from_parquet(self.session, self.variant_annotation_path) + catalog_studies = self.session.spark.read.csv( + list(self.catalog_study_files), sep="\t", header=True + ) + ancestry_lut = self.session.spark.read.csv( + list(self.catalog_ancestry_files), sep="\t", header=True + ) + sumstats_lut = self.session.spark.read.csv( + self.harmonised_study_file, sep="\t", header=False + ) + catalog_associations = self.session.spark.read.csv( + self.catalog_associations_file, sep="\t", header=True + ).persist() + gwas_catalog_study_curation = read_curation_table( + self.gwas_catalog_study_curation_file, self.session + ) + + # Transform + study_index, study_locus = GWASCatalogStudySplitter.split( + StudyIndexGWASCatalogParser.from_source( + catalog_studies, + ancestry_lut, + sumstats_lut, + ).annotate_from_study_curation(gwas_catalog_study_curation), + GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va), + ) + + return study_index + + def __post_init__(self: GWASCatalogInclusionGenerator) -> None: + """Run step.""" + # Create study index: + study_index = self.get_gwas_catalog_study_index() + + # Get study indices for inclusion: + flagged_studies = self.flag_eligible_studies(study_index, self.criteria) + + # Output inclusion list: + eligible = ( + flagged_studies.filter(f.col("isEligible")).select("studyId").persist() + ) + eligible.write.mode(self.session.write_mode).parquet(self.inclusion_list_path) + + # Output exclusion list: + excluded = flagged_studies.filter(~f.col("isEligible")).persist() + excluded.write.mode(self.session.write_mode).parquet(self.exclusion_list_path) diff --git a/src/otg/ld_based_clumping.py b/src/otg/ld_based_clumping.py new file mode 100644 index 000000000..6158f9d36 --- /dev/null +++ b/src/otg/ld_based_clumping.py @@ -0,0 +1,51 @@ +"""Step to apply linkageg based clumping on study-locus dataset.""" +from __future__ import annotations + +from dataclasses import dataclass + +from omegaconf import MISSING + +from otg.common.session import Session +from otg.dataset.ld_index import LDIndex +from otg.dataset.study_index import StudyIndex +from otg.dataset.study_locus import StudyLocus + + +@dataclass +class LdBasedClumpingStep: + """Step to perform LD-based clumping on study locus dataset. + + As a first step, study locus is enriched with population specific linked-variants. + That's why the study index and the ld index is required for this step. Study loci are flaggged + in the resulting dataset, which can be explained by a more significant association + from the same study. + + Attributes: + session (Session): Session object + study_locus_input_path (str): Path to the input study locus. + study_index_path (str): Path to the study index. + ld_index_path (str): Path to the LD index. + clumped_study_locus_output_path (str): path of the resulting, clumped study-locus dataset. + """ + + session: Session = MISSING + study_locus_input_path: str = MISSING + study_index_path: str = MISSING + ld_index_path: str = MISSING + clumped_study_locus_output_path: str = MISSING + + def __post_init__(self: LdBasedClumpingStep) -> None: + """Run clumping step.""" + study_locus = StudyLocus.from_parquet(self.session, self.study_locus_input_path) + ld_index = LDIndex.from_parquet(self.session, self.ld_index_path) + study_index = StudyIndex.from_parquet(self.session, self.study_index_path) + + ( + study_locus + # Annotating study locus with LD information: + .annotate_ld(study_index, ld_index) + .clump() + # Save result: + .df.write.mode(self.session.write_mode) + .parquet(self.clumped_study_locus_output_path) + ) diff --git a/src/otg/window_based_clumping.py b/src/otg/window_based_clumping.py new file mode 100644 index 000000000..690bdd882 --- /dev/null +++ b/src/otg/window_based_clumping.py @@ -0,0 +1,56 @@ +"""Step to run window based clumping on summary statistics datasts.""" +from __future__ import annotations + +from dataclasses import dataclass, field + +from omegaconf import MISSING + +from otg.common.session import Session +from otg.dataset.summary_statistics import SummaryStatistics + + +@dataclass +class WindowBasedClumpingStep: + """Apply window based clumping on summary statistics datasets. + + Attributes: + session (Session): Session object. + summary_statistics_input_path (str): Path to the harmonized summary statistics dataset. + study_locus_output_path (str): Output path for the resulting study locus dataset. + inclusion_list_path (str | None): Path to the inclusion list (list of white-listed study identifier). Optional. + locus_collect_distance (int | None): Distance, within which tagging variants are collected around the semi-index. Optional. + """ + + session: Session = MISSING + summary_statistics_input_path: str = MISSING + study_locus_output_path: str = MISSING + inclusion_list_path: str | None = field(default=None) + locus_collect_distance: int | None = field(default=None) + + def __post_init__(self: WindowBasedClumpingStep) -> None: + """Run the clumping step.""" + # If inclusion list path is provided, only these studies will be read: + if self.inclusion_list_path: + study_ids_to_ingest = [ + f'{self.summary_statistics_input_path}/{row["studyId"]}.parquet' + for row in self.session.spark.read.parquet( + self.inclusion_list_path + ).collect() + ] + else: + # If no inclusion list is provided, read all summary stats in folder: + study_ids_to_ingest = [self.summary_statistics_input_path] + + ( + SummaryStatistics.from_parquet( + self.session, + study_ids_to_ingest, + recursiveFileLookup=True, + ) + .coalesce(4000) + # Applying window based clumping: + .window_based_clumping(locus_collect_distance=self.locus_collect_distance) + # Save resulting study locus dataset: + .df.write.mode(self.session.write_mode) + .parquet(self.study_locus_output_path) + ) diff --git a/tests/conftest.py b/tests/conftest.py index 29f90d7e5..e73dd1a95 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -449,7 +449,7 @@ def sample_gwas_catalog_associations(spark: SparkSession) -> DataFrame: def sample_summary_satistics(spark: SparkSession) -> SummaryStatistics: """Sample GWAS raw associations sample data.""" return SummaryStatistics( - _df=spark.read.parquet("tests/data_samples/GCST005523_chr18.parquet"), + _df=spark.read.parquet("tests/data_samples/sumstats_sample"), _schema=SummaryStatistics.get_schema(), ) diff --git a/tests/data_samples/GCST005523_chr18.parquet b/tests/data_samples/sumstats_sample/GCST005523_chr18.parquet similarity index 100% rename from tests/data_samples/GCST005523_chr18.parquet rename to tests/data_samples/sumstats_sample/GCST005523_chr18.parquet diff --git a/tests/datasource/gwas_catalog/test_gwas_catalog_curation.py b/tests/datasource/gwas_catalog/test_gwas_catalog_curation.py new file mode 100644 index 000000000..cfd71108e --- /dev/null +++ b/tests/datasource/gwas_catalog/test_gwas_catalog_curation.py @@ -0,0 +1,258 @@ +"""Test GWAS Catalog study curation logic.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pyspark.sql.functions as f +import pyspark.sql.types as t +import pytest +from otg.datasource.gwas_catalog.study_index import StudyIndexGWASCatalog +from pyspark.sql import DataFrame + +if TYPE_CHECKING: + from pyspark.sql import SparkSession + + +class TestGWASCatalogStudyCuration: + """Test suite for GWAS Catalog study curation logic.""" + + @pytest.fixture(scope="class") + def mock_gwas_study_index( + self: TestGWASCatalogStudyCuration, + spark: SparkSession, + ) -> StudyIndexGWASCatalog: + """Generate a dataset with a given set of studies with minimalistic schema.""" + # Generating a study index with 7 studies where the first study (s0) has no summary statistics: + study_data = [ + (f"s{count}", False) if count == 0 else (f"s{count}", True) + for count in range(0, 7) + ] + columns = [ + "projectId", + "studyType", + "traitFromSource", + "publicationTitle", + "publicationFirstAuthor", + "pubmedId", + ] + return StudyIndexGWASCatalog( + _df=( + spark.createDataFrame(study_data, ["studyId", "hasSumstats"]).select( + "*", *[f.lit("foo").alias(colname) for colname in columns] + ) + ), + _schema=StudyIndexGWASCatalog.get_schema(), + ) + + @pytest.fixture(scope="class") + def mock_study_curation( + self: TestGWASCatalogStudyCuration, spark: SparkSession + ) -> DataFrame: + """Generate a mocked curation dataset with matching studies.""" + curation_data = [ + ("s2", None, None, None, True), # Good study + ("s3", "pQTL", None, None, True), # Update type + ("s4", None, ["analysis 1"], None, True), # Add analysis flag + ("s5", None, None, ["QC flag"], True), # Add analysis flag + ] + + curation_columns = [ + "studyId", + "studyType", + "analysisFlags", + "qualityControls", + "isCurated", + ] + return ( + spark.createDataFrame(curation_data, curation_columns) + .withColumn("isCurated", f.col("isCurated").cast(t.BooleanType())) + .persist() + ) + + @staticmethod + def test_curation__return_type( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Test return types.""" + # Method should work on empty curation: + assert isinstance( + mock_gwas_study_index.annotate_from_study_curation(None), + StudyIndexGWASCatalog, + ), f"Applying curation without curation table should yield a study table, but got: {type(mock_gwas_study_index.annotate_from_study_curation(None))}" + + # Return type should work: + assert isinstance( + mock_gwas_study_index.annotate_from_study_curation(mock_study_curation), + StudyIndexGWASCatalog, + ), f"Applying curation should return a study table, however got: {type(mock_gwas_study_index.annotate_from_study_curation(mock_study_curation))}" + + @staticmethod + def test_curation__returned_rows( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Test return types.""" + expected_count = mock_gwas_study_index.df.count() + zero_return_count = mock_gwas_study_index.annotate_from_study_curation( + None + ).df.count() + return_count = mock_gwas_study_index.annotate_from_study_curation( + mock_study_curation + ).df.count() + # Method should work on empty curation: + assert ( + zero_return_count == expected_count + ), f"When applied None to curation function, the size of the returned data was not as expected ({zero_return_count} vs {expected_count})." + + # Return type should work: + assert ( + return_count == expected_count + ), f"When applied curation data, the size of the returned data was not as expected ({return_count} vs {expected_count})." + + # Test updated type + @staticmethod + def test_curation__study_type_update( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Test for making sure the study type got updated.""" + curated = mock_gwas_study_index.annotate_from_study_curation( + mock_study_curation + ) + + # Expected studyIds: + expected = [ + row["studyId"] + for row in ( + mock_study_curation.filter(f.col("studyType").isNotNull()) + .select("studyId") + .distinct() + .collect() + ) + ] + + observed = [ + row["studyId"] + for row in ( + curated.df.filter(f.col("studyType") != "foo") + .select("studyId") + .distinct() + .collect() + ) + ] + + assert expected == observed + + # Test update qc flag + @staticmethod + def test_curation__quality_controls( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Test for making sure the study type got updated.""" + curated = mock_gwas_study_index.annotate_from_study_curation( + mock_study_curation + ) + + # Expected studyIds: + expected = [ + row["studyId"] + for row in ( + mock_study_curation.filter(f.col("qualityControls").isNotNull()) + .select("studyId") + .distinct() + .collect() + ) + ] + + observed = [ + row["studyId"] + for row in ( + curated.df.filter(f.size(f.col("qualityControls")) > 0) + .select("studyId") + .distinct() + .collect() + ) + ] + + assert expected == observed + + # Test updated method flag + @staticmethod + def test_curation__analysis_flags( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Test for making sure the study type got updated.""" + curated = mock_gwas_study_index.annotate_from_study_curation( + mock_study_curation + ) + + # Expected studyIds: + expected = [ + row["studyId"] + for row in ( + mock_study_curation.filter(f.col("analysisFlags").isNotNull()) + .select("studyId") + .distinct() + .collect() + ) + ] + + observed = [ + row["studyId"] + for row in ( + curated.df.filter(f.size(f.col("analysisFlags")) > 0) + .select("studyId") + .distinct() + .collect() + ) + ] + + assert expected == observed + + # Test curation extraction: + @staticmethod + def test_extract_curation__return_type( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Testing if the extracted curation table has the right type.""" + assert isinstance( + mock_gwas_study_index.extract_studies_for_curation(mock_study_curation), + DataFrame, + ) + # we don't need to have existing curation: + assert isinstance( + mock_gwas_study_index.extract_studies_for_curation(None), + DataFrame, + ) + + @staticmethod + def test_extract_curation__rows( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Testing if the extracted curation table has the right type.""" + new_curation_count = mock_gwas_study_index.extract_studies_for_curation( + mock_study_curation + ).count() + new_empty_count = mock_gwas_study_index.extract_studies_for_curation( + mock_study_curation + ).count() + expected_count = mock_gwas_study_index.df.filter(f.col("hasSumstats")).count() + assert new_curation_count == expected_count + assert new_empty_count == expected_count + + @staticmethod + def test_extract_curation__new_studies_to_curate( + mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame + ) -> None: + """Testing if the extracted curation table has the right type.""" + new_curation_count = ( + mock_gwas_study_index.extract_studies_for_curation(mock_study_curation) + .filter(f.lower(f.col("isCurated")) == "false") + .count() + ) + new_empty_count = ( + mock_gwas_study_index.extract_studies_for_curation(None) + .filter(f.lower(f.col("isCurated")) == "false") + .count() + ) + assert new_curation_count == 2 + assert new_empty_count == 6 diff --git a/tests/step/test_clump_step.py b/tests/step/test_clump_step.py index d5f706f9c..f928d78ed 100644 --- a/tests/step/test_clump_step.py +++ b/tests/step/test_clump_step.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import TYPE_CHECKING -from otg.clump import ClumpStep +from otg.window_based_clumping import WindowBasedClumpingStep if TYPE_CHECKING: from otg.common.session import Session @@ -18,9 +18,9 @@ def test_clumpstep_summary_stats(self, session: Session) -> None: """Test clump step on summary statistics writes results to a temporary directory.""" with tempfile.TemporaryDirectory() as temp_dir: clumped_study_locus_path = Path(temp_dir, "GCST005523_chr18_clumped") - ClumpStep( + WindowBasedClumpingStep( session=session, - input_path="tests/data_samples/GCST005523_chr18.parquet", - clumped_study_locus_path=str(clumped_study_locus_path), + summary_statistics_input_path="tests/data_samples/sumstats_sample", + study_locus_output_path=str(clumped_study_locus_path), ) assert Path(clumped_study_locus_path).exists(), "Output directory exists."