Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
 into xg1-finngen-finemapping-ingestion
  • Loading branch information
xyg123 committed Jan 10, 2024
2 parents cff5e02 + 77dee8e commit 02f180c
Show file tree
Hide file tree
Showing 33 changed files with 1,170 additions and 163 deletions.
13 changes: 7 additions & 6 deletions config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ anderson: gs://genetics-portal-input/v2g_input/andersson2014/enhancer_tss_associ
javierre: gs://genetics-portal-input/v2g_input/javierre_2016_preprocessed.parquet
jung: gs://genetics-portal-raw/pchic_jung2019/jung2019_pchic_tableS3.csv
thurman: gs://genetics-portal-input/v2g_input/thurman2012/genomewideCorrs_above0.7_promoterPlusMinus500kb_withGeneNames_32celltypeCategories.bed8.gz
catalog_associations: ${datasets.inputs}/v2d/gwas_catalog_v1.0.2-associations_e110_r2023-11-24.tsv
catalog_associations: ${datasets.inputs}/v2d/gwas_catalog_v1.0.2-associations_e110_r2023-12-21.tsv
catalog_studies:
# To get a complete representation of all GWAS Catalog studies, we need to
# ingest the list of unpublished studies from a different file.
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-studies-r2023-11-24.tsv
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-studies-r2023-11-24.tsv
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-studies-r2023-12-21.tsv
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-studies-r2023-12-21.tsv
catalog_ancestries:
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-ancestries-r2023-11-24.tsv
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-ancestries-r2023-11-24.tsv
catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-11-24a.txt
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-ancestries-r2023-12-21.tsv
- ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-unpublished-ancestries-r2023-12-21.tsv
catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-12-21.txt
ukbiobank_manifest: gs://genetics-portal-input/ukb_phenotypes/neale2_saige_study_manifest.190430.tsv
l2g_gold_standard_curation: ${datasets.inputs}/l2g/gold_standard/curation.json
gene_interactions: ${datasets.inputs}/l2g/interaction # 23.09 data
Expand All @@ -43,6 +43,7 @@ v2g: ${datasets.outputs}/v2g
ld_index: ${datasets.outputs}/ld_index
catalog_study_index: ${datasets.study_index}/catalog
catalog_study_locus: ${datasets.study_locus}/catalog_study_locus
gwas_catalog_study_curation: ${datasets.inputs}/v2d/GWAS_Catalog_study_curation.tsv
finngen_study_index: ${datasets.study_index}/finngen
finngen_summary_stats: ${datasets.summary_statistics}/finngen
finngen_finemapping_out: gs://genetics-portal-dev-analysis/xg1/cs_centric_fm_catalog
Expand Down
6 changes: 0 additions & 6 deletions config/step/clump.yaml

This file was deleted.

6 changes: 6 additions & 0 deletions config/step/gwas_catalog_curation_update.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
_target_: otg.gwas_catalog_study_curation.GWASCatalogStudyCurationStep
catalog_study_files: ${datasets.catalog_studies}
catalog_ancestry_files: ${datasets.catalog_ancestries}
catalog_sumstats_lut: ${datasets.catalog_sumstats_lut}
gwas_catalog_study_curation_file: ${datasets.gwas_catalog_study_curation}
gwas_catalog_study_curation_out: ???
4 changes: 3 additions & 1 deletion config/step/gwas_catalog_ingestion.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
_target_: otg.gwas_catalog.GWASCatalogStep
_target_: otg.gwas_catalog_ingestion.GWASCatalogIngestionStep
catalog_study_files: ${datasets.catalog_studies}
catalog_ancestry_files: ${datasets.catalog_ancestries}
catalog_associations_file: ${datasets.catalog_associations}
catalog_sumstats_lut: ${datasets.catalog_sumstats_lut}
variant_annotation_path: ${datasets.variant_annotation}
catalog_studies_out: ${datasets.catalog_study_index}
catalog_associations_out: ${datasets.catalog_study_locus}
gwas_catalog_study_curation_file: ${datasets.gwas_catalog_study_curation}
inclusion_list_path: ???
10 changes: 10 additions & 0 deletions config/step/gwas_study_inclusion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
_target_: otg.gwas_catalog_study_inclusion.GWASCatalogInclusionGenerator
catalog_study_files: ${datasets.catalog_studies}
catalog_ancestry_files: ${datasets.catalog_ancestries}
catalog_associations_file: ${datasets.catalog_associations}
variant_annotation_path: ${datasets.variant_annotation}
gwas_catalog_study_curation_file: ${datasets.gwas_catalog_study_curation}
harmonised_study_file: ???
criteria: ???
inclusion_list_path: ???
exclusion_list_path: ???
5 changes: 5 additions & 0 deletions config/step/ld_based_clumping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
_target_: otg.ld_based_clumping.LdBasedClumpingStep
study_locus_input_path: ???
ld_index_path: ???
study_index_path: ???
clumped_study_locus_output_path: ???
5 changes: 5 additions & 0 deletions config/step/window_based_clumping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
_target_: otg.window_based_clumping.WindowBasedClumpingStep
summary_statistics_input_path: ???
study_locus_output_path: ???
inclusion_list_path: ???
locus_collect_distance: null
5 changes: 0 additions & 5 deletions docs/python_api/step/clump.md

This file was deleted.

5 changes: 5 additions & 0 deletions docs/python_api/step/gwas_catalog_curation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Apply in-house curation on GWAS Catalog studies
---

::: otg.gwas_catalog_study_curation.GWASCatalogStudyCurationStep
5 changes: 5 additions & 0 deletions docs/python_api/step/gwas_catalog_inclusion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Generate inclusion and exclusions lists for GWAS Catalog study ingestion.
---

::: otg.gwas_catalog_study_inclusion.GWASCatalogInclusionGenerator
5 changes: 5 additions & 0 deletions docs/python_api/step/ld_clump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: LD-based clumping
---

::: otg.ld_based_clumping.LdBasedClumpingStep
5 changes: 5 additions & 0 deletions docs/python_api/step/window_based_clumping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Window-based clumping
---

::: otg.window_based_clumping.WindowBasedClumpingStep
7 changes: 2 additions & 5 deletions src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from pathlib import Path

# Code version. It has to be repeated here as well as in `pyproject.toml`, because Airflow isn't able to look at files outside of its `dags/` directory.
OTG_VERSION = "1.0.0"

OTG_VERSION = "0.0.0"

# Cloud configuration.
GCP_PROJECT = "open-targets-genetics-dev"
Expand All @@ -29,7 +28,6 @@
GCP_DATAPROC_IMAGE = "2.1"
GCP_AUTOSCALING_POLICY = "otg-etl"


# Cluster init configuration.
INITIALISATION_BASE_PATH = (
f"gs://genetics_etl_python_playground/initialisation/{OTG_VERSION}"
Expand All @@ -40,18 +38,17 @@
f"{INITIALISATION_BASE_PATH}/install_dependencies_on_cluster.sh"
]


# CLI configuration.
CLUSTER_CONFIG_DIR = "/config"
CONFIG_NAME = "config"
PYTHON_CLI = "cli.py"


# Shared DAG construction parameters.
shared_dag_args = {
"owner": "Open Targets Data Team",
"retries": 0,
}

shared_dag_kwargs = {
"tags": ["genetics_etl", "experimental"],
"start_date": pendulum.now(tz="Europe/London").subtract(days=1),
Expand Down
1 change: 0 additions & 1 deletion src/airflow/dags/gwas_catalog_harmonisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def submit_jobs(**kwargs: Any) -> None:
],
)

# list_inputs >>
(
[list_inputs, list_outputs]
>> create_to_do_list()
Expand Down
181 changes: 139 additions & 42 deletions src/airflow/dags/gwas_catalog_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,92 +5,189 @@

import common_airflow as common
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

CLUSTER_NAME = "otg-preprocess-gwascatalog"
AUTOSCALING = "otg-preprocess-gwascatalog"

SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised"
RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX"
RELEASEBUCKET_NAME = "genetics_etl_python_playground"
SUMMARY_STATS_BUCKET_NAME = "open-targets-gwas-summary-stats"
SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised"
MANIFESTS_PATH = f"{RELEASEBUCKET}/manifests/"


def upload_harmonized_study_list(
concatenated_studies: str, bucket_name: str, object_name: str
) -> None:
"""This function uploads file to GCP.
Args:
concatenated_studies (str): Concatenated list of harmonized summary statistics.
bucket_name (str): Bucket name
object_name (str): Name of the object
"""
hook = GCSHook(gcp_conn_id="google_cloud_default")
hook.upload(
bucket_name=bucket_name,
object_name=object_name,
data=concatenated_studies,
encoding="utf-8",
)


with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — GWAS Catalog preprocess",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
with TaskGroup(group_id="summary_stats_preprocessing") as summary_stats_group:
summary_stats_window_clumping = common.submit_step(
# Getting list of folders (each a gwas study with summary statistics)
list_harmonised_sumstats = GCSListObjectsOperator(
task_id="list_harmonised_parquet",
bucket=SUMMARY_STATS_BUCKET_NAME,
prefix="harmonised",
match_glob="**/_SUCCESS",
)

# Upload resuling list to a bucket:
upload_task = PythonOperator(
task_id="uploader",
python_callable=upload_harmonized_study_list,
op_kwargs={
"concatenated_studies": '{{ "\n".join(ti.xcom_pull( key="return_value", task_ids="list_harmonised_parquet")) }}',
"bucket_name": RELEASEBUCKET_NAME,
"object_name": "output/python_etl/parquet/XX.XX/manifests/harmonised_sumstats.txt",
},
)

# Processing curated GWAS Catalog top-bottom:
with TaskGroup(group_id="curation_processing") as curation_processing:
# Generate inclusion list:
curation_calculate_inclusion_list = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="catalog_sumstats_window_clumping",
step_id="gwas_study_inclusion",
task_id="catalog_curation_inclusion_list",
other_args=[
f"step.input_path={SUMSTATS}",
f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog",
"step.criteria=curation",
f"step.inclusion_list_path={MANIFESTS_PATH}manifest_curation",
f"step.exclusion_list_path={MANIFESTS_PATH}exclusion_curation",
f"step.harmonised_study_file={MANIFESTS_PATH}harmonised_sumstats.txt",
],
)
summary_stats_ld_clumping = common.submit_step(

# Ingest curated associations from GWAS Catalog:
curation_ingest_data = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="catalog_sumstats_ld_clumping",
step_id="gwas_catalog_ingestion",
task_id="ingest_curated_gwas_catalog_data",
other_args=[f"step.inclusion_list_path={MANIFESTS_PATH}manifest_curation"],
)

# Run LD-annotation and clumping on curated data:
curation_ld_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="ld_based_clumping",
task_id="catalog_curation_ld_clumping",
other_args=[
f"step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog",
"step.ld_index_path={RELEASEBUCKET}/ld_index",
"step.study_index_path={RELEASEBUCKET}/study_index/catalog",
"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog",
f"step.study_locus_input_path={RELEASEBUCKET}/study_locus/catalog_curated",
f"step.ld_index_path={RELEASEBUCKET}/ld_index",
f"step.study_index_path={RELEASEBUCKET}/study_index/catalog",
f"step.clumped_study_locus_output_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated",
],
trigger_rule=TriggerRule.ALL_DONE,
)
summary_stats_pics = common.submit_step(

# Do PICS based finemapping:
curation_pics = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="pics",
task_id="catalog_sumstats_pics",
task_id="catalog_curation_pics",
other_args=[
"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog",
"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog",
f"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated",
f"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated",
],
trigger_rule=TriggerRule.ALL_DONE,
)
summary_stats_window_clumping >> summary_stats_ld_clumping >> summary_stats_pics

with TaskGroup(group_id="curation_preprocessing") as curation_group:
parse_study_and_curated_assocs = common.submit_step(
# Define order of steps:
(
curation_calculate_inclusion_list
>> curation_ingest_data
>> curation_ld_clumping
>> curation_pics
)

# Processing summary statistics from GWAS Catalog:
with TaskGroup(
group_id="summary_satistics_processing"
) as summary_satistics_processing:
# Generate inclusion study lists:
summary_stats_calculate_inclusion_list = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="gwas_catalog_ingestion",
task_id="catalog_ingestion",
step_id="gwas_study_inclusion",
task_id="catalog_sumstats_inclusion_list",
other_args=[
"step.criteria=summary_stats",
f"step.inclusion_list_path={MANIFESTS_PATH}manifest_sumstats",
f"step.exclusion_list_path={MANIFESTS_PATH}exclusion_sumstats",
f"step.harmonised_study_file={MANIFESTS_PATH}harmonised_sumstats.txt",
],
)

curation_ld_clumping = common.submit_step(
# Run window-based clumping:
summary_stats_window_based_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="catalog_curation_ld_clumping",
step_id="window_based_clumping",
task_id="catalog_sumstats_window_clumping",
other_args=[
"step.input_path={RELEASEBUCKET}/study_locus/catalog_curated",
"step.ld_index_path={RELEASEBUCKET}/ld_index",
"step.study_index_path={RELEASEBUCKET}/study_index/catalog",
"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated",
f"step.summary_statistics_input_path={SUMSTATS}",
f"step.study_locus_output_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog",
f"step.inclusion_list_path={MANIFESTS_PATH}manifest_sumstats",
],
trigger_rule=TriggerRule.ALL_DONE,
)

curation_pics = common.submit_step(
# Run LD based clumping:
summary_stats_ld_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="ld_based_clumping",
task_id="catalog_sumstats_ld_clumping",
other_args=[
f"step.study_locus_input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog",
f"step.ld_index_path={RELEASEBUCKET}/ld_index",
f"step.study_index_path={RELEASEBUCKET}/study_index/catalog",
f"step.clumped_study_locus_output_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog",
],
)

# Run PICS finemapping:
summary_stats_pics = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="pics",
task_id="catalog_curation_pics",
task_id="catalog_sumstats_pics",
other_args=[
"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated",
"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated",
f"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog",
f"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog",
],
trigger_rule=TriggerRule.ALL_DONE,
)
parse_study_and_curated_assocs >> curation_ld_clumping >> curation_pics

# Order of steps within the group:
(
summary_stats_calculate_inclusion_list
>> summary_stats_window_based_clumping
>> summary_stats_ld_clumping
>> summary_stats_pics
)

# DAG description:
(
common.create_cluster(
CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5
)
>> common.install_dependencies(CLUSTER_NAME)
>> [summary_stats_group, curation_group]
>> common.delete_cluster(CLUSTER_NAME)
>> list_harmonised_sumstats
>> upload_task
>> curation_processing
>> summary_satistics_processing
)
Loading

0 comments on commit 02f180c

Please sign in to comment.