Skip to content

Commit

Permalink
TSPS-341 remove tasks for recovering variants not in the reference pa…
Browse files Browse the repository at this point in the history
…nel (#1468)

* remove tasks for recovering variants not in the reference panel and separate out beagle tasks from imputation tasks

* remove prechunk wdl and references to it
remove "Beagle" from task names in BeagleTasks.wdl

---------

Co-authored-by: Jose Soto <[email protected]>
  • Loading branch information
jsotobroad and Jose Soto authored Jan 9, 2025
1 parent e51628a commit 79d3510
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 678 deletions.
4 changes: 0 additions & 4 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ workflows:
subclass: WDL
primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl

- name: ImputationBeaglePreChunk
subclass: WDL
primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.wdl

- name: LiftoverVcfs
subclass: WDL
primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/LiftoverVcfs.wdl
Expand Down
101 changes: 17 additions & 84 deletions pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
version 1.0

import "../../../../tasks/broad/ImputationTasks.wdl" as tasks
import "../../../../tasks/broad/ImputationBeagleTasks.wdl" as beagleTasks

workflow ImputationBeagle {

Expand Down Expand Up @@ -84,58 +85,38 @@ workflow ImputationBeagle {
gatk_docker = gatk_docker
}

call tasks.CountVariantsInChunksBeagle {
call beagleTasks.CountVariantsInChunks {
input:
vcf = GenerateChunk.output_vcf,
vcf_index = GenerateChunk.output_vcf_index,
panel_bed_file = referencePanelContig.bed,
gatk_docker = gatk_docker
}

call tasks.CheckChunksBeagle {
call beagleTasks.CheckChunks {
input:
var_in_original = CountVariantsInChunksBeagle.var_in_original,
var_also_in_reference = CountVariantsInChunksBeagle.var_also_in_reference
}

# create chunk without overlaps to get sites to impute
call tasks.SubsetVcfToRegion {
input:
vcf = CreateVcfIndex.vcf,
vcf_index = CreateVcfIndex.vcf_index,
output_basename = "input_samples_subset_to_chunk",
contig = referencePanelContig.contig,
start = start,
end = end,
gatk_docker = gatk_docker
}

call tasks.SetIDs as SetIdsVcfToImpute {
input:
vcf = SubsetVcfToRegion.output_vcf,
output_basename = "input_samples_with_variant_ids"
var_in_original = CountVariantsInChunks.var_in_original,
var_also_in_reference = CountVariantsInChunks.var_also_in_reference
}
}

Array[File] chunkedVcfsWithOverlapsForImputation = GenerateChunk.output_vcf
Array[File] chunkedVcfsWithoutOverlapsForSiteIds = SetIdsVcfToImpute.output_vcf
Array[File] chunkedVcfIndexesWithoutOverlapsForSiteIds = SetIdsVcfToImpute.output_vcf_index

call tasks.StoreChunksInfo as StoreContigLevelChunksInfo {
input:
chroms = chunk_contig,
starts = start,
ends = end,
vars_in_array = CountVariantsInChunksBeagle.var_in_original,
vars_in_panel = CountVariantsInChunksBeagle.var_also_in_reference,
valids = CheckChunksBeagle.valid,
vars_in_array = CountVariantsInChunks.var_in_original,
vars_in_panel = CountVariantsInChunks.var_also_in_reference,
valids = CheckChunks.valid,
basename = output_basename
}

# if any chunk for any chromosome fail CheckChunks, then we will not impute run any task in the next scatter,
# namely phasing and imputing which would be the most costly to throw away
Int n_failed_chunks_int = select_first([error_count_override, read_int(StoreContigLevelChunksInfo.n_failed_chunks)])
call tasks.ErrorWithMessageIfErrorCountNotZero as FailQCNChunks {
call beagleTasks.ErrorWithMessageIfErrorCountNotZero as FailQCNChunks {
input:
errorCount = n_failed_chunks_int,
message = "contig " + referencePanelContig.contig + " had " + n_failed_chunks_int + " failing chunks"
Expand All @@ -144,21 +125,14 @@ workflow ImputationBeagle {
scatter (i in range(num_chunks)) {
String chunk_basename_imputed = referencePanelContig.contig + "_chunk_" + i + "_imputed"

call tasks.ExtractIDs as ExtractIdsVcfToImpute {
input:
vcf = chunkedVcfsWithoutOverlapsForSiteIds[i],
output_basename = "imputed_sites",
for_dependency = FailQCNChunks.done # these shenanigans can be replaced with `after` in wdl 1.1
}

# max amount of cpus you can ask for is 96 so at a max of 10k samples we can only ask for 9 cpu a sample.
# these values are based on trying to optimize for pre-emptibility using a 400k sample referene panel
# and up to a 10k sample input vcf
Int beagle_cpu = if (CountSamples.nSamples <= 1000) then 8 else floor(CountSamples.nSamples / 1000) * 9
Int beagle_phase_memory_in_gb = if (CountSamples.nSamples <= 1000) then 22 else ceil(beagle_cpu * 1.5)
Int beagle_impute_memory_in_gb = if (CountSamples.nSamples <= 1000) then 30 else ceil(beagle_cpu * 4.3)

call tasks.PhaseBeagle {
call beagleTasks.Phase {
input:
dataset_vcf = chunkedVcfsWithOverlapsForImputation[i],
ref_panel_bref3 = referencePanelContig.bref3,
Expand All @@ -171,9 +145,9 @@ workflow ImputationBeagle {
memory_mb = beagle_phase_memory_in_gb * 1024
}

call tasks.ImputeBeagle {
call beagleTasks.Impute {
input:
dataset_vcf = PhaseBeagle.vcf,
dataset_vcf = Phase.vcf,
ref_panel_bref3 = referencePanelContig.bref3,
chrom = referencePanelContig.contig,
basename = chunk_basename_imputed,
Expand All @@ -186,7 +160,7 @@ workflow ImputationBeagle {

call tasks.CreateVcfIndex as IndexImputedBeagle {
input:
vcf_input = ImputeBeagle.vcf,
vcf_input = Impute.vcf,
gatk_docker = gatk_docker
}

Expand Down Expand Up @@ -214,50 +188,9 @@ workflow ImputationBeagle {
output_basename = chunk_basename_imputed,
gatk_docker = gatk_docker
}

call tasks.SetIDs {
input:
vcf = RemoveSymbolicAlleles.output_vcf,
output_basename = chunk_basename_imputed
}

call tasks.ExtractIDs {
input:
vcf = SetIDs.output_vcf,
output_basename = "imputed_sites"
}

call tasks.FindSitesUniqueToFileTwoOnly {
input:
file1 = select_first([ExtractIDs.ids, write_lines([])]),
file2 = ExtractIdsVcfToImpute.ids,
ubuntu_docker = ubuntu_docker
}

call tasks.SelectVariantsByIds {
input:
vcf = chunkedVcfsWithoutOverlapsForSiteIds[i],
vcf_index = chunkedVcfIndexesWithoutOverlapsForSiteIds[i],
ids = FindSitesUniqueToFileTwoOnly.missing_sites,
basename = "imputed_sites_to_recover",
gatk_docker = gatk_docker
}

call tasks.RemoveAnnotations {
input:
vcf = SelectVariantsByIds.output_vcf,
basename = "imputed_sites_to_recover_annotations_removed"
}

call tasks.InterleaveVariants {
input:
vcfs = select_all([RemoveAnnotations.output_vcf, SetIDs.output_vcf]),
basename = output_basename, # TODO consider using a contig/chunk labeled basename
gatk_docker = gatk_docker
}
}

Array[File] chromosome_vcfs = select_all(InterleaveVariants.output_vcf)
Array[File] chromosome_vcfs = select_all(RemoveSymbolicAlleles.output_vcf)
}

call tasks.GatherVcfs {
Expand All @@ -272,9 +205,9 @@ workflow ImputationBeagle {
chroms = flatten(chunk_contig),
starts = flatten(start),
ends = flatten(end),
vars_in_array = flatten(CountVariantsInChunksBeagle.var_in_original),
vars_in_panel = flatten(CountVariantsInChunksBeagle.var_also_in_reference),
valids = flatten(CheckChunksBeagle.valid),
vars_in_array = flatten(CountVariantsInChunks.var_in_original),
vars_in_panel = flatten(CountVariantsInChunks.var_also_in_reference),
valids = flatten(CheckChunks.valid),
basename = output_basename
}

Expand Down

This file was deleted.

Loading

0 comments on commit 79d3510

Please sign in to comment.