#
# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import json
import xml.etree.ElementTree as ET
from io import BytesIO
from pathlib import Path
import colorlog
import requests
from tqdm import tqdm
from sequana.enrichment.gsea import GSEA
from sequana.lazy import bioservices, colormap
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import plotly_express as px
from sequana.lazy import pylab
from sequana.summary import Summary
logger = colorlog.getLogger(__name__)
__all__ = ["KEGGPathwayEnrichment"]
[docs]
class KEGGPathwayEnrichment:
"""KEGG Pathways enrichment from DGE results
DGE = Differentially Gene Expression
Current input is the output of the RNADiff analysis. This is a file
than can be read by RNADiffResults
This class takes as input a dictionary with 3 lists of genes: 'up',
'down' and 'all'.
For example, using the output of the RNA-seq DGE, use::
from sequana import RNADiffResults
r = RNADiffResults("rnadiff.csv")
r._log2_fc = 1
gl = r.get_gene_lists(annot_col="gene_name")
gl['KO_vs_cont"]
ke = KEGGPathwayEnrichment(gl, "mmu") # mmu for mouse
this calls ke.compute_enrichment() that stores the up, down and all results
in the attribute :attr:`enrichment` as a dictionary.
You can now plot the results::
ke.barplot('down')
and save enriched pathways as follows::
up = ke.save_significant_pathways("up")
down = ke.save_significant_pathways("down")
up.to_csv("kegg_pathway_up_regulated.csv")
down.to_csv("kegg_pathway_down_regulated.csv")
This class works like a charm for ecoli with GFF that uses gene names.
For mus musculus, organism is **mmu** (not **mus*). you will need to have
a mapping of the Ensembl ID into KEGG IDs (actually gene name).
You can perform the conversion using BioServices/BioMart. We have
implemented a simple function inside Sequana::
from sequana import Mart
conv = Mart("mmusculus_gene_ensembl")
df = conf.query()
conf.save(df)
You can then import the dataframe, as follows using the mapper argument::
import pandas as pd
df = pd.read_csv("biomart.csv")
df = df.rename({"external_gene_name":"name", "ensembl_gene_id": "ensembl"},
axis=1)
df = df.set_index("ensembl", inplace=True)
ke = KEGGPathwayEnrichment("path_to_rnadiff", "mmu", mapper=df)
ke.scatterplot('down')
savefig("B4052_T1vsT0_KE_scatterplot_down.png")
ke.scatterplot('up')
savefig("B4052_T1vsT0_KE_scatterplot_up.png")
Pathways are loaded from KEGG which may take some time. For development or production, you may want
to save the KEGG pathways in a given place. Note however, that the pathways that are enriched will
still be downloaded for the final annotation and image creation
To save the pathways locally and load them later do as follows (here for human)::
ke = KEGGPathwayEnrichment({}, organism="hsa")
ke.save_pathways("all_pathways/")
# load them back next time
ke = KEGGPathwayEnrichment({}, organism="hsa", preload_directory="all_pathways")
"""
def __init__(
self,
gene_lists,
organism,
progress=True,
mapper=None,
background=None,
padj_cutoff=0.05,
preload_directory=None,
convert_input_gene_to_upper_case=False,
color_node_with_annotation="Name",
used_genes=None,
):
"""
In some cases, the input identifiers are converted into names thanks to
the input mapper (csv file). Yet, if the external name are from one
species and you use another species in kegg, the kegg names may be upper case
while your species' name are in lower case. In such situations, you may
set input identifiers are upper case setting the
convert_input_gene_to_upper_case parameter to True
"""
self.convert_input_gene_to_upper_case = convert_input_gene_to_upper_case
self.color_node_with_annotation = color_node_with_annotation
self.kegg = bioservices.KEGG(cache=True)
self.kegg.organism = organism
self.summary = Summary("KEGGPathwayEnrichment")
self.gene_lists = gene_lists
# What is the intersection between the gene names found in KEGG and
# the used names used within the RNAdiff analysis (and therefore the
# gene names to be found in gene_lists)
# First, we get the list of KEGG names and print number of genes
kegg_gene_names = self.kegg.list(self.kegg.organism).strip()
N_kegg_genes = len(kegg_gene_names.split("\n"))
logger.info(f"Number of KEGG genes for {organism}: {N_kegg_genes}")
# then, we extract the fourth columns that contains gene names and synonyms
kegg_gene_names = [x.split("\t")[3] for x in kegg_gene_names.split("\n") if len(x.split("\t")) == 4]
kegg_gene_names = [y.strip() for x in kegg_gene_names for y in x.split(";")]
kegg_gene_names = [y.strip() for x in kegg_gene_names for y in x.split(",")]
def _get_intersection_kegg_genes(genes):
if len(genes) == 0:
return "NA"
else:
return len(set(kegg_gene_names).intersection(set(genes))) / len(genes) * 100
dge_genes = set([x for k, v in gene_lists.items() for x in v])
mapped = [
_get_intersection_kegg_genes(gene_lists["down"]),
_get_intersection_kegg_genes(gene_lists["up"]),
_get_intersection_kegg_genes(gene_lists["all"]),
]
lengths = [len(gene_lists["down"]), len(gene_lists["up"]), len(gene_lists["all"])]
category = ["down", "up", "all"]
if used_genes is not None:
lengths.append(len(used_genes))
category.append("all genes")
intersection = _get_intersection_kegg_genes(used_genes)
logger.info(f"Percentage of genes used in RNAdiff and found in KEGG: {intersection}")
mapped.append(intersection)
# if no background provided, let us use the number of found genes
if background is None:
background = len(used_genes)
self.overlap_stats = pd.DataFrame({"category": category, "mapped_percentage": mapped, "N": lengths})
# Define the background
if background:
self.background = background
logger.info(f"User-defined background: {self.background}")
else:
self.background = N_kegg_genes
logger.info(f"Background defined as number of KEGG genes: {self.background} ")
self.summary.add_params(
{
"organism": organism,
"mapper": (True if mapper is not None else False),
"background": self.background,
}
)
self.padj_cutoff = padj_cutoff
self._load_pathways(progress=progress, preload_directory=preload_directory)
if isinstance(mapper, str):
df = pd.read_csv(mapper)
df = df.rename({"external_gene_name": "name", "ensembl_gene_id": "ensembl"}, axis=1)
df.set_index("ensembl", inplace=True)
self.mapper = df
else: # the dataframe should already contain the correct columns and index
self.mapper = mapper
try:
self.compute_enrichment()
except Exception as err: # pragma: no cover
logger.critical(err)
logger.critical("An error occured while computing enrichments. ")
def _check_category(self, cat):
if cat not in self.gene_lists.keys():
raise ValueError(f"category must be set to one of {self.gene_lists.keys()}. You provided {cat}")
[docs]
def save_pathways(self, out_directory):
outdir = Path(out_directory)
outdir.mkdir(exist_ok=True)
with open(outdir / f"{self.kegg.organism}.json", "w") as fout:
json.dump(self.pathways, fout)
def _load_pathways(self, progress=True, preload_directory=None):
# This is just loading all pathways once for all
self.pathways = {}
if preload_directory:
# preload is a directory with all pathways in it
indir = Path(preload_directory)
logger.info(
f"Loading pathways from local files in {preload_directory}. Expecting a file named {self.kegg.organism}.json"
)
with open(indir / f"{self.kegg.organism}.json", "r") as fin:
data = json.load(fin)
self.pathways = data
logger.info(f"Loaded {len(self.pathways)} pathways.")
else: # pragma: no cover #not tested due to slow call
for ID in tqdm(self.kegg.pathwayIds, desc="Downloading KEGG pathways"):
self.pathways[ID.replace("path:", "")] = self.kegg.parse(self.kegg.get(ID))
# Some cleanup. Note that if we read the json file, this is different
# since already cleanup but this code does no harm
for ID in self.pathways.keys():
name = self.pathways[ID]["NAME"]
if isinstance(name, list):
name = name[0]
self.pathways[ID]["NAME"] = name.split(" - ", 1)[0]
# save gene sets
self.gene_sets = {}
for ID in self.pathways.keys():
res = self.pathways[ID]
if "GENE" in res.keys():
results = []
# some pathways reports genes as a dictionary id:'gene name; description' ('.eg. eco')
# others reports genes as a dictionary id:'description'
for geneID, description in res["GENE"].items():
if ";" in description:
name = description.split(";")[0]
else:
name = geneID
results.append(name)
self.gene_sets[ID] = results
else:
logger.debug("SKIPPED (no genes) {}: {}".format(ID, res["NAME"]))
# save all pathways info
self.df_pathways = pd.DataFrame(self.pathways).T
del self.df_pathways["ENTRY"]
del self.df_pathways["REFERENCE"]
go = [x["GO"] if isinstance(x, dict) and "GO" in x.keys() else None for x in self.df_pathways.DBLINKS]
self.df_pathways["GO"] = go
del self.df_pathways["DBLINKS"]
[docs]
def plot_genesets_hist(self, bins=20):
N = len(self.gene_sets.keys())
pylab.clf()
pylab.hist([len(v) for k, v in self.gene_sets.items()], bins=bins, lw=1, ec="k")
pylab.title("{} gene sets".format(N))
pylab.xlabel("Gene set sizes")
pylab.grid(True, alpha=0.5)
a, b = pylab.xlim()
pylab.xlim([0, b])
[docs]
def compute_enrichment(self, background=None):
if background is None:
background = self.background
self.summary.data["missing_genes"] = {}
self.summary.data["input_gene_list"] = {}
self.enrichment = {
category: self._enrichr(category, background=background) for category in self.gene_lists.keys()
}
self.dfs = {
category: self._get_final_df(self.enrichment[category].results) for category in self.gene_lists.keys()
}
def _enrichr(self, category, background, verbose=True):
if isinstance(category, list):
gene_list = category
else:
self._check_category(category)
gene_list = self.gene_lists[category]
logger.info(f"Computing enrichment for category '{category}'. Input gene list of {len(gene_list)} ids")
self.summary.data["input_gene_list"][category] = len(gene_list)
if self.mapper is not None:
missing = [x for x in gene_list if x not in self.mapper.index]
logger.info(f"Missing genes from mapper dataframe: {len(missing)}")
self.summary.data["missing_genes"][category] = ",".join(missing)
gene_list = [x for x in gene_list if x in self.mapper.index]
identifiers = self.mapper.loc[gene_list]["name"].drop_duplicates().values
if self.convert_input_gene_to_upper_case:
identifiers = [x.upper() for x in identifiers if isinstance(x, str)]
logger.info(f"Mapped gene list of {len(identifiers)} ids")
gene_list = list(identifiers)
gs = GSEA(self.gene_sets)
enr = gs.compute_enrichment(
gene_list=gene_list,
verbose=verbose,
background=background,
)
return enr
def _get_final_df(self, df):
# takes the df and populate the name and size of the found pathways
# we also sort by adjusted p-value
if len(df) == 0:
return df
df = df.copy()
df["name"] = [self.pathways[x]["NAME"] for x in df.Term]
df = df.sort_values("Adjusted P-value")
df["Adjusted P-value (-log10)"] = -np.log10(df["Adjusted P-value"])
df.reset_index(drop=True, inplace=True)
df = df.sort_values("Adjusted P-value")
df = df.rename({"Term": "pathway_id"}, axis=1)
df = df[df.columns]
df["significative"] = df["Adjusted P-value"] < self.padj_cutoff
return df
[docs]
def barplot(self, category, nmax=15):
self._check_category(category)
df = self.dfs[category].query("significative == True").head(nmax)
df = df.sort_values("Adjusted P-value (-log10)")
fig = px.bar(df, x="Adjusted P-value (-log10)", y="name", orientation="h")
# To have all labels displayed
if len(df) > 20:
fig.update_layout(height=800)
return fig
[docs]
def barplot_up_and_down(self, nmax=15):
import plotly.graph_objects as go
dfup = self.dfs["up"].query("significative == True").head(nmax)
dfdown = self.dfs["down"].query("significative == True").head(nmax)
names = sorted(list(set(list(dfdown["name"].values) + list(dfup["name"].values))))
names = names[::-1]
U = [dfup.query("name == @name")["size"].values[0] if name in dfup["name"].values else 0 for name in names]
D = [-dfdown.query("name == @name")["size"].values[0] if name in dfdown["name"].values else 0 for name in names]
fig = go.Figure()
fig.add_trace(go.Bar(y=names, x=U, marker_color="blue", orientation="h", name="Up"))
fig.add_trace(go.Bar(y=names, x=D, base=0, orientation="h", marker_color="red", name="Down"))
fig.update_layout(barmode="stack", xaxis_title="Number of Genes")
return fig
[docs]
def scatterplot(self, category, nmax=15):
self._check_category(category)
df = self.dfs[category].query("significative == True").head(nmax)
df.sort_values("Odds Ratio", inplace=True)
df = df.round({"Odds Ratio": 2, "Combined Score": 2})
fig = px.scatter(
df,
x="Odds Ratio",
y="name",
color="Combined Score",
size="size",
hover_data=["Combined Score", "Adjusted P-value", "Overlap"],
color_continuous_scale="Viridis",
)
# To have all labels displayed
if len(df) > 20:
fig.update_layout(height=800)
return fig
def _get_summary_pathway(self, pathway_ID, df, l2fc=0, padj=0.05):
genes = self.df_pathways.loc[pathway_ID]["GENE"]
df_down = df.query("padj<=@padj and log2FoldChange<=-@l2fc").copy()
df_up = df.query("padj<=@padj and log2FoldChange>=@l2fc").copy()
# for the color, we need to find the match between names and the annotation
# since names is suppose to have been populated with the annotaion, it should be
# found. no need for sanity checks.
df_down["Name"] = df_down[self.color_node_with_annotation]
df_up["Name"] = df_up[self.color_node_with_annotation]
# in principle, the KEGG pathays field 'GENE' is stored as
# "GENE":{key: value} and the values are the NAME, a semi column, and a description
# hence the split on ; herebelow:
# unfortunately, there are special cases to handle such as vibrio cholera (vc)
mapper = {}
if self.kegg.organism.startswith("vc"): # pragma: no cover
for k, v in genes.items():
# the value is just the description. Let us assume that the name is also the ID
mapper[k] = k
else:
for k, v in genes.items():
mapper[v.split(";")[0]] = k
# may not be used ?
self.genes = genes
self.df_down = df_down
self.df_up = df_up
summary_names = []
summary_keggids = []
summary_types = []
summary_pvalues = []
summary_fcs = []
if self.mapper is not None:
if "Name" not in df_down.columns:
df_down["Name"] = df_down["ID"]
Names = []
for index in df_down.index:
Names.append(self.mapper.loc[index]["name"][0])
df_down["Name"] = Names
if "Name" not in df_up.columns:
df_up["Name"] = df_up["ID"]
Names = []
for index in df_up.index:
Names.append(self.mapper.loc[index]["name"][0])
df_up["Name"] = Names
#
identifiers = []
new_mapper = {}
for name, kegg_id in mapper.items():
try:
identifier = self.mapper.query("name == @name")["name"].drop_duplicates().index[0]
identifiers.append(identifier)
new_mapper[identifier] = kegg_id
except:
logger.warning("Skipped {}(kegg ID {}). could not find mapping".format(name, kegg_id))
mapper = new_mapper
df_down["Name"] = df_down["Name"].str.lower()
df_up["Name"] = df_up["Name"].str.lower()
for name, kegg_id in mapper.items():
summary_names.append(name)
summary_keggids.append(kegg_id)
if name.lower() in {x.lower() for x in df_down.Name.fillna("undefined")}:
padj = -pylab.log10(df_down.query("Name==@name.lower()").padj.values[0])
fc = df_down.query("Name==@name.lower()").log2FoldChange.values[0]
summary_fcs.append(fc)
summary_pvalues.append(padj)
summary_types.append("-")
elif name.lower() in {x.lower() for x in df_up.Name.fillna("undefined")}:
padj = -pylab.log10(df_up.query("Name==@name.lower()").padj.values[0])
summary_pvalues.append(padj)
fc = df_up.query("Name==@name.lower()").log2FoldChange.values[0]
summary_fcs.append(fc)
summary_types.append("+")
else:
summary_pvalues.append(None)
summary_fcs.append(None)
summary_types.append("=")
summary = pd.DataFrame(
{
"type": summary_types,
"name": summary_names,
"padj": summary_pvalues,
"fc": summary_fcs,
"keggid": summary_keggids,
}
)
summary["description"] = [self.pathways[pathway_ID]["GENE"][x] for x in summary.keggid]
return summary
def _get_colors(self, summary):
colors = {}
# replace iterrows by faster method
for _, row in summary.iterrows():
row = row.fillna(0)
l2fc = row["fc"]
kegg_id = row["keggid"]
# Orange to Blue using _r means Blue to Orange where Blue is cold
# (negative fold change)
cmap = colormap.get_cmap.cmap_builder("PuOr_r")
if l2fc <= -4:
l2fc = -4
elif l2fc >= 4:
l2fc = 4
# to get a value in the range 0,1 expected by the colormap
l2fc = (l2fc + 4) / 8
color = colormap.rgb2hex(*cmap(l2fc)[0:3], normalised=True)
# this is to reduce URL length until a POST solution is found.
# all entries with low l2fc below |1| are ignored
# 0.625 <=> l2fc = 1
# 0.375 <=> l2fc = -1
if l2fc >= 0.625 or l2fc <= 0.375:
colors[kegg_id] = f"{color}"
# 0.125 <==> l2fc = -3 here color is very dark, text should be in another color
if l2fc < 0.125 or l2fc > 0.875:
colors[kegg_id] = f"{color},white"
# else: #FIXME for now, genes not in the annotation/rnadiff are also ignored
# to keep URL short
# colors[kegg_id] = f"{color}"
return colors
[docs]
def save_pathway(self, pathway_ID, df, scale=None, show=False, filename=None):
summary = self._get_summary_pathway(pathway_ID, df)
colors = self._get_colors(summary)
count_up = len(summary.query("type == '+'"))
count_down = len(summary.query("type == '-'"))
logger.info(
f"pathway {pathway_ID} total genes: {len(summary)}. Found {count_down} down-regulated and {count_up} up-regulated"
)
if filename is None:
filename = f"{pathway_ID}.png"
try:
self._render_pathway_image(pathway_ID, colors, filename)
except Exception as err:
logger.warning(f"Could not create image for {pathway_ID}: {err}")
return summary
def _render_pathway_image(self, pathway_id, colors, filename):
"""Download KEGG pathway static image and overlay gene colors using KGML coordinates.
Uses the KEGG REST API:
- ``https://rest.kegg.jp/get/{pathway_id}/image`` for the background PNG
- ``https://rest.kegg.jp/get/{pathway_id}/kgml`` for node bounding boxes
:param pathway_id: KEGG pathway ID (e.g. ``eco00010``)
:param colors: dict mapping KEGG gene entry IDs to hex color strings
(optionally ``"#rrggbb,white"`` for contrasting text, only the hex part is used)
:param filename: output PNG file path
"""
from matplotlib import colormaps
from PIL import Image, ImageDraw, ImageFont
try:
cmap = colormaps["PuOr_r"]
except Exception:
from matplotlib.cm import get_cmap
cmap = get_cmap("PuOr_r")
img_resp = requests.get(f"https://rest.kegg.jp/get/{pathway_id}/image")
img_resp.raise_for_status()
img = Image.open(BytesIO(img_resp.content)).convert("RGBA")
kgml_resp = requests.get(f"https://rest.kegg.jp/get/{pathway_id}/kgml")
kgml_resp.raise_for_status()
root = ET.fromstring(kgml_resp.text)
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
try:
font = ImageFont.load_default(size=9)
except TypeError:
font = ImageFont.load_default()
logger.info(f"colors dict: {len(colors)} entries, sample keys: {list(colors.keys())[:5]}")
gene_entries = root.findall("entry[@type='gene']")
logger.info(f"KGML gene entries: {len(gene_entries)}")
_RED_BORDER = (200, 0, 0, 255) # red outline for map/pathway-link boxes
_PURPLE_BORDER = (128, 0, 128, 255) # purple outline for compound/metabolite circles
def _draw_node(gx, gy, gw, gh, bg, text_col, label, shape="rectangle", outline=(0, 0, 0, 255), outline_width=1):
box = (gx - gw // 2, gy - gh // 2, gx + gw // 2, gy + gh // 2)
if shape == "circle":
draw.ellipse(box, fill=bg, outline=outline, width=outline_width)
elif shape == "roundrectangle":
draw.rounded_rectangle(box, radius=4, fill=bg, outline=outline, width=outline_width)
else:
draw.rectangle(box, fill=bg, outline=outline, width=outline_width)
if label:
lbbox = draw.textbbox((0, 0), label, font=font)
lw, lh = lbbox[2] - lbbox[0], lbbox[3] - lbbox[1]
draw.text((gx - lw // 2, gy - lh // 2), label, fill=text_col, font=font)
# Pass 1a: paint all gene nodes white so DE colors stand out
for entry in gene_entries:
gr = entry.find("graphics")
if gr is None:
continue
_draw_node(
int(gr.get("x", 0)),
int(gr.get("y", 0)),
int(gr.get("width", 46)),
int(gr.get("height", 17)),
bg=(255, 255, 255, 255),
text_col=(0, 0, 0, 255),
label=gr.get("name", "").split(",")[0].strip(),
)
# Pass 1b: paint ortholog boxes and compound circles green to restore visual variety
for etype in ("ortholog", "compound"):
for entry in root.findall(f"entry[@type='{etype}']"):
gr = entry.find("graphics")
if gr is None:
continue
shape = gr.get("type", "rectangle")
label = "" # original text already in the KEGG image; avoid overwriting
_draw_node(
int(gr.get("x", 0)),
int(gr.get("y", 0)),
int(gr.get("width", 8)),
int(gr.get("height", 8)),
bg=(0, 0, 0, 0), # transparent fill
text_col=(0, 0, 0, 255),
label=label,
shape=shape,
outline=_PURPLE_BORDER,
outline_width=2,
)
# Pass 1c: map (pathway link) boxes — red border only, no fill so original text shows through
for entry in root.findall("entry[@type='map']"):
gr = entry.find("graphics")
if gr is None:
continue
_draw_node(
int(gr.get("x", 0)),
int(gr.get("y", 0)),
int(gr.get("width", 100)),
int(gr.get("height", 25)),
bg=(0, 0, 0, 0), # transparent fill
text_col=(0, 0, 0, 255),
label="",
shape=gr.get("type", "roundrectangle"),
outline=_RED_BORDER,
outline_width=3,
)
# Pass 2: color DE genes
colored = 0
for entry in gene_entries:
gene_ids = [gid.split(":")[-1] for gid in entry.get("name", "").split()]
gr = entry.find("graphics")
if gr is None:
continue
for gid in gene_ids:
if gid in colors:
color_str = colors[gid]
parts = color_str.split(",")
hex_color = parts[0]
text_col = (255, 255, 255, 255) if len(parts) > 1 else (0, 0, 0, 255)
cr, cg, cb = int(hex_color[1:3], 16), int(hex_color[3:5], 16), int(hex_color[5:7], 16)
_draw_node(
int(gr.get("x", 0)),
int(gr.get("y", 0)),
int(gr.get("width", 46)),
int(gr.get("height", 17)),
bg=(cr, cg, cb, 255),
text_col=text_col,
label=gr.get("name", "").split(",")[0].strip(),
)
colored += 1
break
# Pass 3: draw legend
self._draw_log2fc_legend(draw, img.size, font, cmap)
logger.info(f"Colored {colored}/{len(gene_entries)} gene nodes in {pathway_id}")
Image.alpha_composite(img, overlay).convert("RGB").save(filename)
def _draw_log2fc_legend(self, draw, img_size, font, cmap):
"""Draw a log2FC color gradient legend in the bottom-right corner of the overlay."""
from PIL import ImageFont
# Try to load a TrueType font for crisp rendering; fall back to default bitmap font
_FONT_PATHS = [
"/usr/share/fonts/dejavu-sans-fonts/DejaVuSans.ttf",
"/usr/share/fonts/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/liberation-sans/LiberationSans-Regular.ttf",
"/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
"/usr/share/fonts/google-noto/NotoSans-Regular.ttf",
]
legend_font = None
for fp in _FONT_PATHS:
try:
legend_font = ImageFont.truetype(fp, 11)
break
except (IOError, OSError):
continue
if legend_font is None:
legend_font = font # bitmap fallback
bar_w, bar_h = 200, 16
pad = 10
sample_bbox = draw.textbbox((0, 0), "0", font=legend_font)
th = sample_bbox[3] - sample_bbox[1] # text height
# Layout: title / gap / bar / gap / tick labels
content_h = th + 4 + bar_h + 4 + th
img_w, img_h = img_size
lx = img_w - bar_w - pad - 10 # left edge of bar
ly = pad + 8 # top of title — top-right corner
# Semi-transparent white background box
draw.rectangle(
(lx - 8, ly - 5, lx + bar_w + 8, ly + content_h + 5),
fill=(255, 255, 255, 220),
outline=(100, 100, 100, 255),
)
# Title — use plain "log2FC" to avoid Unicode rendering issues with bitmap fonts
title = "log2FC"
tbbox = draw.textbbox((0, 0), title, font=legend_font)
tw = tbbox[2] - tbbox[0]
draw.text((lx + bar_w // 2 - tw // 2, ly), title, fill=(40, 40, 40, 255), font=legend_font)
# Gradient bar
bar_y = ly + th + 4
for i in range(bar_w):
rgba = cmap(i / (bar_w - 1))
draw.line(
[(lx + i, bar_y), (lx + i, bar_y + bar_h - 1)],
fill=(int(rgba[0] * 255), int(rgba[1] * 255), int(rgba[2] * 255), 255),
)
draw.rectangle([lx, bar_y, lx + bar_w, bar_y + bar_h], outline=(80, 80, 80, 255))
# Tick marks and labels below bar
tick_y = bar_y + bar_h + 4
for l2fc_val, tick_label in [(-4, "-4"), (-1, "-1"), (0, "0"), (1, "+1"), (4, "+4")]:
pos = int((l2fc_val + 4) / 8 * (bar_w - 1))
tx = lx + pos
draw.line([(tx, bar_y + bar_h), (tx, bar_y + bar_h + 3)], fill=(80, 80, 80, 255))
lbbox = draw.textbbox((0, 0), tick_label, font=legend_font)
lw = lbbox[2] - lbbox[0]
draw.text((tx - lw // 2, tick_y), tick_label, fill=(40, 40, 40, 255), font=legend_font)
[docs]
def save_significant_pathways(self, category, nmax=20, tag="", outdir="."): # pragma: no cover
"""category should be up, down or all"""
# select the relevant pathways
df = self.dfs[category].query("significative == True")
logger.warning("Found {} pathways to save".format(len(df)))
if len(df) > nmax:
logger.warning("Restricted pathways to {}".format(nmax))
df = df.head(nmax)
logger.info("saving {} deregulated pathways".format(len(df)))
summaries = {}
for ID in df["pathway_id"]:
summary = self.save_pathway(ID, df, filename=(Path(outdir) / f"{ID}_{category}.png"))
summaries[ID] = summary
return summaries
[docs]
def find_pathways_by_gene(self, gene_name):
"""Returns pathways that contain the gene name
::
ke.find_pathways_by_gene("ysgA")
"""
# we are going to search for the gene name and lower case onyl
candidates = [gene_name, gene_name.lower()]
found_paths = []
for key in self.pathways.keys():
if "GENE" in self.pathways[key]:
genes = set([x.split(";")[0] for x in self.pathways[key]["GENE"].values()])
for candidate in candidates:
if candidate in genes:
found_paths.append(key)
return list(set(found_paths))
[docs]
def save_project(self, tag, outdir="."):
"""Save tables and visualisations of the complete enrichmment analysis."""
outdir = Path(outdir)
outdir.mkdir(parents=True, exist_ok=True)
for category in self.gene_lists.keys():
common_out = Path(f"{tag}_kegg_gsea_{category}_degs")
df = self.dfs[category]
if not df.empty:
df.to_csv(outdir / (common_out.name + ".csv"))
df.query("significative == True").to_csv(outdir / (common_out.name + "_significant.csv"))
self.barplot(category).write_image(outdir / (common_out.name + "_barplot.png"))
self.scatterplot(category).write_image(outdir / (common_out.name + "_scatterplot.png"))
# In case of no enrichment results, create empty files stating so
else:
(outdir / (common_out.name + "_NO_RESULTS")).touch()