#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import os
import sys
from collections import defaultdict
import colorlog
import natsort
from sequana.errors import BadFileFormat
from sequana.lazy import pandas as pd
from sequana.lazy import pysam
logger = colorlog.getLogger(__name__)
__all__ = ["GFF3"]
TEMPLATE = {
"seqid": None,
"source": None,
"genetic_type": None,
"start": None,
"stop": None,
"score": None,
"strand": None,
"phase": None,
}
[docs]
class GFF3:
"""Read a GFF file, version 3
.. seealso:: https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md
::
g = GFF3(filename)
# first call is slow
g.df
# print info about the different feature types
g.features
# prints info about duplicated attributes:
g.get_duplicated_attributes_per_genetic_type(self)
On eukaryotes, the reading and processing of the GFF may take a while.
On prokaryotes, it should be pretty fast (a few seconds).
To speed up the eukaryotes case, we skip the processing biological_regions
(50% of the data in mouse).
You can do a lot with this class because your GFF is stored as a dataframe and
and therefore be easily processed.
Sometimes, CDS are missing::
g = GFF3()
g.add_CDS_and_mRNA("test.gff")
"""
def __init__(self, filename, skip_types=["biological_region"], light=False):
"""Initialise a GFF3 reader.
:param str filename: path to the GFF3 file to read.
:param list skip_types: list of feature types to skip while reading.
Defaults to ``["biological_region"]`` which speeds up parsing of
large mammalian GFF files.
:raises IOError: if *filename* does not exist.
"""
self.filename = filename
if os.path.exists(filename) is False:
raise IOError(f"{filename} does not exist")
self.skip_types = skip_types
self._df = None
self._features = None
self._contig_names = None
self._attributes = {}
self._added_intergenic = False
self._added_CDS = False
self._directons = None # a place holder to store directons
self._light = light
def _get_features(self):
"""Extract unique GFF feature types
This is equivalent to awk '{print $3}' | sort | uniq to extract unique GFF
types. No sanity check, this is suppose to be fast.
Less than a few seconds for mammals.
"""
# This is used by the rnaseq pipeline and should be kept fast
count = 0
if self._features:
features = self._features
else:
features = set()
with open(self.filename, "r") as reader:
for line in reader:
# Skip metadata and comments
if line.startswith("#"):
continue
# Skip empty lines
if not line.strip(): # pragma: no cover
continue
split = line.rstrip().split("\t")
L = len(split)
if L == 9:
features.add(split[2])
count += 1
# FIXME may be overwritten by get_df
self._features = features
return sorted(features)
features = property(_get_features)
def _get_contig_names(self):
if self._contig_names is None:
contigs = set()
for record in self.read():
contigs.add(record["seqid"])
self._contig_names = sorted(list(contigs))
return self._contig_names
contig_names = property(_get_contig_names)
[docs]
def get_attributes(self, feature=None):
if feature in self._attributes.keys():
return self._attributes[feature]
attributes = set()
for record in self.read():
if feature and record["genetic_type"] != feature:
continue
attributes.update(record["attributes"].keys())
attributes = sorted(attributes)
self._attributes[feature] = attributes
return self._attributes[feature]
[docs]
def read(self):
"""Read annotations one by one creating a generator"""
self._features = set()
with open(self.filename, "r") as reader:
line = None
for line in reader:
# stop once FASTA starts
if line.startswith("##FASTA"):
break
# Skip metadata and comments
if line.startswith("#"):
continue
# Skip empty lines
if not line.strip():
continue
# Format checking. skip rows that do not have 9 columns since
# it is comments or fasta sequence
split = line.rstrip().split("\t")
if len(split) != 9:
continue
# skipping biological_region saves lots of time
if split[2].strip() in self.skip_types:
continue
# we process main fields and attributes. This takes most of the
# time
self._features.add(split[2])
# the main first 8 fields
annotation = self._process_main_fields(split[0:8])
# all attributes as key/values added to all annotations.
annotation["attributes"] = self._process_attributes(split[8])
# changes 0.20
# annotation.update(annotation["attributes"])
yield annotation
def _get_df(self):
"""Build and cache a :class:`pandas.DataFrame` from the GFF3 file.
The dataframe contains one row per GFF3 record. Column names match the
nine standard GFF3 fields (``seqid``, ``source``, ``genetic_type``,
``start``, ``stop``, ``score``, ``strand``, ``phase``, ``attributes``)
plus one column per attribute key found in the file.
The result is cached so subsequent accesses are instantaneous.
"""
if self._df is not None:
return self._df
logger.info("Processing GFF file. 1. Reading the input file. Please be patient")
# ~ 30 seconds on mouse
df = pd.DataFrame(self.read())
# now, let us populated some common attributes
# gene, gene_id, ID, Name
if not self._light:
try:
attrs_df = pd.json_normalize(df["attributes"])
except KeyError:
raise BadFileFormat
df = pd.concat([df, attrs_df], axis=1)
else:
common = ["gene_id", "ID", "Name"]
try:
attrs_df = pd.json_normalize(df["attributes"])
attrs_df = attrs_df[[c for c in common if c in attrs_df.columns]]
df = pd.concat([df, attrs_df], axis=1)
except KeyError:
raise BadFileFormat
self._df = df
return self._df
df = property(_get_df)
[docs]
def get_duplicated_attributes_per_genetic_type(self):
"""Report duplicated attribute values for each feature type.
Iterates over all feature types found in the GFF file and, for each
attribute column, counts how many values appear more than once. The
counts are printed to *stdout* and returned as a
:class:`pandas.DataFrame` whose columns are feature types and whose
rows are attribute names.
:return: DataFrame with feature types as columns and attribute names as
rows; each cell contains the number of duplicated values.
:rtype: pandas.DataFrame
"""
df = self.df
results = {}
for typ, group in df.groupby("genetic_type"):
results[typ] = {}
print(f"{typ}: {len(group)} entries")
for attr in group.columns[8:]:
series = group[attr].dropna()
dups = series.duplicated().sum()
if dups > 0:
print(f" - {attr}:{dups} duplicates ({len(series)} total)")
else:
print(f" - {attr}:No duplicates ({len(series)} total)")
results[typ][attr] = dups
return pd.DataFrame(results)
[docs]
def get_duplicated_attributes_per_genetic_type2(self):
results = {}
for typ in self.features:
results[typ] = {}
print("{}: {} entries".format(typ, len(self.df.query("genetic_type==@typ"))))
for attr in sorted(self.get_attributes(feature=typ)):
L = len(self.df.query("genetic_type==@typ")[attr].dropna())
dups = self.df.query("genetic_type==@typ")[attr].dropna().duplicated().sum()
if dups > 0:
print(f" - {attr}:{dups} duplicates ({L} in total)")
else:
print(f" - {attr}:No duplicates ({L} in total)")
results[typ][attr] = dups
df = pd.DataFrame(results)
return df
[docs]
def transcript_to_gene_mapping(self, feature="all", attribute="transcript_id"):
"""
:param feature: not used yet
:param attribute: the attribute to be usde. should be transcript_id for
salmon compatability but could use soething different.
"""
# entries may have transcripts set to None
transcripts = [x for x in self.df[attribute] if x]
# retrieve only the data with transcript id defined
transcripts_df = self.df.set_index(attribute)
transcripts_df = transcripts_df.loc[transcripts]
transcripts_df = transcripts_df.reset_index()
results = {}
results2 = defaultdict(list)
for _id, data in transcripts_df[["ID", "Parent"]].iterrows():
results[data.values[0]] = data.values[1]
results2[data.values[1]].append(data.values[0])
return results, results2
[docs]
def save_annotation_to_csv(self, filename="annotations.csv"):
"""Save the GFF annotation dataframe to a CSV file.
:param str filename: path to the output CSV file. Defaults to
``"annotations.csv"``.
"""
self.df.to_csv(filename, index=False)
[docs]
def read_and_save_selected_features(self, outfile, features=["gene"]):
"""Stream the GFF file and write only the requested feature types.
Unlike :meth:`save_gff_filtered`, this method does *not* load the whole
file into memory; it reads and filters line by line, making it suitable
for very large GFF files.
:param str outfile: path of the output GFF file.
:param list features: feature types to keep. Defaults to
``["gene"]``.
"""
count = 0
with open(self.filename, "r") as fin, open(outfile, "w") as fout:
for line in fin:
# stop once FASTA starts
if line.startswith("##FASTA"):
break
split = line.rstrip().split("\t")
# skipping biological_region saves lots of time
try:
if split[2].strip() in features:
fout.write(line)
count += 1
except IndexError:
pass
logger.info(f"Found {count} entries and saved into {outfile}")
[docs]
def get_intergenic_regions(self):
"""Compute intergenic regions between consecutive genes on each chromosome.
For every chromosome present in the GFF, gaps between consecutive
``gene`` features (sorted by start position) are returned as a
:class:`pandas.DataFrame`. Each row represents one intergenic region
and contains the columns ``seqid``, ``start``, ``stop``, ``strand``,
``attributes``, ``source``, ``genetic_type``, ``score``, and ``phase``.
:return: DataFrame of intergenic regions.
:rtype: pandas.DataFrame
"""
start = 0
def get_attributes(data):
"""Serialise an attributes dict to a GFF3 attributes string."""
return ";".join([f'{a}="{b}"' for a, b in data.items()])
data = []
for chrom in self.contig_names:
start = 1
for _, row in (
self.df.query("genetic_type=='gene' and seqid==@chrom")[["start", "stop", "strand", "attributes"]]
.sort_values("start")
.iterrows()
):
if row["start"] > start:
data.append([chrom, start, row["start"] - 1, row["strand"]])
start = row["stop"]
df = pd.DataFrame(data)
df.columns = ["seqid", "start", "stop", "strand"]
df["attributes"] = [{"ID": f"ncregion_{i}"} for i in range(1, len(df) + 1)]
df["source"] = "sequana"
df["genetic_type"] = "region"
df["score"] = 1
df["phase"] = "."
return df
[docs]
def add_CDS_and_mRNA(self, output, gene_ID="gene_id"):
"""Rewrite the GFF file adding synthetic mRNA and CDS entries for every gene.
Some annotation files (e.g. for bacteria or simple eukaryotes) contain
only ``gene`` features without the corresponding ``mRNA``/``CDS``
children required by certain downstream tools. This method reads the
source GFF line by line and, for every ``gene`` feature, emits the
original gene line followed by a synthetic ``mRNA`` line and a
synthetic ``CDS`` line sharing the same coordinates.
:param str output: path to the output GFF file.
:param str gene_ID: attribute key used to retrieve the gene identifier.
Defaults to ``"gene_id"``.
"""
header = "start"
with open(self.filename, "r") as fin, open(output, "w") as fout:
for line in fin.readlines():
if line.startswith("#") or not line.strip():
fout.write(line)
continue
if header == "start":
from sequana import version
fout.write(
f"# sequana gff --add-CDS-and-mRNA --gene-id {gene_ID} -o {output} {self.filename} #v{version}\n"
)
header = "done"
chrom, source, feature, start, end, score, strand, phase, attrs = line.strip().split("\t")
if feature == "gene":
fout.write(line)
# Extract gene ID
gene_id = None
for attr in attrs.split(";"):
if attr.startswith("gene_id="):
gene_id = attr.split("=")[1].strip('"')
break
# Synthetic mRNA
fout.write(
"\t".join(
[
chrom,
source,
"mRNA",
start,
end,
score,
strand,
".",
f'ID="{gene_id}.mRNA"; Parent="{gene_id}"\n',
]
)
)
# Synthetic CDS
fout.write(
"\t".join(
[
chrom,
source,
"CDS",
start,
end,
score,
strand,
"0",
f'ID="{gene_id}.CDS"; Parent="{gene_id}.mRNA"\n',
]
)
)
else:
fout.write(line)
[docs]
def add_intergenic_regions(self):
"""Append intergenic regions to the internal dataframe.
Calls :meth:`get_intergenic_regions` and concatenates the result with
the existing annotation dataframe. Subsequent calls are no-ops (the
operation is only performed once).
"""
if self._added_intergenic:
pass
else:
df = self.get_intergenic_regions()
self._df = pd.concat([self.df, df], ignore_index=True)
self._added_intergenic = True
[docs]
def add_regions_and_save_gff(self, filename):
"""add missing region from the sequence-region found in the header."""
regions = {}
current_region = None
with open(filename, "w") as fout:
with open(self.filename, "r") as fin:
for line in fin:
if line.startswith("##sequence-region"):
items = line.split()
name, start, stop = items[1:4]
regions[name] = (
"\t".join(
[
name,
"header",
"region",
start,
stop,
".",
"+",
".",
f"ID={name}:{start}..{stop};Name={name};chromosome={name}",
]
)
+ "\n"
)
if current_region is None: # first region
region = line.split("\t")[0]
if region in regions.keys():
fout.write(regions[region])
current_region = region
else: # next regions
region = line.split("\t")[0]
if region in regions.keys() and region != current_region:
fout.write(regions[region])
current_region = region
fout.write(line)
[docs]
def save_as_gff(self, filename, sortby=["seqid", "start", "stop", "genetic_type"]):
"""Write the (possibly modified) annotation back to a GFF3 file.
The header comment lines from the original file are preserved.
Records are sorted according to *sortby* using natural sort order so
that chromosome identifiers like ``chr2`` sort before ``chr10``.
:param str filename: path to the output GFF file.
:param list sortby: list of column names used for sorting. Defaults to
``["seqid", "start", "stop", "genetic_type"]``.
"""
def get_attributes(data):
"""Serialise an attributes dict to a GFF3 attributes string."""
return ";".join([f'{a}="{b}"' for a, b in data.items()])
from sequana import version
with open(filename, "w") as fout:
with open(self.filename, "r") as fin:
for line in fin:
if not line.startswith("#"):
break
fout.write(line)
fout.write(f"# Sequana {version}\n")
fout.write("# - sorting seqid, start, stop, genetic type. \n")
if self._added_intergenic:
fout.write("# - added intergenic region\n")
count = 0
for _, y in self.df.sort_values(by=sortby, key=natsort.natsort_keygen()).iterrows():
fout.write(
"\t".join(
[
str(x)
for x in [
y["seqid"],
y["source"],
y["genetic_type"],
y["start"],
y["stop"],
y["score"],
y["strand"],
y["phase"],
get_attributes(y["attributes"]),
]
]
)
+ "\n"
)
[docs]
def save_gff_filtered(self, filename="filtered.gff", features=["gene"], replace_seqid=None):
"""
save_gff_filtered("test.gff", features=['misc_RNA', 'rRNA'],
replace_seqid='locus_tag')
"""
with open(filename, "w") as fout:
fout.write("#gff-version 3\n#Custom gff from sequana\n")
count = 0
from collections import defaultdict
counter = defaultdict(int)
for x, y in self.df.iterrows():
if y["genetic_type"] in features:
if replace_seqid:
y["seqid"] = y["attributes"][replace_seqid]
fout.write(
"{}\tfeature\tcustom\t{}\t{}\t.\t{}\t{}\t{}\n".format(
y["seqid"],
y["start"],
y["stop"],
y["strand"],
y["phase"],
";".join([f"{a}={b}" for a, b in y["attributes"].items()]),
)
)
counter[y["genetic_type"]] += 1
count += 1
logger.info("# kept {} entries".format(count))
for feature in features:
counter[feature] += 0
logger.info("# {}: {} entries".format(feature, counter[feature]))
def _process_main_fields(self, fields):
"""Parse the first eight tab-separated columns of a GFF3 record.
:param list fields: the first eight fields of a GFF3 line (before the
attributes column).
:return: dict with keys ``seqid``, ``source``, ``genetic_type``,
``start``, ``stop``, ``score``, ``strand``, and ``phase``.
:rtype: dict
"""
annotation = {}
annotation = TEMPLATE.copy() # cheap shallow copy
annotation["seqid"] = fields[0]
annotation["source"] = fields[1]
annotation["genetic_type"] = fields[2]
annotation["start"] = int(fields[3])
annotation["stop"] = int(fields[4])
f = fields[5]
if f == "." or f == "?":
annotation["score"] = f
else:
annotation["score"] = float(f)
# Strand
s = fields[6]
annotation["strand"] = s if s in {"+", "-", ".", "?"} else None
# Phase
p = fields[7]
annotation["phase"] = int(p) % 3 if p != "." else p
return annotation
def _process_attributes(self, text):
"""Parse the ninth (attributes) column of a GFF3 or GTF record.
Handles both the standard ``key=value`` (GFF3) and ``key value``
(GTF) conventions. Quoted values (e.g. ``key="value"``) and values
containing semicolons (e.g. ``MF="GO:0005524;GO:0004004"``) are dealt
with correctly. URL-encoded characters (``%09``, ``%0A``, etc.) are
decoded before parsing.
:param str text: raw attributes string from column 9 of a GFF3 line.
:return: dict mapping attribute names to their (unquoted) values.
:rtype: dict
"""
attributes = {}
# some GFF/GTF use different conventions:
# - "ID=1;DB=2" this is the standard
# - "ID 1;DB 2" some gtf uses spaces but should be fine
# - "ID=1;DB=2;Note=some text with ; character " worst case scenario
# In the later case, there is no easy way to fix this. I believe this is
# a non-compatible GFF file.
# we first figure out whether this is a = or space convention
sep = None
text = text.strip()
# find the first = or space indicating the key=value operator (e.g. =)
for x in text:
if x in ["=", " "]:
sep = x
break
if sep is None:
logger.error(f"Your GFF/GTF does not seem to be correct ({text}). Expected a = or space as separator")
sys.exit(1)
# ugly but fast replacement of special characters.
text = text.replace("%09", "\t").replace("%0A", "\n").replace("%0D", "\r")
text = text.replace("%25", "%").replace("%3D", "=").replace("%26", "&").replace("%2C", ",")
text = text.replace("%28", "(").replace("%29", ")") # brackets
# we do not convert the special %3B into ; or %20 into spaces for now
import re
def parse_gff_attributes(attributes, sep="="):
"""parse attributes so handle
Quoted values (e.g., key="value")
Unquoted values (e.g., key=value)
Empty values (e.g., key="")
Values with semicolons (e.g., MF="GO:0005524;GO:0004004")
"""
# Regular expression to match key=value pairs with or without quotes
pattern = re.compile(r'(\S+?)[= ](".*?"|[^;]*)(?:;|$)')
# Dictionary to store parsed attributes
parsed_attributes = {}
# Find all matches for key=value pairs
matches = pattern.findall(attributes)
# Populate dictionary with matches
for key, value in matches:
# Remove quotes around the value if present
value = value.strip('"')
parsed_attributes[key] = value
return parsed_attributes
return parse_gff_attributes(text)
# replace " by nothing (GTF case)
# attributes[attr[:idx]] = value.replace('"', "").replace("%3B", ";").replace("%20", " ")
[docs]
def to_gtf(self, output_filename="test.gtf", mapper={"ID": "{}_id"}):
"""Convert the GFF3 file to GTF format (experimental).
Reads the source GFF3 file line by line and rewrites each record in
GTF format. Attribute keys are renamed according to *mapper* which
maps GFF3 attribute names to GTF attribute name templates; the
``{}`` placeholder in the template is replaced with the feature type.
:param str output_filename: path for the output GTF file. Defaults to
``"test.gtf"``.
:param dict mapper: mapping from GFF3 attribute keys to GTF attribute
name templates. Defaults to ``{"ID": "{}_id"}`` which converts
the GFF3 ``ID`` attribute to e.g. ``gene_id`` or ``exon_id``.
.. note::
This method is experimental and is used by the Sequana RNA-seq
pipeline to convert input GFF files to GTF format for RNA-seqc.
"""
# experimental . used by rnaseq pipeline to convert input gff to gtf,
# used by RNA-seqc tools
fout = open(output_filename, "w")
with open(self.filename, "r") as reader:
for line in reader:
# stop once FASTA starts
if line.startswith("##FASTA"):
break
# Skip metadata and comments
if line.startswith("#"):
fout.write(line)
continue
# Skip empty lines
if not line.strip(): # pragma: no cover
continue
split = line.rstrip().split("\t")
L = len(split)
name = split[0]
source = split[1]
feature = split[2]
start = split[3]
stop = split[4]
a = split[5]
strand = split[6]
b = split[7]
attributes = split[8]
new_attributes = ""
for item in attributes.split(";"):
try:
key, value = item.split("=")
if key in mapper.keys():
key = mapper[key].format(feature)
new_attributes += '{} "{}";'.format(key, value)
except:
pass
# Here we need some cooking due to gtf/gff clumsy conventiom
# 1. looks like attributes' values must have "" surrounding their content
# 2. if feature is e.g. exon, then gtf expects the exon_id attribute
msg = f"{name}\t{source}\t{feature}\t{start}\t{stop}\t{a}\t{strand}\t{b}\t{new_attributes}\n"
fout.write(msg)
fout.close()
[docs]
def to_fasta(self, ref_fasta, fasta_out, features=["gene"], identifier="ID"):
"""From a genomic FASTA file ref_fasta, extract regions stored in the
gff. Export the corresponding regions to a FASTA file fasta_out.
:param ref_fasta: path to genomic FASTA file to extract rRNA regions from.
:param fasta_out: path to FASTA file where rRNA regions will be exported to.
"""
count = 0
with pysam.Fastafile(ref_fasta) as fas:
with open(fasta_out, "w") as fas_out:
for record in self.df.to_dict("records"):
if record["genetic_type"] in features:
region = f"{record['seqid']}:{record['start']}-{record['stop']}"
ID = record[identifier]
seq_record = f">{ID}\n{fas.fetch(region=region)}\n"
fas_out.write(seq_record)
count += 1
logger.info(f"{count} regions were extracted from '{ref_fasta}' to '{fasta_out}'")
[docs]
def to_pep(self, ref_fasta, fasta_out):
"""Extract CDS, convert to proteines and save in file"""
raise NotImplementedError
df = self.df.query("genetic_type=='CDS'")
[docs]
def to_bed(self, output_filename, attribute_name, features=["gene"]):
"""Experimental export to BED format to be used with rseqc scripts
:param str attribute_name: the attribute_name name to be found in the
GFF attributes
"""
# rseqc expects a BED12 file. The format is not clear from the
# documentation. The first 6 columns are clear (e.g., chromosome name
# positions, etc) but last one are not. From the examples, it should be
# block sizes, starts of the transcript but they recommend bedops
# gff2bed tool that do not extract such information. For now, for
# prokaryotes, the block sizes version have been implemented and worked
# on a leptospira example.
fout = open(output_filename, "w")
with open(self.filename, "r") as reader:
for line in reader:
# stop once FASTA starts
if line.startswith("##FASTA"):
break
# Skip metadata and comments
if line.startswith("#"):
continue
# Skip empty lines
if not line.strip(): # pragma: no cover
continue
# a line is read and split on tabulations
split = line.rstrip().split("\t")
chrom_name = split[0]
# source = split[1] #keep this code commented for book-keeping
feature = split[2]
gene_start = int(split[3])
gene_stop = int(split[4])
cds_start = gene_start # for prokaryotes, for now cds=gene
cds_stop = gene_stop
a = split[5] # not used apparently
strand = split[6]
b = split[7] # not used apparently
attributes = split[8] # may be required for eukaryotes
score = 0 # in examples for rseqc, the score is always zero
unknown = 0 # a field not documented in rseqc
block_count = 1
block_sizes = f"{cds_stop-cds_start}," # fixme +1 ?
block_starts = "0," # commas are important at the end. no spaces
# according to rseqc (bed.py) code , the expected bed format is
# chrom, chrom_start, chrom_end, gene name, score, strand, cdsStart, cdsEnd,
# blockcount, blocksizes, blockstarts where blocksizes and blocks
# starts are comma separated list. Here is a line example on
# human:
# chr1 1676716 1678658 NM_001145277 0 + 1676725 1678549 0 4 182,101,105, 0,2960,7198
# for now only the feature 'gene' is implemented. We can
# generalize this later on.
if feature in features:
gene_name = None
for item in attributes.split(";"):
if item.split("=")[0].strip() == attribute_name:
gene_name = item.split("=")[-1]
assert gene_name
# should be the cds start/stop but for now we use the gene
# info start/stop
msg = f"{chrom_name}\t{gene_start}\t{gene_stop}\t{gene_name}\t{score}\t{strand}\t{cds_start}\t{cds_stop}\t{unknown}\t{block_count}\t{block_sizes}\t{block_starts}\n"
fout.write(msg)
fout.close()
[docs]
def clean_gff_line_special_characters(self, text):
"""Simple leaner of gff lines that may contain special characters"""
text = text.replace("%09", "\t").replace("%0A", "\n").replace("%0D", "\r")
text = text.replace("%25", "%").replace("%3D", "=").replace("%26", "&").replace("%2C", ",")
text = text.replace("%28", "(").replace("%29", ")") # brackets
return text
[docs]
def get_simplify_dataframe(self):
"""Method to simplify the gff and keep only the most informative features."""
# Set weight for genetic type to sort them and keep only the most informative
if self.df.empty:
raise BadFileFormat("%s file is not a GFF3.", self.filename)
genetype = ["tRNA", "rRNA", "ncRNA", "CDS", "exon", "gene", "tRNA"]
worst_score = len(genetype) + 1
weight = {k: i for i, k in enumerate(genetype)}
# Note seems optional
tokeep = [
x
for x in [
"seqid",
"genetic_type",
"start",
"stop",
"strand",
"gene",
"gene_id",
"gene_name",
"locus_tag",
"Note",
"product",
]
if x in self.df.columns
]
df = self.df.filter(tokeep, axis=1)
# remove region and chromosome row
df = df.drop(df.loc[df.genetic_type.isin({"region", "chromosome"})].index)
try:
df["gene"] = df["gene"].fillna(df.locus_tag)
except (KeyError, AttributeError):
pass
df["score"] = [weight.get(g_t, worst_score) for g_t in df.genetic_type]
# keep most informative features if on the same region
best_idx = df.groupby(["seqid", "start", "stop"])["score"].idxmin()
return df.loc[best_idx].reset_index(drop=True)
[docs]
def get_features_dict(self):
"""Format feature dict for sequana_coverage."""
df = self.get_simplify_dataframe()
# rename column to fit for sequana_coverage
df = df.set_index("seqid").rename(columns={"start": "gene_start", "stop": "gene_end", "genetic_type": "type"})
return {chr: df.loc[chr].to_dict("records") for chr in df.index.unique()}
[docs]
def get_seqid2size(self):
"""Return a mapping from sequence identifiers to their sizes.
Sizes are read from ``region`` features in the GFF file (the ``stop``
coordinate of a ``region`` record is used as the chromosome/contig
size).
:return: dict mapping sequence id strings to integer sizes.
:rtype: dict
"""
return dict([(row.seqid, row.stop) for _, row in self.df.query("genetic_type=='region'").iterrows()])
[docs]
def search(self, pattern):
"""Search all columns of the annotation dataframe for a pattern.
Converts every cell to a string and returns all rows where at least
one column contains *pattern* as a substring.
:param pattern: search term (converted to ``str`` internally).
:return: filtered copy of the annotation dataframe.
:rtype: pandas.DataFrame
"""
from numpy import logical_or, zeros
pattern = str(pattern)
hits = zeros(len(self.df))
for col in self.df.columns:
hits = logical_or(self.df[col].apply(lambda x: pattern in str(x)), hits)
return self.df.loc[hits].copy()
[docs]
def is_tRNA_or_ribosomal(self, x):
"""Return *True* if *x* is **not** a tRNA or ribosomal RNA name.
Despite the function name, this method is intended as a boolean *keep*
predicate: it returns ``True`` for genes that should be **retained**
(i.e. non-tRNA, non-rRNA genes) and ``False`` for genes that should be
**removed** (tRNA or rRNA).
Used internally to filter out tRNA and rRNA genes when computing
Polycistronic Transcription Units (PTUs).
:param x: gene name or annotation string to test.
:return: ``False`` if *x* matches a tRNA or rRNA pattern
(``"tRNA-"``, ``"28S"``, ``"5.8S"``, ``"18S"``, ``"5S"`` prefix,
or exact ``"tRNA"``), ``True`` otherwise.
:rtype: bool
"""
try:
for hit in ["tRNA-", "28S", "5.8S", "18S", "5S"]:
if x.startswith(hit):
return False
for hit in ["tRNA"]:
if x == hit:
return False
except AttributeError:
pass
return True
def _remove_tRNA_or_ribosomal(self):
"""Filter the internal dataframe to remove tRNA and ribosomal RNA entries.
The exact filtering strategy depends on whether a ``combinedAnnotation``
column is present (Leishmania donovani style) or not (L. infantum style
where tRNA/rRNA have dedicated ``genetic_type`` values).
"""
try:
# specific to leishmania donovani
self._df = self.df[[self.is_tRNA_or_ribosomal(x) for x in self.df.combinedAnnotation]]
except:
# for L infantum, tRNA and rRNA are separated with their own genetic_type (but duplicated as
# CDS/gene/tRNA/exon)
# so we first need to remove exon, then gene with gene_biotype in tRNA and rRNA and finally the genetic_type
# tRNA and rRNA or even simpler, keep only genes (removing exon and tRNA and rRNA) and amnogst the genes,
# filter out the gene_biotype tRNA and rRNA
self._df = self.df.query("genetic_type in ['region', 'gene'] and gene_biotype not in ['tRNA', 'rRNA']")
[docs]
def get_PTU(self):
"""Compute Polycistronic Transcription Units (PTUs).
PTUs are contiguous groups of genes transcribed from the same strand.
tRNA and rRNA genes are excluded before computing directons (groups of
genes with the same strand orientation).
:return: DataFrame with columns ``chromosome``, ``start``, ``stop``,
``strand``, and ``length`` (number of genes in the PTU).
:rtype: pandas.DataFrame
"""
self._remove_tRNA_or_ribosomal()
# make sure it is correct (df changed)
self._directons = None
directons = self.directons
data = []
for seqid in sorted(self.df.contig_names):
subdf = directons.query("seqid==@seqid")
chrom = str(seqid)
for _, row in subdf.iterrows():
start, stop = row["start"], row["stop"]
N = len(self.df.query("seqid==@chrom and start>=@start and stop<=@stop"))
strand = row["strand"]
data.append([seqid, start, stop, strand, N])
df = pd.DataFrame(data)
df.columns = ["chromosome", "start", "stop", "strand", "length"]
return df
def _get_ssr(self):
"""Compute Strand-Switch Regions (SSRs) between consecutive directons.
Iterates over pairs of adjacent directons on each chromosome and
classifies the gap between them:
* ``dSSR`` – divergent SSR (minus strand followed by plus strand)
* ``cSSR`` – convergent SSR (plus strand followed by minus strand)
* ``other`` – same strand (tandem arrangement)
:return: DataFrame with columns ``type``, ``chromosome``, ``start``,
and ``stop``.
:rtype: pandas.DataFrame
"""
# make sure it is correct (if df changed)
self.directons = None
directons = self.directons.copy()
data = []
for seqid in sorted(self.df.contig_names):
subdf = directons.query("seqid==@seqid")
for i in range(0, len(subdf) - 1):
x = subdf.iloc[i]
y = subdf.iloc[i + 1]
s1 = x.strand
s2 = y.strand
if s1 == "-" and s2 == "+":
data.append(["dSSR", seqid, x.stop, y.start])
elif s1 == "+" and s2 == "-":
data.append(["cSSR", seqid, x.stop, y.start])
else:
data.append(["other", seqid, x.stop, y.start])
df = pd.DataFrame(data)
df.columns = ["type", "chromosome", "start", "stop"]
return df
def _get_directons(self):
"""Compute directons: groups of consecutive genes sharing the same strand.
A *directon* is a maximal set of adjacent genes transcribed from the
same strand. The method groups the annotation dataframe by chromosome,
detects strand changes to assign each gene to a directon, then
aggregates per-directon statistics (start, stop, number of genes).
The result is cached; subsequent calls return the cached value.
:return: DataFrame with columns ``seqid``, ``strand``,
``directon_id``, ``start``, ``stop``, ``directon_length``,
and ``directon_name``.
:rtype: pandas.DataFrame
"""
if self._directons is not None:
return self._directons
def assign_directon_groups(group):
"""Label each gene in *group* with the directon it belongs to."""
# group is per-chromosome
group["strand_shift"] = group["strand"] != group["strand"].shift()
group["directon_id"] = group["strand_shift"].cumsum()
return group
df = self.df.groupby("seqid", group_keys=False).apply(assign_directon_groups, include_groups=False)
df["seqid"] = self.df["seqid"]
# Aggregate each directon into a single BED line
directons = (
df.groupby(["seqid", "strand", "directon_id"])
.agg(
{
"start": "min",
"stop": "max",
"source": len, # let us use the 'source' column to count entries/genes per group
}
)
.reset_index()
)
# rename source into meaningful name
directons.rename({"source": "directon_length"}, inplace=True, axis=1)
# Let us define a convention to get a directon ID as :
#
# CHROM<DIRECTION>_<ID>_START_END
#
# where DIRECTION is p or m for the plus or minus strand
# ID is a unique identifier on a given chromosme. related to position of appearance
names = []
for _, row in directons.iterrows():
seqid = row["seqid"]
start = row["start"]
stop = row["stop"]
ID = row["directon_id"]
strand = "p" if row["strand"] == "+" else "m"
names.append(f"{seqid}_{strand}{ID}_{start}_{stop}")
directons["directon_name"] = names
# we sort the directons by seq and start position and reassign unique identifiers
# since the first ones were used within chromosomes. not it is across the entire genome.
directons.sort_values(by=["seqid", "start"], inplace=True)
directons["directon_id"] = list(range(1, len(directons) + 1))
self._directons = directons
return self._directons
directons = property(_get_directons)
[docs]
def directon_to_bed(self, output="directons.bed", colors={"+": "255,0,0", "-": "0,0,255"}):
"""Write directon information to a BED9 file.
Each directon is written as one line in the output BED file with
colour-coding by strand (red for ``+``, blue for ``-`` by default).
:param str output: path for the output BED file. Defaults to
``"directons.bed"``.
:param dict colors: mapping from strand symbol to RGB colour string.
Defaults to ``{"+": "255,0,0", "-": "0,0,255"}``.
"""
df = self.df
directons = self.directons
# Add other BED fields
strand_colors = {"+": "255,0,0", "-": "0,0,255"}
directons["name"] = "."
directons["score"] = 1
directons["thickStart"] = directons["start"]
directons["thickEnd"] = directons["stop"]
directons["itemRgb"] = directons["strand"].map(strand_colors)
# Reorder columns to BED format
bed_df = directons[
["seqid", "start", "stop", "directon_name", "score", "strand", "thickStart", "thickEnd", "itemRgb"]
]
# Write to BED file
bed_df.to_csv(output, sep="\t", header=False, index=False)
[docs]
def add_directon_index(self):
"""Assign each gene its positional index within its directon.
For each gene, sets its ordinal position inside the directon it belongs
to (assuming the GFF is sorted by chromosome and start coordinate).
.. note::
Not yet implemented; this method is a stub reserved for a future
feature.
"""
# For each gene, set its position on the directon it belongs to
# assuming GFF is sorted by chromosome and start
pass
[docs]
def cluster_names_to_bed(self):
"""Identify cluster and output results in BED file"""
pass