#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import os
import re
from pathlib import Path
import colorlog
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.rnadiff import RNADiffTable
logger = colorlog.getLogger(__name__)
__all__ = ["RNADiffCompare"]
class Compare:
def __init__(self):
pass
[docs]class RNADiffCompare(Compare):
"""An object representation of results coming from a RNADiff analysis.
::
from sequana.compare import RNADiffCompare
c = RNADiffCompare("data.csv", "data2.csv")
# change the l2fc to update venn plots
c.plot_venn_up()
c.r1.log2_fc = 1
c.r2.log2_fc = 1
c.plot_venn_up()
"""
def __init__(self, *args, design=None):
self.rns = []
for rnadiff_csv in args:
if isinstance(rnadiff_csv, RNADiffTable):
self.rns.append(rnadiff_csv)
elif os.path.exists(rnadiff_csv):
self.rns.append(RNADiffTable(rnadiff_csv))
else:
raise NotImplementedError
# aliases
self.r1 = self.rns[0]
self.r2 = self.rns[1]
# keep only entries in common
A = self.r1.df.index
B = self.r2.df.index
common = set(A).intersection(B)
if len(A) != len(B):
self.r1.df = self.r1.df.loc[list(common)]
self.r2.df = self.r2.df.loc[list(common)]
self.r1.filt_df = self.r1.filter()
self.r2.filt_df = self.r2.filter()
self.r1.set_gene_lists()
self.r2.set_gene_lists()
logger.info(f"Two sets are not equal. Kept {len(self.r1.df)} in common")
[docs] def plot_venn_down(self, labels=None, ax=None, title="Down expressed genes", mode="all", l2fc=0):
assert l2fc <= 0, "l2fc must be negative"
kargs = {}
kargs["title"] = title
kargs["labels"] = labels
kargs["ax"] = ax
kargs["data1"] = set(self.r1.gene_lists["down"])
kargs["data2"] = set(self.r2.gene_lists["down"])
self._venn(**kargs)
[docs] def plot_venn_up(self, labels=None, ax=None, title="Up expressed genes", mode="all", l2fc=0):
"""Venn diagram of cond1 from RNADiff result1 vs cond2 in RNADiff
result 2
.. plot::
:include-source:
from sequana import sequana_data
from sequana.compare import RNADiffCompare
c = RNADiffCompare(
sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"),
sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare")
)
c.plot_venn_up()
"""
assert l2fc >= 0, "l2fc must be positive"
kargs = {}
kargs["title"] = title
kargs["labels"] = labels
kargs["ax"] = ax
kargs["data1"] = set(self.r1.gene_lists["up"])
kargs["data2"] = set(self.r2.gene_lists["up"])
self._venn(**kargs)
def _venn(self, data1, data2, labels=None, ax=None, title="expressed genes"):
from sequana.viz.venn import plot_venn
if labels is None:
labels = ["A", "B"]
plot_venn([data1, data2], labels=labels, ax=ax, title=title)
[docs] def plot_venn_all(self, labels=None, ax=None, title="all expressed genes", mode="all"):
kargs = {}
kargs["title"] = title
kargs["labels"] = labels
kargs["ax"] = ax
kargs["data1"] = set(self.r1.gene_lists["all"])
kargs["data2"] = set(self.r2.gene_lists["all"])
self._venn(**kargs)
[docs] def plot_corrplot_counts_raw(self, samples=None, log2=True, lower="pie", upper="text"):
from sequana.viz import corrplot
if samples is None:
samples = self.r1.counts_raw.columns
df1 = self.r1.counts_raw[samples]
df2 = self.r2.counts_raw[samples]
df = pd.concat([df1, df2], keys=["r1", "r2"], axis=1)
if log2:
df = pylab.log2(df)
c = corrplot.Corrplot(df).plot(upper=upper, lower=lower)
return df.corr()
[docs] def plot_corrplot_counts_normed(self, samples=None, log2=True, lower="pie", upper="text"):
from sequana.viz import corrplot
if samples is None:
samples = self.r1.counts_raw.columns
df1 = self.r1.counts_norm[samples]
df2 = self.r2.counts_norm[samples]
df = pd.concat([df1, df2], keys=["r1", "r2"], axis=1)
if log2:
df = pylab.log2(df)
c = corrplot.Corrplot(df).plot(upper=upper, lower=lower)
return df.corr()
[docs] def plot_jaccard_distance(
self,
mode,
padjs=[0.0001, 0.001, 0.01, 0.05, 0.1],
Nfc=50,
smooth=False,
window=5,
):
assert mode in ["down", "up", "all"]
pylab.clf()
if mode == "down":
m1 = self.r1.df.log2FoldChange.min()
m2 = self.r2.df.log2FoldChange.min()
minimum = min(m1, m2)
print(m1, m2)
X = pylab.linspace(0, minimum, Nfc)
elif mode == "up":
m1 = self.r1.df.log2FoldChange.max()
m2 = self.r2.df.log2FoldChange.max()
maximum = max(m1, m2)
X = pylab.linspace(0, maximum, Nfc)
else:
minmax1 = self.r1.df.log2FoldChange.abs().max()
minmax2 = self.r2.df.log2FoldChange.abs().max()
maximum = max(minmax1, minmax2)
X = pylab.linspace(0, maximum, Nfc)
common = {}
for padj in padjs:
I = []
common[padj] = []
for x in X:
if mode == "down":
# less than a given fold change that is negative
A = set(self.r1.df.query("log2FoldChange<=@x and padj<@padj").index)
B = set(self.r2.df.query("log2FoldChange<=@x and padj<@padj").index)
elif mode == "up":
# greater than a given fold change that is positive
A = set(self.r1.df.query("log2FoldChange>=@x and padj<@padj").index)
B = set(self.r2.df.query("log2FoldChange>=@x and padj<@padj").index)
else:
A = set(self.r1.df.query("(log2FoldChange>=@x or log2FoldChange<=-@x) and padj<@padj").index)
B = set(self.r2.df.query("(log2FoldChange>=@x or log2FoldChange<=-@x) and padj<@padj").index)
if len(A) == 0 or len(B) == 0:
# no overlap yet
I.append(0)
else:
res = len(A.intersection(B)) / (len(A) + len(B) - len(A.intersection(B))) * 100
I.append(res)
common[padj].append(len(A.intersection(B)))
try:
if smooth:
I = pd.Series(I).rolling(window).median().values
else:
assert False
except:
pass
pylab.plot(X, I, "o-", label=str(padj))
ax = pylab.gca()
ax.set_ylabel("Jaccard similarity (intersection/union)")
ax.set_xlabel("Fold change (log2)")
ax2 = ax.twinx()
for padj in padjs:
ax2.plot(X, common[padj], ls="--")
ax2.set_ylabel("Cardinality of the union ")
ax.legend()
ax.set_ylim([0, 100])
# ax2.set_ylim([0,100])
if mode == "down":
ax.axvline(-2, ls="--", color="r")
else:
ax.axvline(2, ls="--", color="r")
return I, common[padj]
[docs] def plot_common_major_counts(
self,
mode,
labels=None,
switch_up_down_cond2=False,
add_venn=True,
xmax=None,
title="",
fontsize=12,
sortby="log2FoldChange",
):
"""
:param mode: down, up or all
.. plot::
:include-source:
from sequana import sequana_data
from sequana.compare import RNADiffCompare
c = RNADiffCompare(
sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"),
sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare")
)
c.plot_common_major_counts("down")
"""
# cond1, cond2 = self._get_cond1_cond2()
if labels is None:
labels = ["r1", "r2"]
if mode in ["down"]:
# Negative values !
gl1 = list(set(self.r1.gene_lists["down"]))
gl2 = list(set(self.r2.gene_lists["down"]))
A = self.r1.df.loc[gl1].sort_values(by=sortby)
B = self.r2.df.loc[gl1].sort_values(by=sortby)
else:
gl1 = list(set(self.r1.gene_lists[mode]))
gl2 = list(set(self.r2.gene_lists[mode]))
A = self.r1.df.loc[gl1].sort_values(by=sortby, ascending=False)
B = self.r2.df.loc[gl1].sort_values(by=sortby, ascending=False)
# sometimes, up and down may be inverted as compared to the other
# conditions
N = []
for i in range(1, max(len(A), len(B))):
a = A.iloc[0:i].index
b = B.iloc[0:i].index
n = len(set(b).intersection(set(a)))
N.append(n / i * 100)
max_common = len(set(A.index).intersection(set(B.index)))
pylab.clf()
if len(A) > len(B):
pylab.axhline(
max_common / len(A) * 100,
color="r",
ls="--",
label="min set intersection",
)
pylab.axvline(len(B), ls="--", color="k", label="rank of minor set")
else:
pylab.axhline(max_common / len(B) * 100, color="r", ls="--", label="min set intersect")
pylab.axvline(len(A), ls="--", color="k", label="rank of minor set")
pylab.plot(N)
pylab.xlabel("rank", fontsize=fontsize)
pylab.ylabel("% common features", fontsize=fontsize)
pylab.grid(True)
pylab.ylim([0, 100])
if xmax:
pylab.xlim([0, xmax])
else:
pylab.xlim([0, max(len(A), len(B))])
pylab.title(title, fontsize=fontsize)
ax = pylab.gca()
ax2 = ax.twinx()
ax2.plot(A[sortby].values, "orange", label=sortby)
ax2.set_ylabel(sortby)
pylab.legend(loc="lower left")
ax.legend(loc="lower right")
if add_venn:
f = pylab.gcf()
ax = f.add_axes([0.5, 0.5, 0.35, 0.35], facecolor="grey")
if mode == "down":
self.plot_venn_down(ax=ax, title=None, labels=labels, mode="two_only")
elif mode == "up":
self.plot_venn_up(ax=ax, title=None, labels=labels, mode="two_only")
elif mode == "all":
self.plot_venn_all(ax=ax, title=None, labels=labels, mode="two_only")
[docs] def plot_foldchange(self):
mode = "all"
# it may happen that list are not identical due to salmon and bowtie not
# using same input gff for instance.
X = self.r1.df.index
Y = self.r2.df.index
common = list(set(X).intersection(set(Y)))
A = self.r1.df.loc[self.r1.gene_lists[mode]]
B = self.r2.df.loc[self.r2.gene_lists[mode]]
# cast set to list to avoid future error in pandas (june 2022)
AB = list(set(A.index).intersection(set(B.index)))
Ao = A.loc[list(set(A.index).difference(set(B.index)))]
Bo = B.loc[list(set(B.index).difference(set(A.index)))]
Ac = A.loc[AB]
Bc = B.loc[AB]
pylab.plot(
self.r1.df.loc[common].log2FoldChange,
self.r2.df.loc[common].log2FoldChange,
"ko",
alpha=0.5,
markersize=1,
)
pylab.plot(Ac.log2FoldChange, Bc.log2FoldChange, "or", alpha=0.5)
pylab.plot(Ao.log2FoldChange, self.r2.df.loc[Ao.index].log2FoldChange, "*b", alpha=0.5)
pylab.plot(
Bo.log2FoldChange,
self.r1.df.loc[Bo.index].log2FoldChange,
color="cyan",
marker="o",
lw=0,
alpha=0.5,
)
[docs] def plot_volcano_differences(self, mode="all"):
cond1, cond2 = "cond1", "cond2"
labels = [cond1, cond2]
A = self.r1.df.loc[self.r1.gene_lists[mode]]
B = self.r2.df.loc[self.r2.gene_lists[mode]]
# cast set to list to avoid future error in pandas (june 2022)
AB = list(set(A.index).intersection(set(B.index)))
Aonly = A.loc[list(set(A.index).difference(set(B.index)))]
Bonly = B.loc[list(set(B.index).difference(set(A.index)))]
Acommon = A.loc[AB]
Bcommon = B.loc[AB]
pylab.clf()
pylab.plot(
Acommon.log2FoldChange,
-np.log10(Acommon.padj),
marker="o",
alpha=0.5,
color="r",
lw=0,
label="Common in experiment 1",
pickradius=4,
picker=True,
)
pylab.plot(
Bcommon.log2FoldChange,
-np.log10(Bcommon.padj),
marker="o",
alpha=0.5,
color="orange",
lw=0,
label="Common in experiment 2",
pickradius=4,
picker=True,
)
for x in AB:
a_l = A.loc[x].log2FoldChange
a_p = -np.log10(A.loc[x].padj)
b_l = B.loc[x].log2FoldChange
b_p = -np.log10(B.loc[x].padj)
pylab.plot([a_l, b_l], [a_p, b_p], "k", alpha=0.5)
pylab.plot(
Bonly.log2FoldChange,
-np.log10(Bonly.padj),
marker="*",
alpha=0.5,
color="blue",
lw=0,
label="In experiment 2 only",
pickradius=4,
picker=True,
)
pylab.plot(
Aonly.log2FoldChange,
-np.log10(Aonly.padj),
marker="*",
alpha=0.5,
color="cyan",
lw=0,
label="In experiment 1 only",
pickradius=4,
picker=True,
)
for name, x in Bonly.iterrows():
x1 = x.log2FoldChange
y1 = -np.log10(x.padj)
x2 = self.r1.df.loc[name].log2FoldChange
y2 = -np.log10(self.r1.df.loc[name].padj)
pylab.plot([x1, x2], [y1, y2], ls="--", color="r")
for name, x in Aonly.iterrows():
x1 = x.log2FoldChange
y1 = -np.log10(x.padj)
x2 = self.r2.df.loc[name].log2FoldChange
y2 = -np.log10(self.r2.df.loc[name].padj)
pylab.plot([x1, x2], [y1, y2], ls="-", color="r")
pylab.axhline(1.33, alpha=0.5, ls="--", color="r")
pylab.xlabel("log2 fold Change")
pylab.ylabel("log10 adjusted p-values")
pylab.legend()
pylab.grid(True)
return Aonly, Bonly, Acommon, Bcommon
[docs] def plot_volcano(self, labels=None):
"""Volcano plot of log2 fold change versus log10 of adjusted p-value
.. plot::
:include-source:
from sequana import sequana_data
from sequana.compare import RNADiffCompare
c = RNADiffCompare(
sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"),
sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare")
)
c.plot_volcano()
"""
cond1, cond2 = "cond1", "cond2"
if labels is None:
labels = [cond1, cond2]
A = self.r1.df.loc[self.r1.gene_lists["all"]]
B = self.r2.df.loc[self.r2.gene_lists["all"]]
if cond1 == cond2:
cond1 += "(1)"
cond2 += "(2)"
pylab.clf()
pylab.plot(
A.log2FoldChange,
-np.log10(A.padj),
marker="o",
alpha=0.5,
color="r",
lw=0,
label=labels[0],
pickradius=4,
picker=True,
)
pylab.plot(
B.log2FoldChange,
-np.log10(B.padj),
marker="x",
alpha=0.5,
color="k",
lw=0,
label=labels[1],
pickradius=4,
picker=True,
)
genes = list(A.index) + list(B.index)
pylab.grid(True)
pylab.xlabel("fold change")
pylab.ylabel("log10 adjusted p-value")
pylab.legend(loc="lower right")
ax = pylab.gca()
def onpick(event):
thisline = event.artist
self.event = event
label = thisline.get_label()
if label == cond1:
gene_name = A.index[event.ind[0]]
x1 = round(A.loc[gene_name].log2FoldChange, 1)
y1 = round(-np.log10(A.loc[gene_name].padj), 1)
try:
x2 = round(B.loc[gene_name].log2FoldChange, 1)
y2 = round(-np.log10(B.loc[gene_name].padj), 1)
except:
x2, y2 = None, None
else:
gene_name = B.index[event.ind[0]]
x1 = round(B.loc[gene_name].log2FoldChange, 1)
y1 = round(-np.log10(B.loc[gene_name].padj), 1)
try:
x2 = round(A.loc[gene_name].log2FoldChange, 1)
y2 = round(-np.log10(A.loc[gene_name].padj), 1)
except:
x2, y2 = None, None
try:
if x2 is None:
ax.title.set_text("{} at pos [{},{}]".format(gene_name, x1, y1))
else:
ax.title.set_text("{} at pos [{},{}] and [{},{}]".format(gene_name, x1, y1, x2, y2))
except:
print("exception")
ax.title.set_text("")
pylab.draw()
fig = pylab.gcf()
fig.canvas.mpl_connect("pick_event", onpick)
[docs] def plot_geneset(
self,
indices,
showlines=True,
showdots=True,
colors={
"bodies": "blue",
"cbars": "k",
"dot": "blue",
"cmins": "k",
"cmaxes": "k",
},
):
"""indices is a list that represents a gene sets
cmins, cmaxes, cbars are the colors of the bars inside the body of the violin plots
.. plot::
from sequana import sequana_data
from sequana.compare import RNADiffCompare
c = RNADiffCompare(
sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"),
sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare")
)
c.plot_volcano()
indices = c.r1.df.query("log2FoldChange>1 or log2FoldChange<-1").index.values
indices = [x for x in indices if x in c.r1.df.index and x in c.r2.df.index]
c.plot_geneset(indices, showlines=True)
"""
from matplotlib.pyplot import violinplot
from pylab import axhline, clf, plot, violinplot, xticks, ylabel
N = len(self.rns)
data = [self.rns[i].df.loc[indices]["log2FoldChange"].values for i in range(0, N)]
clf()
axhline(0, color="k", ls="--", zorder=-1)
vp = violinplot(data)
for x in vp["bodies"]:
x.set_color(colors["bodies"])
vp["cbars"].set_color(colors["cbars"])
vp["cmins"].set_color(colors["cmins"])
vp["cmaxes"].set_color(colors["cmaxes"])
for i in range(N - 1):
for x, y in zip(data[i], data[i + 1]):
if showlines:
plot([i + 1, i + 2], [x, y], "or-", alpha=0.5)
else:
plot([i + 1, i + 2], [x, y], "or", alpha=0.5)
xticks(range(1, N + 1), [f"C{i}" for i in range(1, N + 1)], fontsize=16)
ylabel("log2 Fold Change", fontsize=16)
return data