#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"""feature counts related tools"""
import glob
import os
import re
import sys
from pathlib import Path
import colorlog
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
logger = colorlog.getLogger(__name__)
__all__ = [
"get_most_probable_strand_consensus",
"get_most_probable_strand",
"MultiFeatureCount",
"FeatureCount",
"FeatureCountMerger",
]
[docs]def get_most_probable_strand(filenames, tolerance, sample_name):
"""Return most propable strand given 3 feature count files (strand of 0,1, and 2)
Return the total counts by strand from featureCount matrix folder, strandness and
probable strand for a single sample (using a tolerance threshold for
strandness). This assumes a single sample by featureCounts file.
:param filenames: a list of 3 feature counts files for a given sample
corresponding to the strand 0,1,2
:param tolerance: a value below 0.5
:param sample: the name of the sample corresponding to the list in filenames
Possible values returned are:
* 0: unstranded
* 1: stranded
* 2: eversely stranded
We compute the number of counts in case 1 and 2 and compute the ratio strand
as :math:`RS = stranded / (stranded + reversely stranded )`. Then we decide
on the possible strandness with the following criteria:
* if RS < tolerance, reversely stranded
* if RS in 0.5+-tolerance: unstranded.
* if RS > 1-tolerance, stranded
* otherwise, we cannot decided.
"""
fc_files = [Path(x) for x in filenames]
res_dict = {}
for f in fc_files:
strand = str(f.parent)[-1]
# Feature counts may have extra columns (not just a Series),
# the count is the last columns though. So,
# FeatureCounts(f).df[df.columns[-1]] is a time series
df = FeatureCount(f).df
df = df[df.columns[-1]]
res_dict[strand] = int(df.sum())
strandness = res_dict["1"] / (res_dict["1"] + res_dict["2"])
res_dict["strandness"] = strandness
if strandness < tolerance:
res_dict["strand"] = 2
elif strandness > 1 - tolerance:
res_dict["strand"] = 1
elif 0.5 - tolerance < strandness and strandness < 0.5 + tolerance:
res_dict["strand"] = 0
else:
res_dict["strand"] = None
df = pd.DataFrame(res_dict, index=[sample_name])
return df
[docs]def get_most_probable_strand_consensus(
rnaseq_folder,
tolerance,
sample_pattern="*/feature_counts_[012]",
file_pattern="feature_counts_[012]/*_feature.out",
):
"""From a sequana RNA-seq run folder get the most probable strand, based on the
frequecies of counts assigned with '0', '1' or '2' type strandness
(featureCounts nomenclature) across all samples.
:param rnaseq_folder: the main directory
:param tolerance: a value in the range 0-0.5. typically 0.1 or 0.15
:param pattern: the samples directory pattern
:param pattern_file: the feature counts pattern
If guess is not possible given the tolerance, fills with None
Consider this tree structure::
rnaseq_folder
├── sample1
│ └── feature_counts
│ ├── 0
│ │ └── sample_feature.out
│ ├── 1
│ │ └── sample_feature.out
│ └── 2
│ └── sample_feature.out
└── sample2
└── feature_counts
├── 0
│ └── sample_feature.out
├── 1
│ └── sample_feature.out
└── 2
└── sample_feature.out
Then, the following command should all files and report the most probable
strand (0,1,2) given the sample1 and sample2::
get_most_probable_strand_consensus("rnaseq_folder", 0.15)
This tree structure is understood automatically. If you have a different
one, you can set the pattern (for samples) and pattern_files parameters.
.. seealso: :func:`get_most_probable_strand`
"""
rnaseq_folder = Path(rnaseq_folder)
sample_folders = list(set([x.parent for x in rnaseq_folder.glob(sample_pattern)]))
if not sample_folders:
# the new method holds 3 sub directories 0/, 1/ and 2/
sample_pattern = "*/feature_counts"
sample_folders = list(set([x.parent for x in rnaseq_folder.glob(sample_pattern)]))
if not sample_folders:
logger.error(f"Could not find sample directories in {rnaseq_folder} with pattern {pattern}")
sys.exit()
results = []
for sample in sample_folders:
filenames = list(sample.glob(file_pattern))
if len(filenames) == 0:
file_pattern = "feature_counts/[012]/*_feature.out"
filenames = list(sample.glob(file_pattern))
if len(filenames) == 0:
logger.warning(f"No files found for {sample}/{file_pattern}. skipped")
continue
result = get_most_probable_strand(filenames, tolerance, sample)
results.append(result)
df = pd.concat(results)
df = df[["0", "1", "2", "strandness", "strand"]]
logger.info(f"Strand guessing for each files (tolerance: {tolerance}):\n")
logger.info(df)
try:
most_probable = df["strand"].value_counts().idxmax()
except:
# if all events are None, return -1
most_probable = -1
return most_probable, df
[docs]class MultiFeatureCount:
"""Read set of feature counts using different options of strandness
.. plot::
:include-source:
from sequana import sequana_data
from sequana.featurecounts import *
directory = sequana_data("/rnaseq_0")
ff = MultiFeatureCount(directory, 0.15)
ff._get_most_probable_strand_consensus()
ff.plot_strandness()
.. seealso:: :func:`get_most_probable_strand` for more information about the
tolerance parameter and meaning of strandness.
The expected data structure is ::
rnaseq_folder
├── sample1
│ ├── feature_counts_0
│ │ └── sample_feature.out
│ ├── feature_counts_1
│ │ └── sample_feature.out
│ ├── feature_counts_2
│ │ └── sample_feature.out
└── sample2
├── feature_counts_0
│ └── sample_feature.out
├── feature_counts_1
│ └── sample_feature.out
├── feature_counts_2
│ └── sample_feature.out
The new expected data structure is ::
new_rnaseq_output/
├── sample1
│ └── feature_counts
│ ├── 0
│ │ └── sample_feature.out
│ ├── 1
│ │ └── sample_feature.out
│ └── 2
│ └── sample_feature.out
└── sample2
└── feature_counts
├── 0
│ └── sample_feature.out
├── 1
│ └── sample_feature.out
└── 2
└── sample_feature.out
"""
# USED in rnaseq pipeline
def __init__(self, rnaseq_folder=".", tolerance=0.1):
"""
:param str rnaseq_folder:
:param int tolerance: the tolerance between 0 and 0.25
"""
self.tolerance = tolerance
self.rnaseq_folder = rnaseq_folder
# this should be called in the constructor once for all
self._get_most_probable_strand_consensus()
def _get_most_probable_strand_consensus(self):
self.probable_strand, self.df = get_most_probable_strand_consensus(self.rnaseq_folder, self.tolerance)
[docs] def plot_strandness(self, fontsize=12, output_filename="strand_summary.png", savefig=False):
df = self.df.sort_index(ascending=False)
df["strandness"] = df["strandness"].T
df["strandness"].plot(kind="barh")
pylab.xlim([0, 1])
pylab.grid(True)
pylab.axvline(self.tolerance, ls="--", color="r")
pylab.axvline(1 - self.tolerance, ls="--", color="r")
pylab.axvline(0.5, ls="--", color="k")
pylab.xlabel("Strandness", fontsize=fontsize)
try:
pylab.gcf().set_layout_engine("tight")
except Exception:
pass
if savefig: # pragma: no cover
pylab.savefig(output_filename)
[docs]class FeatureCountMerger:
"""Merge several feature counts files"""
def __init__(self, pattern="*feature.out", fof=[], skiprows=1):
self.skiprows = skiprows
if len(fof):
self.filenames = fof
else:
self.filenames = glob.glob(pattern)
if len(self.filenames) == 0:
logger.critical("No valid files provided")
sys.exit(1)
for x in self.filenames:
if os.path.exists(x) is False:
logger.critical(f"file x not found")
sys.exit(1)
self.df = pd.read_csv(self.filenames[0], sep="\t", skiprows=self.skiprows)
for filename in self.filenames[1:]:
other_df = pd.read_csv(filename, sep="\t", skiprows=self.skiprows)
self.df = pd.merge(self.df, other_df)
[docs] def to_tsv(self, output_filename="all_features.out"):
self.df.to_csv(output_filename, sep="\t", index=False)
[docs]class FeatureCount:
r"""Read a featureCounts output file.
The input file is expected to be generated with featurecounts tool. It
should be a TSV file such as the following one with the header provided
herebelow. Of course input BAM files can be named after your samples::
Geneid Chr Start End Strand Length WT1 WT2 WT3 KO1 KO2 KO3
gene1 NC_010602.1 141 1466 + 1326 11 20 15 13 17 17
gene2 NC_010602.1 1713 2831 + 1119 35 54 58 34 53 46
gene3 NC_010602.1 2831 3934 + 1104 9 16 16 4 18 18
::
from sequana import FeatureCount
fc = FeatureCount("all_features.out", extra_name_rm=["_S\d+"]
fc.rnadiff_df.to_csv("fc.csv")
"""
def __init__(
self,
filename,
clean_sample_names=True,
extra_name_rm=["_Aligned"],
drop_loc=True,
guess_design=False,
):
""".. rubric:: Constructor
Get the featureCounts output as a pandas DataFrame
:param bool clean_sample_names: if simplifying the sample names in featureCount output columns
:param list extra_name_rm: extra list of regex to remove from samples_names (ignored if clean_sample_name is False)
:param bool drop_loc: if dropping the extra location columns (ie getting only the count matrix)
"""
if isinstance(filename, list):
for ff in filename:
if not Path(ff).exists(): # pragma: no cover
raise IOError(f"No file found with path: {filename}")
else:
if not Path(filename).exists(): # pragma: no cover
raise IOError(f"No file found with path: {filename}")
filename = [filename]
self.filename = filename
self.clean_sample_names = clean_sample_names
self.extra_name_rm = extra_name_rm
self.drop_loc = drop_loc
# populate _raw_df attribute once for all
self._read_data()
# Count matrix prepared for rnadiff
self.rnadiff_df = self._get_rnadiff_df()
if guess_design:
self.design_df = self._get_design_df()
def _get_rnadiff_df(self):
"""Prepare a count matrix from a multi-sample featureCount file.
This is in particular add a 'rawCounts_' to column names to not have an
import problem in R with labels starting with numbers
"""
df = self._raw_df.copy()
# remove unneeded columns are dropped
df.drop(["Chr", "Start", "End", "Strand", "Length"], axis=1, inplace=True)
# make sure data column names are cleaned
df.columns = [self._clean_sample_names(x, self.extra_name_rm) for x in df.columns]
if "Geneid" in df.columns:
df.set_index("Geneid", inplace=True)
df.sort_index(axis=1, inplace=True)
return df
def _get_design_df(self):
"""Prepare the table with grouping information for rnadiff"""
df = pd.DataFrame(
{
"sample": self.rnadiff_df.columns,
"label": self.rnadiff_df.columns.str.replace("rawCounts_", ""),
"condition": None,
}
)
df.set_index(df.columns[0], inplace=True)
# condition is None since it should be set by the users. Yet in simple
# cases, we can try to infer the condition names assuming the condition
# is in the name. eg. WT1, WT2, etc
try:
labels = df["label"]
conditions = self._guess_conditions(labels)
except Exception as err:
logger.info("no conditions could be guess. You will need to edit the design file {}".format(err))
conditions = None
finally:
df["condition"] = conditions
return df
def _guess_conditions(self, labels):
"""Found possible conditions based on common prefix"""
conditions = []
# we assume that the first characters can determine the conditions
L = min([len(x) for x in labels])
candidates = len(labels)
i = 0
for i in range(L):
candidates = set([x[0] for x in labels])
if len(candidates) != len(labels):
break
for candidate in candidates:
names = [x for x in labels if x.startswith(candidate)]
# now for this candidate identify all labels that start with it from
# which we find the longest prefix
N = min([len(x) for x in names])
# the first letter is the same by definition, so we start at 1
if len(names) == 1:
conditions.append(names[0])
else:
for i in range(1, N):
# as soon as there is a difference, we can stop: we found the
# common prefix
if len(set([x[0:i] for x in names])) != 1:
# if different, we need to ignore the last letter hence the
# -1 here below
conditions.append(names[0][0 : i - 1])
break
# trim trailing _ if any
conditions = [x.rstrip("_") for x in conditions]
indconds = []
for label in labels:
for cond in conditions:
if label.startswith(cond):
indconds.append(cond)
break
if len(indconds):
return indconds
else:
return None
def _read_data(self):
if len(self.filename) > 1:
df = pd.read_csv(self.filename[0], sep="\t", comment="#", low_memory=False)
for ff in self.filename[1:]:
other_df = pd.read_csv(ff, sep="\t", comment="#", low_memory=False)
df = pd.merge(df, other_df)
df = df.set_index("Geneid")
else:
df = pd.read_csv(self.filename[0], sep="\t", comment="#", index_col=0, low_memory=False)
self._raw_df = df
def _get_raw_df(self):
df = self._raw_df.copy()
if self.drop_loc:
df.drop(["Chr", "Start", "End", "Strand", "Length"], axis=1, inplace=True)
if self.clean_sample_names:
df.columns = [self._clean_sample_names(x, self.extra_name_rm) for x in df.columns]
return df
df = property(_get_raw_df)
def _clean_sample_names(self, sample_path, extra_name_rm):
"""Clean sample names in feature count tables"""
new_name = str(Path(sample_path).stem)
new_name = new_name.split(".")[0]
for pattern in extra_name_rm:
new_name = re.sub(pattern, "", new_name)
return new_name