Source code for sequana.compare

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################

import os
import re
from pathlib import Path

import colorlog

from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.rnadiff import RNADiffTable

logger = colorlog.getLogger(__name__)


__all__ = ["RNADiffCompare"]


class Compare:
    def __init__(self):
        pass


[docs]class RNADiffCompare(Compare): """An object representation of results coming from a RNADiff analysis. :: from sequana.compare import RNADiffCompare c = RNADiffCompare("data.csv", "data2.csv") # change the l2fc to update venn plots c.plot_venn_up() c.r1.log2_fc = 1 c.r2.log2_fc = 1 c.plot_venn_up() """ def __init__(self, *args, design=None): self.rns = [] for rnadiff_csv in args: if isinstance(rnadiff_csv, RNADiffTable): self.rns.append(rnadiff_csv) elif os.path.exists(rnadiff_csv): self.rns.append(RNADiffTable(rnadiff_csv)) else: raise NotImplementedError # aliases self.r1 = self.rns[0] self.r2 = self.rns[1] # keep only entries in common A = self.r1.df.index B = self.r2.df.index common = set(A).intersection(B) if len(A) != len(B): self.r1.df = self.r1.df.loc[list(common)] self.r2.df = self.r2.df.loc[list(common)] self.r1.filt_df = self.r1.filter() self.r2.filt_df = self.r2.filter() self.r1.set_gene_lists() self.r2.set_gene_lists() logger.info(f"Two sets are not equal. Kept {len(self.r1.df)} in common")
[docs] def plot_venn_down(self, labels=None, ax=None, title="Down expressed genes", mode="all", l2fc=0): assert l2fc <= 0, "l2fc must be negative" kargs = {} kargs["title"] = title kargs["labels"] = labels kargs["ax"] = ax kargs["data1"] = set(self.r1.gene_lists["down"]) kargs["data2"] = set(self.r2.gene_lists["down"]) self._venn(**kargs)
[docs] def plot_venn_up(self, labels=None, ax=None, title="Up expressed genes", mode="all", l2fc=0): """Venn diagram of cond1 from RNADiff result1 vs cond2 in RNADiff result 2 .. plot:: :include-source: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_venn_up() """ assert l2fc >= 0, "l2fc must be positive" kargs = {} kargs["title"] = title kargs["labels"] = labels kargs["ax"] = ax kargs["data1"] = set(self.r1.gene_lists["up"]) kargs["data2"] = set(self.r2.gene_lists["up"]) self._venn(**kargs)
def _venn(self, data1, data2, labels=None, ax=None, title="expressed genes"): from sequana.viz.venn import plot_venn if labels is None: labels = ["A", "B"] plot_venn([data1, data2], labels=labels, ax=ax, title=title)
[docs] def plot_venn_all(self, labels=None, ax=None, title="all expressed genes", mode="all"): kargs = {} kargs["title"] = title kargs["labels"] = labels kargs["ax"] = ax kargs["data1"] = set(self.r1.gene_lists["all"]) kargs["data2"] = set(self.r2.gene_lists["all"]) self._venn(**kargs)
[docs] def plot_corrplot_counts_raw(self, samples=None, log2=True, lower="pie", upper="text"): from sequana.viz import corrplot if samples is None: samples = self.r1.counts_raw.columns df1 = self.r1.counts_raw[samples] df2 = self.r2.counts_raw[samples] df = pd.concat([df1, df2], keys=["r1", "r2"], axis=1) if log2: df = pylab.log2(df) c = corrplot.Corrplot(df).plot(upper=upper, lower=lower) return df.corr()
[docs] def plot_corrplot_counts_normed(self, samples=None, log2=True, lower="pie", upper="text"): from sequana.viz import corrplot if samples is None: samples = self.r1.counts_raw.columns df1 = self.r1.counts_norm[samples] df2 = self.r2.counts_norm[samples] df = pd.concat([df1, df2], keys=["r1", "r2"], axis=1) if log2: df = pylab.log2(df) c = corrplot.Corrplot(df).plot(upper=upper, lower=lower) return df.corr()
[docs] def plot_jaccard_distance( self, mode, padjs=[0.0001, 0.001, 0.01, 0.05, 0.1], Nfc=50, smooth=False, window=5, ): assert mode in ["down", "up", "all"] pylab.clf() if mode == "down": m1 = self.r1.df.log2FoldChange.min() m2 = self.r2.df.log2FoldChange.min() minimum = min(m1, m2) print(m1, m2) X = pylab.linspace(0, minimum, Nfc) elif mode == "up": m1 = self.r1.df.log2FoldChange.max() m2 = self.r2.df.log2FoldChange.max() maximum = max(m1, m2) X = pylab.linspace(0, maximum, Nfc) else: minmax1 = self.r1.df.log2FoldChange.abs().max() minmax2 = self.r2.df.log2FoldChange.abs().max() maximum = max(minmax1, minmax2) X = pylab.linspace(0, maximum, Nfc) common = {} for padj in padjs: I = [] common[padj] = [] for x in X: if mode == "down": # less than a given fold change that is negative A = set(self.r1.df.query("log2FoldChange<=@x and padj<@padj").index) B = set(self.r2.df.query("log2FoldChange<=@x and padj<@padj").index) elif mode == "up": # greater than a given fold change that is positive A = set(self.r1.df.query("log2FoldChange>=@x and padj<@padj").index) B = set(self.r2.df.query("log2FoldChange>=@x and padj<@padj").index) else: A = set(self.r1.df.query("(log2FoldChange>=@x or log2FoldChange<=-@x) and padj<@padj").index) B = set(self.r2.df.query("(log2FoldChange>=@x or log2FoldChange<=-@x) and padj<@padj").index) if len(A) == 0 or len(B) == 0: # no overlap yet I.append(0) else: res = len(A.intersection(B)) / (len(A) + len(B) - len(A.intersection(B))) * 100 I.append(res) common[padj].append(len(A.intersection(B))) try: if smooth: I = pd.Series(I).rolling(window).median().values else: assert False except: pass pylab.plot(X, I, "o-", label=str(padj)) ax = pylab.gca() ax.set_ylabel("Jaccard similarity (intersection/union)") ax.set_xlabel("Fold change (log2)") ax2 = ax.twinx() for padj in padjs: ax2.plot(X, common[padj], ls="--") ax2.set_ylabel("Cardinality of the union ") ax.legend() ax.set_ylim([0, 100]) # ax2.set_ylim([0,100]) if mode == "down": ax.axvline(-2, ls="--", color="r") else: ax.axvline(2, ls="--", color="r") return I, common[padj]
[docs] def plot_common_major_counts( self, mode, labels=None, switch_up_down_cond2=False, add_venn=True, xmax=None, title="", fontsize=12, sortby="log2FoldChange", ): """ :param mode: down, up or all .. plot:: :include-source: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_common_major_counts("down") """ # cond1, cond2 = self._get_cond1_cond2() if labels is None: labels = ["r1", "r2"] if mode in ["down"]: # Negative values ! gl1 = list(set(self.r1.gene_lists["down"])) gl2 = list(set(self.r2.gene_lists["down"])) A = self.r1.df.loc[gl1].sort_values(by=sortby) B = self.r2.df.loc[gl1].sort_values(by=sortby) else: gl1 = list(set(self.r1.gene_lists[mode])) gl2 = list(set(self.r2.gene_lists[mode])) A = self.r1.df.loc[gl1].sort_values(by=sortby, ascending=False) B = self.r2.df.loc[gl1].sort_values(by=sortby, ascending=False) # sometimes, up and down may be inverted as compared to the other # conditions N = [] for i in range(1, max(len(A), len(B))): a = A.iloc[0:i].index b = B.iloc[0:i].index n = len(set(b).intersection(set(a))) N.append(n / i * 100) max_common = len(set(A.index).intersection(set(B.index))) pylab.clf() if len(A) > len(B): pylab.axhline( max_common / len(A) * 100, color="r", ls="--", label="min set intersection", ) pylab.axvline(len(B), ls="--", color="k", label="rank of minor set") else: pylab.axhline(max_common / len(B) * 100, color="r", ls="--", label="min set intersect") pylab.axvline(len(A), ls="--", color="k", label="rank of minor set") pylab.plot(N) pylab.xlabel("rank", fontsize=fontsize) pylab.ylabel("% common features", fontsize=fontsize) pylab.grid(True) pylab.ylim([0, 100]) if xmax: pylab.xlim([0, xmax]) else: pylab.xlim([0, max(len(A), len(B))]) pylab.title(title, fontsize=fontsize) ax = pylab.gca() ax2 = ax.twinx() ax2.plot(A[sortby].values, "orange", label=sortby) ax2.set_ylabel(sortby) pylab.legend(loc="lower left") ax.legend(loc="lower right") if add_venn: f = pylab.gcf() ax = f.add_axes([0.5, 0.5, 0.35, 0.35], facecolor="grey") if mode == "down": self.plot_venn_down(ax=ax, title=None, labels=labels, mode="two_only") elif mode == "up": self.plot_venn_up(ax=ax, title=None, labels=labels, mode="two_only") elif mode == "all": self.plot_venn_all(ax=ax, title=None, labels=labels, mode="two_only")
[docs] def plot_foldchange(self): mode = "all" # it may happen that list are not identical due to salmon and bowtie not # using same input gff for instance. X = self.r1.df.index Y = self.r2.df.index common = list(set(X).intersection(set(Y))) A = self.r1.df.loc[self.r1.gene_lists[mode]] B = self.r2.df.loc[self.r2.gene_lists[mode]] # cast set to list to avoid future error in pandas (june 2022) AB = list(set(A.index).intersection(set(B.index))) Ao = A.loc[list(set(A.index).difference(set(B.index)))] Bo = B.loc[list(set(B.index).difference(set(A.index)))] Ac = A.loc[AB] Bc = B.loc[AB] pylab.plot( self.r1.df.loc[common].log2FoldChange, self.r2.df.loc[common].log2FoldChange, "ko", alpha=0.5, markersize=1, ) pylab.plot(Ac.log2FoldChange, Bc.log2FoldChange, "or", alpha=0.5) pylab.plot(Ao.log2FoldChange, self.r2.df.loc[Ao.index].log2FoldChange, "*b", alpha=0.5) pylab.plot( Bo.log2FoldChange, self.r1.df.loc[Bo.index].log2FoldChange, color="cyan", marker="o", lw=0, alpha=0.5, )
[docs] def plot_volcano_differences(self, mode="all"): cond1, cond2 = "cond1", "cond2" labels = [cond1, cond2] A = self.r1.df.loc[self.r1.gene_lists[mode]] B = self.r2.df.loc[self.r2.gene_lists[mode]] # cast set to list to avoid future error in pandas (june 2022) AB = list(set(A.index).intersection(set(B.index))) Aonly = A.loc[list(set(A.index).difference(set(B.index)))] Bonly = B.loc[list(set(B.index).difference(set(A.index)))] Acommon = A.loc[AB] Bcommon = B.loc[AB] pylab.clf() pylab.plot( Acommon.log2FoldChange, -np.log10(Acommon.padj), marker="o", alpha=0.5, color="r", lw=0, label="Common in experiment 1", pickradius=4, picker=True, ) pylab.plot( Bcommon.log2FoldChange, -np.log10(Bcommon.padj), marker="o", alpha=0.5, color="orange", lw=0, label="Common in experiment 2", pickradius=4, picker=True, ) for x in AB: a_l = A.loc[x].log2FoldChange a_p = -np.log10(A.loc[x].padj) b_l = B.loc[x].log2FoldChange b_p = -np.log10(B.loc[x].padj) pylab.plot([a_l, b_l], [a_p, b_p], "k", alpha=0.5) pylab.plot( Bonly.log2FoldChange, -np.log10(Bonly.padj), marker="*", alpha=0.5, color="blue", lw=0, label="In experiment 2 only", pickradius=4, picker=True, ) pylab.plot( Aonly.log2FoldChange, -np.log10(Aonly.padj), marker="*", alpha=0.5, color="cyan", lw=0, label="In experiment 1 only", pickradius=4, picker=True, ) for name, x in Bonly.iterrows(): x1 = x.log2FoldChange y1 = -np.log10(x.padj) x2 = self.r1.df.loc[name].log2FoldChange y2 = -np.log10(self.r1.df.loc[name].padj) pylab.plot([x1, x2], [y1, y2], ls="--", color="r") for name, x in Aonly.iterrows(): x1 = x.log2FoldChange y1 = -np.log10(x.padj) x2 = self.r2.df.loc[name].log2FoldChange y2 = -np.log10(self.r2.df.loc[name].padj) pylab.plot([x1, x2], [y1, y2], ls="-", color="r") pylab.axhline(1.33, alpha=0.5, ls="--", color="r") pylab.xlabel("log2 fold Change") pylab.ylabel("log10 adjusted p-values") pylab.legend() pylab.grid(True) return Aonly, Bonly, Acommon, Bcommon
[docs] def plot_volcano(self, labels=None): """Volcano plot of log2 fold change versus log10 of adjusted p-value .. plot:: :include-source: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_volcano() """ cond1, cond2 = "cond1", "cond2" if labels is None: labels = [cond1, cond2] A = self.r1.df.loc[self.r1.gene_lists["all"]] B = self.r2.df.loc[self.r2.gene_lists["all"]] if cond1 == cond2: cond1 += "(1)" cond2 += "(2)" pylab.clf() pylab.plot( A.log2FoldChange, -np.log10(A.padj), marker="o", alpha=0.5, color="r", lw=0, label=labels[0], pickradius=4, picker=True, ) pylab.plot( B.log2FoldChange, -np.log10(B.padj), marker="x", alpha=0.5, color="k", lw=0, label=labels[1], pickradius=4, picker=True, ) genes = list(A.index) + list(B.index) pylab.grid(True) pylab.xlabel("fold change") pylab.ylabel("log10 adjusted p-value") pylab.legend(loc="lower right") ax = pylab.gca() def onpick(event): thisline = event.artist self.event = event label = thisline.get_label() if label == cond1: gene_name = A.index[event.ind[0]] x1 = round(A.loc[gene_name].log2FoldChange, 1) y1 = round(-np.log10(A.loc[gene_name].padj), 1) try: x2 = round(B.loc[gene_name].log2FoldChange, 1) y2 = round(-np.log10(B.loc[gene_name].padj), 1) except: x2, y2 = None, None else: gene_name = B.index[event.ind[0]] x1 = round(B.loc[gene_name].log2FoldChange, 1) y1 = round(-np.log10(B.loc[gene_name].padj), 1) try: x2 = round(A.loc[gene_name].log2FoldChange, 1) y2 = round(-np.log10(A.loc[gene_name].padj), 1) except: x2, y2 = None, None try: if x2 is None: ax.title.set_text("{} at pos [{},{}]".format(gene_name, x1, y1)) else: ax.title.set_text("{} at pos [{},{}] and [{},{}]".format(gene_name, x1, y1, x2, y2)) except: print("exception") ax.title.set_text("") pylab.draw() fig = pylab.gcf() fig.canvas.mpl_connect("pick_event", onpick)
[docs] def plot_geneset( self, indices, showlines=True, showdots=True, colors={ "bodies": "blue", "cbars": "k", "dot": "blue", "cmins": "k", "cmaxes": "k", }, ): """indices is a list that represents a gene sets cmins, cmaxes, cbars are the colors of the bars inside the body of the violin plots .. plot:: from sequana import sequana_data from sequana.compare import RNADiffCompare c = RNADiffCompare( sequana_data("rnadiff_salmon.csv", "doc/rnadiff_compare"), sequana_data("rnadiff_bowtie.csv", "doc/rnadiff_compare") ) c.plot_volcano() indices = c.r1.df.query("log2FoldChange>1 or log2FoldChange<-1").index.values indices = [x for x in indices if x in c.r1.df.index and x in c.r2.df.index] c.plot_geneset(indices, showlines=True) """ from matplotlib.pyplot import violinplot from pylab import axhline, clf, plot, violinplot, xticks, ylabel N = len(self.rns) data = [self.rns[i].df.loc[indices]["log2FoldChange"].values for i in range(0, N)] clf() axhline(0, color="k", ls="--", zorder=-1) vp = violinplot(data) for x in vp["bodies"]: x.set_color(colors["bodies"]) vp["cbars"].set_color(colors["cbars"]) vp["cmins"].set_color(colors["cmins"]) vp["cmaxes"].set_color(colors["cmaxes"]) for i in range(N - 1): for x, y in zip(data[i], data[i + 1]): if showlines: plot([i + 1, i + 2], [x, y], "or-", alpha=0.5) else: plot([i + 1, i + 2], [x, y], "or", alpha=0.5) xticks(range(1, N + 1), [f"C{i}" for i in range(1, N + 1)], fontsize=16) ylabel("log2 Fold Change", fontsize=16) return data