Skip to content

Commit

Permalink
flesh out rgfa interface
Browse files Browse the repository at this point in the history
  • Loading branch information
glennhickey committed Oct 4, 2023
1 parent 61dbed5 commit f34c56b
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 36 deletions.
3 changes: 2 additions & 1 deletion build-tools/downloadPangenomeTools
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ fi

# vg
cd ${pangenomeBuildDir}
wget -q https://github.com/vgteam/vg/releases/download/v1.51.0/vg
#wget -q https://github.com/vgteam/vg/releases/download/v1.51.0/vg
wget -q http://public.gi.ucsc.edu/~hickey/vg-patch/vg.30ac0c77e91fbeff06b6e7bc9a7f172607ff84ba -O vg
chmod +x vg
if [[ $STATIC_CHECK -ne 1 || $(ldd vg | grep so | wc -l) -eq 0 ]]
then
Expand Down
2 changes: 2 additions & 0 deletions src/cactus/cactus_progressive_config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@
<!-- rindexOptions: options to vg gbwt -r (r-index construction) as used to make haplotype sampling index -->
<!-- haplOptions: options to vg haplotypes as used to make haplotype sampling index -->
<!-- minRGFAFragment: smallest rGFA (rank > 0) cover path to write -->
<!-- keepRGFAFragmentsInChromGraphs: toggle whether rGFA fragments end up in chromosome graphs (by default only in gbz/rgfa) -->
<graphmap_join
gfaffix="1"
clipNonMinigraph="1"
Expand All @@ -394,6 +395,7 @@
rindexOptions="-p"
haplOptions="-v 2"
minRGFAFragment="50"
keepRGFAFragmentsInChromGraphs="0"
/>
<!-- hal2vg options -->
<!-- includeMinigraph: include minigraph node sequences as paths in output (note that cactus-graphmap-join will still remove them by default) -->
Expand Down
123 changes: 92 additions & 31 deletions src/cactus/refmap/cactus_graphmap_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@

from sonLib.nxnewick import NXNewick
from sonLib.bioio import getTempDirectory, getTempFile, catFiles


rgfa_sample_name = '_rGFA_'

def main():
parser = ArgumentParser()
Job.Runner.addToilOptions(parser)
Expand Down Expand Up @@ -112,7 +114,7 @@ def graphmap_join_options(parser):

parser.add_argument("--gfa", nargs='*', default=None, help = "Produce a GFA for given graph type(s) if specified. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]")

parser.add_argument("--rgfa", nargs='*', default=None, help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]")
parser.add_argument("--rgfa", nargs='*', default=None, help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space.")

parser.add_argument("--gbz", nargs='*', default=None, help = "Generate GBZ/snarls indexes for the given graph type(s) if specified. Valid types are 'full', 'clip' and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. --giraffe will also produce these (and other) indexes")

Expand Down Expand Up @@ -176,9 +178,9 @@ def graphmap_join_validate_options(options):
if gfa == 'filter' and not options.filter:
raise RuntimeError('--gfa cannot be set to filter since filtering is disabled')

if not options.rgfa:
if options.rgfa == []:
options.rgfa = ['clip'] if options.clip else ['full']
options.rgfa = list(set(options.rgfa))
options.rgfa = list(set(options.rgfa)) if options.rgfa else []
for rgfa in options.rgfa:
if rgfa not in ['clip', 'filter', 'full']:
raise RuntimeError('Unrecognized value for --gfa: {}. Must be one of {clip, filter, full}'.format(gfa))
Expand Down Expand Up @@ -383,18 +385,29 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
full_vg_ids = [join_job.rv(i) for i in range(len(vg_ids))]
prev_job = join_job

# by default, we don't leave rGFA path fragments in the chromosome graphs, they will only be in the .gbz and .rgfa.gz output
drop_rgfa_fragments = options.rgfa and not getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "keepRGFAFragmentsInChromGraphs", typeFn=bool, default=False)

# take out the _MINIGRAPH_ paths
if 'full' in options.chrom_vg:
output_full_vg_ids = []
for vg_path, vg_id, full_vg_id in zip(options.vg, vg_ids, full_vg_ids):
drop_graph_event_job = join_job.addFollowOnJobFn(drop_graph_event, config, vg_path, full_vg_id,
disk=vg_id.size * 3, memory=vg_id.size * 6)
output_full_vg_ids.append(drop_graph_event_job.rv())
if drop_rgfa_fragments:
# take out the rGFA path fragments
drop_rgfa_job = drop_graph_event_job.addFollowOnJobFn(drop_rgfa_path_fragments, vg_path, drop_graph_event_job.rv(),
disk=vg_id.size * 3, memory=vg_id.size & 6)
output_full_vg_ids.append(drop_rgfa_job.rv())
else:
output_full_vg_ids.append(drop_graph_event_job.rv())
else:
output_full_vg_ids = full_vg_ids


# run the "clip" phase to do the clip-vg clipping
clip_vg_ids = []
output_clip_vg_ids = []
clipped_stats = None
if options.clip or options.filter:
clip_root_job = Job()
Expand All @@ -411,13 +424,19 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
clip_og_job = clip_job.addFollowOnJobFn(vg_to_og, options, config, vg_path, clip_job.rv(0),
disk=input_vg_id.size * 16, memory=max(og_min_size, input_vg_id.size * 32))
og_chrom_ids['clip']['og'].append(clip_og_job.rv())
if drop_rgfa_fragments:
drop_rgfa_job = clip_job.addFollowOnJobFn(drop_rgfa_path_fragments, vg_path, clip_job.rv(0),
disk=input_vg_id.size * 3, memory=input_vg_id.size & 6)
output_clip_vg_ids.append(drop_rgfa_job.rv())


# join the stats
clipped_stats = clip_root_job.addFollowOnJobFn(cat_stats, clip_vg_stats).rv()
prev_job = clip_root_job

# run the "filter" phase to do the vg clip clipping
filter_vg_ids = []
output_filter_vg_ids = []
if options.filter:
filter_root_job = Job()
prev_job.addFollowOn(filter_root_job)
Expand All @@ -429,8 +448,11 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
if 'filter' in options.odgi + options.chrom_og + options.viz + options.draw:
filter_og_job = filter_job.addFollowOnJobFn(vg_to_og, options, config, vg_path, filter_job.rv(),
disk=input_vg_id.size * 16, memory=max(og_min_size, input_vg_id.size * 64))
og_chrom_ids['filter']['og'].append(filter_og_job.rv())

og_chrom_ids['filter']['og'].append(filter_og_job.rv())
if drop_rgfa_fragments:
drop_rgfa_job = filter_job.addFollowOnJobFn(drop_rgfa_path_fragments, vg_path, filter_job.rv(),
disk=input_vg_id.size * 3, memory=input_vg_id.size & 6)
output_filter_vg_ids.append(drop_rgfa_job.rv())

prev_job = filter_root_job

Expand Down Expand Up @@ -475,6 +497,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
rgfa_ids = []
current_out_dict = None
do_gbz = workflow_phase in options.gbz + options.vcf + options.giraffe
do_rgfa = workflow_phase in options.rgfa
if do_gbz or workflow_phase in options.gfa:
assert len(options.vg) == len(phase_vg_ids) == len(vg_ids)
for vg_path, vg_id, input_vg_id in zip(options.vg, phase_vg_ids, vg_ids):
Expand All @@ -485,9 +508,9 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):

gfa_merge_job = gfa_root_job.addFollowOnJobFn(make_vg_indexes, options, config, gfa_ids,
tag=workflow_phase + '.',
do_gbz=do_gbz,
do_gbz=do_gbz, do_rgfa=do_rgfa,
cores=options.indexCores,
disk=sum(f.size for f in vg_ids) * 6,
disk=sum(f.size for f in vg_ids) * 16,
memory=index_mem)
out_dicts.append(gfa_merge_job.rv())
prev_job = gfa_merge_job
Expand Down Expand Up @@ -519,6 +542,16 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
memory=index_mem)
out_dicts.append(deconstruct_job.rv())

if do_rgfa and vcf_ref == options.vcfReference[0]:
rgfa_deconstruct_job = prev_job.addFollowOnJobFn(make_vcf, config, options.outName, vcf_ref, current_out_dict,
max_ref_allele=options.vcfbub,
tag=vcftag + '.', ref_tag = workflow_phase + '.',
do_rgfa=True,
cores=options.indexCores,
disk = sum(f.size for f in vg_ids) * 6,
memory=index_mem)
out_dicts.append(rgfa_deconstruct_job.rv())

# optional giraffe
if workflow_phase in options.giraffe:
giraffe_job = gfa_merge_job.addFollowOnJobFn(make_giraffe_indexes, options, config, current_out_dict,
Expand Down Expand Up @@ -555,7 +588,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
if do_draw:
og_chrom_ids[workflow_phase]['draw'].append(viz_job.rv(1) if viz_job else None)

return output_full_vg_ids, clip_vg_ids, clipped_stats, filter_vg_ids, out_dicts, og_chrom_ids
return output_full_vg_ids, output_clip_vg_ids, clipped_stats, output_filter_vg_ids, out_dicts, og_chrom_ids

def clip_vg(job, options, config, vg_path, vg_id, phase):
""" run clip-vg
Expand Down Expand Up @@ -755,6 +788,16 @@ def drop_graph_event(job, config, vg_path, full_vg_id):

cactus_call(parameters=['vg', 'paths', '-d', '-S', graph_event, '-x', full_vg_path], outfile=out_path)
return job.fileStore.writeGlobalFile(out_path)

def drop_rgfa_path_fragments(job, vg_path, vg_id):
""" drop out rgfa path fragments """
work_dir = job.fileStore.getLocalTempDir()
vg_path = os.path.join(work_dir, os.path.splitext(os.path.basename(vg_path))[0]) + '.vg'
out_vg_path = os.path.join(work_dir, os.path.splitext(os.path.basename(vg_path))[0]) + 'norgfa.vg'
job.fileStore.readGlobalFile(vg_id, vg_path)

cactus_call(parameters=['vg', 'paths', '-d', '-S', rgfa_sample_name, '-v', vg_path], outfile=out_vg_path)
return job.fileStore.writeGlobalFile(out_vg_path)

def vg_to_gfa(job, options, config, vg_path, vg_id, rgfa=False):
""" run gfa conversion """
Expand Down Expand Up @@ -809,7 +852,7 @@ def merge_rgfa(job, options, config, rgfa_ids, tag=''):

return { '{}rgfa.gz'.format(tag) : job.fileStore.writeGlobalFile(merge_rgfa_path + '.gz') }

def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False, do_rgfa=False):
""" merge of the gfas, then make gbz / snarls / trans
"""
work_dir = job.fileStore.getLocalTempDir()
Expand All @@ -825,8 +868,8 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
job.fileStore.readGlobalFile(gfa_id, gfa_path, mutable=True)
if i == 0:
# make sure every --reference sample is in the GFA header RS tag (which won't be the case if one sample
# is completely missing from the first graph, as vg may filter it out apparently)
cmd = [['head', '-1', gfa_path], ['sed', '-e', '1s/{}//'.format(graph_event)]]
# is completely missing from the first graph, as vg may filter it out apparently)
cmd = [['head', '-1', gfa_path], ['sed', '-e', '1s/ {}//'.format(graph_event), '-e', '1s/{} //'.format(graph_event)]]
gfa_header = cactus_call(parameters=cmd, check_output=True).strip().split('\t')
for i in range(len(gfa_header)):
if gfa_header[i].startswith('RS:Z:'):
Expand All @@ -841,44 +884,59 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
cactus_call(parameters=cmd, outfile=merge_gfa_path, outappend=True, job_memory=job.memory)
job.fileStore.deleteGlobalFile(gfa_id)

# make the gbz
out_dict = {}

if do_rgfa:
if do_gbz:
# make a GBZ that includes the rGFA fragments
rgfa_gbz_path = os.path.join(work_dir, '{}merged.rgfa.gbz'.format(tag))
cactus_call(parameters=['vg', 'gbwt', '-G', merge_gfa_path, '--gbz-format', '-g', rgfa_gbz_path], job_memory=job.memory)
out_dict['{}rgfa.gbz'.format(tag)] = job.fileStore.writeGlobalFile(rgfa_gbz_path)

# remove rGFA paths from the GFA going forward
cactus_call(parameters=[['grep', '-v', '^W\t{}'.format(rgfa_sample_name), merge_gfa_path],
['sed', '-e', '1s/ {}//g'.format(rgfa_sample_name), '-e', '1s/{} //g'.format(rgfa_sample_name)]],
outfile=merge_gfa_path + '.clean')
cactus_call(parameters=['mv', merge_gfa_path +'.clean', merge_gfa_path])

# make the gbz without the rGFA fragments
if do_gbz:
gbz_path = os.path.join(work_dir, '{}merged.gbz'.format(tag))
cactus_call(parameters=['vg', 'gbwt', '-G', merge_gfa_path, '--gbz-format', '-g', gbz_path], job_memory=job.memory)

# zip the gfa and remove any rgfa paths (they will be kept in the GBZ and .rgfa
# but not here as it would be just too confusing)
rgfa_sample_name = '_rGFA_'
gfa_path = merge_gfa_path + '.gz'
cactus_call(parameters=[['grep', '-v', '^W\t{}'.format(rgfa_sample_name), merge_gfa_path],
['bgzip', '--threads', str(job.cores)]],
outfile=gfa_path)
os.remove(merge_gfa_path)

out_dict['{}gbz'.format(tag)] = job.fileStore.writeGlobalFile(gbz_path)

# zip the gfa
cactus_call(parameters=['bgzip', '--threads', str(job.cores), merge_gfa_path])
out_dict['{}gfa.gz'.format(tag)] = job.fileStore.writeGlobalFile(merge_gfa_path + '.gz')

# make the snarls
if do_gbz:
snarls_path = os.path.join(work_dir, '{}merged.snarls'.format(tag))
cactus_call(parameters=['vg', 'snarls', gbz_path, '-T', '-t', str(job.cores)], outfile=snarls_path, job_memory=job.memory)
out_dict['{}snarls'.format(tag)] = job.fileStore.writeGlobalFile(snarls_path)

out_dict = { '{}gfa.gz'.format(tag) : job.fileStore.writeGlobalFile(gfa_path) }
if do_gbz:
out_dict['{}gbz'.format(tag)] = job.fileStore.writeGlobalFile(gbz_path)
out_dict['{}snarls'.format(tag)] = job.fileStore.writeGlobalFile(snarls_path)

return out_dict

def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max_ref_allele=None):
def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max_ref_allele=None, do_rgfa=False):
""" make the vcf
"""
work_dir = job.fileStore.getLocalTempDir()
gbz_path = os.path.join(work_dir, ref_tag + os.path.basename(out_name) + '.gbz')
snarls_path = os.path.join(work_dir, ref_tag + os.path.basename(out_name) + '.snarls')
job.fileStore.readGlobalFile(index_dict['{}gbz'.format(ref_tag)], gbz_path)
job.fileStore.readGlobalFile(index_dict['{}{}gbz'.format(ref_tag, 'rgfa.' if do_rgfa else '')], gbz_path)
job.fileStore.readGlobalFile(index_dict['{}snarls'.format(ref_tag)], snarls_path)

if do_rgfa:
tag = 'rgfa.{}'.format(tag)

# make the vcf
vcf_path = os.path.join(work_dir, '{}merged.vcf.gz'.format(tag))
decon_cmd = ['vg', 'deconstruct', gbz_path, '-P', vcf_ref, '-C', '-a', '-r', snarls_path, '-t', str(job.cores)]
if do_rgfa:
decon_cmd += ['-P', rgfa_sample_name]
if getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "GFANodeIDsInVCF", typeFn=bool, default=True):
decon_cmd.append('-O')
cactus_call(parameters=[decon_cmd, ['bgzip', '--threads', str(job.cores)]], outfile=vcf_path, job_memory=job.memory)
Expand All @@ -890,12 +948,15 @@ def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max
RealtimeLogger.warning('WARNING: tabix failed on VCF with this error {}'.format(str(e)))
tbi_path = None

output_dict = { '{}raw.vcf.gz'.format(tag) : job.fileStore.writeGlobalFile(vcf_path) }
# dont do vcfbub when doing rGFA
out_tag = '{}raw.'.format(tag) if not do_rgfa else tag

output_dict = { '{}vcf.gz'.format(out_tag) : job.fileStore.writeGlobalFile(vcf_path) }
if tbi_path:
output_dict['{}raw.vcf.gz.tbi'.format(tag)] = job.fileStore.writeGlobalFile(tbi_path)
output_dict['{}vcf.gz.tbi'.format(out_tag)] = job.fileStore.writeGlobalFile(tbi_path)

# make the filtered vcf
if max_ref_allele:
if max_ref_allele and not do_rgfa:
vcfbub_path = os.path.join(work_dir, 'merged.filtered.vcf.gz')
cactus_call(parameters=[['vcfbub', '--input', vcf_path, '--max-ref-length', str(max_ref_allele), '--max-level', '0'],
['bgzip', '--threads', str(job.cores)]],
Expand Down
Loading

0 comments on commit f34c56b

Please sign in to comment.