From b535adf2d17c495875835a7a6806485d767a01ba Mon Sep 17 00:00:00 2001 From: Glenn Hickey Date: Mon, 2 Oct 2023 22:37:46 -0400 Subject: [PATCH] flesh out rgfa interface --- build-tools/downloadPangenomeTools | 3 +- src/cactus/cactus_progressive_config.xml | 2 + src/cactus/refmap/cactus_graphmap_join.py | 121 ++++++++++++++++------ test/evolverTest.py | 31 +++++- 4 files changed, 122 insertions(+), 35 deletions(-) diff --git a/build-tools/downloadPangenomeTools b/build-tools/downloadPangenomeTools index 3ba214552..dbe5907a0 100755 --- a/build-tools/downloadPangenomeTools +++ b/build-tools/downloadPangenomeTools @@ -278,7 +278,8 @@ fi # vg cd ${pangenomeBuildDir} -wget -q https://github.com/vgteam/vg/releases/download/v1.51.0/vg +#wget -q https://github.com/vgteam/vg/releases/download/v1.51.0/vg +wget -q http://public.gi.ucsc.edu/~hickey/vg-patch/vg.30ac0c77e91fbeff06b6e7bc9a7f172607ff84ba -O vg chmod +x vg if [[ $STATIC_CHECK -ne 1 || $(ldd vg | grep so | wc -l) -eq 0 ]] then diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 41ea7bf1a..a777118eb 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -381,6 +381,7 @@ + diff --git a/src/cactus/refmap/cactus_graphmap_join.py b/src/cactus/refmap/cactus_graphmap_join.py index 6517329a2..5a7913399 100644 --- a/src/cactus/refmap/cactus_graphmap_join.py +++ b/src/cactus/refmap/cactus_graphmap_join.py @@ -54,7 +54,9 @@ from sonLib.nxnewick import NXNewick from sonLib.bioio import getTempDirectory, getTempFile, catFiles - + +rgfa_sample_name = '_rGFA_' + def main(): parser = ArgumentParser() Job.Runner.addToilOptions(parser) @@ -112,7 +114,7 @@ def graphmap_join_options(parser): parser.add_argument("--gfa", nargs='*', default=None, help = "Produce a GFA for given graph type(s) if specified. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]") - parser.add_argument("--rgfa", nargs='*', default=None, help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]") + parser.add_argument("--rgfa", nargs='*', default=None, help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space.") parser.add_argument("--gbz", nargs='*', default=None, help = "Generate GBZ/snarls indexes for the given graph type(s) if specified. Valid types are 'full', 'clip' and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. --giraffe will also produce these (and other) indexes") @@ -176,7 +178,7 @@ def graphmap_join_validate_options(options): if gfa == 'filter' and not options.filter: raise RuntimeError('--gfa cannot be set to filter since filtering is disabled') - if not options.rgfa: + if options.rgfa == []: options.rgfa = ['clip'] if options.clip else ['full'] options.rgfa = list(set(options.rgfa)) for rgfa in options.rgfa: @@ -383,18 +385,29 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): full_vg_ids = [join_job.rv(i) for i in range(len(vg_ids))] prev_job = join_job + # by default, we don't leave rGFA path fragments in the chromosome graphs, they will only be in the .gbz and .rgfa.gz output + drop_rgfa_fragments = options.rgfa and not getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "keepRGFAFragmentsInChromGraphs", typeFn=bool, default=False) + # take out the _MINIGRAPH_ paths if 'full' in options.chrom_vg: output_full_vg_ids = [] for vg_path, vg_id, full_vg_id in zip(options.vg, vg_ids, full_vg_ids): drop_graph_event_job = join_job.addFollowOnJobFn(drop_graph_event, config, vg_path, full_vg_id, disk=vg_id.size * 3, memory=vg_id.size * 6) - output_full_vg_ids.append(drop_graph_event_job.rv()) + if drop_rgfa_fragments: + # take out the rGFA path fragments + drop_rgfa_job = drop_graph_event_job.addFollowOnJobFn(drop_rgfa_path_fragments, vg_path, drop_graph_event_job.rv(), + disk=vg_id.size * 3, memory=vg_id.size & 6) + output_full_vg_ids.append(drop_rgfa_job.rv()) + else: + output_full_vg_ids.append(drop_graph_event_job.rv()) else: output_full_vg_ids = full_vg_ids + # run the "clip" phase to do the clip-vg clipping clip_vg_ids = [] + output_clip_vg_ids = [] clipped_stats = None if options.clip or options.filter: clip_root_job = Job() @@ -411,6 +424,11 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): clip_og_job = clip_job.addFollowOnJobFn(vg_to_og, options, config, vg_path, clip_job.rv(0), disk=input_vg_id.size * 16, memory=max(og_min_size, input_vg_id.size * 32)) og_chrom_ids['clip']['og'].append(clip_og_job.rv()) + if drop_rgfa_fragments: + drop_rgfa_job = clip_job.addFollowOnJobFn(drop_rgfa_path_fragments, vg_path, clip_job.rv(0), + disk=input_vg_id.size * 3, memory=input_vg_id.size & 6) + output_clip_vg_ids.append(drop_rgfa_job.rv()) + # join the stats clipped_stats = clip_root_job.addFollowOnJobFn(cat_stats, clip_vg_stats).rv() @@ -418,6 +436,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): # run the "filter" phase to do the vg clip clipping filter_vg_ids = [] + output_filter_vg_ids = [] if options.filter: filter_root_job = Job() prev_job.addFollowOn(filter_root_job) @@ -429,8 +448,11 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): if 'filter' in options.odgi + options.chrom_og + options.viz + options.draw: filter_og_job = filter_job.addFollowOnJobFn(vg_to_og, options, config, vg_path, filter_job.rv(), disk=input_vg_id.size * 16, memory=max(og_min_size, input_vg_id.size * 64)) - og_chrom_ids['filter']['og'].append(filter_og_job.rv()) - + og_chrom_ids['filter']['og'].append(filter_og_job.rv()) + if drop_rgfa_fragments: + drop_rgfa_job = filter_job.addFollowOnJobFn(drop_rgfa_path_fragments, vg_path, filter_job.rv(), + disk=input_vg_id.size * 3, memory=input_vg_id.size & 6) + output_filter_vg_ids.append(drop_rgfa_job.rv()) prev_job = filter_root_job @@ -475,6 +497,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): rgfa_ids = [] current_out_dict = None do_gbz = workflow_phase in options.gbz + options.vcf + options.giraffe + do_rgfa = workflow_phase in options.rgfa if do_gbz or workflow_phase in options.gfa: assert len(options.vg) == len(phase_vg_ids) == len(vg_ids) for vg_path, vg_id, input_vg_id in zip(options.vg, phase_vg_ids, vg_ids): @@ -485,9 +508,9 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): gfa_merge_job = gfa_root_job.addFollowOnJobFn(make_vg_indexes, options, config, gfa_ids, tag=workflow_phase + '.', - do_gbz=do_gbz, + do_gbz=do_gbz, do_rgfa=do_rgfa, cores=options.indexCores, - disk=sum(f.size for f in vg_ids) * 6, + disk=sum(f.size for f in vg_ids) * 16, memory=index_mem) out_dicts.append(gfa_merge_job.rv()) prev_job = gfa_merge_job @@ -519,6 +542,16 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): memory=index_mem) out_dicts.append(deconstruct_job.rv()) + if do_rgfa and vcf_ref == options.vcfReference[0]: + rgfa_deconstruct_job = prev_job.addFollowOnJobFn(make_vcf, config, options.outName, vcf_ref, current_out_dict, + max_ref_allele=options.vcfbub, + tag=vcftag + '.', ref_tag = workflow_phase + '.', + do_rgfa=True, + cores=options.indexCores, + disk = sum(f.size for f in vg_ids) * 6, + memory=index_mem) + out_dicts.append(rgfa_deconstruct_job.rv()) + # optional giraffe if workflow_phase in options.giraffe: giraffe_job = gfa_merge_job.addFollowOnJobFn(make_giraffe_indexes, options, config, current_out_dict, @@ -555,7 +588,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids): if do_draw: og_chrom_ids[workflow_phase]['draw'].append(viz_job.rv(1) if viz_job else None) - return output_full_vg_ids, clip_vg_ids, clipped_stats, filter_vg_ids, out_dicts, og_chrom_ids + return output_full_vg_ids, output_clip_vg_ids, clipped_stats, output_filter_vg_ids, out_dicts, og_chrom_ids def clip_vg(job, options, config, vg_path, vg_id, phase): """ run clip-vg @@ -755,6 +788,16 @@ def drop_graph_event(job, config, vg_path, full_vg_id): cactus_call(parameters=['vg', 'paths', '-d', '-S', graph_event, '-x', full_vg_path], outfile=out_path) return job.fileStore.writeGlobalFile(out_path) + +def drop_rgfa_path_fragments(job, vg_path, vg_id): + """ drop out rgfa path fragments """ + work_dir = job.fileStore.getLocalTempDir() + vg_path = os.path.join(work_dir, os.path.splitext(os.path.basename(vg_path))[0]) + '.vg' + out_vg_path = os.path.join(work_dir, os.path.splitext(os.path.basename(vg_path))[0]) + 'norgfa.vg' + job.fileStore.readGlobalFile(vg_id, vg_path) + + cactus_call(parameters=['vg', 'paths', '-d', '-S', rgfa_sample_name, '-v', vg_path], outfile=out_vg_path) + return job.fileStore.writeGlobalFile(out_vg_path) def vg_to_gfa(job, options, config, vg_path, vg_id, rgfa=False): """ run gfa conversion """ @@ -809,7 +852,7 @@ def merge_rgfa(job, options, config, rgfa_ids, tag=''): return { '{}rgfa.gz'.format(tag) : job.fileStore.writeGlobalFile(merge_rgfa_path + '.gz') } -def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False): +def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False, do_rgfa=False): """ merge of the gfas, then make gbz / snarls / trans """ work_dir = job.fileStore.getLocalTempDir() @@ -825,8 +868,8 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False): job.fileStore.readGlobalFile(gfa_id, gfa_path, mutable=True) if i == 0: # make sure every --reference sample is in the GFA header RS tag (which won't be the case if one sample - # is completely missing from the first graph, as vg may filter it out apparently) - cmd = [['head', '-1', gfa_path], ['sed', '-e', '1s/{}//'.format(graph_event)]] + # is completely missing from the first graph, as vg may filter it out apparently) + cmd = [['head', '-1', gfa_path], ['sed', '-e', '1s/ {}//'.format(graph_event), '-e', '1s/{} //'.format(graph_event)]] gfa_header = cactus_call(parameters=cmd, check_output=True).strip().split('\t') for i in range(len(gfa_header)): if gfa_header[i].startswith('RS:Z:'): @@ -841,44 +884,59 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False): cactus_call(parameters=cmd, outfile=merge_gfa_path, outappend=True, job_memory=job.memory) job.fileStore.deleteGlobalFile(gfa_id) - # make the gbz + out_dict = {} + + if do_rgfa: + if do_gbz: + # make a GBZ that includes the rGFA fragments + rgfa_gbz_path = os.path.join(work_dir, '{}merged.rgfa.gbz'.format(tag)) + cactus_call(parameters=['vg', 'gbwt', '-G', merge_gfa_path, '--gbz-format', '-g', rgfa_gbz_path], job_memory=job.memory) + out_dict['{}rgfa.gbz'.format(tag)] = job.fileStore.writeGlobalFile(rgfa_gbz_path) + + # remove rGFA paths from the GFA going forward + cactus_call(parameters=[['grep', '-v', '^W\t{}'.format(rgfa_sample_name), merge_gfa_path], + ['sed', '-e', '1s/ {}//g'.format(rgfa_sample_name), '-e', '1s/{} //g'.format(rgfa_sample_name)]], + outfile=merge_gfa_path + '.clean') + cactus_call(parameters=['mv', merge_gfa_path +'.clean', merge_gfa_path]) + + # make the gbz without the rGFA fragments if do_gbz: gbz_path = os.path.join(work_dir, '{}merged.gbz'.format(tag)) cactus_call(parameters=['vg', 'gbwt', '-G', merge_gfa_path, '--gbz-format', '-g', gbz_path], job_memory=job.memory) - - # zip the gfa and remove any rgfa paths (they will be kept in the GBZ and .rgfa - # but not here as it would be just too confusing) - rgfa_sample_name = '_rGFA_' - gfa_path = merge_gfa_path + '.gz' - cactus_call(parameters=[['grep', '-v', '^W\t{}'.format(rgfa_sample_name), merge_gfa_path], - ['bgzip', '--threads', str(job.cores)]], - outfile=gfa_path) - os.remove(merge_gfa_path) - + out_dict['{}gbz'.format(tag)] = job.fileStore.writeGlobalFile(gbz_path) + + # zip the gfa + cactus_call(parameters=['bgzip', '--threads', str(job.cores), merge_gfa_path]) + out_dict['{}gfa.gz'.format(tag)] = job.fileStore.writeGlobalFile(merge_gfa_path + '.gz') # make the snarls if do_gbz: snarls_path = os.path.join(work_dir, '{}merged.snarls'.format(tag)) cactus_call(parameters=['vg', 'snarls', gbz_path, '-T', '-t', str(job.cores)], outfile=snarls_path, job_memory=job.memory) + out_dict['{}snarls'.format(tag)] = job.fileStore.writeGlobalFile(snarls_path) - out_dict = { '{}gfa.gz'.format(tag) : job.fileStore.writeGlobalFile(gfa_path) } if do_gbz: out_dict['{}gbz'.format(tag)] = job.fileStore.writeGlobalFile(gbz_path) - out_dict['{}snarls'.format(tag)] = job.fileStore.writeGlobalFile(snarls_path) + return out_dict -def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max_ref_allele=None): +def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max_ref_allele=None, do_rgfa=False): """ make the vcf """ work_dir = job.fileStore.getLocalTempDir() gbz_path = os.path.join(work_dir, ref_tag + os.path.basename(out_name) + '.gbz') snarls_path = os.path.join(work_dir, ref_tag + os.path.basename(out_name) + '.snarls') - job.fileStore.readGlobalFile(index_dict['{}gbz'.format(ref_tag)], gbz_path) + job.fileStore.readGlobalFile(index_dict['{}{}gbz'.format(ref_tag, 'rgfa.' if do_rgfa else '')], gbz_path) job.fileStore.readGlobalFile(index_dict['{}snarls'.format(ref_tag)], snarls_path) + if do_rgfa: + tag = 'rgfa.{}'.format(tag) + # make the vcf vcf_path = os.path.join(work_dir, '{}merged.vcf.gz'.format(tag)) decon_cmd = ['vg', 'deconstruct', gbz_path, '-P', vcf_ref, '-C', '-a', '-r', snarls_path, '-t', str(job.cores)] + if do_rgfa: + decon_cmd += ['-P', rgfa_sample_name] if getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "GFANodeIDsInVCF", typeFn=bool, default=True): decon_cmd.append('-O') cactus_call(parameters=[decon_cmd, ['bgzip', '--threads', str(job.cores)]], outfile=vcf_path, job_memory=job.memory) @@ -890,12 +948,15 @@ def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max RealtimeLogger.warning('WARNING: tabix failed on VCF with this error {}'.format(str(e))) tbi_path = None - output_dict = { '{}raw.vcf.gz'.format(tag) : job.fileStore.writeGlobalFile(vcf_path) } + # dont do vcfbub when doing rGFA + out_tag = '{}raw.'.format(tag) if not do_rgfa else tag + + output_dict = { '{}vcf.gz'.format(out_tag) : job.fileStore.writeGlobalFile(vcf_path) } if tbi_path: - output_dict['{}raw.vcf.gz.tbi'.format(tag)] = job.fileStore.writeGlobalFile(tbi_path) + output_dict['{}vcf.gz.tbi'.format(out_tag)] = job.fileStore.writeGlobalFile(tbi_path) # make the filtered vcf - if max_ref_allele: + if max_ref_allele and not do_rgfa: vcfbub_path = os.path.join(work_dir, 'merged.filtered.vcf.gz') cactus_call(parameters=[['vcfbub', '--input', vcf_path, '--max-ref-length', str(max_ref_allele), '--max-level', '0'], ['bgzip', '--threads', str(job.cores)]], diff --git a/test/evolverTest.py b/test/evolverTest.py index c17946580..6e8e1eeb7 100644 --- a/test/evolverTest.py +++ b/test/evolverTest.py @@ -468,7 +468,7 @@ def _run_yeast_pangenome(self, binariesMode): cactus_pangenome_cmd = ['cactus-pangenome', self._job_store(binariesMode), orig_seq_file_path, '--outDir', join_path, '--outName', 'yeast', '--refContigs'] + chroms + ['--reference', 'S288C', 'DBVPG6044', '--vcf', '--vcfReference','DBVPG6044', 'S288C', '--giraffe', 'clip', 'filter', '--chrom-vg', 'clip', 'filter', - '--viz', '--chrom-og', 'clip', 'full', '--odgi', '--haplo', 'clip', + '--viz', '--chrom-og', 'clip', 'full', '--odgi', '--haplo', 'clip', '--rgfa', '--indexCores', '4', '--consCores', '2'] subprocess.check_call(cactus_pangenome_cmd + cactus_opts) @@ -476,7 +476,7 @@ def _run_yeast_pangenome(self, binariesMode): subprocess.check_call(['mkdir', '-p', os.path.join(self.tempDir, 'chroms')]) subprocess.check_call(['mv', os.path.join(join_path, 'chrom-subproblems', 'contig_sizes.tsv'), os.path.join(self.tempDir, 'chroms')]) - def _check_yeast_pangenome(self, binariesMode, other_ref=None, expect_odgi=False, expect_haplo=False): + def _check_yeast_pangenome(self, binariesMode, other_ref=None, expect_odgi=False, expect_haplo=False, expect_rgfa=False): """ yeast pangenome chromosome by chromosome pipeline """ @@ -541,6 +541,23 @@ def _check_yeast_pangenome(self, binariesMode, other_ref=None, expect_odgi=False for haplo_idx in ['yeast.ri', 'yeast.hapl']: idx_bytes = os.path.getsize(os.path.join(join_path, haplo_idx)) self.assertGreaterEqual(idx_bytes, 10000000) + + if expect_rgfa: + # make sure we have the rgfa-related files + for rgfa_idx in ['yeast.rgfa.gz', 'yeast.rgfa.gbz', 'yeast.rgfa.vcf.gz', 'yeast.rgfa.vcf.gz.tbi']: + idx_bytes = os.path.getsize(os.path.join(join_path, haplo_idx)) + if not rgfa_idx.endswith('.tbi'): + self.assertGreaterEqual(idx_bytes, 500000) + + # make sure we have some off-ref nodes in the rgfa file + for rank, expected_count in [(0, 200000), (1, 5000), (2, 100), (3, 20), (4, 5), (5, 2)]: + rank_count = int(subprocess.check_output('gzip -dc {} | grep "SR:i:{}" | wc -l'.format(os.path.join(join_path, 'yeast.rgfa.gz'),rank), shell=True).decode('utf-8')) + self.assertGreaterEqual(rank_count, expected_count) + + # make sure the vcf is bigger + vcf_lines = int(subprocess.check_output('gzip -dc {} | wc -l'.format(vcf_paths[0]), shell=True).decode('utf-8')) + rgfa_vcf_lines = int(subprocess.check_output('gzip -dc {} | wc -l'.format(vcf_paths[0].replace('.vcf','.rgfa.vcf')), shell=True).decode('utf-8')) + self.assertGreaterEqual(rgfa_vcf_lines - vcf_lines, 1000) # make sure the chrom splitting stats are somewhat sane contig_sizes = {} @@ -572,6 +589,9 @@ def _check_yeast_pangenome(self, binariesMode, other_ref=None, expect_odgi=False clip_nodes = int(subprocess.check_output(['vg', 'stats', '-N', os.path.join(join_path, 'yeast.gbz')]).strip().decode('utf-8').strip()) self.assertGreaterEqual(clip_nodes, 400000) self.assertLessEqual(clip_nodes, 500000) + if expect_rgfa: + rgfa_clip_nodes = int(subprocess.check_output(['vg', 'stats', '-N', os.path.join(join_path, 'yeast.rgfa.gbz')]).strip().decode('utf-8').strip()) + self.assertEqual(rgfa_clip_nodes, clip_nodes) filter_nodes = int(subprocess.check_output(['vg', 'stats', '-N', os.path.join(join_path, 'yeast.d2.gbz')]).strip().decode('utf-8').strip()) self.assertGreaterEqual(filter_nodes, 300000) @@ -580,7 +600,10 @@ def _check_yeast_pangenome(self, binariesMode, other_ref=None, expect_odgi=False clip_edges = int(subprocess.check_output(['vg', 'stats', '-E', os.path.join(join_path, 'yeast.gbz')]).strip().decode('utf-8').strip()) self.assertGreaterEqual(clip_edges, 550000) self.assertLessEqual(clip_edges, 650000) - + if expect_rgfa: + rgfa_clip_edges = int(subprocess.check_output(['vg', 'stats', '-E', os.path.join(join_path, 'yeast.rgfa.gbz')]).strip().decode('utf-8').strip()) + self.assertEqual(rgfa_clip_edges, clip_edges) + filter_edges = int(subprocess.check_output(['vg', 'stats', '-E', os.path.join(join_path, 'yeast.d2.gbz')]).strip().decode('utf-8').strip()) self.assertGreaterEqual(filter_edges, 400000) self.assertLessEqual(filter_edges, 500000) @@ -946,7 +969,7 @@ def testYeastPangenomeLocal(self): self._run_yeast_pangenome(name) # check the output - self._check_yeast_pangenome(name, other_ref='DBVPG6044', expect_odgi=True, expect_haplo=True) + self._check_yeast_pangenome(name, other_ref='DBVPG6044', expect_odgi=True, expect_haplo=True, expect_rgfa=True) if __name__ == '__main__':