From 18d48217a85e5e844d95d9bb995f40a2422fbb5d Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Mon, 9 Mar 2020 17:02:17 +0000 Subject: [PATCH 01/10] moved to temp file space --- gwas_results.py => gwas.py | 114 +++++++++++++++++------ harmonise.py | 55 ------------ main.py | 119 +++++++++++------------- param.py | 14 +-- requirements.txt | 1 + test/test_gwas.py | 18 ++++ test/test_gwas_results.py | 14 --- test/test_vcf.py | 23 +++-- vcf.py | 180 +++++++++++++++++++------------------ 9 files changed, 271 insertions(+), 267 deletions(-) rename gwas_results.py => gwas.py (65%) delete mode 100644 harmonise.py create mode 100644 test/test_gwas.py delete mode 100644 test/test_gwas_results.py diff --git a/gwas_results.py b/gwas.py similarity index 65% rename from gwas_results.py rename to gwas.py index 3b4e45c..daddabc 100755 --- a/gwas_results.py +++ b/gwas.py @@ -1,11 +1,12 @@ import logging import gzip import os +import tempfile +import pickle +import re -""" Class to store single GWAS result i.e. single SNP or row""" - -class GwasResult: +class Gwas: def __init__(self, chrom, pos, ref, alt, b, se, pval, n, alt_freq, dbsnpid, ncase, imp_info, imp_z, vcf_filter="PASS"): @@ -40,16 +41,20 @@ def reverse_sign(self): except TypeError: self.alt_freq = None - def are_alleles_iupac(self): - for bp in self.alt: - if bp != 'A' and bp != 'T' and bp != 'C' and bp != 'G': - return False + def check_reference_allele(self, fasta): + assert self.ref == str( + fasta.fetch(region="{}:{}-{}".format( + self.chrom, + self.pos, + self.pos + len(self.ref) - 1 + )) + ).upper() + def check_alleles_iupac(self): + for bp in self.alt: + assert bp in {"A", "T", "G", "C"} for bp in self.ref: - if bp != 'A' and bp != 'T' and bp != 'C' and bp != 'G': - return False - - return True + assert bp in {"A", "T", "G", "C"} def __str__(self): return str(self.__dict__) @@ -59,6 +64,7 @@ def __str__(self): @staticmethod def read_from_file( path, + fasta, chrom_field, pos_field, ea_field, @@ -69,7 +75,7 @@ def read_from_file( delimiter, header, ncase_field=None, - dbsnp_field=None, + rsid_field=None, ea_af_field=None, nea_af_field=None, imp_z_field=None, @@ -78,6 +84,8 @@ def read_from_file( rm_chr_prefix=False ): + rsid_pattern = re.compile("^rs[0-9]*") + logging.info("Reading summary stats and mapping to FASTA: {}".format(path)) logging.debug("File path: {}".format(path)) logging.debug("CHR field: {}".format(chrom_field)) @@ -90,14 +98,21 @@ def read_from_file( logging.debug("Delimiter: {}".format(delimiter)) logging.debug("Header: {}".format(header)) logging.debug("ncase Field: {}".format(ncase_field)) - logging.debug("dbsnp Field: {}".format(dbsnp_field)) + logging.debug("dbsnp Field: {}".format(rsid_field)) logging.debug("EA AF Field: {}".format(ea_af_field)) logging.debug("NEA AF Field: {}".format(nea_af_field)) logging.debug("IMP Z Score Field: {}".format(imp_z_field)) logging.debug("IMP INFO Field: {}".format(imp_info_field)) logging.debug("N Control Field: {}".format(ncontrol_field)) - total_variants = 0 + metadata = { + 'TotalVariants': 0, + 'VariantsNotRead': 0, + 'HarmonisedVariants': 0, + 'VariantsNotHarmonised': 0, + 'SwitchedAlleles': 0 + } + file_idx = dict() filename, file_extension = os.path.splitext(path) if file_extension == '.gz': @@ -111,11 +126,11 @@ def read_from_file( if header: logging.info("Skipping header: {}".format(f.readline().strip())) - results = [] - i = 0 - for n, l in enumerate(f): - total_variants += 1 - i+=1 + # store results in a serialised temp file to reduce memory usage + results = tempfile.TemporaryFile() + + for l in f: + metadata['TotalVariants'] += 1 s = l.strip().split(delimiter) logging.debug("Input row: {}".format(s)) @@ -125,15 +140,17 @@ def read_from_file( chrom = s[chrom_field].replace("chr", "") else: chrom = s[chrom_field] - except: - logging.warning("Skipping {}".format(s)) - exit + except Exception as e: + logging.warning("Skipping {}: {}".format(s, e)) + metadata['VariantsNotRead'] += 1 + continue try: pos = int(float(s[pos_field])) # float is for scientific notation assert pos > 0 except Exception as e: logging.warning("Skipping {}: {}".format(s, e)) + metadata['VariantsNotRead'] += 1 continue ref = s[nea_field].upper() @@ -143,18 +160,21 @@ def read_from_file( b = float(s[effect_field]) except Exception as e: logging.warning("Skipping {}: {}".format(s, e)) + metadata['VariantsNotRead'] += 1 continue try: se = float(s[se_field]) except Exception as e: logging.warning("Skipping {}: {}".format(s, e)) + metadata['VariantsNotRead'] += 1 continue try: pval = float(s[pval_field]) except Exception as e: logging.warning("Skipping line {}, {}".format(s, e)) + metadata['VariantsNotRead'] += 1 continue try: @@ -169,10 +189,11 @@ def read_from_file( alt_freq = None try: - dbsnpid = s[dbsnp_field] - except (IndexError, TypeError, ValueError) as e: + rsid = s[rsid_field] + assert rsid_pattern.match(rsid) + except (IndexError, TypeError, ValueError, AssertionError) as e: logging.debug("Could not parse dbsnp identifier: {}".format(e)) - dbsnpid = None + rsid = None try: ncase = float(s[ncase_field]) @@ -204,7 +225,7 @@ def read_from_file( logging.debug("Could not parse imputation Z score: {}".format(e)) imp_z = None - result = GwasResult( + result = Gwas( chrom, pos, ref, @@ -214,7 +235,7 @@ def read_from_file( pval, n, alt_freq, - dbsnpid, + rsid, ncase, imp_info, imp_z @@ -222,8 +243,43 @@ def read_from_file( logging.debug("Extracted row: {}".format(result)) - results.append(result) + # check alleles + try: + result.check_alleles_iupac() + except AssertionError as e: + logging.warning("Skipping {}: {}".format(s, e)) + metadata['VariantsNotRead'] += 1 + continue + + # harmonise alleles + try: + result.check_reference_allele(fasta) + except AssertionError: + try: + result.reverse_sign() + result.check_reference_allele(fasta) + metadata['SwitchedAlleles'] += 1 + except AssertionError as e: + logging.warning("Could not harmonise {}: {}".format(s, e)) + metadata['VariantsNotHarmonised'] += 1 + continue + metadata['HarmonisedVariants'] += 1 + + # keep file position for sorted recall later + if result.chrom not in file_idx: + file_idx[result.chrom] = [] + file_idx[result.chrom].append((result.pos, results.tell())) + pickle.dump(result, results) f.close() - return results, total_variants + logging.info("Total variants: {}".format(metadata['TotalVariants'])) + logging.info("Variants could not be read: {}".format(metadata['VariantsNotRead'])) + logging.info("Variants harmonised: {}".format(metadata['HarmonisedVariants'])) + logging.info("Variants discarded during harmonisation: {}".format(metadata['VariantsNotHarmonised'])) + logging.info("Alleles switched: {}".format(metadata['SwitchedAlleles'])) + logging.info("Skipped {} of {}".format( + metadata['VariantsNotRead'] + metadata['VariantsNotHarmonised'], metadata['TotalVariants']) + ) + + return results, file_idx, metadata diff --git a/harmonise.py b/harmonise.py deleted file mode 100644 index 5206f76..0000000 --- a/harmonise.py +++ /dev/null @@ -1,55 +0,0 @@ -import logging - - -class Harmonise: - - @staticmethod - def align_gwas_to_fasta(variants, fasta): - flipped_variants = 0 - harmonised = [] - - for variant in variants: - - if not variant.are_alleles_iupac(): - logging.warning("Skipping record {}: allele(s) are not standard IUPAC".format(variant)) - continue - - # get expected FASTA REF - try: - expected_ref_allele = str( - fasta.fetch(region="{}:{}-{}".format( - variant.chrom, - variant.pos, - variant.pos + (len(variant.ref) - 1) - )) - ).upper() - except: - logging.warning("Skipping record {}: problem getting ref allele".format(variant)) - continue - - if variant.ref != expected_ref_allele: - variant.reverse_sign() - flipped_variants += 1 - - if len(variant.ref) != len(variant.alt): - - # get expected FASTA REF - try: - expected_ref_allele = str( - fasta.fetch(region="{}:{}-{}".format( - variant.chrom, - variant.pos, - variant.pos + (len(variant.ref) - 1) - )) - ).upper() - except: - logging.warning("Skipping record {}: problem getting ref allele".format(variant)) - continue - - if variant.ref != expected_ref_allele: - logging.warning("Skipping record {}: could not match alleles to FASTA".format(variant)) - continue - - harmonised.append(variant) - - return harmonised, flipped_variants diff --git a/main.py b/main.py index 11697c8..fc0da80 100644 --- a/main.py +++ b/main.py @@ -1,26 +1,29 @@ import argparse import logging import marshmallow -from gwas_results import GwasResult +from gwas import Gwas from vcf import Vcf import pysam -from harmonise import Harmonise from datetime import datetime import json from param import Param import sys import os + def main(): - version = "1.1.1" + version = "1.2.0" parser = argparse.ArgumentParser(description='Map GWAS summary statistics to VCF/BCF') parser.add_argument('-v', '--version', action='version', version='%(prog)s {}'.format(version)) - parser.add_argument('--out', dest='out', required=False, help='Path to output VCF/BCF. If not present then must be specified as \'out\' in json file') - parser.add_argument('--data', dest='data', required=False, help='Path to GWAS summary stats. If not present then must be specified as \'data\' in json file') + parser.add_argument('--out', dest='out', required=False, + help='Path to output VCF/BCF. If not present then must be specified as \'out\' in json file') + parser.add_argument('--data', dest='data', required=False, + help='Path to GWAS summary stats. If not present then must be specified as \'data\' in json file') parser.add_argument('--ref', dest='ref', required=True, help='Path to reference FASTA') parser.add_argument('--json', dest='json', required=True, help='Path to parameters JSON') - parser.add_argument('--id', dest='id', required=False, help='Study identifier. If not present then must be specified as \'id\' in json file') + parser.add_argument('--id', dest='id', required=False, + help='Study identifier. If not present then must be specified as \'id\' in json file') parser.add_argument('--cohort_controls', type=int, dest='cohort_controls', required=False, default=None, help='Total study number of controls (if case/control) or total sample size if continuous. Overwrites value if present in json file.') parser.add_argument('--cohort_cases', type=int, dest='cohort_cases', required=False, default=None, @@ -38,7 +41,7 @@ def main(): if args.log: logging.basicConfig(level=getattr(logging, args.log), format='%(asctime)s %(levelname)s %(message)s') - logging.info("GWAS2VCF {}".format(version)) + logging.info("Gwas2VCF {}".format(version)) logging.info("Arguments: {}".format(vars(args))) logging.info("Reading JSON parameters") try: @@ -96,7 +99,6 @@ def main(): logging.error("{} file does not exist".format(args.data)) sys.exit() - if not os.path.isfile(args.ref): logging.error("{} file does not exist".format(args.ref)) sys.exit() @@ -105,70 +107,55 @@ def main(): logging.error("{} output directory does not exist".format(args.out)) sys.exit() - - # read in GWAS and harmonise alleles to reference fasta (ref) - gwas, total_variants = GwasResult.read_from_file( - args.data, - j['chr_col'], - j['pos_col'], - j['ea_col'], - j['oa_col'], - j['beta_col'], - j['se_col'], - j['pval_col'], - j['delimiter'], - j['header'], - ncase_field=j.get('ncase_col'), - dbsnp_field=j.get('snp_col'), - ea_af_field=j.get('eaf_col'), - nea_af_field=j.get('oaf_col'), - imp_z_field=j.get('imp_z_col'), - imp_info_field=j.get('imp_info_col'), - ncontrol_field=j.get('ncontrol_col'), - rm_chr_prefix=args.rm_chr_prefix - ) - - logging.info("Total variants: {}".format(total_variants)) - logging.info("Variants could not be read: {}".format(total_variants - len(gwas))) - + # read in data + # harmonise on-the-fly and write to pickle format + # keep file index for each record and chromosome position to write out karyotypically sorted records later with pysam.FastaFile(args.ref) as fasta: + gwas, idx, sample_metadata = Gwas.read_from_file( + args.data, + fasta, + j['chr_col'], + j['pos_col'], + j['ea_col'], + j['oa_col'], + j['beta_col'], + j['se_col'], + j['pval_col'], + j['delimiter'], + j['header'], + ncase_field=j.get('ncase_col'), + rsid_field=j.get('snp_col'), + ea_af_field=j.get('eaf_col'), + nea_af_field=j.get('oaf_col'), + imp_z_field=j.get('imp_z_col'), + imp_info_field=j.get('imp_info_col'), + ncontrol_field=j.get('ncontrol_col'), + rm_chr_prefix=args.rm_chr_prefix + ) + + # metadata + file_metadata = { + 'Gwas2VCF_command': ' '.join(sys.argv[1:]) + "; " + version, + 'file_date': datetime.now().isoformat() + } - # harmonise to FASTA - harmonised, flipped_variants = Harmonise.align_gwas_to_fasta(gwas, fasta) - - logging.info("Variants harmonised: {}".format(len(harmonised))) - logging.info("Variants discarded during harmonisation: {}".format(len(gwas) - len(harmonised))) - logging.info("Alleles switched: {}".format(flipped_variants - (len(gwas) - len(harmonised)))) - - # number of skipped records - logging.info("Skipped {} of {}".format(total_variants - len(harmonised), total_variants)) - - file_metadata = { - 'gwas_harmonisation_command': ' '.join(sys.argv[1:]) + "; " + version, - 'file_date': datetime.now().isoformat() - } - - sample_metadata = { - 'TotalVariants': total_variants, - 'VariantsNotRead': total_variants - len(gwas), - 'HarmonisedVariants': len(harmonised), - 'VariantsNotHarmonised': len(gwas) - len(harmonised), - 'SwitchedAlleles': flipped_variants - (len(gwas) - len(harmonised)) - } + if args.cohort_controls is not None: + sample_metadata['TotalControls'] = args.cohort_controls - if args.cohort_controls is not None: - sample_metadata['TotalControls'] = args.cohort_controls + if args.cohort_cases is not None: + sample_metadata['TotalCases'] = args.cohort_cases - if args.cohort_cases is not None: - sample_metadata['TotalCases'] = args.cohort_cases + if 'ncase_col' in j or args.cohort_cases is not None: + sample_metadata['StudyType'] = 'CaseControl' + else: + sample_metadata['StudyType'] = 'Continuous' - if 'ncase_col' in j or args.cohort_cases is not None: - sample_metadata['StudyType'] = 'CaseControl' - else: - sample_metadata['StudyType'] = 'Continuous' + # write to VCF + # loop over sorted chromosome position and get record using random access + Vcf.write_to_file(gwas, idx, args.out, fasta, j['build'], args.id, sample_metadata, file_metadata, args.csi) - # write to vcf - Vcf.write_to_file(harmonised, args.out, fasta, j['build'], args.id, sample_metadata, file_metadata, args.csi) + # close temp file to release disk space + gwas.close() if __name__ == "__main__": diff --git a/param.py b/param.py index 310d8a1..944d2cb 100644 --- a/param.py +++ b/param.py @@ -24,14 +24,18 @@ class Param(Schema): ncontrol_col = fields.Int(required=False, description="Column number for number of controls (if case/control) or total sample size if continuous") build = fields.Str(required=True, description="Name of the genome build i.e. GRCh36, GRCh37, GRCh38") - data = fields.Str(required=False, description="Path to input file, required if not passed in main command line parameters") - out = fields.Str(required=False, description="Path to output vcf file, required if not passed in main command line parameters") - id = fields.Str(required=False, description="Unique identified or name of GWAS study, required if not passed in main command line parameters") + data = fields.Str(required=False, + description="Path to input file, required if not passed in main command line parameters") + out = fields.Str(required=False, + description="Path to output vcf file, required if not passed in main command line parameters") + id = fields.Str(required=False, + description="Unique identified or name of GWAS study, required if not passed in main command line parameters") cohort_cases = fields.Float(required=False, - description="Total study number of cases") + description="Total study number of cases") cohort_controls = fields.Float(required=False, - description="Total study number of controls (if case/control) or total sample size if continuous") + description="Total study number of controls (if case/control) or total sample size if continuous") jsonmeta = fields.Str(required=False, description="Path to input metadata file file") + @validates_schema(pass_original=True) def check_unknown_fields(self, data, original_data): unknown = set(original_data) - set(self.fields) diff --git a/requirements.txt b/requirements.txt index f551730..f85173a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +pytest biopython==1.72 pysam==0.15.2 marshmallow==2.18.1 diff --git a/test/test_gwas.py b/test/test_gwas.py new file mode 100644 index 0000000..2aadb24 --- /dev/null +++ b/test/test_gwas.py @@ -0,0 +1,18 @@ +import unittest +from gwas import Gwas +import pytest + + +class TestGwasResults(unittest.TestCase): + def test_are_alleles_iupac(self): + # should not raise exec + g = Gwas('1', 100, 'A', 'T', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) + g.check_alleles_iupac() + + with pytest.raises(AssertionError): + g = Gwas('1', 100, 'A', 'wdeT', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) + g.check_alleles_iupac() + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_gwas_results.py b/test/test_gwas_results.py deleted file mode 100644 index 81b2c51..0000000 --- a/test/test_gwas_results.py +++ /dev/null @@ -1,14 +0,0 @@ -import unittest -from gwas_results import GwasResult - - -class TestGwasResults(unittest.TestCase): - def test_are_alleles_iupac(self): - g = GwasResult('1', 100, 'A', 'T', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) - self.assertTrue(g.are_alleles_iupac()) - g = GwasResult('1', 100, 'A', 'wdeT', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) - self.assertFalse(g.are_alleles_iupac()) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/test_vcf.py b/test/test_vcf.py index afc98f2..e3ed403 100644 --- a/test/test_vcf.py +++ b/test/test_vcf.py @@ -5,23 +5,22 @@ class TestVcf(unittest.TestCase): def test_convert_pval_to_neg_log10(self): - self.assertEqual(0, Vcf.convert_pval_to_neg_log10(1)) - self.assertEqual(999, Vcf.convert_pval_to_neg_log10(0)) + assert 0 == Vcf.convert_pval_to_neg_log10(1) + assert 999 == Vcf.convert_pval_to_neg_log10(0) def test_is_valid_float32(self): # small - self.assertFalse(Vcf.is_float32_lossy(1e-37)) - self.assertFalse(Vcf.is_float32_lossy(np.finfo(np.float32).tiny)) - self.assertTrue(Vcf.is_float32_lossy(1e-50)) - self.assertFalse(None) - self.assertFalse(Vcf.is_float32_lossy(-1e-37)) - self.assertTrue(Vcf.is_float32_lossy(-1e-50)) + assert Vcf.is_float32_lossy(1e-37) == False + assert Vcf.is_float32_lossy(np.finfo(np.float32).tiny) == False + assert Vcf.is_float32_lossy(1e-50) == True + assert Vcf.is_float32_lossy(-1e-37) == False + assert Vcf.is_float32_lossy(-1e-50) == True # large - self.assertTrue(Vcf.is_float32_lossy(999999999999999999999999999999999999999)) - self.assertTrue(Vcf.is_float32_lossy(-999999999999999999999999999999999999999)) - self.assertFalse(Vcf.is_float32_lossy(-99999999999999999999999999999999999999)) - self.assertFalse(Vcf.is_float32_lossy(99999999999999999999999999999999999999)) + assert Vcf.is_float32_lossy(999999999999999999999999999999999999999) == True + assert Vcf.is_float32_lossy(-999999999999999999999999999999999999999) == True + assert Vcf.is_float32_lossy(-99999999999999999999999999999999999999) == False + assert Vcf.is_float32_lossy(99999999999999999999999999999999999999) == False if __name__ == '__main__': diff --git a/vcf.py b/vcf.py index 3fe04dc..c9b2b67 100644 --- a/vcf.py +++ b/vcf.py @@ -1,6 +1,7 @@ import pysam import logging import numpy as np +import pickle class Vcf: @@ -37,8 +38,14 @@ def remove_illegal_chars(s): r = r.replace(",", "_") return r + """ + Write GWAS file to VCF + Expects an open file handle to a Picle file of GWAS results & file index dict(chromosome[(position, offset)]) + """ + @staticmethod - def write_to_file(gwas_results, path, fasta, build, trait_id, sample_metadata=None, file_metadata=None, csi=False): + def write_to_file(gwas_file, gwas_idx, path, fasta, build, trait_id, sample_metadata=None, file_metadata=None, + csi=False): logging.info("Writing headers to BCF/VCF: {}".format(path)) header = pysam.VariantHeader() @@ -101,92 +108,93 @@ def write_to_file(gwas_results, path, fasta, build, trait_id, sample_metadata=No vcf = pysam.VariantFile(path, "w", header=header) - records = dict() - for result in gwas_results: - lpval = -np.log10(result.pval) - - # check floats - if Vcf.is_float32_lossy(result.b): - logging.warning( - "Effect field cannot fit into float32. Expect loss of precision for: {}".format( - result.b) - ) - if Vcf.is_float32_lossy(result.se): - result.se = np.float64(np.finfo(np.float32).tiny).item() - logging.warning( - "Standard error field cannot fit into float32. Expect loss of precision for: {}".format( - result.se) - ) - if Vcf.is_float32_lossy(lpval): - logging.warning( - "-log10(pval) field cannot fit into float32. Expect loss of precision for: {}".format( - lpval) - ) - if Vcf.is_float32_lossy(result.alt_freq): - logging.warning( - "Allele frequency field cannot fit into float32. Expect loss of precision for: {}".format( - result.alt_freq) - ) - if Vcf.is_float32_lossy(result.n): - logging.warning( - "Sample size field cannot fit into float32. Expect loss of precision for: {}".format( - result.n) - ) - if Vcf.is_float32_lossy(result.imp_z): - logging.warning( - "Imputation Z score field cannot fit into float32. Expect loss of precision for: {}".format( - result.imp_z) - ) - if Vcf.is_float32_lossy(result.imp_info): - logging.warning( - "Imputation INFO field cannot fit into float32. Expect loss of precision for: {}".format( - result.imp_info) - ) - - record = vcf.new_record() - record.chrom = result.chrom - assert " " not in record.chrom - record.pos = result.pos - assert record.pos > 0 - record.id = Vcf.remove_illegal_chars(result.dbsnpid) - record.alleles = (result.ref, result.alt) - record.filter.add(result.vcf_filter) - - if result.alt_freq is not None: - record.info['AF'] = result.alt_freq - - if result.b is not None: - record.samples[trait_id]['ES'] = result.b - if result.se is not None: - record.samples[trait_id]['SE'] = result.se - if lpval is not None: - record.samples[trait_id]['LP'] = lpval - if result.alt_freq is not None: - record.samples[trait_id]['AF'] = result.alt_freq - if result.n is not None: - record.samples[trait_id]['SS'] = result.n - if result.imp_z is not None: - record.samples[trait_id]['EZ'] = result.imp_z - if result.imp_info is not None: - record.samples[trait_id]['SI'] = result.imp_info - if result.ncase is not None: - record.samples[trait_id]['NC'] = result.ncase - if result.dbsnpid is not None: - record.samples[trait_id]['ID'] = record.id - - # bank variants by chromosome - if result.chrom not in records: - records[result.chrom] = [] - - records[result.chrom].append(record) - - # sort records & write records to VCF/BCF - logging.info("Sorting records by chromosome and position") + # recall variant objects in chromosome position order for contig in fasta.references: - if contig in records: - records[contig].sort(key=lambda x: x.pos) - for record in records[contig]: - vcf.write(record) + if contig not in gwas_idx: + continue + + # sort by chromosome position + gwas_idx[contig].sort() + + for chr_pos in gwas_idx[contig]: + + # load GWAS result + gwas_file.seek(chr_pos[1]) + result = pickle.load(gwas_file) + + lpval = -np.log10(result.pval) + + # check floats + if Vcf.is_float32_lossy(result.b): + logging.warning( + "Effect field cannot fit into float32. Expect loss of precision for: {}".format( + result.b) + ) + if Vcf.is_float32_lossy(result.se): + result.se = np.float64(np.finfo(np.float32).tiny).item() + logging.warning( + "Standard error field cannot fit into float32. Expect loss of precision for: {}".format( + result.se) + ) + if Vcf.is_float32_lossy(lpval): + logging.warning( + "-log10(pval) field cannot fit into float32. Expect loss of precision for: {}".format( + lpval) + ) + if Vcf.is_float32_lossy(result.alt_freq): + logging.warning( + "Allele frequency field cannot fit into float32. Expect loss of precision for: {}".format( + result.alt_freq) + ) + if Vcf.is_float32_lossy(result.n): + logging.warning( + "Sample size field cannot fit into float32. Expect loss of precision for: {}".format( + result.n) + ) + if Vcf.is_float32_lossy(result.imp_z): + logging.warning( + "Imputation Z score field cannot fit into float32. Expect loss of precision for: {}".format( + result.imp_z) + ) + if Vcf.is_float32_lossy(result.imp_info): + logging.warning( + "Imputation INFO field cannot fit into float32. Expect loss of precision for: {}".format( + result.imp_info) + ) + + record = vcf.new_record() + record.chrom = result.chrom + assert " " not in record.chrom + record.pos = result.pos + assert record.pos > 0 + record.id = Vcf.remove_illegal_chars(result.dbsnpid) + record.alleles = (result.ref, result.alt) + record.filter.add(result.vcf_filter) + + if result.alt_freq is not None: + record.info['AF'] = result.alt_freq + + if result.b is not None: + record.samples[trait_id]['ES'] = result.b + if result.se is not None: + record.samples[trait_id]['SE'] = result.se + if lpval is not None: + record.samples[trait_id]['LP'] = lpval + if result.alt_freq is not None: + record.samples[trait_id]['AF'] = result.alt_freq + if result.n is not None: + record.samples[trait_id]['SS'] = result.n + if result.imp_z is not None: + record.samples[trait_id]['EZ'] = result.imp_z + if result.imp_info is not None: + record.samples[trait_id]['SI'] = result.imp_info + if result.ncase is not None: + record.samples[trait_id]['NC'] = result.ncase + if result.dbsnpid is not None: + record.samples[trait_id]['ID'] = record.id + + # write to file + vcf.write(record) vcf.close() From d607a21c8076810604837c28b60d0f1dd875f53c Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Tue, 10 Mar 2020 21:08:28 +0000 Subject: [PATCH 02/10] implemeted heap queue to sort --- gwas.py | 5 +++-- vcf.py | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gwas.py b/gwas.py index daddabc..861eb66 100755 --- a/gwas.py +++ b/gwas.py @@ -4,6 +4,7 @@ import tempfile import pickle import re +from heapq import heappush class Gwas: @@ -84,7 +85,7 @@ def read_from_file( rm_chr_prefix=False ): - rsid_pattern = re.compile("^rs[0-9]*") + rsid_pattern = re.compile("^rs[0-9]*$") logging.info("Reading summary stats and mapping to FASTA: {}".format(path)) logging.debug("File path: {}".format(path)) @@ -268,7 +269,7 @@ def read_from_file( # keep file position for sorted recall later if result.chrom not in file_idx: file_idx[result.chrom] = [] - file_idx[result.chrom].append((result.pos, results.tell())) + heappush(file_idx[result.chrom], (result.pos, results.tell())) pickle.dump(result, results) f.close() diff --git a/vcf.py b/vcf.py index c9b2b67..21a4f59 100644 --- a/vcf.py +++ b/vcf.py @@ -2,6 +2,7 @@ import logging import numpy as np import pickle +from heapq import heappop class Vcf: @@ -113,10 +114,8 @@ def write_to_file(gwas_file, gwas_idx, path, fasta, build, trait_id, sample_meta if contig not in gwas_idx: continue - # sort by chromosome position - gwas_idx[contig].sort() - - for chr_pos in gwas_idx[contig]: + while gwas_idx[contig]: + chr_pos = heappop(gwas_idx[contig]) # load GWAS result gwas_file.seek(chr_pos[1]) From 553363e1613c1c5d21006434045b451807df4997 Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Tue, 10 Mar 2020 21:14:28 +0000 Subject: [PATCH 03/10] implemeted heap queue to sort --- gwas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gwas.py b/gwas.py index 861eb66..a44b77d 100755 --- a/gwas.py +++ b/gwas.py @@ -266,7 +266,7 @@ def read_from_file( continue metadata['HarmonisedVariants'] += 1 - # keep file position for sorted recall later + # keep file position sorted by chromosome position for recall later if result.chrom not in file_idx: file_idx[result.chrom] = [] heappush(file_idx[result.chrom], (result.pos, results.tell())) From 6b19844ba0763cf95035aabe385d1903ec095361 Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Tue, 10 Mar 2020 21:33:24 +0000 Subject: [PATCH 04/10] added log --- vcf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vcf.py b/vcf.py index 21a4f59..85d5aef 100644 --- a/vcf.py +++ b/vcf.py @@ -110,6 +110,7 @@ def write_to_file(gwas_file, gwas_idx, path, fasta, build, trait_id, sample_meta vcf = pysam.VariantFile(path, "w", header=header) # recall variant objects in chromosome position order + logging.info("Writing variants to BCF/VCF: {}".format(path)) for contig in fasta.references: if contig not in gwas_idx: continue From 5c557002a35d552d3015d4d4493b12498a7c3e99 Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Wed, 11 Mar 2020 09:27:27 +0000 Subject: [PATCH 05/10] updated loggings --- gwas.py | 14 +++++++------- vcf.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/gwas.py b/gwas.py index a44b77d..337c973 100755 --- a/gwas.py +++ b/gwas.py @@ -142,7 +142,7 @@ def read_from_file( else: chrom = s[chrom_field] except Exception as e: - logging.warning("Skipping {}: {}".format(s, e)) + logging.debug("Skipping {}: {}".format(s, e)) metadata['VariantsNotRead'] += 1 continue @@ -150,7 +150,7 @@ def read_from_file( pos = int(float(s[pos_field])) # float is for scientific notation assert pos > 0 except Exception as e: - logging.warning("Skipping {}: {}".format(s, e)) + logging.debug("Skipping {}: {}".format(s, e)) metadata['VariantsNotRead'] += 1 continue @@ -160,21 +160,21 @@ def read_from_file( try: b = float(s[effect_field]) except Exception as e: - logging.warning("Skipping {}: {}".format(s, e)) + logging.debug("Skipping {}: {}".format(s, e)) metadata['VariantsNotRead'] += 1 continue try: se = float(s[se_field]) except Exception as e: - logging.warning("Skipping {}: {}".format(s, e)) + logging.debug("Skipping {}: {}".format(s, e)) metadata['VariantsNotRead'] += 1 continue try: pval = float(s[pval_field]) except Exception as e: - logging.warning("Skipping line {}, {}".format(s, e)) + logging.debug("Skipping line {}, {}".format(s, e)) metadata['VariantsNotRead'] += 1 continue @@ -248,7 +248,7 @@ def read_from_file( try: result.check_alleles_iupac() except AssertionError as e: - logging.warning("Skipping {}: {}".format(s, e)) + logging.debug("Skipping {}: {}".format(s, e)) metadata['VariantsNotRead'] += 1 continue @@ -261,7 +261,7 @@ def read_from_file( result.check_reference_allele(fasta) metadata['SwitchedAlleles'] += 1 except AssertionError as e: - logging.warning("Could not harmonise {}: {}".format(s, e)) + logging.debug("Could not harmonise {}: {}".format(s, e)) metadata['VariantsNotHarmonised'] += 1 continue metadata['HarmonisedVariants'] += 1 diff --git a/vcf.py b/vcf.py index 85d5aef..a18a9fe 100644 --- a/vcf.py +++ b/vcf.py @@ -41,7 +41,7 @@ def remove_illegal_chars(s): """ Write GWAS file to VCF - Expects an open file handle to a Picle file of GWAS results & file index dict(chromosome[(position, offset)]) + Expects an open file handle to a Pickle file of GWAS results & file index dict(chromosome[(position, offset)]) """ @staticmethod From 888870ef140665e295f492bed04823e051616688 Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Wed, 11 Mar 2020 09:34:36 +0000 Subject: [PATCH 06/10] updated docs --- README.md | 29 +++++++++-------------------- test/test_vcf.py | 35 +++++++++++++++-------------------- 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index ec102a4..1e1f467 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ Unit tests: ``` cd gwas2vcf -python -m unittest discover test +python -m pytest -v test ``` End-to-end tests: @@ -55,33 +55,24 @@ bash test/example.sh ## Usage ``` -Map GWAS summary statistics to VCF/BCF - -usage: main.py [-h] [-v] [--out OUT] [--data DATA] --ref REF --json JSON - [--id ID] [--cohort_controls COHORT_CONTROLS] - [--cohort_cases COHORT_CASES] [--rm_chr_prefix] [--csi] - [--log {DEBUG,INFO,WARNING,ERROR,CRITICAL}] +usage: main.py [-h] [-v] [--out OUT] [--data DATA] --ref REF --json JSON [--id ID] [--cohort_controls COHORT_CONTROLS] + [--cohort_cases COHORT_CASES] [--rm_chr_prefix] [--csi] [--log {DEBUG,INFO,WARNING,ERROR,CRITICAL}] Map GWAS summary statistics to VCF/BCF optional arguments: -h, --help show this help message and exit -v, --version show program's version number and exit - --out OUT Path to output VCF/BCF. If not present then must be - specified as 'out' in json file - --data DATA Path to GWAS summary stats. If not present then must - be specified as 'data' in json file + --out OUT Path to output VCF/BCF. If not present then must be specified as 'out' in json file + --data DATA Path to GWAS summary stats. If not present then must be specified as 'data' in json file --ref REF Path to reference FASTA --json JSON Path to parameters JSON - --id ID Study identifier. If not present then must be - specified as 'id' in json file + --id ID Study identifier. If not present then must be specified as 'id' in json file --cohort_controls COHORT_CONTROLS - Total study number of controls (if case/control) or - total sample size if continuous. Overwrites value if + Total study number of controls (if case/control) or total sample size if continuous. Overwrites value if present in json file. --cohort_cases COHORT_CASES - Total study number of cases. Overwrites value if - present in json file. + Total study number of cases. Overwrites value if present in json file. --rm_chr_prefix Remove chr prefix from GWAS chromosome --csi Default is to index tbi but use this flag to index csi --log {DEBUG,INFO,WARNING,ERROR,CRITICAL} @@ -92,7 +83,7 @@ See `param.py` for JSON specification ## Combine multiallelics -Merge variants at single genetic position on to a single row +Merge variants at single genetic position on to a single row. This step is **highly** recommended to avoid duplicate RSIDs. ``` bcftools norm \ @@ -139,8 +130,6 @@ file.vcf.gz ## Merge multiple GWAS summary stats into a single file -Note: Merged GWAS BCFs are significantly slower to query; for best performance do not do this. - ``` bcftools merge \ -O z \ diff --git a/test/test_vcf.py b/test/test_vcf.py index e3ed403..8f361ad 100644 --- a/test/test_vcf.py +++ b/test/test_vcf.py @@ -1,27 +1,22 @@ -import unittest from vcf import Vcf import numpy as np -class TestVcf(unittest.TestCase): - def test_convert_pval_to_neg_log10(self): - assert 0 == Vcf.convert_pval_to_neg_log10(1) - assert 999 == Vcf.convert_pval_to_neg_log10(0) +def test_convert_pval_to_neg_log10(): + assert Vcf.convert_pval_to_neg_log10(1) == 0 + assert Vcf.convert_pval_to_neg_log10(0) == 999 - def test_is_valid_float32(self): - # small - assert Vcf.is_float32_lossy(1e-37) == False - assert Vcf.is_float32_lossy(np.finfo(np.float32).tiny) == False - assert Vcf.is_float32_lossy(1e-50) == True - assert Vcf.is_float32_lossy(-1e-37) == False - assert Vcf.is_float32_lossy(-1e-50) == True - # large - assert Vcf.is_float32_lossy(999999999999999999999999999999999999999) == True - assert Vcf.is_float32_lossy(-999999999999999999999999999999999999999) == True - assert Vcf.is_float32_lossy(-99999999999999999999999999999999999999) == False - assert Vcf.is_float32_lossy(99999999999999999999999999999999999999) == False +def test_is_valid_float32(): + # small + assert not Vcf.is_float32_lossy(1e-37) + assert not Vcf.is_float32_lossy(np.finfo(np.float32).tiny) + assert Vcf.is_float32_lossy(1e-50) + assert not Vcf.is_float32_lossy(-1e-37) + assert Vcf.is_float32_lossy(-1e-50) - -if __name__ == '__main__': - unittest.main() + # large + assert Vcf.is_float32_lossy(999999999999999999999999999999999999999) + assert Vcf.is_float32_lossy(-999999999999999999999999999999999999999) + assert not Vcf.is_float32_lossy(-99999999999999999999999999999999999999) + assert not Vcf.is_float32_lossy(99999999999999999999999999999999999999) From 4f10761e4e13395e71e9bab4aa9de6d862d9dbc4 Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Wed, 11 Mar 2020 09:35:00 +0000 Subject: [PATCH 07/10] updated test --- test/test_gwas.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/test/test_gwas.py b/test/test_gwas.py index 2aadb24..988e622 100644 --- a/test/test_gwas.py +++ b/test/test_gwas.py @@ -1,18 +1,12 @@ -import unittest from gwas import Gwas import pytest -class TestGwasResults(unittest.TestCase): - def test_are_alleles_iupac(self): - # should not raise exec - g = Gwas('1', 100, 'A', 'T', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) - g.check_alleles_iupac() - - with pytest.raises(AssertionError): - g = Gwas('1', 100, 'A', 'wdeT', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) - g.check_alleles_iupac() +def test_are_alleles_iupac(): + # should not raise exec + g = Gwas('1', 100, 'A', 'T', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) + g.check_alleles_iupac() - -if __name__ == '__main__': - unittest.main() + with pytest.raises(AssertionError): + g = Gwas('1', 100, 'A', 'wdeT', 1, 0.1, 0.1, 100, 0.05, 0, "snp", None, None) + g.check_alleles_iupac() From d85392a09c6a58347bb809a8daec85a0f4b4186b Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Wed, 11 Mar 2020 09:37:10 +0000 Subject: [PATCH 08/10] updated odcs --- README.md | 10 ++++------ test/example.json | 15 --------------- test/example.sh | 19 ------------------- 3 files changed, 4 insertions(+), 40 deletions(-) delete mode 100644 test/example.json delete mode 100644 test/example.sh diff --git a/README.md b/README.md index 1e1f467..37357cc 100644 --- a/README.md +++ b/README.md @@ -46,12 +46,6 @@ cd gwas2vcf python -m pytest -v test ``` -End-to-end tests: - -``` -bash test/example.sh -``` - ## Usage ``` @@ -81,6 +75,10 @@ optional arguments: See `param.py` for JSON specification +## Example + +See [gwas-vcf-performance](https://github.com/MRCIEU/gwas-vcf-performance/blob/master/workflow.Rmd) for a full implementation + ## Combine multiallelics Merge variants at single genetic position on to a single row. This step is **highly** recommended to avoid duplicate RSIDs. diff --git a/test/example.json b/test/example.json deleted file mode 100644 index 4a532bd..0000000 --- a/test/example.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "chr_col": 0, - "pos_col": 1, - "snp_col": 2, - "ea_col": 3, - "oa_col": 4, - "beta_col": 5, - "se_col": 6, - "ncontrol_col": 7, - "pval_col": 8, - "eaf_col": 9, - "delimiter": "\t", - "header": true, - "build": "GRCh37" -} diff --git a/test/example.sh b/test/example.sh deleted file mode 100644 index 2ea254b..0000000 --- a/test/example.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env bash - -# get data -curl -L http://csg.sph.umich.edu/abecasis/public/lipids2013/jointGwasMc_LDL.txt.gz | \ -gzip -dc | \ -cut -s -f2- | \ -sed 's/^chr//g' | \ -tr ':' '\t' | \ -sed $'s/^SNP_hg19/chr\tpos/g' > example.txt - -# harmonise against reference FASTA - -source ./venv/bin/activate -./venv/bin/python main.py \ ---out example.vcf \ ---data example.txt \ ---ref human_g1k_v37.fasta \ ---id '300' \ ---json example.json From 62aae53466cf9befe5a6c662557d042aa4cb9181 Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Wed, 11 Mar 2020 09:38:04 +0000 Subject: [PATCH 09/10] updated req --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index f85173a..68c6257 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pytest +pytest==5.3.5 biopython==1.72 pysam==0.15.2 marshmallow==2.18.1 From c01b20136a1abe4267806447ae56744a6adefe5f Mon Sep 17 00:00:00 2001 From: Matthew Lyon Date: Wed, 11 Mar 2020 10:17:37 +0000 Subject: [PATCH 10/10] updated test --- test/test_vcf.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/test_vcf.py b/test/test_vcf.py index 8f361ad..9f0927c 100644 --- a/test/test_vcf.py +++ b/test/test_vcf.py @@ -1,5 +1,9 @@ from vcf import Vcf import numpy as np +import pickle +from gwas import Gwas +import tempfile +from heapq import heappush, heappop def test_convert_pval_to_neg_log10(): @@ -20,3 +24,27 @@ def test_is_valid_float32(): assert Vcf.is_float32_lossy(-999999999999999999999999999999999999999) assert not Vcf.is_float32_lossy(-99999999999999999999999999999999999999) assert not Vcf.is_float32_lossy(99999999999999999999999999999999999999) + + +def test_gwas_unpickle(): + g1 = [Gwas("1", 101, "A", "T", 1, 0, 5e-8, 1000, 0.4, "rs1234", None, None, None), + Gwas("1", 105, "A", "T", 1, 0, 5e-8, 1000, 0.4, "rs1234", None, None, None), + Gwas("1", 102, "A", "T", 1, 0, 5e-8, 1000, 0.4, "rs1234", None, None, None)] + g2 = [] + idx = [] + results = tempfile.TemporaryFile() + for result in g1: + heappush(idx, (result.pos, results.tell())) + pickle.dump(result, results) + + while idx: + pos = heappop(idx) + results.seek(pos[1]) + result = pickle.load(results) + g2.append(result) + + assert g2[0].pos == 101 + assert g2[1].pos == 102 + assert g2[2].pos == 105 + + results.close()