diff --git a/src/pheval/cli_pheval_utils.py b/src/pheval/cli_pheval_utils.py index 629ab000c..c2abe0739 100644 --- a/src/pheval/cli_pheval_utils.py +++ b/src/pheval/cli_pheval_utils.py @@ -253,22 +253,19 @@ def update_phenopackets_command( mutually_exclusive=["phenopacket_path"], ) @click.option( - "--template-vcf-path", - "-t", - cls=MutuallyExclusiveOptionError, + "--hg19-template-vcf", + "-hg19", metavar="PATH", required=False, - help="Template VCF file", - mutually_exclusive=["vcf_dir"], + help="Template hg19 VCF file", type=Path, ) @click.option( - "--vcf-dir", - "-v", - cls=MutuallyExclusiveOptionError, + "--hg38-template-vcf", + "-hg38", metavar="PATH", - help="Directory containing template VCF files", - mutually_exclusive=["template_vcf"], + required=False, + help="Template hg38 VCF file", type=Path, ) @click.option( @@ -284,13 +281,22 @@ def create_spiked_vcfs_command( phenopacket_path: Path, phenopacket_dir: Path, output_dir: Path, - template_vcf_path: Path = None, - vcf_dir: Path = None, + hg19_template_vcf: Path = None, + hg38_template_vcf: Path = None, ): - """Spikes variants into a template VCF file for a directory of phenopackets.""" + """ + Create spiked VCF from either a Phenopacket or a Phenopacket directory. + + Args: + phenopacket_path (Path): Path to a single Phenopacket file (optional). + phenopacket_dir (Path): Path to a directory containing Phenopacket files (optional). + output_dir (Path): The directory to store the generated spiked VCF file(s). + hg19_template_vcf (Path): Path to the hg19 template VCF file (optional). + hg38_template_vcf (Path): Path to the hg38 template VCF file (optional). + """ if phenopacket_path is None and phenopacket_dir is None: raise InputError("Either a phenopacket or phenopacket directory must be specified") - spike_vcfs(output_dir, phenopacket_path, phenopacket_dir, template_vcf_path, vcf_dir) + spike_vcfs(output_dir, phenopacket_path, phenopacket_dir, hg19_template_vcf, hg38_template_vcf) @click.command() diff --git a/src/pheval/prepare/create_spiked_vcf.py b/src/pheval/prepare/create_spiked_vcf.py index d3dd1f5df..e76ef8ed6 100644 --- a/src/pheval/prepare/create_spiked_vcf.py +++ b/src/pheval/prepare/create_spiked_vcf.py @@ -1,7 +1,6 @@ import gzip import logging import re -import secrets import urllib.parse from copy import copy from dataclasses import dataclass @@ -10,6 +9,8 @@ from phenopackets import Family, File, Phenopacket +from pheval.prepare.custom_exceptions import InputError +from pheval.utils.file_utils import files_with_suffix, is_gzipped from pheval.utils.phenopacket_utils import ( IncompatibleGenomeAssemblyError, PhenopacketRebuilder, @@ -19,9 +20,6 @@ write_phenopacket, ) -from .custom_exceptions import InputError -from ..utils.file_utils import all_files, files_with_suffix, is_gzipped - info_log = logging.getLogger("info") genome_assemblies = { @@ -91,39 +89,6 @@ class VcfHeader: chr_status: bool -class VcfPicker: - """Choose a VCF file randomly from a directory if provided, otherwise selects the single template.""" - - def __init__(self, template_vcf: Path or None, vcf_dir: Path or None): - """ - Initialise the VcfPicker. - - Args: - template_vcf (Path or None): The path to a template VCF file, or None if not provided. - vcf_dir (Path or None): The directory containing VCF files, or None if not provided. - """ - self.template_vcf = template_vcf - self.vcf_dir = vcf_dir - - def pick_file_from_dir(self) -> Path: - """ - Selects a VCF file from a directory at random. - - Returns: - Path: The randomly selected VCF file path from the directory. - """ - return secrets.choice(all_files(self.vcf_dir)) - - def pick_file(self) -> Path: - """ - Select a VCF file randomly when given a directory; if not, the template VCF is assigned. - - Returns: - Path: The selected VCF file path. - """ - return self.pick_file_from_dir() if self.vcf_dir is not None else self.template_vcf - - def read_vcf(vcf_file: Path) -> List[str]: """ Read the contents of a VCF file into memory, handling both uncompressed and gzipped files. @@ -206,6 +171,72 @@ def parse_vcf_header(self) -> VcfHeader: return VcfHeader(sample_id, assembly, chr_status) +@dataclass +class VcfFile: + """ + Represents a VCF file with its name, contents, and header information. + + Attributes: + vcf_file_name (str): The name of the VCF file. + vcf_contents (List[str]): The contents of the VCF file. + vcf_header (VcfHeader): The parsed header information of the VCF file. + """ + + vcf_file_name: str = None + vcf_contents: List[str] = None + vcf_header: VcfHeader = None + + @staticmethod + def populate_fields(template_vcf: Path): + """ + Populate the fields of the VcfFile instance using the contents of a template VCF file. + + Args: + template_vcf (Path): The path to the template VCF file. + + Returns: + VcfFile: An instance of VcfFile with populated fields. + + """ + contents = read_vcf(template_vcf) + return VcfFile(template_vcf.name, contents, VcfHeaderParser(contents).parse_vcf_header()) + + +def select_vcf_template( + phenopacket_path: Path, + proband_causative_variants: List[ProbandCausativeVariant], + hg19_vcf_info: VcfFile, + hg38_vcf_info: VcfFile, +) -> VcfFile: + """ + Select the appropriate VCF template based on the assembly information of the proband causative variants. + + Args: + phenopacket_path (Path): The path to the Phenopacket file. + proband_causative_variants (List[ProbandCausativeVariant]): A list of causative variants from the proband. + hg19_vcf_info (VcfFile): VCF file info for hg19 template vcf. + hg38_vcf_info (VcfFile): CF file info for hg38 template vcf. + + Returns: + VcfFile: The selected VCF template file based on the assembly information of the proband causative variants. + + """ + if proband_causative_variants[0].assembly in ["hg19", "GRCh37"]: + if hg19_vcf_info: + return hg19_vcf_info + else: + raise InputError("Must specify hg19 template VCF!") + elif proband_causative_variants[0].assembly in ["hg38", "GRCh38"]: + if hg38_vcf_info: + return hg38_vcf_info + else: + raise InputError("Must specify hg38 template VCF!") + else: + raise IncompatibleGenomeAssemblyError( + proband_causative_variants[0].assembly, phenopacket_path + ) + + def check_variant_assembly( proband_causative_variants: list[ProbandCausativeVariant], vcf_header: VcfHeader, @@ -229,7 +260,13 @@ def check_variant_assembly( raise ValueError("Too many genome assemblies!") if phenopacket_assembly[0] not in compatible_genome_assembly: raise IncompatibleGenomeAssemblyError(phenopacket_assembly, phenopacket_path) - if phenopacket_assembly[0] != vcf_header.assembly: + if ( + phenopacket_assembly[0] in {"hg19", "GRCh37"} + and vcf_header.assembly not in {"hg19", "GRCh37"} + ) or ( + phenopacket_assembly[0] in {"hg38", "GRCh38"} + and vcf_header.assembly not in {"hg38", "GRCh38"} + ): raise IncompatibleGenomeAssemblyError( assembly=phenopacket_assembly, phenopacket=phenopacket_path ) @@ -387,7 +424,8 @@ def write_vcf_file(self) -> None: def spike_vcf_contents( phenopacket: Union[Phenopacket, Family], phenopacket_path: Path, - chosen_template_vcf: Path, + hg19_vcf_info: VcfFile, + hg38_vcf_info: VcfFile, ) -> tuple[str, List[str]]: """ Spike VCF records with variants obtained from a Phenopacket or Family. @@ -395,22 +433,28 @@ def spike_vcf_contents( Args: phenopacket (Union[Phenopacket, Family]): Phenopacket or Family containing causative variants. phenopacket_path (Path): Path to the Phenopacket file. - chosen_template_vcf (Path): Path to the chosen template VCF file. + hg19_vcf_info (VcfFile): VCF file info for hg19 template vcf. + hg38_vcf_info (VcfFile): VCF file info for hg38 template vcf. Returns: A tuple containing: assembly (str): The genome assembly information extracted from VCF header. modified_vcf_contents (List[str]): Modified VCF records with spiked variants. """ - # this is a separate function to a click command as it will fail if annotated with click annotations - # and referenced from another click command phenopacket_causative_variants = PhenopacketUtil(phenopacket).causative_variants() - vcf_contents = read_vcf(chosen_template_vcf) - vcf_header = VcfHeaderParser(vcf_contents).parse_vcf_header() - check_variant_assembly(phenopacket_causative_variants, vcf_header, phenopacket_path) + chosen_template_vcf = select_vcf_template( + phenopacket_path, phenopacket_causative_variants, hg19_vcf_info, hg38_vcf_info + ) + check_variant_assembly( + phenopacket_causative_variants, chosen_template_vcf.vcf_header, phenopacket_path + ) return ( - vcf_header.assembly, - VcfSpiker(vcf_contents, phenopacket_causative_variants, vcf_header).construct_vcf(), + chosen_template_vcf.vcf_header.assembly, + VcfSpiker( + chosen_template_vcf.vcf_contents, + phenopacket_causative_variants, + chosen_template_vcf.vcf_header, + ).construct_vcf(), ) @@ -418,7 +462,8 @@ def generate_spiked_vcf_file( output_dir: Path, phenopacket: Union[Phenopacket, Family], phenopacket_path: Path, - chosen_template_vcf: Path, + hg19_vcf_info: VcfFile, + hg38_vcf_info: VcfFile, ) -> File: """ Write spiked VCF contents to a new file. @@ -427,21 +472,17 @@ def generate_spiked_vcf_file( output_dir (Path): Path to the directory to store the generated file. phenopacket (Union[Phenopacket, Family]): Phenopacket or Family containing causative variants. phenopacket_path (Path): Path to the Phenopacket file. - chosen_template_vcf (Path): Path to the chosen template VCF file. - + hg19_vcf_info (VcfFile): VCF file info for hg19 template vcf. + hg38_vcf_info (VcfFile): VCF file info for hg38 template vcf. Returns: File: The generated File object representing the newly created spiked VCF file. """ output_dir.mkdir(exist_ok=True) info_log.info(f" Created a directory {output_dir}") vcf_assembly, spiked_vcf = spike_vcf_contents( - phenopacket, phenopacket_path, chosen_template_vcf - ) - spiked_vcf_path = ( - output_dir.joinpath(phenopacket_path.name.replace(".json", ".vcf.gz")) - if is_gzipped(chosen_template_vcf) - else output_dir.joinpath(phenopacket_path.name.replace(".json", ".vcf")) + phenopacket, phenopacket_path, hg19_vcf_info, hg38_vcf_info ) + spiked_vcf_path = output_dir.joinpath(phenopacket_path.name.replace(".json", ".vcf.gz")) VcfWriter(spiked_vcf, spiked_vcf_path).write_vcf_file() return File( uri=urllib.parse.unquote(spiked_vcf_path.as_uri()), @@ -449,8 +490,19 @@ def generate_spiked_vcf_file( ) +def spike_and_update_phenopacket(hg19_vcf_info, hg38_vcf_info, output_dir, phenopacket_path): + phenopacket = phenopacket_reader(phenopacket_path) + spiked_vcf_file_message = generate_spiked_vcf_file( + output_dir, phenopacket, phenopacket_path, hg19_vcf_info, hg38_vcf_info + ) + updated_phenopacket = PhenopacketRebuilder(phenopacket).add_spiked_vcf_path( + spiked_vcf_file_message + ) + write_phenopacket(updated_phenopacket, phenopacket_path) + + def create_spiked_vcf( - output_dir: Path, phenopacket_path: Path, template_vcf_path: Path, vcf_dir: Path + output_dir: Path, phenopacket_path: Path, hg19_template_vcf: Path, hg38_template_vcf: Path ) -> None: """ Create a spiked VCF for a Phenopacket. @@ -458,27 +510,21 @@ def create_spiked_vcf( Args: output_dir (Path): The directory to store the generated spiked VCF file. phenopacket_path (Path): Path to the Phenopacket file. - template_vcf_path (Path): Path to the template VCF file (optional). - vcf_dir (Path): Path to the directory containing VCF files (optional). + hg19_template_vcf (Path): Path to the hg19 template VCF file (optional). + hg38_template_vcf (Path): Path to the hg38 template VCF file (optional). Raises: - InputError: If both template_vcf_path and vcf_dir are None. + InputError: If both hg19_template_vcf and hg38_template_vcf are None. """ - if template_vcf_path is None and vcf_dir is None: - raise InputError("Either a template_vcf or vcf_dir must be specified") - vcf_file_path = VcfPicker(template_vcf_path, vcf_dir).pick_file() - phenopacket = phenopacket_reader(phenopacket_path) - spiked_vcf_file_message = generate_spiked_vcf_file( - output_dir, phenopacket, phenopacket_path, vcf_file_path - ) - updated_phenopacket = PhenopacketRebuilder(phenopacket).add_spiked_vcf_path( - spiked_vcf_file_message - ) - write_phenopacket(updated_phenopacket, phenopacket_path) + if hg19_template_vcf is None and hg38_template_vcf is None: + raise InputError("Either a hg19 template vcf or hg38 template vcf must be specified") + hg19_vcf_info = VcfFile.populate_fields(hg19_template_vcf) if hg19_template_vcf else None + hg38_vcf_info = VcfFile.populate_fields(hg38_template_vcf) if hg38_template_vcf else None + spike_and_update_phenopacket(hg19_vcf_info, hg38_vcf_info, output_dir, phenopacket_path) def create_spiked_vcfs( - output_dir: Path, phenopacket_dir: Path, template_vcf_path: Path, vcf_dir: Path + output_dir: Path, phenopacket_dir: Path, hg19_template_vcf: Path, hg38_template_vcf: Path ) -> None: """ Create a spiked VCF for a directory of Phenopackets. @@ -486,35 +532,26 @@ def create_spiked_vcfs( Args: output_dir (Path): The directory to store the generated spiked VCF file. phenopacket_dir (Path): Path to the Phenopacket directory. - template_vcf_path (Path): Path to the template VCF file (optional). - vcf_dir (Path): Path to the directory containing VCF files (optional). + hg19_template_vcf (Path): Path to the template hg19 VCF file (optional). + hg38_template_vcf (Path): Path to the template hg19 VCF file (optional). Raises: - InputError: If both template_vcf_path and vcf_dir are None. + InputError: If both hg19_template_vcf and hg38_template_vcf are None. """ - if template_vcf_path is None and vcf_dir is None: - raise InputError("Either a template_vcf or vcf_dir must be specified") + if hg19_template_vcf is None and hg38_template_vcf is None: + raise InputError("Either a hg19 template vcf or hg38 template vcf must be specified") + hg19_vcf_info = VcfFile.populate_fields(hg19_template_vcf) if hg19_template_vcf else None + hg38_vcf_info = VcfFile.populate_fields(hg38_template_vcf) if hg38_template_vcf else None for phenopacket_path in files_with_suffix(phenopacket_dir, ".json"): - vcf_file_path = VcfPicker(template_vcf_path, vcf_dir).pick_file() - phenopacket = phenopacket_reader(phenopacket_path) - spiked_vcf_file_message = generate_spiked_vcf_file( - output_dir, phenopacket, phenopacket_path, vcf_file_path - ) - updated_phenopacket = PhenopacketRebuilder(phenopacket).add_spiked_vcf_path( - spiked_vcf_file_message - ) - write_phenopacket(updated_phenopacket, phenopacket_path) - # or made a lambda one-liner for maximum wtf... - # [spike_vcf(path, output_dir, template_vcf, vcf_dir) for path in phenopacket_dir.iterdir() if path.suffix == - # ".json"] + spike_and_update_phenopacket(hg19_vcf_info, hg38_vcf_info, output_dir, phenopacket_path) def spike_vcfs( output_dir: Path, phenopacket_path: Path, phenopacket_dir: Path, - template_vcf_path: Path, - vcf_dir: Path, + hg19_template_vcf: Path, + hg38_template_vcf: Path, ) -> None: """ Create spiked VCF from either a Phenopacket or a Phenopacket directory. @@ -523,10 +560,10 @@ def spike_vcfs( output_dir (Path): The directory to store the generated spiked VCF file(s). phenopacket_path (Path): Path to a single Phenopacket file (optional). phenopacket_dir (Path): Path to a directory containing Phenopacket files (optional). - template_vcf_path (Path): Path to the template VCF file (optional). - vcf_dir (Path): Path to the directory containing VCF files (optional). + hg19_template_vcf (Path): Path to the hg19 template VCF file (optional). + hg38_template_vcf (Path): Path to the hg38 template VCF file (optional). """ if phenopacket_path is not None: - create_spiked_vcf(output_dir, phenopacket_path, template_vcf_path, vcf_dir) + create_spiked_vcf(output_dir, phenopacket_path, hg19_template_vcf, hg38_template_vcf) elif phenopacket_dir is not None: - create_spiked_vcfs(output_dir, phenopacket_dir, template_vcf_path, vcf_dir) + create_spiked_vcfs(output_dir, phenopacket_dir, hg19_template_vcf, hg38_template_vcf) diff --git a/tests/test_create_spiked_vcf.py b/tests/test_create_spiked_vcf.py index f8465fa50..1667ec0c2 100644 --- a/tests/test_create_spiked_vcf.py +++ b/tests/test_create_spiked_vcf.py @@ -4,7 +4,6 @@ from pheval.prepare.create_spiked_vcf import ( VcfHeader, VcfHeaderParser, - VcfPicker, VcfSpiker, check_variant_assembly, read_vcf, @@ -131,21 +130,6 @@ ] -class TestVcfPicker(unittest.TestCase): - @classmethod - def setUpClass(cls) -> None: - cls.vcf_file = Path("./tests/input_dir/test_vcf_dir/test_1.vcf") - cls.vcf_dir = Path("./tests/input_dir/test_vcf_dir/") - - def test_pick_file_from_dir(self): - self.assertTrue( - "test_1.vcf" or "test_2.vcf.gz" == VcfPicker(None, self.vcf_dir).pick_file_from_dir() - ) - - def test_pick_file(self): - self.assertEqual("test_1.vcf", VcfPicker(self.vcf_file, None).pick_file().name) - - class TestReadVcf(unittest.TestCase): @classmethod def setUpClass(cls) -> None: