#  This file is part of Sequana software
#  Copyright (c) 2016-2022 - Sequana Development Team
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#  website:
#  documentation:
import os
from collections import defaultdict

import colorlog

from sequana.errors import BadFileFormat

logger = colorlog.getLogger(__name__)

from sequana.lazy import pandas as pd
from sequana.lazy import pysam

__all__ = ["GFF3"]

[docs]class GFF3: """Read a GFF file, version 3 .. seealso:: :: g = GFF3(filename) # first call is slow g.df # print info about the different feature types g.features # prints info about duplicated attributes: g.get_duplicated_attributes_per_genetic_type(self) On eukaryotes, the reading and processing of the GFF may take a while. On prokaryotes, it should be pretty fast (a few seconds). To speed up the eukaryotes case, we skip the processing biological_regions (50% of the data in mouse). """ def __init__(self, filename, skip_types=["biological_region"]): self.filename = filename self.skip_types = skip_types self._df = None self._features = None self._attributes = None def _get_features(self): """Extract unique GFF feature types This is equivalent to awk '{print $3}' | sort | uniq to extract unique GFF types. No sanity check, this is suppose to be fast. Less than a few seconds for mammals. """ # This is used by the rnaseq pipeline and should be kept fast count = 0 if self._features: features = self._features else: features = set() with open(self.filename, "r") as reader: for line in reader: # Skip metadata and comments if line.startswith("#"): continue # Skip empty lines if not line.strip(): # pragma: no cover continue split = line.rstrip().split("\t") L = len(split) if L == 9: features.add(split[2]) count += 1 # FIXME may be overwritten by get_df self._features = features return sorted(features) features = property(_get_features)
[docs] def get_attributes(self, feature=None, sep="; "): """Return list of possible attributes If feature is provided, must be valid and used as a filter to keep only entries for that feature. ~10 seconds on mouse genome GFF file. sep must be "; " with extra space to cope with special cases where an attribute has several entries separated by ; e.g.: BP="GO:0006412"; MF="GO:0005524;GO:0004004;" """ # This is used by the rnaseq pipeline and should be kept fast if self._attributes: return self._attributes attributes = set() with open(self.filename, "r") as reader: for line in reader: # stop once FASTA starts if line.startswith("##FASTA"): break # Skip metadata and comments and empty lines if line.startswith("#") or not line.strip(): # pragma: no cover continue split = line.rstrip().split("\t") if feature and split[2] != feature: continue for item in split[8].split(sep): item = item.strip() if len(item) == 0: # empty final string #pragma: no cover continue # Here, some GFF use = some others use spaces... very # annoying. if "=" in item: item = item.split("=")[0].strip() else: item = item.split()[0].strip() attributes.add(item) self._attributes = sorted(attributes) return self._attributes
attributes = property(get_attributes)
[docs] def read(self): """Read annotations one by one creating a generator""" self._features = set() with open(self.filename, "r") as reader: line = None for line in reader: # stop once FASTA starts if line.startswith("##FASTA"): break # Skip metadata and comments if line.startswith("#"): continue # Skip empty lines if not line.strip(): continue # Format checking. skip rows that do not have 9 columns since # it is comments or fasta sequence split = line.rstrip().split("\t") if len(split) != 9: continue # skipping biological_region saves lots of time if split[2].strip() in self.skip_types: continue # we process main fields and attributes. This takes most of the # time self._features.add(split[2]) # the main first 8 fields annotation = self._process_main_fields(split[0:8]) # all attributes as key/values added to all annotations. annotation["attributes"] = self._process_attributes(split[8]) annotation.update(annotation["attributes"]) yield annotation
def _get_df(self): if self._df is not None: return self._df"Processing GFF file. 1. Reading the input file. Please be patient") # ~ 30 seconds on mouse df = pd.DataFrame( self._df = df return self._df df = property(_get_df)
[docs] def get_duplicated_attributes_per_genetic_type(self): results = {} for typ in self.features: results[typ] = {} print("{}: {} entries".format(typ, len(self.df.query("genetic_type==@typ")))) for attr in sorted(self.attributes): L = len(self.df.query("genetic_type==@typ")[attr].dropna()) dups = self.df.query("genetic_type==@typ")[attr].dropna().duplicated().sum() if dups > 0: print(f" - {attr}:{dups} duplicates ({L} in total)") else: print(f" - {attr}:No duplicates ({L} in total)") results[typ][attr] = dups df = pd.DataFrame(results) return df
[docs] def transcript_to_gene_mapping(self, feature="all", attribute="transcript_id"): """ :param feature: not used yet :param attribute: the attribute to be usde. should be transcript_id for salmon compatability but could use soething different. """ # entries may have transcripts set to None transcripts = [x for x in self.df[attribute] if x] # retrieve only the data with transcript id defined transcripts_df = self.df.set_index(attribute) transcripts_df = transcripts_df.loc[transcripts] transcripts_df = transcripts_df.reset_index() results = {} results2 = defaultdict(list) for _id, data in transcripts_df[["ID", "Parent"]].iterrows(): results[data.values[0]] = data.values[1] results2[data.values[1]].append(data.values[0]) return results, results2
[docs] def save_annotation_to_csv(self, filename="annotations.csv"): self.df.to_csv(filename, index=False)
[docs] def read_and_save_selected_features(self, outfile, features=["gene"]): count = 0 with open(self.filename, "r") as fin, open(outfile, "w") as fout: for line in fin: # stop once FASTA starts if line.startswith("##FASTA"): break split = line.rstrip().split("\t") # skipping biological_region saves lots of time try: if split[2].strip() in features: fout.write(line) count += 1 except IndexError: pass"Found {count} entries and saved into {outfile}")
[docs] def save_gff_filtered(self, filename="filtered.gff", features=["gene"], replace_seqid=None): """ save_gff_filtered("test.gff", features=['misc_RNA', 'rRNA'], replace_seqid='locus_tag') """ with open(filename, "w") as fout: fout.write("#gff-version 3\n#Custom gff from sequana\n") count = 0 from collections import defaultdict counter = defaultdict(int) for x, y in self.df.iterrows(): if y["genetic_type"] in features: if replace_seqid: y["seqid"] = y["attributes"][replace_seqid] fout.write( "{}\tfeature\tcustom\t{}\t{}\t.\t{}\t{}\t{}\n".format( y["seqid"], y["start"], y["stop"], y["strand"], y["phase"], ";".join([f"{a}={b}" for a, b in y["attributes"].items()]), ) ) counter[y["genetic_type"]] += 1 count += 1"# kept {} entries".format(count)) for feature in features: counter[feature] += 0"# {}: {} entries".format(feature, counter[feature]))
def _process_main_fields(self, fields): annotation = {} # Unique id of the sequence annotation["seqid"] = fields[0] # Optional source if fields[1] != ".": annotation["source"] = fields[1] # Annotation type annotation["genetic_type"] = fields[2] # Start and stop annotation["start"] = int(fields[3]) annotation["stop"] = int(fields[4]) # Optional score field if fields[5] != ".": annotation["score"] = float(fields[5]) # Strand if fields[6] == "+" or fields[6] == "-" or fields[6] == "?" or fields[6] == ".": annotation["strand"] = fields[6] # Phase if fields[7] != ".": annotation["phase"] = int(fields[7]) % 3 else: annotation["phase"] = fields[7] return annotation def _process_attributes(self, text): attributes = {} # some GFF/GTF use different conventions: # - "ID=1;DB=2" this is the standard # - "ID 1;DB 2" some gtf uses spaces but should be fine # - "ID=1;DB=2;Note=some text with ; character " worst case scenario # In the later case, there is no easy way to fix this. I believe this is # a non-compatible GFF file. # we first figure out whether this is a = or space convention sep = None text = text.strip() for x in text: if x in ["=", " "]: sep = x break if sep is None: logger.error(f"Your GFF/GTF does not seem to be correct ({text}). Expected a = or space as separator") sys.exit(1) # ugly but fast replacement. text = text.replace("%09", "\t").replace("%0A", "\n").replace("%0D", "\r") text = text.replace("%25", "%").replace("%3D", "=").replace("%26", "&").replace("%2C", ",") text = text.replace("%28", "(").replace("%29", ")") # brackets # we do not convert the special %3B into ; or %20 into spaces for now # split into mutliple attributes # GTF ends in ; so we need to strip it split = text.rstrip(";") # some GFF uses ; and a space as separator but usually, it is just ; with no surrounding spaces. if "; " in split: split = text.split("; ") else: split = text.split(";") for attr in split: # make sure there is no trailing spaces attr = attr.strip() # find the separator. Sometimes it is spaces, sometimes a = sign idx = attr.find(sep) value = attr[idx + 1 :] # replace " by nothing (GTF case) attributes[attr[:idx]] = value.replace('"', "").replace("%3B", ";").replace("%20", " ") return attributes
[docs] def to_gtf(self, output_filename="test.gtf", mapper={"ID": "{}_id"}): # experimental . used by rnaseq pipeline to convert input gff to gtf, # used by RNA-seqc tools fout = open(output_filename, "w") with open(self.filename, "r") as reader: for line in reader: # stop once FASTA starts if line.startswith("##FASTA"): break # Skip metadata and comments if line.startswith("#"): fout.write(line) continue # Skip empty lines if not line.strip(): # pragma: no cover continue split = line.rstrip().split("\t") L = len(split) name = split[0] source = split[1] feature = split[2] start = split[3] stop = split[4] a = split[5] strand = split[6] b = split[7] attributes = split[8] new_attributes = "" for item in attributes.split(";"): try: key, value = item.split("=") if key in mapper.keys(): key = mapper[key].format(feature) new_attributes += '{} "{}";'.format(key, value) except: pass # Here we need some cooking due to gtf/gff clumsy conventiom # 1. looks like attributes' values must have "" surrounding their content # 2. if feature is e.g. exon, then gtf expects the exon_id attribute msg = f"{name}\t{source}\t{feature}\t{start}\t{stop}\t{a}\t{strand}\t{b}\t{new_attributes}\n" fout.write(msg) fout.close()
[docs] def to_fasta(self, ref_fasta, fasta_out): """From a genomic FASTA file ref_fasta, extract regions stored in the gff. Export the corresponding regions to a FASTA file fasta_out. :param ref_fasta: path to genomic FASTA file to extract rRNA regions from. :param fasta_out: path to FASTA file where rRNA regions will be exported to. """ count = 0 with pysam.Fastafile(ref_fasta) as fas: with open(fasta_out, "w") as fas_out: for region in self.df.to_dict("records"): name = f"{region['seqid']}:{region['start']}-{region['stop']}" seq_record = f">{name}\n{fas.fetch(region=name)}\n" fas_out.write(seq_record) count += 1"{count} regions were extracted from '{ref_fasta}' to '{fasta_out}'")
[docs] def to_bed(self, output_filename, attribute_name): """Experimental export to BED format to be used with rseqc scripts :param str attribute_name: the attribute_name name to be found in the GFF attributes """ # rseqc expects a BED12 file. The format is not clear from the # documentation. The first 6 columns are clear (e.g., chromosome name # positions, etc) but last one are not. From the examples, it should be # block sizes, starts of the transcript but they recommend bedops # gff2bed tool that do not extract such information. For now, for # prokaryotes, the block sizes version have been implemented and worked # on a leptospira example. fout = open(output_filename, "w") with open(self.filename, "r") as reader: for line in reader: # stop once FASTA starts if line.startswith("##FASTA"): break # Skip metadata and comments if line.startswith("#"): continue # Skip empty lines if not line.strip(): # pragma: no cover continue # a line is read and split on tabulations split = line.rstrip().split("\t") chrom_name = split[0] # source = split[1] #keep this code commented for book-keeping feature = split[2] gene_start = int(split[3]) gene_stop = int(split[4]) cds_start = gene_start # for prokaryotes, for now cds=gene cds_stop = gene_stop a = split[5] # not used apparently strand = split[6] b = split[7] # not used apparently attributes = split[8] # may be required for eukaryotes score = 0 # in examples for rseqc, the score is always zero unknown = 0 # a field not documented in rseqc block_count = 1 block_sizes = f"{cds_stop-cds_start}," # fixme +1 ? block_starts = "0," # commas are important at the end. no spaces # according to rseqc ( code , the expected bed format is # chrom, chrom_start, chrom_end, gene name, score, strand, cdsStart, cdsEnd, # blockcount, blocksizes, blockstarts where blocksizes and blocks # starts are comma separated list. Here is a line example on # human: # chr1 1676716 1678658 NM_001145277 0 + 1676725 1678549 0 4 182,101,105, 0,2960,7198 # for now only the feature 'gene' is implemented. We can # generalize this later on. if feature == "gene": gene_name = None for item in attributes.split(";"): if item.split("=")[0].strip() == attribute_name: gene_name = item.split("=")[-1] assert gene_name # should be the cds start/stop but for now we use the gene # info start/stop msg = f"{chrom_name}\t{gene_start}\t{gene_stop}\t{gene_name}\t{score}\t{strand}\t{cds_start}\t{cds_stop}\t{unknown}\t{block_count}\t{block_sizes}\t{block_starts}\n" fout.write(msg) fout.close()
[docs] def clean_gff_line_special_characters(self, text): """Simple leaner of gff lines that may contain special characters""" text = text.replace("%09", "\t").replace("%0A", "\n").replace("%0D", "\r") text = text.replace("%25", "%").replace("%3D", "=").replace("%26", "&").replace("%2C", ",") text = text.replace("%28", "(").replace("%29", ")") # brackets return text
[docs] def get_simplify_dataframe(self): """Method to simplify the gff and keep only the most informative features.""" # Set weight for genetic type to sort them and keep only the most informative if self.df.empty: raise BadFileFormat("%s file is not a GFF3.", self.filename) genetype = ["tRNA", "rRNA", "ncRNA", "CDS", "exon", "gene", "tRNA"] worst_score = len(genetype) + 1 weight = {k: i for i, k in enumerate(genetype)} # Note seems optional tokeep = [ x for x in [ "seqid", "genetic_type", "start", "stop", "strand", "gene", "gene_id", "gene_name", "locus_tag", "Note", "product", ] if x in self.df.columns ] df = self.df.filter(tokeep, axis=1) # remove region and chromosome row df = df.drop(df.loc[df.genetic_type.isin({"region", "chromosome"})].index) try: df["gene"] = df["gene"].fillna(df.locus_tag) except (KeyError, AttributeError): pass df["score"] = [weight.get(g_t, worst_score) for g_t in df.genetic_type] # keep most informative features if on the same region best_idx = df.groupby(["seqid", "start", "stop"])["score"].idxmin() return df.loc[best_idx].reset_index(drop=True)
[docs] def get_features_dict(self): """Format feature dict for sequana_coverage.""" df = self.get_simplify_dataframe() # rename column to fit for sequana_coverage df = df.set_index("seqid").rename(columns={"start": "gene_start", "stop": "gene_end", "genetic_type": "type"}) return {chr: df.loc[chr].to_dict("records") for chr in df.index.unique()}