Skip to content

Commit

Permalink
Fix bug 97 in develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jdidion committed May 4, 2020
1 parent 03f1883 commit 82a726c
Showing 1 changed file with 208 additions and 134 deletions.
342 changes: 208 additions & 134 deletions atropos/commands/legacy_reports.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABCMeta, abstractmethod
from argparse import Namespace

import math
import textwrap
from typing import (
Expand Down Expand Up @@ -901,157 +901,231 @@ def _print_metrics_report(cls, data, outfile):
outfile: The output file.
"""
paired = "read2" in data
max_count = data["read1"]["counts"]

if paired:
max_count = max(max_count, data["read2"]["counts"])

printer = PairedEndMetricsPrinter(data, outfile)
else:
printer = SingleEndMetricsPrinter(data, outfile)

printer.print_header()

# Sequence-level stats
printer.print_counts()
printer.print_histogram("Sequence lengths:", "lengths", "hist")
printer.print_histogram("Sequence qualities:", "qualities", "hist")
printer.print_histogram("Sequence GC content (%)", "gc", "hist")
printer.print_tile_histograms(
"per-tile sequence qualities (%)", "tile_sequence_qualities")

# Base-level stats
printer.print_base_histograms("base qualities (%)", "base_qualities")
printer.print_base_histograms("base composition (%)", "bases")
printer.print_tile_base_histograms(
"per-tile base qualities (%)", "tile_base_qualities")


class MetricsPrinter(metaclass=ABCMeta):
def __init__(self, data, outfile):
self._data = data
self._title_printer = TitlePrinter(outfile)
max_count = self._max_count()
max_width = len(str(max_count))
# add space for commas and column separation
max_width += (max_width // 3) + 1
_print_title = TitlePrinter(outfile)
_print = RowPrinter(outfile, (35, max_width))
self._printer = RowPrinter(outfile, (35, max_width))

def _print_histogram(title, hist1, hist2=None):
_print_title(title, level=2)
@abstractmethod
def _max_count(self):
pass

if hist1 is None:
_print("No Data")
return
def _print_histogram(self, title, hist1, hist2=None):
self._title_printer(title, level=2)

if hist2:
hist = (
(key, hist1.get(key, 0), hist2.get(key, 0))
for key in sorted(set(hist1.keys()) | set(hist2.keys()))
)
else:
hist = sorted(hist1.items(), key=lambda x: x[0])
if hist1 is None:
self._printer("No Data")
return

for histbin in hist:
_print(*histbin)
if hist2:
hist = (
(key, hist1.get(key, 0), hist2.get(key, 0))
for key in sorted(set(hist1.keys()) | set(hist2.keys())))
else:
hist = sorted(hist1.items(), key=lambda x: x[0])

def _print_base_histogram(title, hist, extra_width=4, index_name="Pos"):
_print_title(title, level=2)
for histbin in hist:
self._printer(*histbin)

if hist is None:
_print("No Data")
return
def _print_base_histogram(self, title, hist, extra_width=4, index_name="Pos"):
self._title_printer(title, level=2)

_print(index_name, *hist["columns"], header=True, extra_width=extra_width)
if hist is None:
self._printer("No Data")
return

for pos, row in hist["rows"].items():
total_count = sum(row)
base_pcts = (round(count * 100 / total_count, 1) for count in row)
_print(pos, *base_pcts, extra_width=extra_width)
self._printer(
index_name, *hist["columns"], header=True, extra_width=extra_width)

def _print_tile_histogram(title, hist):
if hist is None:
_print_title(title, level=2)
_print("No Data")
return
for pos, row in hist["rows"].items():
total_count = sum(row)
base_pcts = (
round(count * 100 / total_count, 1)
for count in row)
self._printer(pos, *base_pcts, extra_width=extra_width)

ncol = len(hist["columns"])
max_tile_width = (
max(4, len(str(math.ceil(data["read1"]["counts"] / ncol)))) + 1
)
_print_base_histogram(
title, hist, extra_width=max_tile_width, index_name="Tile"
)

def _print_tile_base_histogram(title: str, hist: dict):
"""Print a histogram of position x tile, with values as the median
base quality.
"""
_print_title(title, level=2)
if hist is None:
_print("No Data")
return

quals = hist["columns"]
tiles = hist["columns2"]
ncol = len(tiles)
max_tile_width = (
max(4, len(str(math.ceil(data["read1"]["counts"] / ncol)))) + 1
)
_print("Pos", *tiles, header=True, extra_width=max_tile_width)

for pos, tiles in hist["rows"].items():
# compute the weighted median for each tile at each position
_print(
pos,
*(
weighted_median(quals, tile_counts)
for tile_counts in tiles.values()
),
extra_width=max_tile_width,
)
def _print_tile_histogram(self, title, hist):
if hist is None:
self._title_printer(title, level=2)
self._printer("No Data")
return

_print("", "Read1", "Read2", header=True)
ncol = len(hist["columns"])
max_tile_width = max(
4, len(str(math.ceil(self._data["read1"]["counts"] / ncol)))) + 1
self._print_base_histogram(
title, hist, extra_width=max_tile_width, index_name="Tile")

# Sequence-level metrics
_print(
"Read pairs:" if paired else "Reads:",
data["read1"]["counts"],
data["read2"]["counts"],
)
_print()
_print_histogram(
"Sequence lengths:",
data["read1"]["lengths"]["hist"],
data["read2"]["lengths"]["hist"],
)
_print()
if "qualities" in data["read1"]:
_print_histogram(
"Sequence qualities:",
data["read1"]["qualities"]["hist"],
data["read2"]["qualities"]["hist"],
)
_print()
_print_histogram(
"Sequence GC content (%)",
data["read1"]["gc"]["hist"],
data["read2"]["gc"]["hist"],
)
_print()
if "tile_sequence_qualities" in data["read1"]:
_print_tile_histogram(
"Read 1 per-tile sequence qualities (%)",
data["read1"]["tile_sequence_qualities"],
)
_print()
_print_tile_histogram(
"Read 2 per-tile sequence qualities (%)",
data["read2"]["tile_sequence_qualities"],
)
_print()
def _print_tile_base_histogram(self, title, hist):
"""Print a histogram of position x tile, with values as the median
base quality.
"""
self._title_printer(title, level=2)

# Base-level metrics
if "base_qualities" in data["read1"]:
_print_base_histogram(
"Read 1 base qualities (%)", data["read1"]["base_qualities"]
)
_print()
_print_base_histogram(
"Read 2 base qualities (%)", data["read2"]["base_qualities"]
)
_print()
_print_base_histogram("Read 1 base composition (%)", data["read1"]["bases"])
_print()
_print_base_histogram("Read 2 base composition (%)", data["read2"]["bases"])
_print()
if hist is None:
self._printer("No Data")
return

if "tile_base_qualities" in data["read1"]:
_print_tile_base_histogram(
"Read 1 per-tile base qualities (%)",
data["read1"]["tile_base_qualities"],
)
_print()
_print_tile_base_histogram(
"Read 2 per-tile base qualities (%)",
data["read2"]["tile_base_qualities"],
)
_print()
quals = hist["columns"]
tiles = hist["columns2"]
ncol = len(tiles)
max_tile_width = max(
4, len(str(math.ceil(self._data["read1"]["counts"] / ncol)))) + 1
self._printer("Pos", *tiles, header=True, extra_width=max_tile_width)

for pos, tiles in hist["rows"].items():
# compute the weighted median for each tile at each position
self._printer(
pos,
*(
weighted_median(quals, tile_counts)
for tile_counts in tiles.values()
),
extra_width=max_tile_width)

@abstractmethod
def print_header(self):
pass

@abstractmethod
def print_counts(self):
pass

@abstractmethod
def print_histogram(self, title, key1, key2):
pass

@abstractmethod
def print_tile_histograms(self, title, key):
pass

@abstractmethod
def print_base_histograms(self, title, key):
pass

@abstractmethod
def print_tile_base_histograms(self, title, key):
pass


class SingleEndMetricsPrinter(MetricsPrinter):
def _max_count(self):
return self._data["read1"]["counts"]

def print_header(self):
self._printer("", "Read1", header=True)

def print_counts(self):
self._printer("Reads:", self._data["read1"]["counts"])
self._printer()

def print_histogram(self, title, key1, key2):
if key1 in self._data["read1"]:
self._print_histogram(title, self._data["read1"][key1][key2])
self._printer()

def print_tile_histograms(self, title, key):
if key in self._data["read1"]:
self._print_tile_histogram(
"Read 1 {}".format(title),
self._data["read1"][key])
self._printer()

def print_base_histograms(self, title, key):
if key in self._data["read1"]:
self._print_base_histogram(
"Read 1 {}".format(title),
self._data["read1"][key])
self._printer()

def print_tile_base_histograms(self, title, key):
if key in self._data["read1"]:
self._print_tile_base_histogram(
"Read 1 {}".format(title),
self._data["read1"][key])


class PairedEndMetricsPrinter(MetricsPrinter):
def _max_count(self):
return max(self._data["read1"]["counts"], self._data["read2"]["counts"])

def print_header(self):
self._printer("", "Read1", "Read2", header=True)

def print_counts(self):
self._printer(
"Read pairs:",
self._data["read1"]["counts"],
self._data["read2"]["counts"])
self._printer()

def print_histogram(self, title, key1, key2):
if key1 in self._data:
self._print_histogram(
title,
self._data["read1"][key1][key2],
self._data["read2"][key1][key2])
self._printer()

def print_tile_histograms(self, title, key):
if "tile_sequence_qualities" in self._data["read1"]:
self._print_tile_histogram(
"Read 1 {}".format(title),
self._data["read1"][key])
self._printer()
self._print_tile_histogram(
"Read 2 {}".format(title),
self._data["read2"][key])
self._printer()

def print_base_histograms(self, title, key):
if key in self._data:
self._print_base_histogram(
"Read 1 {}".format(title),
self._data["read1"][key])
self._printer()
self._print_base_histogram(
"Read 2 {}".format(title),
self._data["read2"][key])
self._printer()

def print_tile_base_histograms(self, title, key):
if key in self._data["read1"]:
self._print_tile_base_histogram(
"Read 1 {}".format(title),
self._data["read1"][key])
self._printer()
self._print_tile_base_histogram(
"Read 2 {}".format(title),
self._data["read2"][key])
self._printer()


def sizeof(*x: Union[str, int, float], seps: bool = True, prec: int = 1):
Expand Down

0 comments on commit 82a726c

Please sign in to comment.