# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import json
import colorlog
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
logger = colorlog.getLogger(__name__)
__all__ = ["MultiKrakenResults", "MultiKrakenResults2"]
[docs]class MultiKrakenResults:
"""Select several kraken output and creates summary plots
::
import glob
mkr = MultiKrakenResults(glob.glob("*/*/kraken.csv"))
mkr.plot_stacked_hist()
"""
def __init__(self, filenames, sample_names=None):
self.filenames = filenames
if sample_names is None:
self.sample_names = list(range(1, len(filenames) + 1))
else:
self.sample_names = sample_names
self.ranks = [
"kingdom",
"phylum",
"class",
"order",
"family",
"genus",
"species",
"name",
]
[docs] def get_df(self, limit=5, rank="kingdom"):
if rank not in self.ranks:
raise ValueError(f"rank must be in {self.ranks}")
data = {}
for sample, filename in zip(self.sample_names, self.filenames):
df = pd.read_csv(filename)
count = df["count"].sum()
if rank not in df.columns: # pragma: no cover
for name in self.ranks:
df[name] = "Unclassified"
df = df.groupby(rank)["percentage"].sum()
# if a taxon is obsolete, the kingdom is empty.
# We will set the kingdom as Unclassified and raise a warning
# if the count is > 5%
if " " in df.index:
percent = df.loc[" "]
if percent > limit:
logger.warning(f"Found {percent}% of taxons with no {rank}")
if "Unclassified" in df.index:
df.loc["Unclassified"] += df.loc[" "]
df.drop(" ", inplace=True)
else:
df.loc["Unclassified"] = df.loc[" "]
df.drop(" ", inplace=True)
df["Count"] = count
data[sample] = df
df = pd.DataFrame(data)
df = df.fillna(0)
if self.sample_names is None:
df = df.sort_index(ascending=False, axis=1)
return df
[docs] def plot_stacked_hist(
self,
output_filename=None,
dpi=200,
kind="barh",
fontsize=10,
edgecolor="k",
lw=1,
width=1,
ytick_fontsize=10,
max_labels=50,
max_sample_name_length=30,
rank="kingdom",
):
"""Summary plot of reads classified."""
df = self.get_df(rank=rank)
# remove count from the summary graph
df.drop("Count", inplace=True)
fig, ax = pylab.subplots(figsize=(9.5, 7))
labels = []
for kingdom in sorted(df.index): # pragma: no cover
if kingdom == "Eukaryota":
color = "purple"
elif kingdom == "Unclassified":
color = "grey"
elif kingdom == "Bacteria":
color = "red"
elif kingdom == "Viruses":
color = "darkgreen"
elif kingdom == "Archaea":
color = "yellow"
else:
color = "darkblue"
if kind == "barh":
pylab.barh(
range(0, len(df.columns)),
df.loc[kingdom],
height=width,
left=df.loc[labels].sum().values,
edgecolor=edgecolor,
lw=lw,
color=color,
alpha=0.8,
)
else:
pylab.bar(
range(0, len(df.columns)),
df.loc[kingdom],
width=width,
bottom=df.loc[labels].sum().values,
edgecolor=edgecolor,
lw=lw,
color=color,
alpha=0.8,
)
labels.append(kingdom)
if kind == "barh":
pylab.xlabel("Percentage (%)", fontsize=fontsize)
pylab.ylabel("Sample index/name", fontsize=fontsize)
if len(self.sample_names) < max_labels:
pylab.yticks(
range(len(self.sample_names)),
[str(x)[0:max_sample_name_length] for x in df.columns],
fontsize=ytick_fontsize,
)
else: # pragma: no cover
pylab.yticks([1], [""])
pylab.xlim([0, 100])
pylab.ylim([-0.5, len(df.columns) - 0.5])
else:
pylab.ylabel("Percentage (%)", fontsize=fontsize)
pylab.xlabel("Sample index/name", fontsize=fontsize)
pylab.ylim([0, 100])
if len(self.sample_names) < max_labels:
pylab.xticks(
range(len(self.sample_names)),
[str(x)[0:max_sample_name_length] for x in df.columns],
fontsize=ytick_fontsize,
)
else: # pragma: no cover
pylab.xticks([1], [""])
pylab.xlim([-0.5, len(df.columns) - 0.5])
ax.legend(labels, title=f"{rank}", bbox_to_anchor=(1, 1))
try: # pragma: no cover
pylab.gcf().set_layout_engine("tight")
except: # pragma: no cover
pass
if output_filename:
pylab.savefig(output_filename, dpi=dpi)
[docs]class MultiKrakenResults2:
"""Select several kraken output and creates summary plots
::
import glob
mkr = MultiKrakenResults2(glob.glob("*/*/summary.json"))
mkr.plot_stacked_hist()
"""
def __init__(self, filenames, sample_names=None):
self.filenames = filenames
if sample_names is None:
self.sample_names = list(range(1, len(filenames) + 1))
else:
self.sample_names = sample_names
[docs] def get_df(self, limit=5, sorting_method="sample_name"):
data = {}
for sample, filename in zip(self.sample_names, self.filenames):
summary = json.loads(open(filename, "r").read())
total = max(1, summary["total"])
if "unclassified" not in summary:
summary["unclassified"] = 0
data[sample] = {
"unclassified": round(summary["unclassified"] / total * 100, 2),
"nreads": summary["total"],
}
for db in summary["databases"]:
try:
data[sample][db] = round(summary[db]["C"] / total * 100, 2)
except KeyError: # pragma: no cover
data[sample][db] = 0
df = pd.DataFrame(data)
df = df.fillna(0)
df = df.loc[["unclassified"] + [x for x in df.index if x != "unclassified"]]
# df = df.sort_index(ascending=False)
if sorting_method == "sample_name":
df = df.sort_index(ascending=False, axis=1)
return df
[docs] def plot_stacked_hist(
self,
output_filename=None,
dpi=200,
kind="barh",
fontsize=10,
edgecolor="k",
lw=2,
width=1,
ytick_fontsize=10,
max_labels=50,
alpha=0.8,
colors=None,
cmap="viridis",
sorting_method="sample_name",
max_sample_name_length=30,
):
"""Summary plot of reads classified.
:param sorting_method: only by sample name for now
:param cmap: a valid matplotlib colormap. viridis is the default sequana colormap.
if you prefer to use a colormap, you can use::
from matplotlib import cm
cm = matplotlib.colormap
colors = [cm.get_cmap(cmap)(x) for x in pylab.linspace(0.2, 1, L)]
"""
df = self.get_df(sorting_method=sorting_method)
df = df.T
del df["nreads"]
fig, ax = pylab.subplots(figsize=(9.5, 7))
# we will store grey/unclassified first and then other DB with a max of
# 10 DBs
L = len(df.columns) - 1
from matplotlib import colormaps
if colors is None:
colors = [colormaps.get_cmap(cmap)(x) for x in pylab.linspace(0.2, 1, L)]
colors = ["grey"] + colors
df.plot(
kind="barh",
stacked=True,
width=width,
edgecolor=edgecolor,
color=colors,
lw=lw,
alpha=alpha,
ax=ax,
legend=False,
)
pylab.xlabel("Percentage (%)", fontsize=fontsize)
pylab.ylabel("Sample index/name", fontsize=fontsize)
if len(self.sample_names) < max_labels:
pylab.yticks(
range(len(self.sample_names)),
[str(x)[0:max_sample_name_length] for x in df.index],
fontsize=ytick_fontsize,
)
else: # pragma: no cover
pylab.yticks([1], [""])
pylab.xlim([0, 100])
pylab.ylim([0 - 0.5, len(df.index) - 0.5]) # +1 for the legends
ax.legend(df.columns, title="DBs ", bbox_to_anchor=(1, 1))
try: # pragma: no cover
pylab.gcf().set_layout_engine("tight")
except: # pragma: no cover
pass
if output_filename:
pylab.savefig(output_filename, dpi=dpi)
return df