#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import colorlog
from sequana import tools
from sequana.fasta import FastA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
logger = colorlog.getLogger(__name__)
__all__ = ["ContigsBase", "Contigs"]
[docs]class ContigsBase(object):
"""Parent class for contigs data"""
def __init__(self, filename):
""".. rubric:: Constructor
:param filename: input file name
"""
self.filename = filename
self.fasta = FastA(filename)
[docs] def get_gc(self, window=100):
"""Return GC content for each contig"""
data = tools._base_content(self.filename, window, "GC")
names = self.fasta.names
lengths = self.fasta.lengths
GC = [100 * np.nanmean(data[name]) for name in names]
return GC
[docs] def plot_contig_length_vs_GC(self, alpha=0.5):
"""Plot contig GC content versus contig length
.. plot::
from sequana.contigs import Contigs
from sequana import sequana_data
filename = sequana_data("test_contigs_spades.fasta")
ctg = Contigs(filename)
ctg.plot_contig_length_vs_GC()
"""
pylab.plot(self.df["length"], self.df["GC"], "o", alpha=alpha)
pylab.xlabel("contig length (bp)")
pylab.ylabel("GC (%)")
pylab.grid(True)
pylab.ylim([0, 100])
pylab.xlim(0, self.df["length"].max() + 10)
[docs] def scatter_length_cov_gc(self, min_length=200, min_cov=10, grid=True, logy=False, logx=True):
"""Plot scatter length versus GC content
:param min_length: add vertical line to indicate possible
contig length cutoff
:param min_cov: add horizontal line to indicate possible
coverage contig cutff
:param grid: add grid to the plot
:param logy: set y-axis log scale
:param logx: set x-axis log scale
.. plot::
from sequana import Contigs, sequana_data
filename = sequana_data("test_contigs_spades.fasta")
ctg = Contigs(filename)
ctg.scatter_length_cov_gc()
"""
if "cov" not in self.df.columns:
logger.warning("scatter_length_cov_gc required 'cov' coverage column information")
return
pylab.clf()
pylab.scatter(self.df.length, self.df["cov"], c=self.df.GC)
if logx:
pylab.semilogx()
if logy:
pylab.semilogy()
pylab.axvline(min_length, lw=2, c="r", ls="--")
pylab.axhline(min_cov, lw=2, c="r", ls="--")
pylab.xlabel("contig length")
pylab.ylabel("contig coverage")
pylab.colorbar(label="GC")
if grid:
pylab.grid(True)
[docs]class Contigs(ContigsBase):
"""Utilities for summarising or plotting contig information
Depending on how the FastA file was created, different types of plots can be
are available. For instance, if the FastA was created with Canu,
*nreads* and *covStat* information can be extracted. Therefore,
plots such as :meth:`plot_scatter_contig_length_vs_nreads_cov`
and :meth:`plot_contig_length_vs_nreads` can be used.
"""
def __init__(self, filename, mode="canu"):
""".. rubric:: **Constructor**
:param filename: input FastA file
:param canu: tool that created the output file.
"""
super(Contigs, self).__init__(filename)
self.mode = mode
self._df = None
[docs] def hist_plot_contig_length(self, bins=40, fontsize=16, lw=1):
"""Plot distribution of contig lengths
:param bin: number of bins for the histogram
:param fontsize: fontsize for xy-labels
:param lw: width of bar contour edges
:param ec: color of bar contours
.. plot::
from sequana import Contigs, sequana_data
filename = sequana_data("test_contigs_spades.fasta")
c = Contigs(filename)
c.hist_plot_contig_length()
"""
L = len(self.fasta.sequences)
pylab.clf()
pylab.hist(self.fasta.lengths, lw=lw, ec="k", bins=bins)
pylab.grid()
pylab.xlabel("Contig length", fontsize=fontsize)
pylab.ylabel("#", fontsize=fontsize)
pylab.title("Distribution {} contigs".format(L))
def _get_df(self):
if self._df is None:
try:
self._compute_spades_df()
except ValueError:
self._compute_df()
return self._df
df = property(_get_df)
def _compute_spades_df(self):
lengths = []
names = []
covs = []
for name in self.fasta.names:
_, ID, _, length, _, cov = name.split("_")
lengths.append(length)
names.append(ID)
covs.append(cov)
self._df = pd.DataFrame({"cov": covs, "length": lengths, "name": names})
self._df = self._df.astype({"length": int, "cov": float})
self._df = self._df[["name", "length", "cov"]]
self._df["GC"] = self.get_gc()
def _compute_df(self, window=100):
data = tools._base_content(self.filename, window, "GC")
names = self.fasta.names
lengths = self.fasta.lengths
GC = [np.nanmean(data[name]) for name in names]
nreads = [0] * len(GC)
covStats = [0] * len(GC)
if self.mode == "canu":
for i, comment in enumerate(self.fasta.comments):
read = [x for x in comment.split() if x.startswith("reads")][0]
covStat = [x for x in comment.split() if x.startswith("covStat")][0]
read = read.split("=")[1]
covStat = covStat.split("=")[1]
nreads[i] = int(read)
covStats[i] = float(covStat)
df = pd.DataFrame(
{
"GC": list(GC),
"length": lengths,
"name": names,
"nread": nreads,
"covStat": covStats,
}
)
self._df = df.copy()
return df
[docs] def plot_contig_length_vs_nreads(
self,
fontsize=16,
min_length=5000,
min_nread=10,
grid=True,
logx=True,
logy=True,
):
"""Plot contig length versus nreads
In canu, contigs have the number of reads that support them.
Here, we can see whether contigs have lots of reads supported them or not.
.. note:: For Canu output only
.. plot::
from sequana import Contigs, sequana_data
filename = sequana_data("test_contigs_ex1.fasta")
c = Contigs(filename)
c.plot_contig_length_vs_nreads(logx=False)
"""
# same as plot_scatter_contig_length_nread_cov but no covStats information
if not "nread" in self.df.columns:
logger.warning("plot_scatter_contig_length_nread_cov required 'nread' column information (Canu output)")
return
pylab.clf()
m1 = self.df.length.min()
M1 = self.df.length.max()
pylab.plot(self.df.length, self.df.nread, "o")
pylab.xlabel("Contig length", fontsize=fontsize)
pylab.ylabel("Contig N reads", fontsize=fontsize)
if grid:
pylab.grid()
if logx:
pylab.semilogx()
if logy:
pylab.semilogy
query = "nread>@min_nread and length>@min_length"
X = self.df.query(query)["length"]
Y = self.df.query(query)["nread"]
try: # pragma: no cover
A = np.vstack([X, np.ones(len(X))]).T
m, c = np.linalg.lstsq(A, Y.as_matrix())[0]
x = np.array([m1, M1])
pylab.plot(x, m * x + c, "o-r")
except AttributeError:
pass
pylab.xlabel("Contig length", fontsize=16)
pylab.ylabel("nread support", fontsize=16)
pylab.gcf().set_layout_engine("tight")
[docs] def plot_scatter_contig_length_vs_nreads_cov(
self,
fontsize=16,
vmin=0,
vmax=50,
min_nreads=20,
min_length=5000,
grid=True,
logx=True,
logy=True,
):
"""Scatter plot showing number of support reads and contig lengths
.. note:: only for Canu output.
.. plot::
from sequana import Contigs, sequana_data
filename = sequana_data("test_contigs_ex1.fasta")
c = Contigs(filename)
c.plot_scatter_contig_length_vs_nreads_cov()
"""
if not "covStat" in self.df.columns:
logger.warning(
"plot_scatter_contig_length_nread_cov required 'covStat' coverage column information (Canu output). You may use plot_contig_length_vs_nreads method instead"
)
return
if not "nread" in self.df.columns: # pragma: no cover
logger.warning("plot_scatter_contig_length_nread_cov required 'nread' column information (Canu output)")
return
m1 = self.df.length.min()
M1 = self.df.length.max()
# selection
query = "nread>@min_nreads and length>@min_length"
X = self.df.query(query)["length"]
Y = self.df.query(query)["nread"]
Z = self.df.query(query)["covStat"]
if len(X) == 0:
logger.warning("No contig after filtering. Set min_reads and min_length")
return
pylab.clf()
pylab.scatter(X, Y, c=Z, vmin=vmin, vmax=vmax)
pylab.colorbar()
pylab.xlabel("Contig length", fontsize=fontsize)
pylab.ylabel("Contig reads", fontsize=fontsize)
try: # pragma: no cover
A = np.vstack([X, np.ones(len(X))]).T
m, c = np.linalg.lstsq(A, Y.as_matrix())[0]
x = np.array([m1, M1])
pylab.plot(x, m * x + c, "o-r")
except AttributeError:
pass
if grid:
pylab.grid()
if logx:
pylab.semilogx()
if logy:
pylab.semilogy()
pylab.gcf().set_layout_engine("tight")
"""def get_contig_per_chromosome(self):
if self.bam is None:
print("no bam file found")
return
self.df = self.bam.get_df()
df = self.df.query("flag in [0,16]")
alldata = {}
for chrom in sorted(df.rname.unique()):
data = df.query("rname == @chrom").sort_values(by="rstart")[["qname", "qlen", "rstart", "rend"]]
alldata[chrom] = data
return alldata
"""
[docs] def stats(self):
"""Return N50, L50 and total cumulated length"""
from sequana.stats import L50, N50
length = self.df["length"]
return {"N50": N50(length), "total_length": sum(length), "L50": L50(length)}