"""This is a MultiQC plugin re-used temporarely here"""
import re
import zipfile
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
[docs]
class FastQC:
"""A temporary class to manipulate fastqc statistics
This class can also read the output of Falco. Note, however, that
falco has txt file instead of zip file.
"""
def __init__(self):
self.fastqc_data = {}
[docs]
def read_sample(self, filename, s_name):
"""reads the fastqc stats
:param filename:
:param s_name: sample name.
This method was copied from multiqc.modules.fastqc.fastqc modules as a
temporary hack to read the sample data.
"""
try:
zz = zipfile.ZipFile(filename)
file_contents = zz.open("{}{}".format(zz.namelist()[0], "fastqc_data.txt")).read().decode("utf8")
except:
with open(filename, "r") as fin:
file_contents = fin.read()
self.fastqc_data[s_name] = {"statuses": dict()}
section = None
s_headers = None
self.dup_keys = []
for l in file_contents.splitlines():
# Hack to fix falco bug found in some versions where this
# line is missing the # character in front of it
if l.startswith("Length\tCount"):
l = "#" + l
if l == ">>END_MODULE":
section = None
s_headers = None
elif l.startswith(">>"):
(section, status) = l[2:].split("\t", 1)
section = section.lower().replace(" ", "_")
self.fastqc_data[s_name]["statuses"][section] = status
elif section is not None:
if l.startswith("#"):
s_headers = l[1:].split("\t")
# Special case: Total Deduplicated Percentage header line
if s_headers[0] == "Total Deduplicated Percentage":
self.fastqc_data[s_name]["basic_statistics"].append(
{
"measure": "total_deduplicated_percentage",
"value": float(s_headers[1]),
}
)
else:
if s_headers[1] == "Relative count":
s_headers[1] = "Percentage of total"
s_headers = [s.lower().replace(" ", "_") for s in s_headers]
self.fastqc_data[s_name][section] = list()
elif s_headers is not None:
s = l.split("\t")
row = dict()
for i, v in enumerate(s):
v.replace("NaN", "0")
try:
v = float(v)
except ValueError:
pass
row[s_headers[i]] = v
self.fastqc_data[s_name][section].append(row)
# Special case - need to remember order of duplication keys
if section == "sequence_duplication_levels":
try:
self.dup_keys.append(float(s[0]))
except ValueError:
self.dup_keys.append(s[0])
# Tidy up the Basic Stats
self.fastqc_data[s_name]["basic_statistics"] = {
d["measure"]: d["value"] for d in self.fastqc_data[s_name]["basic_statistics"]
}
# TC: may 2020 Here we add the mean quality, which surprisingly is not
# to be found in the basic statistics.
try:
quality_sum = sum(
[x["quality"] * x["count"] for x in self.fastqc_data[s_name]["per_sequence_quality_scores"]]
)
nreads = sum([x["count"] for x in self.fastqc_data[s_name]["per_sequence_quality_scores"]])
mean_quality = quality_sum / float(nreads)
self.fastqc_data[s_name]["basic_statistics"]["mean_quality"] = mean_quality
except:
# if no sequence to be found;
self.fastqc_data[s_name]["basic_statistics"]["mean_quality"] = 0
self.fastqc_data[s_name]["basic_statistics"]["avg_sequence_length"] = 0
# Calculate the average sequence length (Basic Statistics gives a range)
length_bp = 0
total_count = 0
for d in self.fastqc_data[s_name].get("sequence_length_distribution", {}):
length_bp += d["count"] * self._avg_bp_from_range(d["length"])
total_count += d["count"]
if total_count > 0:
self.fastqc_data[s_name]["basic_statistics"]["avg_sequence_length"] = length_bp / total_count
def _avg_bp_from_range(self, bp):
"""Helper function - FastQC often gives base pair ranges (eg. 10-15)
which are not helpful when plotting. This returns the average from such
ranges as an int, which is helpful. If not a range, just returns the int
"""
# copied from multiqc v1.6
try:
if "-" in bp:
maxlen = float(bp.split("-", 1)[1])
minlen = float(bp.split("-", 1)[0])
bp = ((maxlen - minlen) / 2) + minlen
except TypeError:
pass
return int(bp)
[docs]
def plot_sequence_quality(self, max_score=40, ax=None):
ymax = max_score + 1
xmax = 0
for sample in self.fastqc_data.keys():
if "per_sequence_quality_scores" in self.fastqc_data[sample]:
data = {
self._avg_bp_from_range(d["base"]): d["mean"]
for d in self.fastqc_data[sample]["per_base_sequence_quality"]
}
df = pd.Series(data)
df.plot(color="k", alpha=0.5)
if df.max() > ymax:
ymax = df.max()
if df.index.max() > xmax:
xmax = df.index.max()
if ax:
pylab.sca(ax)
pylab.fill_between([0, xmax], [0, 0], [20, 20], color="red", alpha=0.4)
pylab.fill_between([0, xmax], [20, 20], [30, 30], color="orange", alpha=0.4)
pylab.fill_between([0, xmax], [30, 30], [ymax, ymax], color="green", alpha=0.4)
X = range(1, xmax + 1)
pylab.ylim([0, ymax])
if xmax != 0:
pylab.xlim([0, xmax])
pylab.title("Quality scores across all bases")
pylab.xlabel("Position in read (bp)")
pylab.ylabel("Phred Score", fontsize=12)
pylab.grid(axis="x")