Source code for sequana.taxonomy

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2020 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
import ftplib
import gzip
import os
import re
import shutil
import tarfile
import tempfile
from collections import Counter
from functools import wraps
from pathlib import Path

import colorlog
from tqdm import tqdm

from sequana import sequana_config_path
from sequana.lazy import pandas as pd
from sequana.misc import wget
from sequana.utils.singleton import Singleton

logger = colorlog.getLogger(__name__)


__all__ = ["NCBITaxonomy", "Taxonomy"]


[docs]class NCBITaxonomy: """ """ def __init__(self, names, nodes): """ :param names: can be a local file or URL :param nodes: can be a local file or URL """ # Path to existing files logger.info("Reading input files") self.names = names self.nodes = nodes # First, the nodes if os.path.exists(nodes): self.df_nodes = pd.read_csv(nodes, sep="|", header=None) else: with tempfile.TemporaryFile() as fout_nodes: logger.info("Loading nodes.dmp from an URL {}".format(nodes)) wget(nodes, fout_nodes.name) self.df_nodes = pd.read_csv(fout_nodes.name, sep="|", header=None) for i, _type in enumerate(self.df_nodes.dtypes): if _type == "O": self.df_nodes[i] = self.df_nodes[i].str.strip("\t") """ tax_id -- node id in GenBank taxonomy database parent tax_id -- parent node id in GenBank taxonomy database rank -- rank of this node (superkingdom, kingdom, ...) embl code -- locus-name prefix; not unique division id -- see division.dmp file inherited div flag (1 or 0) -- 1 if node inherits division from parent genetic code id -- see gencode.dmp file inherited GC flag (1 or 0) -- 1 if node inherits genetic code from parent mitochondrial genetic code id -- see gencode.dmp file inherited MGC flag (1 or 0) -- 1 if node inherits mitochondrial gencode from parent GenBank hidden flag (1 or 0) -- 1 if name is suppressed in GenBank entry lineage hidden subtree root flag (1 or 0) -- 1 if this subtree has no sequence data yet comments -- free-text comments and citations """ try: self.df_nodes.columns = [ "taxid", "parent", "rank", 4, 5, "gc_id", "mt_id", 7, 8, 9, 10, 11, 12, 13, ] del self.df_nodes[13] except Exception: # pragma: no cover self.df_nodes.columns = ["taxid", "parent", "rank", 4, 5] del self.df_nodes[5] # make sure they are ordered by taxon ID self.df_nodes.sort_values("taxid", inplace=True) self.df_nodes.set_index("taxid", inplace=True) # now we read the names if os.path.exists(names): self.df_names = pd.read_csv(names, sep="|", header=None) else: with tempfile.TemporaryFile() as fout_names: logger.info(f"Loading names.dmp from an URL {names}") wget(names, fout_names.name) self.df_names = pd.read_csv(fout_names.name, sep="|", header=None) for i, _type in enumerate(self.df_names.dtypes): if _type == "O": self.df_names[i] = self.df_names[i].str.strip("\t") del self.df_names[4] self.df_names.columns = ["taxid", "name", "unique_name", "key"] self.df_names.set_index("taxid", inplace=True)
[docs] def create_taxonomy_file(self, filename="taxonomy.csv.gz"): filename = Path(filename) logger.info("Please wait while creating the output file. This may take a few minutes") if filename.suffixes != [".csv", ".gz"]: raise ValueError(f"{filename} extension must be '.csv.gz'") count = 0 df_names = self.df_names.query("key == 'scientific name'").copy() # first we create the CSV file logger.info("Creating CSV file") with filename.with_suffix("").open(mode="w") as fout: fout.write("id,parent,rank,scientific_name\n") for taxid in tqdm(self.df_nodes.index): row = self.df_nodes.loc[taxid] sc = df_names.loc[taxid, "name"] fout.write(f"{taxid},{row.parent},{row['rank']},{sc}\n") # second, we gzip it. input must be read as binary logger.info("Compressing CSV file") with filename.with_suffix("").open(mode="rb") as fin: bindata = fin.read() with gzip.open(filename, "wb") as f: f.write(bindata)
def load_taxons(f): @wraps(f) def wrapper(*args, **kargs): if len(args[0].records) == 0: args[0].load_records() return f(*args, **kargs) return wrapper
[docs]class Taxonomy(metaclass=Singleton): """This class should ease the retrieval and manipulation of Taxons There are many resources to retrieve information about a Taxon. For instance, from BioServices, one can use UniProt, Ensembl, or EUtils. This is convenient to retrieve a Taxon (see :meth:`fetch_by_name` and :meth:`fetch_by_id` that rely on Ensembl). However, you can also download a flat file from EBI ftp server, which stores a set or records (2.8M (april 2020). Note that the Ensembl database does not seem to be as up to date as the flat files but entries contain more information. for instance taxon 2 is in the flat file but not available through the :meth:`fetch_by_id`, which uses ensembl. So, you may access to a taxon in 2 different ways getting differnt dictionary. However, 3 keys are common (id, parent, scientific_name) :: >>> t = taxonomy.Taxonomy() >>> t.fetch_by_id(9606) # Get a dictionary from Ensembl >>> t.records[9606] # or just try with the get >>> t[9606] >>> t.get_lineage(9606) Possible ranks are various. You may have biotype, clade, etc ub generally speaking ranks are about lineage. For a given rank, e.g. kingdom, you may have sub division such as superkingdom and subkingdom. order has even more subdivisions (infra, parv, sub, super) Since version 0.8.3 we use NCBI that is updated more often than the ebi ftp according to their README. ftp://ncbi.nlm.nih.gov/pub/taxonomy/ We use Ensembl to retrieve various information regarding taxons. """ def __init__(self, filename=None, verbose=True, online=True, source="ncbi"): """.. rubric:: constructor :param offline: if you do not have internet, the connction to Ensembl may hang for a while and fail. If so, set **offline** to True :param from: download taxonomy databases from ncbi """ assert source in ["ncbi", "ena"] self.source = source if online: from bioservices import Ensembl, EUtils self.ensembl = Ensembl(verbose=False) self.records = {} # empty to start with. self.verbose = verbose if filename is None: self._dbname = "taxonomy.csv.gz" self.database = sequana_config_path + os.sep + self._dbname else: assert str(filename).endswith(".csv.gz") self.database = filename self._custom_db = sequana_config_path self._custom_db += "/taxonomy/taxonomy_custom.csv.gz"
[docs] def download_taxonomic_file(self, overwrite=False): # pragma: no cover """Loads entire flat file from NCBI Do not overwrite the file by default. """ if os.path.exists(self.database) and overwrite is False: logger.info(f"Found {self.database} file in sequana your path {sequana_config_path}") return if self.source == "ena": url = "ftp.ebi.ac.uk" else: url = "ftp.ncbi.nlm.nih.gov" self.ftp = ftplib.FTP(url) self.ftp.login() if self.source == "ena": # for the EBI ftp only: self.ftp.cwd('databases') self.ftp.cwd("pub") self.ftp.cwd("databases") self.ftp.cwd("taxonomy") logger.warning("Downloading and saving data in {self.database}") self.ftp.retrbinary("RETR taxonomy.dat", open(self.database, "wb").write) ftp.close() else: self.ftp.cwd("pub") self.ftp.cwd("taxonomy") logger.warning(f"Downloading and saving data in {self.database}") with tempfile.TemporaryDirectory() as tmpdir: filename = tmpdir + os.sep + "taxdump.tar.gz" self.ftp.retrbinary("RETR taxdump.tar.gz", open(filename, "wb").write) tf = tarfile.open(filename) assert "nodes.dmp" in tf.getnames() assert "names.dmp" in tf.getnames() logger.info("Extracting nodes.dmp") tf.extract("nodes.dmp", tmpdir) logger.info("Extracting names.dmp") tf.extract("names.dmp", tmpdir) logger.info("Extracting and building local database") ncbi = NCBITaxonomy(tmpdir + os.sep + "names.dmp", tmpdir + os.sep + "nodes.dmp") tmp_taxfile = tmpdir + os.sep + "taxonomy.csv.gz" ncbi.create_taxonomy_file(tmp_taxfile) shutil.move(tmp_taxfile, self.database) self.ftp.close()
[docs] def load_records(self, overwrite=False): """Load a flat file and store records in :attr:`records`""" self.download_taxonomic_file(overwrite=overwrite) # for usecols=range(4) cause last column may contain extra commas try: self.records = pd.read_csv(self.database, index_col=0, compression="gzip", usecols=range(4)) except gzip.BadGzipFile: logger.error(f"input file {self.database} should be gzipped") raise gzip.BadGzipFile
[docs] def load_records_from_csv(self, filename): df = pd.read_csv(filename)
[docs] def find_taxon(self, taxid, mode="ncbi"): taxid = str(taxid) if mode == "ncbi": from bioservices import EUtils self.eutils = EUtils(verbose=False) res = self.eutils.taxonomy_summary(taxid) else: res = self.ensembl.get_taxonomy_by_id(taxid) return res
[docs] @load_taxons def fetch_by_id(self, taxon): """Search for a taxon by identifier :return; a dictionary. :: >>> ret = s.search_by_id('10090') >>> ret['name'] 'Mus Musculus' """ res = self.ensembl.get_taxonomy_by_id(taxon) return res
[docs] @load_taxons def fetch_by_name(self, name): """Search a taxon by its name. :param str name: name of an organism. SQL cards possible e.g., _ and % characters. :return: a list of possible matches. Each item being a dictionary. :: >>> ret = s.search_by_name('Mus Musculus') >>> ret[0]['id'] 10090 """ res = self.ensembl.get_taxonomy_by_name(name) return res
[docs] @load_taxons def get_lineage(self, taxon): """Get lineage of a taxon :param int taxon: a known taxon :return: list containing the lineage """ # important to reinit the second argument to [] lineage = self._gen_lineage_and_rank(taxon, []) return [x[0] for x in lineage]
@load_taxons def _gen_lineage_and_rank(self, taxon, lineage_rank=[]): # recursively filling the lineage argument try: record = self.records.loc[taxon] except: return [("unknown_taxon:{}".format(taxon), "no rank")] parent = int(record["parent"]) if taxon == 1: lineage_rank.append((record["scientific_name"], record["rank"])) lineage_rank.reverse() return lineage_rank else: lineage_rank.append((record["scientific_name"], record["rank"])) return self._gen_lineage_and_rank(parent, lineage_rank)
[docs] @load_taxons def get_parent_taxon(self, taxon): return self.records.loc[taxon, "parent"]
[docs] @load_taxons def get_parent_name(self, taxon): taxid = self.get_parent_taxon(taxon) return self.records.loc[taxid, "scientific_name"]
[docs] @load_taxons def get_lineage_and_rank(self, taxon): """Get lineage and rank of a taxon :param int taxon: :return: a list of tuples. Each tuple is a pair of taxon name/rank The list is the lineage for to the input taxon. """ return self._gen_lineage_and_rank(taxon, [])
[docs] @load_taxons def get_ranks(self): return self.records.groupby("rank").count().parent.sort_values()
[docs] def get_records_for_given_rank(self, rank): return self.records.query("rank == @rank")
[docs] @load_taxons def get_names_for_given_rank(self, rank): return self.records.query("rank == @rank").scientific_name.unique()
[docs] @load_taxons def get_children(self, taxon): return self.records.query("parent == @taxon").index.values
@load_taxons def __len__(self): return len(self.records) def __getitem__(self, taxon): return self.records.loc[taxon]