diff --git a/.gitmodules b/.gitmodules index 3d3092143..18b4a0d56 100644 --- a/.gitmodules +++ b/.gitmodules @@ -31,3 +31,6 @@ [submodule "submodules/red"] path = submodules/red url = https://github.com/glennhickey/red.git +[submodule "submodules/collapse-bubble"] + path = submodules/collapse-bubble + url = https://github.com/Han-Cao/collapse-bubble.git diff --git a/Makefile b/Makefile index 6ab7d972b..0cfcdde14 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ modules = api setup caf bar hal reference pipeline preprocessor # submodules are in multiple pass to handle dependencies cactus2hal being dependent on # both cactus and sonLib -submodules1 = sonLib cPecan hal matchingAndOrdering pinchesAndCacti abPOA lastz paffy red +submodules1 = sonLib cPecan hal matchingAndOrdering pinchesAndCacti abPOA lastz paffy red collapse-bubble submodules2 = cactus2hal submodules = ${submodules1} ${submodules2} @@ -239,12 +239,12 @@ suball.cPecan: suball.sonLib suball.cactus2hal: suball.sonLib suball.hal all_libs.api cd submodules/cactus2hal && ${MAKE} - mkdir -p bin + mkdir -p ${BINDIR} ${LIBDIR} ${INCLDIR} -ln -f submodules/cactus2hal/bin/* bin/ suball.hal: suball.sonLib cd submodules/hal && ${MAKE} - mkdir -p bin + mkdir -p ${BINDIR} ${LIBDIR} ${INCLDIR} -ln -f submodules/hal/bin/* bin/ -ln -f submodules/hal/lib/libHal.a submodules/hal/lib/halLib.a @@ -256,7 +256,7 @@ suball.abPOA: suball.lastz: cd submodules/lastz && ${MAKE} - mkdir -p bin + mkdir -p ${BINDIR} ${LIBDIR} ${INCLDIR} ln -f submodules/lastz/src/lastz bin suball.paffy: @@ -271,8 +271,12 @@ suball.red: cd submodules/red && ${MAKE} ln -f submodules/red/bin/Red ${BINDIR} +suball.collapse-bubble: + chmod +x submodules/collapse-bubble/scripts/merge_duplicates.py + ln -f submodules/collapse-bubble/scripts/merge_duplicates.py ${BINDIR} + subclean.%: - cd submodules/$* && ${MAKE} clean + if [ $(shell ls submodules/$* | grep -i makefile | wc -l) != 0 ]; then cd submodules/$* && ${MAKE} clean; fi ## # docker diff --git a/README.md b/README.md index 66040d196..132607026 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ Cactus uses many different algorithms and individual code contributions, princip - Andrea Guarracino, Erik Garrison and co-authors for [odgi](https://github.com/pangenome/odgi). Make sure to [cite odgi](https://doi.org/10.1093/bioinformatics/btac308) when using it or its visualizations. - Hani Z. Girgis for [RED](http://toolsmith.ens.utulsa.edu/) - Erik Garrison and co-authors for [vcfwave](https://github.com/vcflib/vcflib/blob/master/doc/vcfwave.md). [vcflib citation](https://doi.org/10.1371/journal.pcbi.1009123) +- Martin Frith and collaborators for `last-train` from [last](https://gitlab.com/mcfrith/last). [last-train citation](https://academic.oup.com/bioinformatics/article-lookup/doi/10.1093/bioinformatics/btw742) +- Han Cao for [merge_duplicates](https://github.com/Han-Cao/collapse-bubble) vcf cleanup script ## Installing Manually From Source diff --git a/build-tools/downloadPangenomeTools b/build-tools/downloadPangenomeTools index 7e3fbe1cf..c6664fc65 100755 --- a/build-tools/downloadPangenomeTools +++ b/build-tools/downloadPangenomeTools @@ -34,6 +34,7 @@ set -x rm -rf ${pangenomeBuildDir} mkdir -p ${pangenomeBuildDir} mkdir -p ${binDir} +mkdir -p ${libDir} # minimap2 if [ -z ${arm+x} ] diff --git a/build-tools/downloadVCFWave b/build-tools/downloadVCFWave index e0d7b7991..26ffa0fb7 100755 --- a/build-tools/downloadVCFWave +++ b/build-tools/downloadVCFWave @@ -20,6 +20,7 @@ set -x rm -rf ${pangenomeBuildDir} mkdir -p ${pangenomeBuildDir} mkdir -p ${binDir} +mkdir -p ${libDir} cd ${pangenomeBuildDir} git clone --recursive https://github.com/vcflib/vcflib.git @@ -29,7 +30,7 @@ mkdir build cd build cmake -DZIG=OFF -DWFA_GITMODULE=ON -DCMAKE_BUILD_TYPE=Debug .. cmake --build . -- -j ${numcpu} -mv vcfwave vcfcreatemulti vcfbreakmulti vcfuniq ${binDir} +mv vcfwave vcfcreatemulti vcfbreakmulti vcfuniq vcffixup ${binDir} mv ./contrib/WFA2-lib/libwfa2.so.0 ${libDir} cd ${CWD} diff --git a/setup.py b/setup.py index 877b43fa8..7c8d9a620 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,8 @@ def run(self): 'networkx>=2,<2.8.1', 'pytest', 'cigar', - 'biopython'], + 'biopython', + 'pysam'], cmdclass = { 'install': PostInstallCommand, diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 5d5edc656..dd4ab0653 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -420,8 +420,10 @@ + + diff --git a/src/cactus/refmap/cactus_graphmap_join.py b/src/cactus/refmap/cactus_graphmap_join.py index 7f1925b72..0918aff7f 100644 --- a/src/cactus/refmap/cactus_graphmap_join.py +++ b/src/cactus/refmap/cactus_graphmap_join.py @@ -27,6 +27,8 @@ import timeit import re import subprocess +import gzip +import shutil from operator import itemgetter from collections import defaultdict @@ -57,7 +59,9 @@ from sonLib.nxnewick import NXNewick from sonLib.bioio import getTempDirectory, getTempFile, catFiles - + +import pysam + def main(): parser = ArgumentParser() Job.Runner.addToilOptions(parser) @@ -392,6 +396,14 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): if options.vcfwave and options.vcf: vcfwave_check_job = root_job.addFollowOnJobFn(check_vcfwave) root_job = vcfwave_check_job + + # vcffixup isn't included in the static binary release, so we start by checking it's available + merge_dup = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "mergeDuplicatesOptions", typeFn=str, default=None) not in [None, "0"] + wave_norm = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "vcfwaveNorm", typeFn=str, default=True) + bub_norm = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "bcftoolsNorm", typeFn=str, default=False) + if options.vcf and merge_dup and (bub_norm or (options.vcfwave and wave_norm)): + vcffixup_check_job = root_job.addFollowOnJobFn(check_vcffixup) + root_job = vcffixup_check_job # make sure reference doesn't have a haplotype suffix, as it will have been changed upstream ref_base, ref_ext = os.path.splitext(options.reference[0]) @@ -598,7 +610,6 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): ref_fasta_job.rv() if ref_fasta_job else None) if ref_fasta_job: ref_fasta_job.addFollowOn(vcf_job) - out_dicts.append(vcf_job.rv()) # optional giraffe @@ -1036,7 +1047,7 @@ def make_vcf(job, config, options, workflow_phase, index_mem, vcf_ref, vg_ids, r bub_vcf_tbi_ids.append((bub_vcf_id, bub_tbi_id)) if options.vcfwave: - vcfwave_job = deconstruct_job.addFollowOnJobFn(vcfwave, config, options.outName, vcf_ref, + vcfwave_job = deconstruct_job.addFollowOnJobFn(chunked_vcfwave, config, options.outName, vcf_ref, raw_vcf_id, raw_tbi_id, options.vcfbub, ref_fasta_dict, @@ -1048,17 +1059,23 @@ def make_vcf(job, config, options, workflow_phase, index_mem, vcf_ref, vg_ids, r wave_vcf_tbi_ids.append((wave_vcf_id, wave_tbi_id)) merge_vcf_job = root_job.addFollowOnJobFn(vcf_cat, raw_vcf_tbi_ids, vcftag + '.raw.', - disk = sum(f.size for f in vg_ids) * 16) + fix_ploidies=True, + disk = sum(f.size for f in vg_ids) * 16, + memory = cactus_clamp_memory(sum(f.size for f in vg_ids))) out_dict = {'{}.raw.vcf.gz'.format(vcftag) : merge_vcf_job.rv(0), '{}.raw.vcf.gz.tbi'.format(vcftag) : merge_vcf_job.rv(1) } if bub_vcf_tbi_ids: merge_bub_job = root_job.addFollowOnJobFn(vcf_cat, bub_vcf_tbi_ids, vcftag + '.bub.', - disk = sum(f.size for f in vg_ids) * 16) + fix_ploidies=True, + disk = sum(f.size for f in vg_ids) * 16, + memory = cactus_clamp_memory(sum(f.size for f in vg_ids))) out_dict['{}.vcf.gz'.format(vcftag)] = merge_bub_job.rv(0) out_dict['{}.vcf.gz.tbi'.format(vcftag)] = merge_bub_job.rv(1) if wave_vcf_tbi_ids: merge_wave_job = root_job.addFollowOnJobFn(vcf_cat, wave_vcf_tbi_ids, vcftag + '.wave.', - disk = sum(f.size for f in vg_ids) * 16) + fix_ploidies=True, + disk = sum(f.size for f in vg_ids) * 16, + memory = cactus_clamp_memory(sum(f.size for f in vg_ids))) out_dict['{}.wave.vcf.gz'.format(vcftag)] = merge_wave_job.rv(0) out_dict['{}.wave.vcf.gz.tbi'.format(vcftag)] = merge_wave_job.rv(1) @@ -1100,35 +1117,128 @@ def vcfbub(job, config, out_name, vcf_ref, vcf_id, tbi_id, max_ref_allele, fasta # run vcfbub vcfbub_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + tag + 'bub.vcf.gz') assert max_ref_allele - bub_cmd = [['vcfbub', '--input', vcf_path, '--max-ref-length', str(max_ref_allele), '--max-level', '0']] - if getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "bcftoolsNorm", typeFn=bool, default=False): - fa_ref_path = os.path.join(work_dir, tag + 'fa.gz') - job.fileStore.readGlobalFile(fasta_ref_dict[vcf_ref], fa_ref_path) - bub_cmd.append(['bcftools', 'norm', '-f', fa_ref_path]) - bub_cmd.append(['bcftools', 'norm', '-m', '+any']) - bub_cmd.append(['bcftools', 'sort', '-T', os.path.join(work_dir, 'bcftools.XXXXXX')]) - bub_cmd.append(['bgzip', '--threads', str(job.cores)]) - cactus_call(parameters=bub_cmd, outfile=vcfbub_path) + cactus_call(parameters = [['vcfbub', '--input', vcf_path, '--max-ref-length', str(max_ref_allele), '--max-level', '0'], + ['bgzip']], outfile = vcfbub_path) + try: cactus_call(parameters=['tabix', '-p', 'vcf', vcfbub_path]) tbi_path = vcfbub_path + '.tbi' except Exception as e: cactus_call(parameters=['bcftools', 'index', '-c', vcfbub_path]) - tbi_path = vcfbub_path + '.csi' + tbi_path = vcfbub_path + '.csi' + + bub_vcf_id = job.fileStore.writeGlobalFile(vcfbub_path) + bub_tbi_id = job.fileStore.writeGlobalFile(tbi_path) + + if getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "bcftoolsNorm", typeFn=bool, default=False): + norm_job = job.addChildJobFn(vcfnorm, config, vcf_ref, bub_vcf_id, vcfbub_path, bub_tbi_id, fasta_ref_dict, + disk=bub_vcf_id.size * 6) + return norm_job.rv() + else: + return bub_vcf_id, bub_tbi_id - return job.fileStore.writeGlobalFile(vcfbub_path), job.fileStore.writeGlobalFile(tbi_path) +def vcfnorm(job, config, vcf_ref, vcf_id, vcf_path, tbi_id, fasta_ref_dict): + """ run bcftools norm and merge_duplicates.py """ + work_dir = job.fileStore.getLocalTempDir() + vcf_path = os.path.join(work_dir, os.path.basename(vcf_path)) + job.fileStore.readGlobalFile(vcf_id, vcf_path) + job.fileStore.readGlobalFile(tbi_id, vcf_path + '.tbi') + fa_ref_path = os.path.join(work_dir, vcf_ref + '.fa.gz') + job.fileStore.readGlobalFile(fasta_ref_dict[vcf_ref], fa_ref_path) + + norm_path = os.path.join(work_dir, 'norm.' + os.path.basename(vcf_path)) + cactus_call(parameters=['bcftools', 'view', '-h', '-Oz', vcf_path], outfile=norm_path) + cactus_call(parameters=[['bcftools', 'norm', '-m', '-any', vcf_path], + ['bcftools', 'norm', '-f', fa_ref_path], + ['bcftools', 'view', '-H'], + ['sort', '-k1,1d', '-k2,2n', '-s', '-T', work_dir], + ['bgzip']], outfile=norm_path, outappend=True) + + merge_duplicates_opts = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "mergeDuplicatesOptions", typeFn=str, default=None) + if merge_duplicates_opts not in [None, "0"]: + #note: merge_duplcates complains about not having a .tbi but I don't think it actually affects anything + merge_path = os.path.join(work_dir, 'merge.' + os.path.basename(vcf_path)) + merge_cmd = ['merge_duplicates.py', '-i', norm_path, '-o', merge_path] + if merge_duplicates_opts: + merge_cmd.append(merge_duplicates_opts.split(' ')) + cactus_call(parameters=merge_cmd) + norm_path = merge_path + + multi_path = os.path.join(work_dir, 'multi.' + os.path.basename(vcf_path)) + postmerge_cmd = [['bcftools', 'norm', '-m', '+any', norm_path]] + if merge_duplicates_opts not in [None, "0"]: + # note, we use vcffixup to fix up AC, AF, NS tags. bcftools +fill-tags is about 1000 times faster + # and does more, but isn't in the static binary being delivered with cactus so am using vcflib + # for now (it's not in the binary either, but is already necessary for vcfwave) + postmerge_cmd.append(['vcffixup', '-']) + postmerge_cmd.append(['bgzip']) + cactus_call(parameters=postmerge_cmd, outfile=multi_path) + + try: + cactus_call(parameters=['tabix', '-p', 'vcf', multi_path]) + tbi_path = multi_path + '.tbi' + except Exception as e: + cactus_call(parameters=['bcftools', 'index', '-c', multi_path]) + tbi_path = multi_path + '.csi' + + job.fileStore.deleteGlobalFile(vcf_id) + job.fileStore.deleteGlobalFile(tbi_id) + + return job.fileStore.writeGlobalFile(multi_path), job.fileStore.writeGlobalFile(tbi_path) + +def fix_vcf_ploidies(in_vcf_path, out_vcf_path): + """ since we're deconstructing chromosomes independently, we can have cases where a sample + is haploid in one chromosome (ex Y) but diploid in other chromosomes. this will almost + certainly upset some downstream tools, which will be expecting consistent ploidies + across the whole vcf (which you would get if deconstructing the whole genome at once). + this function smooths it over, by doing two scans. 1) find the max ploidy of each sample + 2) add dots to each GT to make sure each line gets this ploidy + """ + sample_to_ploidy = {} + in_vcf = pysam.VariantFile(in_vcf_path, 'rb') + + # pass 1: find the (max) ploidy of every sample, assuming phased + for var in in_vcf.fetch(): + for sample in var.samples.values(): + ploidy = len(sample['GT']) + cur_ploidy = 0 if sample.name not in sample_to_ploidy else sample_to_ploidy[sample.name] + sample_to_ploidy[sample.name] = max(ploidy, cur_ploidy) + + # pass 2: correct the GTs + out_vcf = pysam.VariantFile(out_vcf_path, 'w', header=in_vcf.header) + for var in in_vcf.fetch(): + for sample in var.samples.values(): + ploidy_delta = sample_to_ploidy[sample.name] - len(sample['GT']) + if ploidy_delta > 0: + gt = list(sample['GT']) + for i in range(ploidy_delta): + gt.append(None) + sample['GT'] = tuple(gt) + sample.phased = True + + out_vcf.write(var) + + in_vcf.close() + out_vcf.close() def check_vcfwave(job): """ check to make sure vcfwave is installed """ try: cactus_call(parameters=['vcfwave', '-h']) except: - raise RuntimeError('--vcfwave option used, but vcfwave tool not found in PATH. vcfwave is *not* included in the cactus binary release, but it is in the cactus Docker image. If you have Docker installed, you can try running again with --binariesMode docker. Or running your whole command with docker run. If you cannot use Docker, then you will need to build vcflib yourself before retrying: source code and details here: https://github.com/vcflib/vcflib') + raise RuntimeError('--vcfwave option used, but vcfwave tool not found in PATH. vcfwave is *not* included in the cactus binary release, but it is in the cactus Docker image. If you have Docker installed, you can try running again with --binariesMode docker. Or running your whole command with docker run. If you cannot use Docker, then you will need to build vcflib yourself before retrying: source code and details here: https://github.com/vcflib/vcflib. Running the ./build-tools/downloadVCFWave script (from the cactus/ directory) will attemp to download and build vcfwave.') -def vcfwave(job, config, out_name, vcf_ref, vcf_id, tbi_id, max_ref_allele, fasta_ref_dict, tag): - """ run vcfwave """ +def check_vcffixup(job): + """ check to make sure vcffixup is installed """ + try: + cactus_call(parameters=[['echo', '##fileformat=VCFv4.2'], ['vcffixup', '-']]) + except: + raise RuntimeError('vcf normalization with merge_duplicates enabled, but vcffixup tool (used in postprocessing) not found in PATH. vcffixup is *not* included in the cactus binary release, but it is in the cactus Docker image. If you have Docker installed, you can try running again with --binariesMode docker. Or running your whole command with docker run. If you cannot use Docker, then you will need to build vcflib yourself before retrying: source code and details here: https://github.com/vcflib/vcflib. Running the ./build-tools/downloadVCFWave script (from the cactus/ directory) will attemp to download and build vcfwave.') + +def chunked_vcfwave(job, config, out_name, vcf_ref, vcf_id, tbi_id, max_ref_allele, fasta_ref_dict, tag): + """ run vcfwave in parallel chunks """ work_dir = job.fileStore.getLocalTempDir() - vcf_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + vcf_ref + tag + 'raw.vcf.gz') + vcf_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + vcf_ref + '.' + tag + 'raw.vcf.gz') job.fileStore.readGlobalFile(vcf_id, vcf_path) job.fileStore.readGlobalFile(tbi_id, vcf_path + '.tbi') @@ -1137,33 +1247,113 @@ def vcfwave(job, config, out_name, vcf_ref, vcf_id, tbi_id, max_ref_allele, fast shell=True).decode('utf-8').strip()) == 0: return vcf_id, tbi_id - # run vcfbub and vcfwave, using original HPRC recipe - vcfwave_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + vcf_ref + tag + 'wave.vcf.gz') - wave_opts = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "vcfwaveOptions", typeFn=str, default=None) - assert wave_opts - bubwave_cmd = [['vcfbub', '--input', vcf_path, '-l', '0', '-a', str(max_ref_allele)], - ['bcftools', 'annotate', '-x', 'INFO/AT'], - ['vcfwave'] + wave_opts.split(' ') + ['-t', str(job.cores)]] + # run vcfbub using original HPRC recipe + # allele splitting added here as vcfwave has history of trouble with multi-allelic sites + vcfbub_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + vcf_ref + '.' + tag + 'bub.vcf.gz') + bub_cmd = [['vcfbub', '--input', vcf_path, '-l', '0', '-a', str(max_ref_allele)], + ['bcftools', 'annotate', '-x', 'INFO/AT'], + ['bcftools', 'norm', '-m', '-any'], + ['bgzip', '--threads', str(job.cores)]] + cactus_call(parameters=bub_cmd, outfile=vcfbub_path) + # count the lines in the vcf + lines = int(cactus_call(parameters=[['bcftools', 'view', '-H', vcfbub_path], ['wc', '-l']], check_output=True).strip()) + + # get the chunk size + chunk_lines = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "vcfwaveChunkLines", typeFn=int, default=None) + # sanity check + if chunk_lines and lines / chunk_lines >= 1000: + chunk_lines = int(lines / 1000) + + # make a bunch of chunks, storing their paths here + chunk_paths = [] + if chunk_lines and lines > chunk_lines: + header_path = os.path.join(work_dir, 'header.vcf') + cactus_call(parameters=['bcftools', 'view', '-h', vcfbub_path], outfile=header_path) + with gzip.open(vcfbub_path, 'rb') as bubfile: + chunk_file = None + i = 0 + for line in bubfile: + line = line.decode() + if line.startswith('#'): + continue + if i % chunk_lines == 0: + chunk_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + + vcf_ref + tag + 'chunk{}.vcf'.format(len(chunk_paths))) + if chunk_file: + chunk_file.close() + cactus_call(parameters=['bgzip', chunk_paths[-1], '--threads', str(job.cores)]) + chunk_paths[-1] += '.gz' + shutil.copy(header_path, chunk_path) + chunk_file = open(chunk_path, 'a') + chunk_paths.append(chunk_path) + chunk_file.write(line) + i += 1 + assert chunk_file + chunk_file.close() + cactus_call(parameters=['bgzip', chunk_paths[-1], '--threads', str(job.cores)]) + chunk_paths[-1] += '.gz' + else: + # no chunks, just use the original + chunk_paths = [vcfbub_path] + + # distribute on the chunks + root_job = Job() + job.addChild(root_job) + chunk_vcf_tbi_ids = [] + for chunk_path in chunk_paths: + chunk_id = job.fileStore.writeGlobalFile(chunk_path) + vcfwave_job = root_job.addChildJobFn(vcfwave, config, chunk_path, chunk_id, + disk=chunk_id.size * 10, cores=job.cores, memory=job.memory) + chunk_vcf_tbi_ids.append(vcfwave_job.rv()) + + # combine the chunks + ## + ## Note: fix_ploidies should be false here. But due to a bug in deconstrut we set it to true + ## it's resolved here: https://github.com/vgteam/vg/pull/4497 + ## so we can toggle it back once cactus is updated to use the next vg release + ## + vcfwave_cat_job = root_job.addFollowOnJobFn(vcf_cat, chunk_vcf_tbi_ids, tag, sort=True, + fix_ploidies=False, + disk=vcf_id.size * 10, + memory=cactus_clamp_memory(vcf_id.size*5)) + + # normalize the output if getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "vcfwaveNorm", typeFn=bool, default=True): - fa_ref_path = os.path.join(work_dir, tag + 'fa.gz') - job.fileStore.readGlobalFile(fasta_ref_dict[vcf_ref], fa_ref_path) - bubwave_cmd.append(['bcftools', 'norm', '-f', fa_ref_path]) - bubwave_cmd.append(['bcftools', 'sort', '-T', os.path.join(work_dir, 'bcftools.XXXXXX')]) - bubwave_cmd.append(['bgzip']) + vcfwave_path = os.path.join(work_dir, os.path.basename(out_name) + '.' + vcf_ref + '.' + tag + 'wave.vcf.gz') - cactus_call(parameters=bubwave_cmd, outfile=vcfwave_path) - try: - cactus_call(parameters=['tabix', '-p', 'vcf', vcfwave_path]) - tbi_path = vcfwave_path + '.tbi' - except Exception as e: - cactus_call(parameters=['bcftools', 'index', '-c', vcfwave_path]) - tbi_path = vcfwave_path + '.csi' + norm_job = vcfwave_cat_job.addFollowOnJobFn(vcfnorm, config, vcf_ref, vcfwave_cat_job.rv(0), + vcfwave_path, vcfwave_cat_job.rv(1), fasta_ref_dict, + disk=vcf_id.size*12, + memory=cactus_clamp_memory(vcf_id.size*5)) + return norm_job.rv() + else: + return vcfwave_cat_job.rv() + +def vcfwave(job, config, vcf_path, vcf_id): + """ run vcfwave """ + work_dir = job.fileStore.getLocalTempDir() + vcf_path = os.path.join(work_dir, os.path.basename(vcf_path)) + job.fileStore.readGlobalFile(vcf_id, vcf_path) + + # short circuit on empty file (note zcat -> head exits 141, so we can't use cactus_call) + if int(subprocess.check_output('gzip -dc {} | grep -v ^# | head | wc -l'.format(vcf_path), + shell=True).decode('utf-8').strip()) == 0: + return vcf_id, None + + # run vcfwave + vcfwave_path = os.path.join(work_dir, 'wave.{}'.format(os.path.basename(vcf_path))) + wave_opts = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "vcfwaveOptions", typeFn=str, default=None) + assert wave_opts + cactus_call(parameters=[['zcat', vcf_path], + ['vcfwave'] + wave_opts.split(' ') + ['-t', str(job.cores)], + ['bgzip']], outfile=vcfwave_path) - return job.fileStore.writeGlobalFile(vcfwave_path), job.fileStore.writeGlobalFile(tbi_path) + job.fileStore.deleteGlobalFile(vcf_id) + return job.fileStore.writeGlobalFile(vcfwave_path), None -def vcf_cat(job, vcf_tbi_ids, tag): - """ concat some vcf files """ +def vcf_cat(job, vcf_tbi_ids, tag, sort=False, fix_ploidies=True): + """ concat some vcf files, optionally do a (stable) sort of the results """ work_dir = job.fileStore.getLocalTempDir() vcf_paths = [] sample_sets = [] @@ -1171,7 +1361,8 @@ def vcf_cat(job, vcf_tbi_ids, tag): for i, (vcf_id, tbi_id) in enumerate(vcf_tbi_ids): vcf_path = os.path.join(work_dir, '{}.{}vcf.gz'.format(i, tag)) job.fileStore.readGlobalFile(vcf_id, vcf_path) - job.fileStore.readGlobalFile(tbi_id, vcf_path + '.tbi') + if not sort: + job.fileStore.readGlobalFile(tbi_id, vcf_path + '.tbi') vcf_paths.append(vcf_path) samples = cactus_call(parameters=['bcftools', 'query', '-l', vcf_path], check_output=True).strip().split('\n') all_sample_set.update(samples) @@ -1213,6 +1404,21 @@ def vcf_cat(job, vcf_tbi_ids, tag): [os.path.basename(vcf_path) for vcf_path in vcf_paths], work_dir=work_dir, outfile=cat_vcf_path) + if sort: + # stable sort, which is apparently not guaranteed by bcftools sort + # (this could be useful for merge_duplicates.py) + sort_vcf_path = os.path.join(work_dir, '{}sort.vcf.gz'.format(tag)) + cactus_call(parameters=['bcftools', 'view', '-Oz', '-h', cat_vcf_path], outfile=sort_vcf_path) + cactus_call(parameters=[['bcftools', 'view', '-H', cat_vcf_path], + ['sort', '-k1,1d', '-k2,2n', '-s', '-T', work_dir], + ['bgzip']], outfile=sort_vcf_path, outappend=True) + cat_vcf_path = sort_vcf_path + + if fix_ploidies: + ploidy_vcf_path = os.path.join(work_dir, '{}ploidy.vcf.gz'.format(tag)) + fix_vcf_ploidies(cat_vcf_path, ploidy_vcf_path) + cat_vcf_path = ploidy_vcf_path + try: cactus_call(parameters=['tabix', '-p', 'vcf', cat_vcf_path]) tbi_path = cat_vcf_path + '.tbi' @@ -1222,7 +1428,8 @@ def vcf_cat(job, vcf_tbi_ids, tag): for vcf_id, tbi_id in vcf_tbi_ids: job.fileStore.deleteGlobalFile(vcf_id) - job.fileStore.deleteGlobalFile(tbi_id) + if not sort: + job.fileStore.deleteGlobalFile(tbi_id) return job.fileStore.writeGlobalFile(cat_vcf_path), job.fileStore.writeGlobalFile(tbi_path) diff --git a/submodules/collapse-bubble b/submodules/collapse-bubble new file mode 160000 index 000000000..76070b8ab --- /dev/null +++ b/submodules/collapse-bubble @@ -0,0 +1 @@ +Subproject commit 76070b8abb009b38972f97bb21ec2dd3ac4a7037 diff --git a/test/evolverTest.py b/test/evolverTest.py index f89edec41..c63e3aff7 100644 --- a/test/evolverTest.py +++ b/test/evolverTest.py @@ -434,8 +434,11 @@ def _run_evolver_primates_pangenome(self, binariesMode): config_path = 'src/cactus/cactus_progressive_config.xml' xml_root = ET.parse(config_path).getroot() graphmap_elem = xml_root.find("graphmap") + graphmap_join_elem = xml_root.find("graphmap_join") # force cactus to use minigraph chunking graphmap_elem.attrib["minigraphConstructBatchSize"] = "2" + # force cactus use vcfwave chunking + graphmap_join_elem.attrib["vcfwaveChunkLines"] = "1000" mc_config_path = os.path.join(self.tempDir, "config.mc.xml") with open(mc_config_path, 'w') as mc_config_file: xmlString = ET.tostring(xml_root, encoding='unicode')