Source code for sequana.compare

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################

import os
import re
from pathlib import Path

import colorlog
from matplotlib_venn import venn2_unweighted, venn3_unweighted

from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.rnadiff import RNADiffTable

logger = colorlog.getLogger(__name__)


__all__ = ["RNADiffCompare"]


class Compare:
    def __init__(self):
        pass


[docs]class RNADiffCompare(Compare): """An object representation of results coming from a RNADiff analysis. :: from sequana.compare import RNADiffCompare c = RNADiffCompare("data.csv", "data2.csv") # change the l2fc to update venn plots c.plot_venn_up() c.r1.log2_fc = 1 c.r2.log2_fc = 1 c.plot_venn_up() """ def __init__(self, *args, design=None): self.rns = [] for rnadiff_csv in args: if isinstance(rnadiff_csv, RNADiffTable): self.rns.append(rnadiff_csv) elif os.path.exists(rnadiff_csv): self.rns.append(RNADiffTable(rnadiff_csv)) else: raise NotImplementedError # aliases self.r1 = self.rns[0] self.r2 = self.rns[1] # keep only entries in common A = self.r1.df.index B = self.r2.df.index common = set(A).intersection(B) if len(A) != len(B): self.r1.df = self.r1.df.loc[list(common)] self.r2.df = self.r2.df.loc[list(common)] self.r1.filt_df = self.r1.filter() self.r2.filt_df = self.r2.filter() self.r1.set_gene_lists() self.r2.set_gene_lists() logger.info(f"Two sets are not equal. Kept {len(self.r1.df)} in common")
[docs] def plot_venn_down(self, labels=None, ax=None, title="Down expressed genes", mode="all", l2fc=0): assert l2fc <= 0, "l2fc must be negative" kargs = {} kargs["title"] = title kargs["labels"] = labels kargs["ax"] = ax kargs["data1"] = set(self.r1.gene_lists["down"]) kargs["data2"] = set(self.r2.gene_lists["down"]) self._venn(**kargs)
[docs] def plot_venn_up(self, labels=None, ax=None, title="Up expressed genes", mode="all", l2fc=0): """Venn diagram of cond1 from RNADiff result1 vs cond2 in RNADiff result 2 .. plot:: :include-source: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_venn_up() """ assert l2fc >= 0, "l2fc must be positive" kargs = {} kargs["title"] = title kargs["labels"] = labels kargs["ax"] = ax kargs["data1"] = set(self.r1.gene_lists["up"]) kargs["data2"] = set(self.r2.gene_lists["up"]) self._venn(**kargs)
def _venn(self, data1, data2, labels=None, ax=None, title="expressed genes"): from sequana.viz.venn import plot_venn if labels is None: labels = ["A", "B"] plot_venn([data1, data2], labels=labels, ax=ax, title=title)
[docs] def plot_venn_all(self, labels=None, ax=None, title="all expressed genes", mode="all"): kargs = {} kargs["title"] = title kargs["labels"] = labels kargs["ax"] = ax kargs["data1"] = set(self.r1.gene_lists["all"]) kargs["data2"] = set(self.r2.gene_lists["all"]) self._venn(**kargs)
[docs] def plot_corrplot_counts_raw(self, samples=None, log2=True, lower="pie", upper="text"): from sequana.viz import corrplot if samples is None: samples = self.r1.counts_raw.columns df1 = self.r1.counts_raw[samples] df2 = self.r2.counts_raw[samples] df = pd.concat([df1, df2], keys=["r1", "r2"], axis=1) if log2: df = pylab.log2(df) c = corrplot.Corrplot(df).plot(upper=upper, lower=lower) return df.corr()
[docs] def plot_corrplot_counts_normed(self, samples=None, log2=True, lower="pie", upper="text"): from sequana.viz import corrplot if samples is None: samples = self.r1.counts_raw.columns df1 = self.r1.counts_norm[samples] df2 = self.r2.counts_norm[samples] df = pd.concat([df1, df2], keys=["r1", "r2"], axis=1) if log2: df = pylab.log2(df) c = corrplot.Corrplot(df).plot(upper=upper, lower=lower) return df.corr()
[docs] def plot_jaccard_distance( self, mode, padjs=[0.0001, 0.001, 0.01, 0.05, 0.1], Nfc=50, smooth=False, window=5, ): assert mode in ["down", "up", "all"] pylab.clf() if mode == "down": m1 = self.r1.df.log2FoldChange.min() m2 = self.r2.df.log2FoldChange.min() minimum = min(m1, m2) print(m1, m2) X = pylab.linspace(0, minimum, Nfc) elif mode == "up": m1 = self.r1.df.log2FoldChange.max() m2 = self.r2.df.log2FoldChange.max() maximum = max(m1, m2) X = pylab.linspace(0, maximum, Nfc) else: minmax1 = self.r1.df.log2FoldChange.abs().max() minmax2 = self.r2.df.log2FoldChange.abs().max() maximum = max(minmax1, minmax2) X = pylab.linspace(0, maximum, Nfc) common = {} for padj in padjs: I = [] common[padj] = [] for x in X: if mode == "down": # less than a given fold change that is negative A = set(self.r1.df.query("log2FoldChange<=@x and padj<@padj").index) B = set(self.r2.df.query("log2FoldChange<=@x and padj<@padj").index) elif mode == "up": # greater than a given fold change that is positive A = set(self.r1.df.query("log2FoldChange>=@x and padj<@padj").index) B = set(self.r2.df.query("log2FoldChange>=@x and padj<@padj").index) else: A = set(self.r1.df.query("(log2FoldChange>=@x or log2FoldChange<=-@x) and padj<@padj").index) B = set(self.r2.df.query("(log2FoldChange>=@x or log2FoldChange<=-@x) and padj<@padj").index) if len(A) == 0 or len(B) == 0: # no overlap yet I.append(0) else: res = len(A.intersection(B)) / (len(A) + len(B) - len(A.intersection(B))) * 100 I.append(res) common[padj].append(len(A.intersection(B))) try: if smooth: I = pd.Series(I).rolling(window).median().values else: assert False except: pass pylab.plot(X, I, "o-", label=str(padj)) ax = pylab.gca() ax.set_ylabel("Jaccard similarity (intersection/union)") ax.set_xlabel("Fold change (log2)") ax2 = ax.twinx() for padj in padjs: ax2.plot(X, common[padj], ls="--") ax2.set_ylabel("Cardinality of the union ") ax.legend() ax.set_ylim([0, 100]) # ax2.set_ylim([0,100]) if mode == "down": ax.axvline(-2, ls="--", color="r") else: ax.axvline(2, ls="--", color="r") return I, common[padj]
[docs] def plot_common_major_counts( self, mode, labels=None, switch_up_down_cond2=False, add_venn=True, xmax=None, title="", fontsize=12, sortby="log2FoldChange", ): """ :param mode: down, up or all .. plot:: :include-source: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_common_major_counts("down") """ # cond1, cond2 = self._get_cond1_cond2() if labels is None: labels = ["r1", "r2"] if mode in ["down"]: # Negative values ! gl1 = list(set(self.r1.gene_lists["down"])) gl2 = list(set(self.r2.gene_lists["down"])) A = self.r1.df.loc[gl1].sort_values(by=sortby) B = self.r2.df.loc[gl1].sort_values(by=sortby) else: gl1 = list(set(self.r1.gene_lists[mode])) gl2 = list(set(self.r2.gene_lists[mode])) A = self.r1.df.loc[gl1].sort_values(by=sortby, ascending=False) B = self.r2.df.loc[gl1].sort_values(by=sortby, ascending=False) # sometimes, up and down may be inverted as compared to the other # conditions N = [] for i in range(1, max(len(A), len(B))): a = A.iloc[0:i].index b = B.iloc[0:i].index n = len(set(b).intersection(set(a))) N.append(n / i * 100) max_common = len(set(A.index).intersection(set(B.index))) pylab.clf() if len(A) > len(B): pylab.axhline( max_common / len(A) * 100, color="r", ls="--", label="min set intersection", ) pylab.axvline(len(B), ls="--", color="k", label="rank of minor set") else: pylab.axhline(max_common / len(B) * 100, color="r", ls="--", label="min set intersect") pylab.axvline(len(A), ls="--", color="k", label="rank of minor set") pylab.plot(N) pylab.xlabel("rank", fontsize=fontsize) pylab.ylabel("% common features", fontsize=fontsize) pylab.grid(True) pylab.ylim([0, 100]) if xmax: pylab.xlim([0, xmax]) else: pylab.xlim([0, max(len(A), len(B))]) pylab.title(title, fontsize=fontsize) ax = pylab.gca() ax2 = ax.twinx() ax2.plot(A[sortby].values, "orange", label=sortby) ax2.set_ylabel(sortby) pylab.legend(loc="lower left") ax.legend(loc="lower right") if add_venn: f = pylab.gcf() ax = f.add_axes([0.5, 0.5, 0.35, 0.35], facecolor="grey") if mode == "down": self.plot_venn_down(ax=ax, title=None, labels=labels, mode="two_only") elif mode == "up": self.plot_venn_up(ax=ax, title=None, labels=labels, mode="two_only") elif mode == "all": self.plot_venn_all(ax=ax, title=None, labels=labels, mode="two_only")
[docs] def plot_foldchange(self): mode = "all" # it may happen that list are not identical due to salmon and bowtie not # using same input gff for instance. X = self.r1.df.index Y = self.r2.df.index common = list(set(X).intersection(set(Y))) A = self.r1.df.loc[self.r1.gene_lists[mode]] B = self.r2.df.loc[self.r2.gene_lists[mode]] # cast set to list to avoid future error in pandas (june 2022) AB = list(set(A.index).intersection(set(B.index))) Ao = A.loc[list(set(A.index).difference(set(B.index)))] Bo = B.loc[list(set(B.index).difference(set(A.index)))] Ac = A.loc[AB] Bc = B.loc[AB] pylab.plot( self.r1.df.loc[common].log2FoldChange, self.r2.df.loc[common].log2FoldChange, "ko", alpha=0.5, markersize=1, ) pylab.plot(Ac.log2FoldChange, Bc.log2FoldChange, "or", alpha=0.5) pylab.plot(Ao.log2FoldChange, self.r2.df.loc[Ao.index].log2FoldChange, "*b", alpha=0.5) pylab.plot( Bo.log2FoldChange, self.r1.df.loc[Bo.index].log2FoldChange, color="cyan", marker="o", lw=0, alpha=0.5, )
[docs] def plot_volcano_differences(self, mode="all"): cond1, cond2 = "cond1", "cond2" labels = [cond1, cond2] A = self.r1.df.loc[self.r1.gene_lists[mode]] B = self.r2.df.loc[self.r2.gene_lists[mode]] # cast set to list to avoid future error in pandas (june 2022) AB = list(set(A.index).intersection(set(B.index))) Aonly = A.loc[list(set(A.index).difference(set(B.index)))] Bonly = B.loc[list(set(B.index).difference(set(A.index)))] Acommon = A.loc[AB] Bcommon = B.loc[AB] pylab.clf() pylab.plot( Acommon.log2FoldChange, -np.log10(Acommon.padj), marker="o", alpha=0.5, color="r", lw=0, label="Common in experiment 1", pickradius=4, picker=True, ) pylab.plot( Bcommon.log2FoldChange, -np.log10(Bcommon.padj), marker="o", alpha=0.5, color="orange", lw=0, label="Common in experiment 2", pickradius=4, picker=True, ) for x in AB: a_l = A.loc[x].log2FoldChange a_p = -np.log10(A.loc[x].padj) b_l = B.loc[x].log2FoldChange b_p = -np.log10(B.loc[x].padj) pylab.plot([a_l, b_l], [a_p, b_p], "k", alpha=0.5) pylab.plot( Bonly.log2FoldChange, -np.log10(Bonly.padj), marker="*", alpha=0.5, color="blue", lw=0, label="In experiment 2 only", pickradius=4, picker=True, ) pylab.plot( Aonly.log2FoldChange, -np.log10(Aonly.padj), marker="*", alpha=0.5, color="cyan", lw=0, label="In experiment 1 only", pickradius=4, picker=True, ) for name, x in Bonly.iterrows(): x1 = x.log2FoldChange y1 = -np.log10(x.padj) x2 = self.r1.df.loc[name].log2FoldChange y2 = -np.log10(self.r1.df.loc[name].padj) pylab.plot([x1, x2], [y1, y2], ls="--", color="r") for name, x in Aonly.iterrows(): x1 = x.log2FoldChange y1 = -np.log10(x.padj) x2 = self.r2.df.loc[name].log2FoldChange y2 = -np.log10(self.r2.df.loc[name].padj) pylab.plot([x1, x2], [y1, y2], ls="-", color="r") pylab.axhline(1.33, alpha=0.5, ls="--", color="r") pylab.xlabel("log2 fold Change") pylab.ylabel("log10 adjusted p-values") pylab.legend() pylab.grid(True) return Aonly, Bonly, Acommon, Bcommon
[docs] def plot_volcano(self, labels=None): """Volcano plot of log2 fold change versus log10 of adjusted p-value .. plot:: :include-source: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_volcano() """ cond1, cond2 = "cond1", "cond2" if labels is None: labels = [cond1, cond2] A = self.r1.df.loc[self.r1.gene_lists["all"]] B = self.r2.df.loc[self.r2.gene_lists["all"]] if cond1 == cond2: cond1 += "(1)" cond2 += "(2)" pylab.clf() pylab.plot( A.log2FoldChange, -np.log10(A.padj), marker="o", alpha=0.5, color="r", lw=0, label=labels[0], pickradius=4, picker=True, ) pylab.plot( B.log2FoldChange, -np.log10(B.padj), marker="x", alpha=0.5, color="k", lw=0, label=labels[1], pickradius=4, picker=True, ) genes = list(A.index) + list(B.index) pylab.grid(True) pylab.xlabel("fold change") pylab.ylabel("log10 adjusted p-value") pylab.legend(loc="lower right") ax = pylab.gca() def onpick(event): thisline = event.artist self.event = event label = thisline.get_label() if label == cond1: gene_name = A.index[event.ind[0]] x1 = round(A.loc[gene_name].log2FoldChange, 1) y1 = round(-np.log10(A.loc[gene_name].padj), 1) try: x2 = round(B.loc[gene_name].log2FoldChange, 1) y2 = round(-np.log10(B.loc[gene_name].padj), 1) except: x2, y2 = None, None else: gene_name = B.index[event.ind[0]] x1 = round(B.loc[gene_name].log2FoldChange, 1) y1 = round(-np.log10(B.loc[gene_name].padj), 1) try: x2 = round(A.loc[gene_name].log2FoldChange, 1) y2 = round(-np.log10(A.loc[gene_name].padj), 1) except: x2, y2 = None, None try: if x2 is None: ax.title.set_text("{} at pos [{},{}]".format(gene_name, x1, y1)) else: ax.title.set_text("{} at pos [{},{}] and [{},{}]".format(gene_name, x1, y1, x2, y2)) except: print("exception") ax.title.set_text("") pylab.draw() fig = pylab.gcf() fig.canvas.mpl_connect("pick_event", onpick)
[docs] def plot_geneset( self, indices, showlines=True, showdots=True, colors={ "bodies": "blue", "cbars": "k", "dot": "blue", "cmins": "k", "cmaxes": "k", }, ): """indices is a list that represents a gene sets cmins, cmaxes, cbars are the colors of the bars inside the body of the violin plots .. plot:: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_volcano() indices = c.r1.df.query("log2FoldChange>1 or log2FoldChange<-1").index.values indices = [x for x in indices if x in c.r1.df.index and x in c.r2.df.index] c.plot_geneset(indices, showlines=True) """ from matplotlib.pyplot import violinplot from pylab import axhline, clf, plot, violinplot, xticks, ylabel N = len(self.rns) data = [self.rns[i].df.loc[indices]["log2FoldChange"].values for i in range(0, N)] clf() axhline(0, color="k", ls="--", zorder=-1) vp = violinplot(data) for x in vp["bodies"]: x.set_color(colors["bodies"]) vp["cbars"].set_color(colors["cbars"]) vp["cmins"].set_color(colors["cmins"]) vp["cmaxes"].set_color(colors["cmaxes"]) for i in range(N - 1): for x, y in zip(data[i], data[i + 1]): if showlines: plot([i + 1, i + 2], [x, y], "or-", alpha=0.5) else: plot([i + 1, i + 2], [x, y], "or", alpha=0.5) xticks(range(1, N + 1), [f"C{i}" for i in range(1, N + 1)], fontsize=16) ylabel("log2 Fold Change", fontsize=16) return data