Skip to content

Commit

Permalink
Merge pull request #216 from opentargets/tskir-3106-ingest-finngen
Browse files Browse the repository at this point in the history
Implement Preprocess DAG following recent architecture updates
  • Loading branch information
tskir authored Nov 5, 2023
2 parents 5a63093 + cc3a1e5 commit 90e7d44
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 75 deletions.
41 changes: 39 additions & 2 deletions src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

import pendulum
import yaml
from airflow.providers.google.cloud.operators.dataproc import (
ClusterGenerator,
DataprocCreateClusterOperator,
Expand All @@ -24,18 +25,23 @@
image_version = "2.1"


# Executable configuration.
# Cluster init configuration.
initialisation_base_path = (
f"gs://genetics_etl_python_playground/initialisation/{otg_version}"
)
python_cli = f"{initialisation_base_path}/cli.py"
config_tar = f"{initialisation_base_path}/config.tar.gz"
package_wheel = f"{initialisation_base_path}/otgenetics-{otg_version}-py3-none-any.whl"
initialisation_executable_file = [
f"{initialisation_base_path}/install_dependencies_on_cluster.sh"
]


# CLI configuration.
cluster_config_dir = "/config"
config_name = "config"
python_cli = "cli.py"


# Shared DAG construction parameters.
shared_dag_args = dict(
owner="Open Targets Data Team",
Expand Down Expand Up @@ -153,6 +159,20 @@ def submit_pyspark_job(
)


def submit_step(cluster_name, step_id):
"""Submit a PySpark job to execute a specific CLI step."""
return submit_pyspark_job(
cluster_name=cluster_name,
task_id=step_id,
python_module_path=python_cli,
args=[
f"step={step_id}",
f"--config-dir={cluster_config_dir}",
f"--config-name={config_name}",
],
)


def install_dependencies(cluster_name: str) -> DataprocSubmitJobOperator:
"""Install dependencies on a Dataproc cluster.
Expand Down Expand Up @@ -197,3 +217,20 @@ def delete_cluster(cluster_name: str) -> DataprocDeleteClusterOperator:
trigger_rule=TriggerRule.ALL_DONE,
deferrable=True,
)


def read_yaml_config(config_path):
"""Parse a YAMl config file and do all necessary checks."""
assert config_path.exists(), f"YAML config path {config_path} does not exist."
with open(config_path, "r") as config_file:
return yaml.safe_load(config_file)


def generate_dag(cluster_name, tasks):
"""For a list of tasks, generate a complete DAG."""
return (
create_cluster(cluster_name)
>> install_dependencies(cluster_name)
>> tasks
>> delete_cluster(cluster_name)
)
66 changes: 19 additions & 47 deletions src/airflow/dags/dag_genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,32 @@

from pathlib import Path

import yaml
import common_airflow as common
from airflow.models.dag import DAG
from common_airflow import (
create_cluster,
delete_cluster,
install_dependencies,
shared_dag_args,
shared_dag_kwargs,
submit_pyspark_job,
)

SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "configs" / "dag.yaml"
PYTHON_CLI = "cli.py"
CONFIG_NAME = "config"
CLUSTER_CONFIG_DIR = "/config"
CLUSTER_NAME = "workflow-otg-cluster"
CLUSTER_NAME = "otg-etl"


with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics ETL workflow",
default_args=shared_dag_args,
**shared_dag_kwargs,
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
assert (
SOURCE_CONFIG_FILE_PATH.exists()
), f"Config path {SOURCE_CONFIG_FILE_PATH} does not exist."

with open(SOURCE_CONFIG_FILE_PATH, "r") as config_file:
# Parse and define all steps and their prerequisites.
tasks = {}
steps = yaml.safe_load(config_file)
for step in steps:
# Define task for the current step.
step_id = step["id"]
this_task = submit_pyspark_job(
cluster_name=CLUSTER_NAME,
task_id=step_id,
python_module_path=PYTHON_CLI,
args=[
f"step={step_id}",
f"--config-dir={CLUSTER_CONFIG_DIR}",
f"--config-name={CONFIG_NAME}",
],
)
# Chain prerequisites.
tasks[step_id] = this_task
for prerequisite in step.get("prerequisites", []):
this_task.set_upstream(tasks[prerequisite])

# Construct the DAG with all tasks.
(
create_cluster(CLUSTER_NAME)
>> install_dependencies(CLUSTER_NAME)
>> list(tasks.values())
>> delete_cluster(CLUSTER_NAME)
# Parse and define all steps and their prerequisites.
tasks = {}
steps = common.read_yaml_config(SOURCE_CONFIG_FILE_PATH)
for step in steps:
# Define task for the current step.
step_id = step["id"]
this_task = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id=step_id,
)
# Chain prerequisites.
tasks[step_id] = this_task
for prerequisite in step.get("prerequisites", []):
this_task.set_upstream(tasks[prerequisite])
# Construct the DAG with all tasks.
dag = common.generate_dag(cluster_name=CLUSTER_NAME, tasks=list(tasks.values()))
26 changes: 26 additions & 0 deletions src/airflow/dags/dag_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Airflow DAG for the Preprocess part of the pipeline."""
from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG

CLUSTER_NAME = "otg-preprocess"

ALL_STEPS = [
"finngen",
]


with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Preprocess",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
all_tasks = [
common.submit_step(cluster_name=CLUSTER_NAME, step_id=step)
for step in ALL_STEPS
]
dag = common.generate_dag(cluster_name=CLUSTER_NAME, tasks=all_tasks)
2 changes: 1 addition & 1 deletion src/otg/datasource/finngen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""GWAS Catalog Data Source."""
"""FinnGen datasource classes."""

from __future__ import annotations
2 changes: 1 addition & 1 deletion src/otg/datasource/finngen/summary_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class FinnGenSummaryStats(SummaryStatistics):
"""Summary statistics dataset for FinnGen."""

@classmethod
def from_finngen_harmonized_summary_stats(
def from_source(
cls: type[FinnGenSummaryStats],
summary_stats_df: DataFrame,
) -> FinnGenSummaryStats:
Expand Down
37 changes: 16 additions & 21 deletions src/otg/finngen.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,33 @@ class FinnGenStep:

def __post_init__(self: FinnGenStep) -> None:
"""Run step."""
# Read the JSON data from the URL.
# Fetch study index.
json_data = urlopen(self.finngen_phenotype_table_url).read().decode("utf-8")
rdd = self.session.spark.sparkContext.parallelize([json_data])
df = self.session.spark.read.json(rdd)

# Parse the study index data.
finngen_studies = FinnGenStudyIndex.from_source(
# Process study index.
study_index = FinnGenStudyIndex.from_source(
df,
self.finngen_release_prefix,
self.finngen_sumstat_url_prefix,
self.finngen_sumstat_url_suffix,
)

# Write the study index output.
finngen_studies.df.write.mode(self.session.write_mode).parquet(
# Write study index.
study_index.df.write.mode(self.session.write_mode).parquet(
self.finngen_study_index_out
)

# Prepare list of files for ingestion.
input_filenames = [
row.summarystatsLocation for row in finngen_studies.collect()
]
# Fetch summary stats.
input_filenames = [row.summarystatsLocation for row in study_index.collect()]
summary_stats_df = self.session.spark.read.option("delimiter", "\t").csv(
input_filenames, header=True
)

# Specify data processing instructions.
summary_stats_df = FinnGenSummaryStats.from_finngen_harmonized_summary_stats(
summary_stats_df
).df

# Sort and partition for output.
summary_stats_df.sortWithinPartitions("position").write.partitionBy(
"studyId", "chromosome"
).mode(self.session.write_mode).parquet(self.finngen_summary_stats_out)
# Process summary stats.
summary_stats_df = FinnGenSummaryStats.from_source(summary_stats_df).df
# Write summary stats.
(
summary_stats_df.sortWithinPartitions("position")
.write.partitionBy("studyId", "chromosome")
.mode(self.session.write_mode)
.parquet(self.finngen_summary_stats_out)
)
4 changes: 1 addition & 3 deletions tests/datasource/finngen/test_finngen_summary_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ def test_finngen_summary_stats_from_source(
) -> None:
"""Test summary statistics from source."""
assert isinstance(
FinnGenSummaryStats.from_finngen_harmonized_summary_stats(
sample_finngen_summary_stats
),
FinnGenSummaryStats.from_source(sample_finngen_summary_stats),
SummaryStatistics,
)

0 comments on commit 90e7d44

Please sign in to comment.