Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Finngen R10 harmonisation and preprocessing #370

Merged
merged 6 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-11-24a.txt
ukbiobank_manifest: gs://genetics-portal-input/ukb_phenotypes/neale2_saige_study_manifest.190430.tsv
l2g_gold_standard_curation: ${datasets.inputs}/l2g/gold_standard/curation.json
gene_interactions: ${datasets.inputs}/l2g/interaction # 23.09 data
finngen_phenotype_table_url: https://r9.finngen.fi/api/phenos
eqtl_catalogue_paths_imported: ${datasets.inputs}/preprocess/eqtl_catalogue/tabix_ftp_paths_imported.tsv

# Output datasets
Expand Down
3 changes: 0 additions & 3 deletions config/step/finngen.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions config/step/finngen_studies.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
_target_: otg.finngen_studies.FinnGenStudiesStep
finngen_study_index_out: ${datasets.finngen_study_index}
3 changes: 3 additions & 0 deletions config/step/finngen_sumstat_preprocess.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
_target_: otg.finngen_sumstat_preprocess.FinnGenSumstatPreprocessStep
raw_sumstats_path: ???
out_sumstats_path: ???
5 changes: 0 additions & 5 deletions docs/python_api/step/finngen.md

This file was deleted.

5 changes: 5 additions & 0 deletions docs/python_api/step/finngen_studies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: FinnGen Studies
---

::: otg.finngen_studies.FinnGenStudiesStep
5 changes: 5 additions & 0 deletions docs/python_api/step/finngen_sumstat_preprocess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: FinnGen Preprocess Summary Stats
---

::: otg.finngen_sumstat_preprocess.FinnGenSumstatPreprocessStep
77 changes: 77 additions & 0 deletions src/airflow/dags/finngen_harmonisation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Airflow DAG for the harmonisation part of the pipeline."""
from __future__ import annotations

import re
import time
from pathlib import Path
from typing import Any

import common_airflow as common
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator

CLUSTER_NAME = "otg-finngen-harmonisation"
AUTOSCALING = "gwascatalog-harmonisation" # same as GWAS Catalog harmonisation
SUMMARY_STATS_BUCKET_NAME = "finngen-public-data-r10"
RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX"
SUMSTATS_PARQUET = f"{RELEASEBUCKET}/summary_statistics/finngen"

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Finngen harmonisation",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
# List raw harmonised files from GWAS Catalog
list_inputs = GCSListObjectsOperator(
task_id="list_raw_sumstats",
bucket=SUMMARY_STATS_BUCKET_NAME,
prefix="summary_stats",
match_glob="**/*.gz",
)

# Submit jobs to dataproc
@task(task_id="submit_jobs")
def submit_jobs(**kwargs: Any) -> None:
"""Submit jobs to dataproc.

Args:
**kwargs (Any): Keyword arguments.
"""
ti = kwargs["ti"]
todo = ti.xcom_pull(task_ids="list_raw_sumstats", key="return_value")
print("Number of jobs to submit: ", len(todo)) # noqa: T201
for i in range(len(todo)):
# Not to exceed default quota 400 jobs per minute
if i > 0 and i % 399 == 0:
time.sleep(60)
input_path = todo[i]
match_result = re.search(r"summary_stats/finngen_(.*).gz", input_path)
if match_result:
study_id = match_result.group(1)
print("Submitting job for study: ", study_id) # noqa: T201
common.submit_pyspark_job_no_operator(
cluster_name=CLUSTER_NAME,
step_id="finngen_sumstat_preprocess",
other_args=[
f"step.raw_sumstats_path=gs://{SUMMARY_STATS_BUCKET_NAME}/{input_path}",
f"step.out_sumstats_path={SUMSTATS_PARQUET}/{study_id}.parquet",
],
)

# list_inputs >>
(
list_inputs
>> common.create_cluster(
CLUSTER_NAME,
autoscaling_policy=AUTOSCALING,
num_workers=8,
# num_preemptible_workers=8,
master_machine_type="n1-highmem-32",
worker_machine_type="n1-standard-2",
)
>> common.install_dependencies(CLUSTER_NAME)
>> submit_jobs()
>> common.delete_cluster(CLUSTER_NAME)
)
31 changes: 18 additions & 13 deletions src/airflow/dags/finngen_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,43 @@
AUTOSCALING = "finngen-preprocess"

RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX"
SUMSTATS = "{RELEASEBUCKET}/summary_statistics/finngen"
SUMSTATS = f"{RELEASEBUCKET}/summary_statistics/finngen"
WINDOWBASED_CLUMPED = (
"{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_window_clumped/finngen"
f"{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_window_clumped/finngen"
)
LD_CLUMPED = "{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_ld_clumped/finngen"
PICSED = "{RELEASEBUCKET}/credible_set/from_sumstats_study_locus/finngen"
LD_CLUMPED = f"{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_ld_clumped/finngen"
PICSED = f"{RELEASEBUCKET}/credible_set/from_sumstats_study_locus/finngen"

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Finngen preprocess",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
study_and_sumstats = common.submit_step(
study_index = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="finngen",
task_id="finngen_sumstats_and_study_index",
step_id="finngen_studies",
task_id="finngen_studies",
)

window_based_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="finngen_window_based_clumping",
other_args=[
"step.input_path={SUMSTATS}",
"step.clumped_study_locus_path={WINDOWBASED_CLUMPED}",
f"step.input_path={SUMSTATS}",
f"step.clumped_study_locus_path={WINDOWBASED_CLUMPED}",
],
)
ld_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="finngen_ld_clumping",
other_args=[
"step.input_path={WINDOWBASED_CLUMPED}",
"step.clumped_study_locus_path={LD_CLUMPED}",
f"step.input_path={WINDOWBASED_CLUMPED}",
f"step.ld_index_path={RELEASEBUCKET}/ld_index",
f"step.study_index_path={RELEASEBUCKET}/study_index/finngen",
f"step.clumped_study_locus_path={LD_CLUMPED}",
],
trigger_rule=TriggerRule.ALL_DONE,
)
Expand All @@ -64,10 +66,13 @@

(
common.create_cluster(
CLUSTER_NAME, autoscaling_policy=AUTOSCALING, master_disk_size=2000
CLUSTER_NAME,
autoscaling_policy=AUTOSCALING,
master_disk_size=2000,
num_workers=6,
)
>> common.install_dependencies(CLUSTER_NAME)
>> study_and_sumstats
>> study_index
>> window_based_clumping
>> ld_clumping
>> pics
Expand Down
8 changes: 4 additions & 4 deletions src/otg/datasource/finngen/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ class FinnGenStudyIndex:

The following information is aggregated/extracted:

- Study ID in the special format (FINNGEN_R9_*)
- Study ID in the special format (e.g. FINNGEN_R10_*)
- Trait name (for example, Amoebiasis)
- Number of cases and controls
- Link to the summary statistics location

Some fields are also populated as constants, such as study type and the initial sample size.
"""

finngen_phenotype_table_url: str = "https://r9.finngen.fi/api/phenos"
finngen_release_prefix: str = "FINNGEN_R9"
finngen_phenotype_table_url: str = "https://r10.finngen.fi/api/phenos"
finngen_release_prefix: str = "FINNGEN_R10"
finngen_summary_stats_url_prefix: str = (
"gs://finngen-public-data-r9/summary_stats/finngen_R9_"
"gs://finngen-public-data-r10/summary_stats/finngen_R10_"
)
finngen_summary_stats_url_suffix: str = ".gz"

Expand Down
14 changes: 8 additions & 6 deletions src/otg/datasource/finngen/summary_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,27 @@ class FinnGenSummaryStats:
def from_source(
cls: type[FinnGenSummaryStats],
spark: SparkSession,
raw_files: list[str],
raw_file: str,
) -> SummaryStatistics:
"""Ingests all summary statst for all FinnGen studies.

Args:
spark (SparkSession): Spark session object.
raw_files (list[str]): Paths to raw summary statistics .gz files.
raw_file (str): Path to raw summary statistics .gz files.

Returns:
SummaryStatistics: Processed summary statistics dataset
"""
study_id = raw_file.split("/")[-1].split(".")[0].upper()
processed_summary_stats_df = (
spark.read.schema(cls.raw_schema)
.option("delimiter", "\t")
.csv(raw_files, header=True)
.csv(raw_file, header=True)
# Drop rows which don't have proper position.
.filter(f.col("pos").cast(t.IntegerType()).isNotNull())
.select(
# From the full path, extracts just the filename, and converts to upper case to get the study ID.
f.upper(f.regexp_extract(f.input_file_name(), r"([^/]+)\.gz", 1)).alias(
"studyId"
),
f.lit(study_id).alias("studyId"),
# Add variant information.
f.concat_ws(
"_",
Expand All @@ -82,6 +81,9 @@ def from_source(
.filter(
f.col("pos").cast(t.IntegerType()).isNotNull() & (f.col("beta") != 0)
)
# Average ~20Mb partitions with 30 partitions per study
.repartitionByRange(30, "chromosome", "position")
.sortWithinPartitions("chromosome", "position")
)

# Initializing summary statistics object:
Expand Down
50 changes: 0 additions & 50 deletions src/otg/finngen.py

This file was deleted.

31 changes: 31 additions & 0 deletions src/otg/finngen_studies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Step to run FinnGen study table ingestion."""

from __future__ import annotations

from dataclasses import dataclass

from omegaconf import MISSING

from otg.common.session import Session
from otg.datasource.finngen.study_index import FinnGenStudyIndex


@dataclass
class FinnGenStudiesStep:
"""FinnGen study index generation step.

Attributes:
session (Session): Session object.
finngen_study_index_out (str): Output path for the FinnGen study index dataset.
"""

session: Session = MISSING
finngen_study_index_out: str = MISSING
finngen_summary_stats_out: str = MISSING

def __post_init__(self: FinnGenStudiesStep) -> None:
"""Run step."""
# Fetch study index.
FinnGenStudyIndex.from_source(self.session.spark).df.write.mode(
self.session.write_mode
).parquet(self.finngen_study_index_out)
36 changes: 36 additions & 0 deletions src/otg/finngen_sumstat_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Step to run FinnGen study table ingestion."""

from __future__ import annotations

from dataclasses import dataclass

from omegaconf import MISSING

from otg.common.session import Session
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats


@dataclass
class FinnGenSumstatPreprocessStep:
"""FinnGen sumstats preprocessing.

Attributes:
session (Session): Session object.
finngen_study_index_out (str): Output path for the FinnGen study index dataset.
finngen_summary_stats_out (str): Output path for the FinnGen summary statistics.
"""

session: Session = MISSING
raw_sumstats_path: str = MISSING
out_sumstats_path: str = MISSING

def __post_init__(self: FinnGenSumstatPreprocessStep) -> None:
"""Run step."""
# Process summary stats.
(
FinnGenSummaryStats.from_source(
self.session.spark, raw_file=self.raw_sumstats_path
)
.df.write.mode(self.session.write_mode)
.parquet(self.out_sumstats_path)
)
2 changes: 1 addition & 1 deletion tests/datasource/finngen/test_finngen_summary_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_finngen_summary_stats_from_source(spark: SparkSession) -> None:
assert isinstance(
FinnGenSummaryStats.from_source(
spark=spark,
raw_files=["tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz"],
raw_file="tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz",
),
SummaryStatistics,
)