#
# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import glob
import json
import os
import shutil
import time
import urllib
from os.path import expanduser
from pathlib import Path
import colorlog
import colormap
import plotly.express as px
import requests
from bioservices import KEGG
from tqdm import tqdm
from sequana.enrichment.gsea import GSEA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.summary import Summary
logger = colorlog.getLogger(__name__)
__all__ = ["KEGGPathwayEnrichment"]
[docs]class KEGGPathwayEnrichment:
"""KEGG Pathways enrichment from DGE results
DGE = Differentially Gene Expression
Current input is the output of the RNADiff analysis. This is a file
than can be read by RNADiffResults
This class takes as input a dictionary with 3 lists of genes: 'up',
'down' and 'all'.
For example, using the output of the RNA-seq DGE, use::
from sequana import RNADiffResults
r = RNADiffResults("rnadiff.csv")
r._log2_fc = 1
gl = r.get_gene_lists(annot_col="gene_name")
gl['KO_vs_cont"]
ke = KEGGPathwayEnrichment(gl, "mmu") # mmu for mouse
this calls ke.compute_enrichment() that stores the up, down and all results
in the attribute :attr:`enrichment` as a dictionary.
You can now plot the results::
ke.barplot('down')
and save enriched pathways as follows::
up = ke.save_significant_pathways("up")
down = ke.save_significant_pathways("down")
up.to_csv("kegg_pathway_up_regulated.csv")
down.to_csv("kegg_pathway_down_regulated.csv")
This class works like a charm for ecoli with GFF that uses gene names.
For mus musculus, organism is **mmu** (not **mus*). you will need to have
a mapping of the Ensembl ID into KEGG IDs (actually gene name).
You can perform the conversion using BioServices/BioMart. We have
implemented a simple function inside Sequana::
from sequana import Mart
conv = Mart("mmusculus_gene_ensembl")
df = conf.query()
conf.save(df)
You can then import the dataframe, as follows using the mapper argument::
import pandas as pd
df = pd.read_csv("biomart.csv")
df = df.rename({"external_gene_name":"name", "ensembl_gene_id": "ensembl"},
axis=1)
df = df.set_index("ensembl", inplace=True)
ke = KEGGPathwayEnrichment("path_to_rnadiff", "mmu", mapper=df)
ke.scatterplot('down')
savefig("B4052_T1vsT0_KE_scatterplot_down.png")
ke.scatterplot('up')
savefig("B4052_T1vsT0_KE_scatterplot_up.png")
Pathways are loaded from KEGG which may take some time. For development or production, you may want
to save the KEGG pathways in a given place. Note however, that the pathways that are enriched will
still be downloaded for the final annotation and image creation
To save the pathways locally and load them later do as follows (here for human)::
ke = KEGGPathwayEnrichment({}, organism="hsa")
ke.save_pathways("all_pathways/")
# load them back next time
ke = KEGGPathwayEnrichment({}, organism="hsa", preload_directory="all_pathways")
"""
def __init__(
self,
gene_lists,
organism,
progress=True,
mapper=None,
background=None,
padj_cutoff=0.05,
preload_directory=None,
convert_input_gene_to_upper_case=False,
color_node_with_annotation="Name",
used_genes=None,
):
"""
In some cases, the input identifiers are converted into names thanks to
the input mapper (csv file). Yet, if the external name are from one
species and you use another species in kegg, the kegg names may be upper case
while your species' name are in lower case. In such situations, you may
set input identifiers are upper case setting the
convert_input_gene_to_upper_case parameter to True
"""
self.convert_input_gene_to_upper_case = convert_input_gene_to_upper_case
self.color_node_with_annotation = color_node_with_annotation
self.kegg = KEGG(cache=True)
self.kegg.organism = organism
self.summary = Summary("KEGGPathwayEnrichment")
self.gene_lists = gene_lists
# What is the intersection between the gene names found in KEGG and
# the used names used within the RNAdiff analysis (and therefore the
# gene names to be found in gene_lists)
# First, we get the list of KEGG names and print number of genes
kegg_gene_names = self.kegg.list(self.kegg.organism).strip()
N_kegg_genes = len(kegg_gene_names.split("\n"))
logger.info(f"Number of KEGG genes for {organism}: {N_kegg_genes}")
# then, we extract the fourth columns that contains gene names and synonyms
kegg_gene_names = [x.split("\t")[3] for x in kegg_gene_names.split("\n") if len(x.split("\t")) == 4]
kegg_gene_names = [y.strip() for x in kegg_gene_names for y in x.split(";")]
kegg_gene_names = [y.strip() for x in kegg_gene_names for y in x.split(",")]
def _get_intersection_kegg_genes(genes):
if len(genes) == 0:
return "NA"
else:
return len(set(kegg_gene_names).intersection(set(genes))) / len(genes) * 100
dge_genes = set([x for k, v in gene_lists.items() for x in v])
mapped = [
_get_intersection_kegg_genes(gene_lists["down"]),
_get_intersection_kegg_genes(gene_lists["up"]),
_get_intersection_kegg_genes(gene_lists["all"]),
]
lengths = [len(gene_lists["down"]), len(gene_lists["up"]), len(gene_lists["all"])]
category = ["down", "up", "all"]
if used_genes is not None:
lengths.append(len(used_genes))
category.append("all genes")
intersection = _get_intersection_kegg_genes(used_genes)
logger.info(f"Percentage of genes used in RNAdiff and found in KEGG: {intersection}")
mapped.append(intersection)
# if no background provided, let us use the number of found genes
if background is None:
background = len(used_genes)
self.overlap_stats = pd.DataFrame({"category": category, "mapped_percentage": mapped, "N": lengths})
# Define the background
if background:
self.background = background
logger.info(f"User-defined background: {self.background}")
else:
self.background = N_kegg_genes
logger.info(f"Background defined as number of KEGG genes: {self.background} ")
self.summary.add_params(
{
"organism": organism,
"mapper": (True if mapper is not None else False),
"background": self.background,
}
)
self.padj_cutoff = padj_cutoff
self._load_pathways(progress=progress, preload_directory=preload_directory)
if isinstance(mapper, str):
df = pd.read_csv(mapper)
df = df.rename({"external_gene_name": "name", "ensembl_gene_id": "ensembl"}, axis=1)
df.set_index("ensembl", inplace=True)
self.mapper = df
else: # the dataframe should already contain the correct columns and index
self.mapper = mapper
try:
self.compute_enrichment()
except Exception as err: # pragma: no cover
logger.critical(err)
logger.critical("An error occured while computing enrichments. ")
def _check_category(self, cat):
if cat not in self.gene_lists.keys():
raise ValueError(f"category must be set to one of {self.gene_lists.keys()}. You provided {cat}")
[docs] def save_pathways(self, out_directory):
outdir = Path(out_directory)
outdir.mkdir(exist_ok=True)
with open(outdir / f"{self.kegg.organism}.json", "w") as fout:
json.dump(self.pathways, fout)
def _load_pathways(self, progress=True, preload_directory=None):
# This is just loading all pathways once for all
self.pathways = {}
if preload_directory:
# preload is a directory with all pathways in it
indir = Path(preload_directory)
logger.info(
f"Loading pathways from local files in {preload_directory}. Expecting a file named {self.kegg.organism}.json"
)
with open(indir / f"{self.kegg.organism}.json", "r") as fin:
data = json.load(fin)
self.pathways = data
logger.info(f"Loaded {len(self.pathways)} pathways.")
else: # pragma: no cover #not tested due to slow call
for ID in tqdm(self.kegg.pathwayIds, desc="Downloading KEGG pathways"):
self.pathways[ID.replace("path:", "")] = self.kegg.parse(self.kegg.get(ID))
# Some cleanup. Note that if we read the json file, this is different
# since already cleanup but this code does no harm
for ID in self.pathways.keys():
name = self.pathways[ID]["NAME"]
if isinstance(name, list):
name = name[0]
self.pathways[ID]["NAME"] = name.split(" - ", 1)[0]
# save gene sets
self.gene_sets = {}
for ID in self.pathways.keys():
res = self.pathways[ID]
if "GENE" in res.keys():
results = []
# some pathways reports genes as a dictionary id:'gene name; description' ('.eg. eco')
# others reports genes as a dictionary id:'description'
for geneID, description in res["GENE"].items():
if ";" in description:
name = description.split(";")[0]
else:
name = geneID
results.append(name)
self.gene_sets[ID] = results
else:
logger.debug("SKIPPED (no genes) {}: {}".format(ID, res["NAME"]))
# save all pathways info
self.df_pathways = pd.DataFrame(self.pathways).T
del self.df_pathways["ENTRY"]
del self.df_pathways["REFERENCE"]
go = [x["GO"] if isinstance(x, dict) and "GO" in x.keys() else None for x in self.df_pathways.DBLINKS]
self.df_pathways["GO"] = go
del self.df_pathways["DBLINKS"]
[docs] def plot_genesets_hist(self, bins=20):
N = len(self.gene_sets.keys())
pylab.clf()
pylab.hist([len(v) for k, v in self.gene_sets.items()], bins=bins, lw=1, ec="k")
pylab.title("{} gene sets".format(N))
pylab.xlabel("Gene set sizes")
pylab.grid(True, alpha=0.5)
a, b = pylab.xlim()
pylab.xlim([0, b])
[docs] def compute_enrichment(self, background=None):
if background is None:
background = self.background
self.summary.data["missing_genes"] = {}
self.summary.data["input_gene_list"] = {}
self.enrichment = {
category: self._enrichr(category, background=background) for category in self.gene_lists.keys()
}
self.dfs = {
category: self._get_final_df(self.enrichment[category].results) for category in self.gene_lists.keys()
}
def _enrichr(self, category, background, verbose=True):
if isinstance(category, list):
gene_list = category
else:
self._check_category(category)
gene_list = self.gene_lists[category]
logger.info(f"Computing enrichment for category '{category}'. Input gene list of {len(gene_list)} ids")
self.summary.data["input_gene_list"][category] = len(gene_list)
if self.mapper is not None:
missing = [x for x in gene_list if x not in self.mapper.index]
logger.info(f"Missing genes from mapper dataframe: {len(missing)}")
self.summary.data["missing_genes"][category] = ",".join(missing)
gene_list = [x for x in gene_list if x in self.mapper.index]
identifiers = self.mapper.loc[gene_list]["name"].drop_duplicates().values
if self.convert_input_gene_to_upper_case:
identifiers = [x.upper() for x in identifiers if isinstance(x, str)]
logger.info(f"Mapped gene list of {len(identifiers)} ids")
gene_list = list(identifiers)
gs = GSEA(self.gene_sets)
enr = gs.compute_enrichment(
gene_list=gene_list,
verbose=verbose,
background=background,
)
return enr
def _get_final_df(self, df):
# takes the df and populate the name and size of the found pathways
# we also sort by adjusted p-value
if len(df) == 0:
return df
df = df.copy()
df["name"] = [self.pathways[x]["NAME"] for x in df.Term]
df = df.sort_values("Adjusted P-value")
df["Adjusted P-value (-log10)"] = -np.log10(df["Adjusted P-value"])
df.reset_index(drop=True, inplace=True)
df = df.sort_values("Adjusted P-value")
df = df.rename({"Term": "pathway_id"}, axis=1)
df = df[df.columns]
df["significative"] = df["Adjusted P-value"] < self.padj_cutoff
return df
[docs] def barplot(self, category, nmax=15):
self._check_category(category)
df = self.dfs[category].query("significative == True").head(nmax)
df = df.sort_values("Adjusted P-value (-log10)")
fig = px.bar(df, x="Adjusted P-value (-log10)", y="name", orientation="h")
# To have all labels displayed
if len(df) > 20:
fig.update_layout(height=800)
return fig
[docs] def barplot_up_and_down(self, nmax=15):
import plotly.graph_objects as go
dfup = self.dfs["up"].query("significative == True").head(nmax)
dfdown = self.dfs["down"].query("significative == True").head(nmax)
names = sorted(list(set(list(dfdown["name"].values) + list(dfup["name"].values))))
names = names[::-1]
U = [dfup.query("name == @name")["size"].values[0] if name in dfup["name"].values else 0 for name in names]
D = [-dfdown.query("name == @name")["size"].values[0] if name in dfdown["name"].values else 0 for name in names]
fig = go.Figure()
fig.add_trace(go.Bar(y=names, x=U, marker_color="blue", orientation="h", name="Up"))
fig.add_trace(go.Bar(y=names, x=D, base=0, orientation="h", marker_color="red", name="Down"))
fig.update_layout(barmode="stack", xaxis_title="Number of Genes")
return fig
[docs] def scatterplot(self, category, nmax=15):
self._check_category(category)
df = self.dfs[category].query("significative == True").head(nmax)
df.sort_values("Odds Ratio", inplace=True)
df = df.round({"Odds Ratio": 2, "Combined Score": 2})
fig = px.scatter(
df,
x="Odds Ratio",
y="name",
color="Combined Score",
size="size",
hover_data=["Combined Score", "Adjusted P-value", "Overlap"],
color_continuous_scale="Viridis",
)
# To have all labels displayed
if len(df) > 20:
fig.update_layout(height=800)
return fig
def _get_summary_pathway(self, pathway_ID, df, l2fc=0, padj=0.05):
genes = self.df_pathways.loc[pathway_ID]["GENE"]
df_down = df.query("padj<=@padj and log2FoldChange<=-@l2fc").copy()
df_up = df.query("padj<=@padj and log2FoldChange>=@l2fc").copy()
# for the color, we need to find the match between names and the annotation
# since names is suppose to have been populated with the annotaion, it should be
# found. no need for sanity checks.
df_down["Name"] = df_down[self.color_node_with_annotation]
df_up["Name"] = df_up[self.color_node_with_annotation]
# in principle, the KEGG pathays field 'GENE' is stored as
# "GENE":{key: value} and the values are the NAME, a semi column, and a description
# hence the split on ; herebelow:
# unfortunately, there are special cases to handle such as vibrio cholera (vc)
mapper = {}
if self.kegg.organism.startswith("vc"): # pragma: no cover
for k, v in genes.items():
# the value is just the description. Let us assume that the name is also the ID
mapper[k] = k
else:
for k, v in genes.items():
mapper[v.split(";")[0]] = k
# may not be used ?
self.genes = genes
self.df_down = df_down
self.df_up = df_up
summary_names = []
summary_keggids = []
summary_types = []
summary_pvalues = []
summary_fcs = []
if self.mapper is not None:
if "Name" not in df_down.columns:
df_down["Name"] = df_down["ID"]
Names = []
for index in df_down.index:
Names.append(self.mapper.loc[index]["name"][0])
df_down["Name"] = Names
if "Name" not in df_up.columns:
df_up["Name"] = df_up["ID"]
Names = []
for index in df_up.index:
Names.append(self.mapper.loc[index]["name"][0])
df_up["Name"] = Names
#
identifiers = []
new_mapper = {}
for name, kegg_id in mapper.items():
try:
identifier = self.mapper.query("name == @name")["name"].drop_duplicates().index[0]
identifiers.append(identifier)
new_mapper[identifier] = kegg_id
except:
logger.warning("Skipped {}(kegg ID {}). could not find mapping".format(name, kegg_id))
mapper = new_mapper
df_down["Name"] = df_down["Name"].str.lower()
df_up["Name"] = df_up["Name"].str.lower()
for name, kegg_id in mapper.items():
summary_names.append(name)
summary_keggids.append(kegg_id)
if name.lower() in {x.lower() for x in df_down.Name.fillna("undefined")}:
padj = -pylab.log10(df_down.query("Name==@name.lower()").padj.values[0])
fc = df_down.query("Name==@name.lower()").log2FoldChange.values[0]
summary_fcs.append(fc)
summary_pvalues.append(padj)
summary_types.append("-")
elif name.lower() in {x.lower() for x in df_up.Name.fillna("undefined")}:
padj = -pylab.log10(df_up.query("Name==@name.lower()").padj.values[0])
summary_pvalues.append(padj)
fc = df_up.query("Name==@name.lower()").log2FoldChange.values[0]
summary_fcs.append(fc)
summary_types.append("+")
else:
summary_pvalues.append(None)
summary_fcs.append(None)
summary_types.append("=")
summary = pd.DataFrame(
{
"type": summary_types,
"name": summary_names,
"padj": summary_pvalues,
"fc": summary_fcs,
"keggid": summary_keggids,
}
)
summary["description"] = [self.pathways[pathway_ID]["GENE"][x] for x in summary.keggid]
return summary
def _get_colors(self, summary):
colors = {}
# replace iterrows by faster method
for _, row in summary.iterrows():
row = row.fillna(0)
l2fc = row["fc"]
kegg_id = row["keggid"]
# Orange to Blue using _r means Blue to Orange where Blue is cold
# (negative fold change)
cmap = colormap.get_cmap.cmap_builder("PuOr_r")
if l2fc <= -4:
l2fc = -4
elif l2fc >= 4:
l2fc = 4
# to get a value in the range 0,1 expected by the colormap
l2fc = (l2fc + 4) / 8
color = colormap.rgb2hex(*cmap(l2fc)[0:3], normalised=True)
# this is to reduce URL length until a POST solution is found.
# all entries with low l2fc below |1| are ignored
# 0.625 <=> l2fc = 1
# 0.375 <=> l2fc = -1
if l2fc >= 0.625 or l2fc <= 0.375:
colors[kegg_id] = f"{color}"
# 0.125 <==> l2fc = -3 here color is very dark, text should be in another color
if l2fc < 0.125 or l2fc > 0.875:
colors[kegg_id] = f"{color},white"
# else: #FIXME for now, genes not in the annotation/rnadiff are also ignored
# to keep URL short
# colors[kegg_id] = f"{color}"
return colors
[docs] def save_pathway(self, pathway_ID, df, scale=None, show=False, filename=None):
summary = self._get_summary_pathway(pathway_ID, df)
colors = self._get_colors(summary)
count_up = len(summary.query("type == '+'"))
count_down = len(summary.query("type == '-'"))
logger.info(
f"pathway {pathway_ID} total genes: {len(summary)}. Found {count_down} down-regulated and {count_up} up-regulated"
)
url = "https://www.kegg.jp/kegg-bin/show_pathway"
params = {
"map": pathway_ID,
"multi_query": "\r\n".join([f"{k} {v}" for k, v in colors.items()]),
}
if filename is None:
filename = f"{pathway_ID}.png"
try:
# let us try the selenium solution. If it works, we overwrite the previous image
urlsel = url + f"?map={pathway_ID}&multi_query="
# print(urlsel)
# print("\\r\\n".join([f"{k} {v}" for k, v in colors.items()]))
urlsel += urllib.parse.quote("\r\n".join([f"{k} {v}" for k, v in colors.items()]))
if len(urlsel) >= 2048:
logger.warning(f"EXCESS. consider increasing l2fc: {len(urlsel)}, {len(colors)}")
self._download_kegg_image(urlsel, pathway_ID)
# in theory one should have an image called PATHWAYID_date_ID.png
# we need to renae it
filenames = glob.glob(f"{pathway_ID}@2x_*png")
if len(filenames) > 1:
logger.warning(
f"expected one PNG file. found several KEGG PNG files: {filenames}. please remove them. Moving the first one found"
)
if len(filenames) == 0:
logger.warning("no files were saved using headless call. roll back to requests ")
# filename is defined above by the user or as pathway_ID.png
shutil.move(filenames[0], filename)
except Exception as err: # pragma: no cover
print(err)
# color do not work anymore since Aug 2023 but this allows us to get the image
# in case the new implementation (above) fails
self.params = params
html_page = requests.post(url, data=params)
self.tmp = html_page
html_page = html_page.content.decode()
try:
links_to_png = [x for x in html_page.split() if "png" in x and x.startswith("src")]
link_to_png = links_to_png[0].replace("src=", "").replace('"', "")
r = requests.get(f"https://www.kegg.jp/{link_to_png}")
with open(filename, "wb") as fout:
fout.write(r.content)
except IndexError:
logger.warning(f"could not create image for {pathway_ID}")
return summary
def _download_kegg_image(self, url, pathwayID):
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# Define the path to the Chrome WebDriver executable
# Drivers can be found here : https://pypi.org/project/selenium/
home = expanduser("~")
webdriver_path = f"{home}/.config/sequana/chromedriver"
if os.path.exists(webdriver_path) is False:
logger.critical(
"To annotate KEGG image, you currently need chrome and a valid/compatible selenium driver for chroe. Please see https://pypi.org/project/selenium"
)
# Set up the WebDriver with options
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = "/usr/bin/google-chrome" # Path to your Chrome executable
chrome_options.add_argument("--headless") # Optional: run Chrome in headless mode (no GUI)
# Create a ChromeService object with the WebDriver path
chrome_service = ChromeService(executable_path=webdriver_path)
logger.info(f"Creating driver to access KEGG website for {pathwayID}")
# Create a WebDriver instance
driver = webdriver.Chrome(service=chrome_service, options=chrome_options)
# Navigate to the web page. Wait 10s for the page to be updated with the requests
# somehow the WebDriverWait is not enough. Requests larger than 2048 characters will fail
# here, FIXME with a javascript of post requests but not easy.
driver.get(url)
time.sleep(10)
# Find and click the menu tab 'download-menu'
download_menu = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "downloadMenu")))
# Use ActionChains to hover over the "Download" menu item
actions = ActionChains(driver)
actions.move_to_element(download_menu).perform()
logger.info("Downloading image\n")
image_link = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, "Image (png) file 2x")))
actions.move_to_element(image_link).click().perform()
# Wait for the download to complete (you can use a more robust approach to wait for the file to be ready)
time.sleep(5)
# Close the WebDriver
driver.quit()
[docs] def save_significant_pathways(self, category, nmax=20, tag="", outdir="."): # pragma: no cover
"""category should be up, down or all"""
# select the relevant pathways
df = self.dfs[category].query("significative == True")
logger.warning("Found {} pathways to save".format(len(df)))
if len(df) > nmax:
logger.warning("Restricted pathways to {}".format(nmax))
df = df.head(nmax)
logger.info("saving {} deregulated pathways".format(len(df)))
summaries = {}
for ID in df["pathway_id"]:
summary = self.save_pathway(ID, df, filename=(Path(outdir) / f"{ID}_{category}.png"))
summaries[ID] = summary
return summaries
[docs] def find_pathways_by_gene(self, gene_name):
"""Returns pathways that contain the gene name
::
ke.find_pathways_by_gene("ysgA")
"""
# we are going to search for the gene name and lower case onyl
candidates = [gene_name, gene_name.lower()]
found_paths = []
for key in self.pathways.keys():
if "GENE" in self.pathways[key]:
genes = set([x.split(";")[0] for x in self.pathways[key]["GENE"].values()])
for candidate in candidates:
if candidate in genes:
found_paths.append(key)
return list(set(found_paths))
[docs] def save_project(self, tag, outdir="."):
"""Save tables and visualisations of the complete enrichmment analysis."""
outdir = Path(outdir)
outdir.mkdir(parents=True, exist_ok=True)
for category in self.gene_lists.keys():
common_out = Path(f"{tag}_kegg_gsea_{category}_degs")
df = self.dfs[category]
if not df.empty:
df.to_csv(outdir / (common_out.name + ".csv"))
df.query("significative == True").to_csv(outdir / (common_out.name + "_significant.csv"))
self.barplot(category).write_image(outdir / (common_out.name + "_barplot.png"))
self.scatterplot(category).write_image(outdir / (common_out.name + "_scatterplot.png"))
# In case of no enrichment results, create empty files stating so
else:
(outdir / (common_out.name + "_NO_RESULTS")).touch()