From 338d55fdf2a700565e3c40f24ca07c938506a072 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 20 Dec 2023 22:05:18 +0000 Subject: [PATCH 1/5] chore: remove unnecessary file --- config/datasets/gcp.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index c198b4d96..e8d949ecd 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -24,7 +24,6 @@ catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-11-24a.txt ukbiobank_manifest: gs://genetics-portal-input/ukb_phenotypes/neale2_saige_study_manifest.190430.tsv l2g_gold_standard_curation: ${datasets.inputs}/l2g/gold_standard/curation.json gene_interactions: ${datasets.inputs}/l2g/interaction # 23.09 data -finngen_phenotype_table_url: https://r9.finngen.fi/api/phenos eqtl_catalogue_paths_imported: ${datasets.inputs}/preprocess/eqtl_catalogue/tabix_ftp_paths_imported.tsv # Output datasets From f681f58e92a38a917eca64c57cedc23c3461f778 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 20 Dec 2023 22:16:47 +0000 Subject: [PATCH 2/5] fix: several fixes on finngen harmonisation and preprocess --- config/step/finngen.yaml | 3 - config/step/finngen_studies.yaml | 2 + config/step/finngen_sumstat_preprocess.yaml | 3 + src/airflow/dags/finngen_harmonisation.py | 77 +++++++++++++++++++++ src/airflow/dags/finngen_preprocess.py | 31 +++++---- src/otg/datasource/finngen/study_index.py | 8 +-- src/otg/datasource/finngen/summary_stats.py | 14 ++-- src/otg/finngen.py | 50 ------------- src/otg/finngen_studies.py | 31 +++++++++ src/otg/finngen_sumstat_preprocess.py | 36 ++++++++++ 10 files changed, 179 insertions(+), 76 deletions(-) delete mode 100644 config/step/finngen.yaml create mode 100644 config/step/finngen_studies.yaml create mode 100644 config/step/finngen_sumstat_preprocess.yaml create mode 100644 src/airflow/dags/finngen_harmonisation.py delete mode 100644 src/otg/finngen.py create mode 100644 src/otg/finngen_studies.py create mode 100644 src/otg/finngen_sumstat_preprocess.py diff --git a/config/step/finngen.yaml b/config/step/finngen.yaml deleted file mode 100644 index fb049db37..000000000 --- a/config/step/finngen.yaml +++ /dev/null @@ -1,3 +0,0 @@ -_target_: otg.finngen.FinnGenStep -finngen_study_index_out: ${datasets.finngen_study_index} -finngen_summary_stats_out: ${datasets.finngen_summary_stats} diff --git a/config/step/finngen_studies.yaml b/config/step/finngen_studies.yaml new file mode 100644 index 000000000..23b58c443 --- /dev/null +++ b/config/step/finngen_studies.yaml @@ -0,0 +1,2 @@ +_target_: otg.finngen_studies.FinnGenStudiesStep +finngen_study_index_out: ${datasets.finngen_study_index} diff --git a/config/step/finngen_sumstat_preprocess.yaml b/config/step/finngen_sumstat_preprocess.yaml new file mode 100644 index 000000000..319e7af63 --- /dev/null +++ b/config/step/finngen_sumstat_preprocess.yaml @@ -0,0 +1,3 @@ +_target_: otg.finngen_sumstat_preprocess.FinnGenSumstatPreprocessStep +raw_sumstats_path: ??? +out_sumstats_path: ??? diff --git a/src/airflow/dags/finngen_harmonisation.py b/src/airflow/dags/finngen_harmonisation.py new file mode 100644 index 000000000..749d3fc4e --- /dev/null +++ b/src/airflow/dags/finngen_harmonisation.py @@ -0,0 +1,77 @@ +"""Airflow DAG for the harmonisation part of the pipeline.""" +from __future__ import annotations + +import re +import time +from pathlib import Path +from typing import Any + +import common_airflow as common +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator + +CLUSTER_NAME = "otg-finngen-harmonisation" +AUTOSCALING = "gwascatalog-harmonisation" # same as GWAS Catalog harmonisation +SUMMARY_STATS_BUCKET_NAME = "finngen-public-data-r10" +RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" +SUMSTATS_PARQUET = f"{RELEASEBUCKET}/summary_statistics/finngen" + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — Finngen harmonisation", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +): + # List raw harmonised files from GWAS Catalog + list_inputs = GCSListObjectsOperator( + task_id="list_raw_sumstats", + bucket=SUMMARY_STATS_BUCKET_NAME, + prefix="summary_stats", + match_glob="**/*.gz", + ) + + # Submit jobs to dataproc + @task(task_id="submit_jobs") + def submit_jobs(**kwargs: Any) -> None: + """Submit jobs to dataproc. + + Args: + **kwargs (Any): Keyword arguments. + """ + ti = kwargs["ti"] + todo = ti.xcom_pull(task_ids="list_raw_sumstats", key="return_value") + print("Number of jobs to submit: ", len(todo)) # noqa: T201 + for i in range(len(todo)): + # Not to exceed default quota 400 jobs per minute + if i > 0 and i % 399 == 0: + time.sleep(60) + input_path = todo[i] + match_result = re.search(r"summary_stats/finngen_(.*).gz", input_path) + if match_result: + study_id = match_result.group(1) + print("Submitting job for study: ", study_id) # noqa: T201 + common.submit_pyspark_job_no_operator( + cluster_name=CLUSTER_NAME, + step_id="finngen_sumstat_preprocess", + other_args=[ + f"step.raw_sumstats_path=gs://{SUMMARY_STATS_BUCKET_NAME}/{input_path}", + f"step.out_sumstats_path={SUMSTATS_PARQUET}/{study_id}.parquet", + ], + ) + + # list_inputs >> + ( + list_inputs + >> common.create_cluster( + CLUSTER_NAME, + autoscaling_policy=AUTOSCALING, + num_workers=8, + # num_preemptible_workers=8, + master_machine_type="n1-highmem-32", + worker_machine_type="n1-standard-2", + ) + >> common.install_dependencies(CLUSTER_NAME) + >> submit_jobs() + # >> common.delete_cluster(CLUSTER_NAME) + ) diff --git a/src/airflow/dags/finngen_preprocess.py b/src/airflow/dags/finngen_preprocess.py index e1febd34f..3ca5f3907 100644 --- a/src/airflow/dags/finngen_preprocess.py +++ b/src/airflow/dags/finngen_preprocess.py @@ -11,12 +11,12 @@ AUTOSCALING = "finngen-preprocess" RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" -SUMSTATS = "{RELEASEBUCKET}/summary_statistics/finngen" +SUMSTATS = f"{RELEASEBUCKET}/summary_statistics/finngen" WINDOWBASED_CLUMPED = ( - "{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_window_clumped/finngen" + f"{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_window_clumped/finngen" ) -LD_CLUMPED = "{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_ld_clumped/finngen" -PICSED = "{RELEASEBUCKET}/credible_set/from_sumstats_study_locus/finngen" +LD_CLUMPED = f"{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_ld_clumped/finngen" +PICSED = f"{RELEASEBUCKET}/credible_set/from_sumstats_study_locus/finngen" with DAG( dag_id=Path(__file__).stem, @@ -24,10 +24,10 @@ default_args=common.shared_dag_args, **common.shared_dag_kwargs, ): - study_and_sumstats = common.submit_step( + study_index = common.submit_step( cluster_name=CLUSTER_NAME, - step_id="finngen", - task_id="finngen_sumstats_and_study_index", + step_id="finngen_studies", + task_id="finngen_studies", ) window_based_clumping = common.submit_step( @@ -35,8 +35,8 @@ step_id="clump", task_id="finngen_window_based_clumping", other_args=[ - "step.input_path={SUMSTATS}", - "step.clumped_study_locus_path={WINDOWBASED_CLUMPED}", + f"step.input_path={SUMSTATS}", + f"step.clumped_study_locus_path={WINDOWBASED_CLUMPED}", ], ) ld_clumping = common.submit_step( @@ -44,8 +44,10 @@ step_id="clump", task_id="finngen_ld_clumping", other_args=[ - "step.input_path={WINDOWBASED_CLUMPED}", - "step.clumped_study_locus_path={LD_CLUMPED}", + f"step.input_path={WINDOWBASED_CLUMPED}", + f"step.ld_index_path={RELEASEBUCKET}/ld_index", + f"step.study_index_path={RELEASEBUCKET}/study_index/finngen", + f"step.clumped_study_locus_path={LD_CLUMPED}", ], trigger_rule=TriggerRule.ALL_DONE, ) @@ -64,10 +66,13 @@ ( common.create_cluster( - CLUSTER_NAME, autoscaling_policy=AUTOSCALING, master_disk_size=2000 + CLUSTER_NAME, + autoscaling_policy=AUTOSCALING, + master_disk_size=2000, + num_workers=6, ) >> common.install_dependencies(CLUSTER_NAME) - >> study_and_sumstats + >> study_index >> window_based_clumping >> ld_clumping >> pics diff --git a/src/otg/datasource/finngen/study_index.py b/src/otg/datasource/finngen/study_index.py index 0ebd1438a..5ab30ebe0 100644 --- a/src/otg/datasource/finngen/study_index.py +++ b/src/otg/datasource/finngen/study_index.py @@ -14,7 +14,7 @@ class FinnGenStudyIndex: The following information is aggregated/extracted: - - Study ID in the special format (FINNGEN_R9_*) + - Study ID in the special format (e.g. FINNGEN_R10_*) - Trait name (for example, Amoebiasis) - Number of cases and controls - Link to the summary statistics location @@ -22,10 +22,10 @@ class FinnGenStudyIndex: Some fields are also populated as constants, such as study type and the initial sample size. """ - finngen_phenotype_table_url: str = "https://r9.finngen.fi/api/phenos" - finngen_release_prefix: str = "FINNGEN_R9" + finngen_phenotype_table_url: str = "https://r10.finngen.fi/api/phenos" + finngen_release_prefix: str = "FINNGEN_R10" finngen_summary_stats_url_prefix: str = ( - "gs://finngen-public-data-r9/summary_stats/finngen_R9_" + "gs://finngen-public-data-r10/summary_stats/finngen_R10_" ) finngen_summary_stats_url_suffix: str = ".gz" diff --git a/src/otg/datasource/finngen/summary_stats.py b/src/otg/datasource/finngen/summary_stats.py index 281792a08..8fc966c5b 100644 --- a/src/otg/datasource/finngen/summary_stats.py +++ b/src/otg/datasource/finngen/summary_stats.py @@ -39,28 +39,27 @@ class FinnGenSummaryStats: def from_source( cls: type[FinnGenSummaryStats], spark: SparkSession, - raw_files: list[str], + raw_file: str, ) -> SummaryStatistics: """Ingests all summary statst for all FinnGen studies. Args: spark (SparkSession): Spark session object. - raw_files (list[str]): Paths to raw summary statistics .gz files. + raw_file (str): Path to raw summary statistics .gz files. Returns: SummaryStatistics: Processed summary statistics dataset """ + study_id = raw_file.split("/")[-1].split(".")[0].upper() processed_summary_stats_df = ( spark.read.schema(cls.raw_schema) .option("delimiter", "\t") - .csv(raw_files, header=True) + .csv(raw_file, header=True) # Drop rows which don't have proper position. .filter(f.col("pos").cast(t.IntegerType()).isNotNull()) .select( # From the full path, extracts just the filename, and converts to upper case to get the study ID. - f.upper(f.regexp_extract(f.input_file_name(), r"([^/]+)\.gz", 1)).alias( - "studyId" - ), + f.lit(study_id).alias("studyId"), # Add variant information. f.concat_ws( "_", @@ -82,6 +81,9 @@ def from_source( .filter( f.col("pos").cast(t.IntegerType()).isNotNull() & (f.col("beta") != 0) ) + # Average ~20Mb partitions with 30 partitions per study + .repartitionByRange(30, "chromosome", "position") + .sortWithinPartitions("chromosome", "position") ) # Initializing summary statistics object: diff --git a/src/otg/finngen.py b/src/otg/finngen.py deleted file mode 100644 index 6b179e2c5..000000000 --- a/src/otg/finngen.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Step to run FinnGen study table ingestion.""" - -from __future__ import annotations - -from dataclasses import dataclass - -from omegaconf import MISSING - -from otg.common.session import Session -from otg.datasource.finngen.study_index import FinnGenStudyIndex -from otg.datasource.finngen.summary_stats import FinnGenSummaryStats - - -@dataclass -class FinnGenStep: - """FinnGen ingestion step. - - Attributes: - session (Session): Session object. - finngen_study_index_out (str): Output path for the FinnGen study index dataset. - finngen_summary_stats_out (str): Output path for the FinnGen summary statistics. - """ - - session: Session = MISSING - finngen_study_index_out: str = MISSING - finngen_summary_stats_out: str = MISSING - - def __post_init__(self: FinnGenStep) -> None: - """Run step.""" - # Fetch study index. - # Process study index. - study_index = FinnGenStudyIndex.from_source(self.session.spark) - # Write study index. - study_index.df.write.mode(self.session.write_mode).parquet( - self.finngen_study_index_out - ) - - # Fetch summary stats locations - input_filenames = [row.summarystatsLocation for row in study_index.df.collect()] - # Process summary stats. - summary_stats = FinnGenSummaryStats.from_source( - self.session.spark, raw_files=input_filenames - ) - - # Write summary stats. - ( - summary_stats.df.write.partitionBy("studyId") - .mode(self.session.write_mode) - .parquet(self.finngen_summary_stats_out) - ) diff --git a/src/otg/finngen_studies.py b/src/otg/finngen_studies.py new file mode 100644 index 000000000..9a1d800e8 --- /dev/null +++ b/src/otg/finngen_studies.py @@ -0,0 +1,31 @@ +"""Step to run FinnGen study table ingestion.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from omegaconf import MISSING + +from otg.common.session import Session +from otg.datasource.finngen.study_index import FinnGenStudyIndex + + +@dataclass +class FinnGenStudiesStep: + """FinnGen study index generation step. + + Attributes: + session (Session): Session object. + finngen_study_index_out (str): Output path for the FinnGen study index dataset. + """ + + session: Session = MISSING + finngen_study_index_out: str = MISSING + finngen_summary_stats_out: str = MISSING + + def __post_init__(self: FinnGenStudiesStep) -> None: + """Run step.""" + # Fetch study index. + FinnGenStudyIndex.from_source(self.session.spark).df.write.mode( + self.session.write_mode + ).parquet(self.finngen_study_index_out) diff --git a/src/otg/finngen_sumstat_preprocess.py b/src/otg/finngen_sumstat_preprocess.py new file mode 100644 index 000000000..959c000dc --- /dev/null +++ b/src/otg/finngen_sumstat_preprocess.py @@ -0,0 +1,36 @@ +"""Step to run FinnGen study table ingestion.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from omegaconf import MISSING + +from otg.common.session import Session +from otg.datasource.finngen.summary_stats import FinnGenSummaryStats + + +@dataclass +class FinnGenSumstatPreprocessStep: + """FinnGen sumstats preprocessing. + + Attributes: + session (Session): Session object. + finngen_study_index_out (str): Output path for the FinnGen study index dataset. + finngen_summary_stats_out (str): Output path for the FinnGen summary statistics. + """ + + session: Session = MISSING + raw_sumstats_path: str = MISSING + out_sumstats_path: str = MISSING + + def __post_init__(self: FinnGenSumstatPreprocessStep) -> None: + """Run step.""" + # Process summary stats. + ( + FinnGenSummaryStats.from_source( + self.session.spark, raw_file=self.raw_sumstats_path + ) + .df.write.mode(self.session.write_mode) + .parquet(self.out_sumstats_path) + ) From 28e7e27cdfd7f811fd535ca1d0a754199f589ac8 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 20 Dec 2023 22:22:58 +0000 Subject: [PATCH 3/5] docs: update docs --- docs/python_api/step/finngen.md | 5 ----- docs/python_api/step/finngen_studies.md | 5 +++++ docs/python_api/step/finngen_sumstat_preprocess.md | 5 +++++ 3 files changed, 10 insertions(+), 5 deletions(-) delete mode 100644 docs/python_api/step/finngen.md create mode 100644 docs/python_api/step/finngen_studies.md create mode 100644 docs/python_api/step/finngen_sumstat_preprocess.md diff --git a/docs/python_api/step/finngen.md b/docs/python_api/step/finngen.md deleted file mode 100644 index fedefae50..000000000 --- a/docs/python_api/step/finngen.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: FinnGen ---- - -::: otg.finngen.FinnGenStep diff --git a/docs/python_api/step/finngen_studies.md b/docs/python_api/step/finngen_studies.md new file mode 100644 index 000000000..cfd7342e9 --- /dev/null +++ b/docs/python_api/step/finngen_studies.md @@ -0,0 +1,5 @@ +--- +title: FinnGen Studies +--- + +::: otg.finngen_studies.FinnGenStudiesStep diff --git a/docs/python_api/step/finngen_sumstat_preprocess.md b/docs/python_api/step/finngen_sumstat_preprocess.md new file mode 100644 index 000000000..0b374a278 --- /dev/null +++ b/docs/python_api/step/finngen_sumstat_preprocess.md @@ -0,0 +1,5 @@ +--- +title: FinnGen Preprocess Summary Stats +--- + +::: otg.finngen_sumstat_preprocess.FinnGenSumstatPreprocessStep From 3860353366d75d203c426109e3de908903592c7c Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 20 Dec 2023 22:40:19 +0000 Subject: [PATCH 4/5] fix: test --- tests/datasource/finngen/test_finngen_summary_stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/datasource/finngen/test_finngen_summary_stats.py b/tests/datasource/finngen/test_finngen_summary_stats.py index 3a16d9a57..315d8cd64 100644 --- a/tests/datasource/finngen/test_finngen_summary_stats.py +++ b/tests/datasource/finngen/test_finngen_summary_stats.py @@ -12,7 +12,7 @@ def test_finngen_summary_stats_from_source(spark: SparkSession) -> None: assert isinstance( FinnGenSummaryStats.from_source( spark=spark, - raw_files=["tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz"], + raw_file="tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz", ), SummaryStatistics, ) From 86f0c5934b787efc5a4f18912421dd83f18d092a Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 21 Dec 2023 12:05:00 +0000 Subject: [PATCH 5/5] fix: uncomment line Co-authored-by: Daniel Suveges --- src/airflow/dags/finngen_harmonisation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/airflow/dags/finngen_harmonisation.py b/src/airflow/dags/finngen_harmonisation.py index 749d3fc4e..ad88e695c 100644 --- a/src/airflow/dags/finngen_harmonisation.py +++ b/src/airflow/dags/finngen_harmonisation.py @@ -73,5 +73,5 @@ def submit_jobs(**kwargs: Any) -> None: ) >> common.install_dependencies(CLUSTER_NAME) >> submit_jobs() - # >> common.delete_cluster(CLUSTER_NAME) + >> common.delete_cluster(CLUSTER_NAME) )