Source code for sequana.demultiplex

#
#  This file is part of Sequana software
#
#  Copyright (c) 2019-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
import json

import colorlog

from sequana.lazy import pandas as pd
from sequana.lazy import pylab

logger = colorlog.getLogger(__name__)


__all__ = ["StatsFile"]


[docs]class StatsFile(object): """Reads a bcl2fastq Stats.json file and produces useful plots for QCs""" def __init__(self, filename="Stats.json"): with open(filename) as fh: self.data = json.load(fh)
[docs] def to_summary_reads(self, filename): with open(filename, "w") as fh: fh.write("%4s %32s %16s\n" % ("Lane", "Sample", "NumberReads")) for i, lane in enumerate(self.data["ConversionResults"]): total = 0 for this in lane["DemuxResults"]: fh.write("%4s %32s %16s\n" % (i + 1, this["SampleId"], this["NumberReads"])) total += this["NumberReads"] fh.write("%4s %32s %16s\n" % (i + 1, "Total", total)) try: fh.write("%4s %32s %16s\n" % (i + 1, "Undetermined", lane["Undetermined"]["NumberReads"])) except: pass
[docs] def get_data_reads(self): lanes = [] names = [] reads = [] for i, lane in enumerate(self.data["ConversionResults"]): total = 0 for this in lane["DemuxResults"]: lanes.append(lane["LaneNumber"]) names.append(this["SampleId"]) reads.append(this["NumberReads"]) total += this["NumberReads"] # if only undetermined (no sample sheet), no data should be found # meaning that there is 0 determined and no names associated # so we call it determined and store 0 if total == 0: assert total == 0 names.append("Determined") reads.append(0) lanes.append(lane["LaneNumber"]) if "Undetermined" in lane: names.append("Undetermined") lanes.append(i + 1) reads.append(lane["Undetermined"]["NumberReads"]) else: names.append("Undetermined") lanes.append(i + 1) reads.append(0) # print(lanes, names, reads) df = pd.DataFrame({"lane": lanes, "name": names, "count": reads}) return df
[docs] def barplot(self, filename="lane{}_status.png", lanes=None): df = self.get_data_reads() if lanes is None: lanes = df.lane.unique() for lane in lanes: pylab.clf() query = "lane==@lane and name!='Undetermined'" counts = df.query(query)["count"] total = counts.sum() L = len(counts) query = "lane==@lane and name=='Undetermined'" under = df.query(query)["count"].sum() if total > 0: pylab.bar(range(L), counts, color="b", label="reads") if total == 0: color = "red" else: if 100 * under / total < 20: color = "green" elif 100 * under / total < 50: color = "orange" else: color = "red" pylab.bar(range(L, L + 1), under, color=color, label="undetermined") pylab.xticks([]) pylab.ylabel("Number of reads") try: pylab.legend(loc="lower left") except: pass pylab.title("Lane {}".format(lane)) pylab.savefig(filename.format(lane), dpi=200)
[docs] def barplot_per_sample(self, alpha=0.5, width=0.8, filename=None): df = self.get_data_reads() # this is ugly but will do the job for now under = df.query("name=='Undetermined'") others = df.query("name!='Undetermined'") # we group all lanes under = under.groupby("name").sum().reset_index() others = others.groupby("name").sum().reset_index() under = under[["name", "count"]].set_index("name") others = others[["name", "count"]].set_index("name") all_data = others.sort_index(ascending=False) all_data.columns = ["samples"] # appended at the end all_data.loc["undetermined"] = 0 # revert back all_data = all_data.loc[::-1] # just for legend under.columns = ["undetermined"] if all_data.sum().min() > 1e6: all_data /= 1e6 under /= 1e6 M = True else: M = False all_data.plot(kind="barh", alpha=alpha, zorder=1, width=width, ec="k") under.plot( kind="barh", alpha=alpha, color="red", ax=pylab.gca(), zorder=1, width=width, ec="k", ) self.all_data = all_data self.under = under if len(all_data) < 100: pylab.yticks(range(len(all_data)), all_data.index) pylab.ylim([0.5, len(all_data) + 0.5]) pylab.legend() pylab.grid(True, zorder=-1) if M: pylab.xlabel("Number of reads (M)") else: pylab.xlabel("Number of reads") try: pylab.gcf().set_layout_engine("tight") except: pass if filename: pylab.savefig(filename, dpi=200)
[docs] def barplot_summary(self, filename=None, color=["green", "red"], alpha=0.8): df = self.get_data_reads() under = df.query("name=='Undetermined'") total = df.query("name!='Undetermined'") total = total.groupby("lane").sum().reset_index() total["name"] = "Determined" df = pd.concat([under, total]) # sort=True) df = df.pivot(index="lane", columns="name", values="count") df = df[["Determined", "Undetermined"]] if df.sum().min() > 1e6: df /= 1e6 df.plot.barh(stacked=True, color=color, alpha=alpha, ec="k") pylab.xlabel("Number of reads (M)") else: df.plot.barh(stacked=True, color=color, alpha=alpha, ec="k") pylab.xlabel("Number of reads") pylab.legend() if filename: pylab.savefig(filename, dpi=200) return df
[docs] def plot_unknown_barcodes(self, N=20): ub = self.data["UnknownBarcodes"] df = pd.DataFrame({x["Lane"]: x["Barcodes"] for x in ub}) if "unknown" in df.index and len(df) == 1: df.loc["known"] = [0 for i in df.columns] # if data is made of undetermined only, the dataframe is just made of # N lanes with one entry : unknown S = df.sum(axis=1).sort_values(ascending=False).index[0:N] data = df.loc[S][::-1] # print(data) data.columns = ["Lane {}".format(x) for x in data.columns] from matplotlib import rcParams rcParams["axes.axisbelow"] = True pylab.figure(figsize=(10, 8)) ax = pylab.gca() data.plot(kind="barh", width=1, ec="k", ax=ax) rcParams["axes.axisbelow"] = False pylab.xlabel("Number of reads", fontsize=12) pylab.ylabel("") pylab.grid(True) pylab.legend( ["Lane {}".format(x) for x in range(1, len(df.columns) + 1)], loc="lower right", ) # here we plot a vertical line corresponding to the median of reads # found in determined indices try: N = self.get_data_reads().query('name!="Undetermined"')["count"].median() pylab.axvline(N, ls="--", color="r") except: pass try: pylab.gcf().set_layout_engine("tight") except Exception as err: print(err) return data