Source code for sequana.enrichment.quickgo
#
# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import shutil
import colorlog
import networkx as nx
from sequana.datatools import sequana_data
from sequana.lazy import bioservices
from sequana.lazy import pandas as pd
logger = colorlog.getLogger(__name__)
__all__ = ["QuickGOGraph"]
[docs]
class QuickGOGraph:
"""Used by :class:`PantherEnrichment` and :class:`UniprotEnrichment`"""
def __init__(self):
self._ancestors = {
"MF": "GO:0003674",
"CC": "GO:0005575",
"BP": "GO:0008150",
"SLIM_MF": "GO:0003674",
"SLIM_CC": "GO:0005575",
"SLIM_BP": "GO:0008150",
}
self.quickgo = bioservices.quickgo.QuickGO(cache=True)
self.quickgo.requests_per_sec = 10
self.quickgo.services.settings.TIMEOUT = 120
self.obsolets = []
def _get_graph(self, go_ids, ontologies=None):
# Here we filter the data to keep only the relevant go terms as shown in
# panther pie chart
gg = nx.DiGraph()
if isinstance(ontologies, str): # pragma: no cover
ontologies = [ontologies]
# for pantherDB only
for x in ontologies:
if "PROTEIN" in x or "PATHWAY" in x: # pragma: no cover
return {}
ancestors = [self._ancestors[x] for x in ontologies]
go_ids = [x for x in go_ids if x not in ancestors]
# levels = []
renamed_ids = {}
obsolets = []
logger.info(f"Retrieving info for {len(go_ids)} enriched go terms")
annotations = {}
for i, go_id in enumerate(go_ids):
# retrieve info about a given GO ID
info = self.quickgo.get_go_terms(go_id)
annotations[go_id] = info
# Some go terms may be renamed.
if info == 400:
logger.warning(f"{go_id} not valid GO term. skipped")
continue
elif info[0]["id"] != go_id: # pragma: no cover
_id = info[0]["id"]
logger.warning("changed {} to {}".format(go_id, _id))
annotations[_id] = info
renamed_ids[go_id] = _id
else:
_id = go_id
# Some go terms may be obsolete
if info[0]["isObsolete"] is True: # pragma: no cover
logger.warning("Skipping obsolete go terms: {}".format(go_id))
obsolets.append(go_id)
continue
# now figure out the distance to main ancestor
# we can try several times
# if _id != self.ancestors[ontology]:
for ancestor in ancestors:
edges = self.quickgo.get_go_paths(_id, ancestor)
if edges == 400:
logger.warning(f"Could not retrieve {_id} to {ancestor}")
continue
if edges["numberOfHits"] == 0:
continue
if len(edges["results"]) >= 1:
for path in edges["results"]:
for edge in path:
gg.add_edge(edge["child"], edge["parent"])
else:
print(_id, edges["results"])
self.obsolets += obsolets
self.annotations = annotations
self.graph = gg
all_paths = {}
for ancestor in ancestors:
if ancestor not in gg:
continue
paths = nx.shortest_path_length(gg, target=ancestor)
for obsolet in obsolets:
paths[obsolet] = 100
all_paths[ancestor] = paths
for key in all_paths.keys():
for old, new in renamed_ids.items():
if new in all_paths[key]:
all_paths[key][old] = all_paths[key][new]
return all_paths
[docs]
def save_chart(self, data, filename="chart.png"):
"""
pe = PantherEnrichment("B4052-V1.T1vsT0.complete.xls", fc_threshold=5,
padj_threshold=0.05)
df = pe.plot_go_terms("down", log=True, compute_levels=False)
pe.save_chart(df, "chart.png")
"""
# if dataframe, get 'id' column, otherwise expect a list or string of go
# terms separated by commas
if isinstance(data, list):
goids = ",".join(data)
elif isinstance(data, str):
goids = data
elif "id" in data:
goids = ",".join(list(data["id"].values))
try:
goids = [x for x in goids.split(",") if x not in self.obsolets]
except:
logger.error("Could not save chart")
goids = ",".join(goids)
# remove obsolets
try:
res = self.quickgo.get_go_chart(goids)
if res is None:
raise Exception
with open(filename, "wb") as fout:
fout.write(res.content)
except:
logger.warning("Could not create the GO chart. Maybe too many go IDs ({})".format(len(goids.split(","))))
no_data = sequana_data("no_data.png")
shutil.copy(no_data, filename)
[docs]
def get_go_description(self, go_ids):
obsolets = []
descriptions = []
annotations = {}
for i, go_id in enumerate(go_ids):
# retrieve info about a given GO ID
info = self.quickgo.get_go_terms(go_id)
annotations[go_id] = info
# Some go terms may be renamed.
if info[0]["id"] != go_id:
_id = info[0]["id"]
logger.warning("changed {} to {}".format(go_id, _id))
annotations[_id] = info
# renamed_ids[go_id] = _id
else:
_id = go_id
# Some go terms may be obsolete
if info[0]["isObsolete"] is True:
logger.warning("Skipping obsolete go terms: {}".format(go_id))
obsolets.append(go_id)
descriptions.append(info[0]["name"])
return descriptions