#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import colorlog
from sequana import BAM, FastQ
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
logger = colorlog.getLogger(__name__)
[docs]
def find_motif(bamfile, motif="CAGCAG", window=200, savefig=False, local_th=5, global_th=10):
"""
If at least 10 position contains at least 5 instances of the motif, then
this is a hit and the alignment is kept
"""
b1 = BAM(bamfile)
# FIND motif and create pictures
count = 0
found = []
Ss = []
alns = []
for a in b1:
count += 1
if a.query_sequence is None:
continue
seq = a.query_sequence
X1 = [seq[i : i + window].count(motif) for i in range(len(seq))]
S = sum([x > local_th for x in X1])
Ss.append(S)
als.append(a)
if S > global_th:
found.append(True)
off = a.query_alignment_start
pylab.clf()
pylab.plot(range(off + a.reference_start, off + a.reference_start + len(seq)), X1)
if savefig:
pylab.savefig("{}_{}_{}.png".format(a.reference_name, S, a.query_name.replace("/", "_")))
else:
found.append(False)
return alns, found, Ss
[docs]
class FindMotif:
"""
fm = FindMotif("cl10/select1.sorted.bam")
df = fm.find_motif("CAGCAG")
df.query("hit>10")
local threshold should be window length divided by motif length
divided by 2
"""
def __init__(self, local_threshold=5, global_threshold=10, window=200):
self.local_threshold = local_threshold
self.global_threshold = global_threshold
self.window = window
[docs]
def find_motif_from_sequence(self, seq, motif, window=None, local_threshold=None):
if local_threshold is None:
local_threshold = self.local_threshold
if window is None:
window = self.window
# This should be improved with a true sliding window
X1 = [seq[i : i + window].count(motif) for i in range(len(seq))]
# Number of point crossing the threshold in the sequence
# The threshold should be below window/len(motif) if there are no errors
S = sum([x >= local_threshold for x in X1])
return X1, S
[docs]
def find_motif_fasta(self, filename, motif, window=200, local_threshold=None, global_threshold=None):
from sequana import FastA
data = FastA(filename)
N = len(data)
df = {"query_name": [], "hit": [], "length": [], "start": [], "end": []}
for i, item in enumerate(data):
X1, S = self.find_motif_from_sequence(item.sequence, motif, window=window, local_threshold=local_threshold)
if S >= self.global_threshold:
df["query_name"].append(item.name)
df["start"].append(0)
df["end"].append(len(item.sequence))
df["length"].append(len(item.sequence))
df["hit"].append(S)
df = pd.DataFrame(df)
return df
[docs]
def find_motif_bam(
self,
filename,
motif,
window=200,
figure=False,
savefig=False,
local_threshold=None,
global_threshold=None,
):
from sequana import BAM
b1 = BAM(filename)
df = {"query_name": [], "hit": [], "length": [], "start": [], "end": []}
for a in b1:
if a.query_sequence is None:
continue
seq = a.query_sequence
X1, S = self.find_motif_from_sequence(seq, motif, window=window, local_threshold=local_threshold)
df["query_name"].append(a.query_name)
df["start"].append(a.reference_start)
df["end"].append(a.reference_end)
df["length"].append(a.rlen)
df["hit"].append(S)
if S >= self.global_threshold:
off = a.query_alignment_start
# pylab.clf()
if figure:
pylab.plot(
range(off + a.reference_start, off + a.reference_start + len(seq)),
X1,
)
if savefig:
pylab.savefig("{}_{}_{}.png".format(a.reference_name, S, a.query_name.replace("/", "_")))
df = pd.DataFrame(df)
L = len(df.query("hit>5"))
print(L)
return df
[docs]
def plot_specific_alignment(
self,
bamfile,
query_name,
motif,
clf=True,
show_figure=True,
authorized_flags=[0, 16],
windows=[10, 50, 100, 150, 200, 250, 500, 1000],
local_threshold=5,
):
found = None
bam = BAM(bamfile)
for aln in bam:
if aln.query_name == query_name and aln.flag in authorized_flags:
found = aln
break # we may have several entries. let us pick up the first
sizes = []
if found:
# Detection
seq = found.query_sequence
if clf:
pylab.clf()
for window in windows:
X = [seq[i : i + window].count(motif) for i in range(len(seq))]
if show_figure:
pylab.plot(X, label=window)
score = sum([x > local_threshold for x in X])
sizes.append(score - window)
if show_figure:
pylab.legend()
pylab.ylabel("# {} in a given sliding window".format(motif))
pylab.title(query_name)
else:
print("{} Not found in {} file".format(query_name, bamfile))
return sizes
"""def find_length(self, query_name, motif, window=200):
found = None
bam = BAM(self.bamfile)
for aln in bam:
if aln.query_name == query_name:
found = aln
if found:
# Detection
seq = found.query_sequence
if clf:pylab.clf()
for window in windows:
"""
[docs]
def plot_alignment(
self,
bamfile,
motif,
window=200,
global_th=10,
title=None,
legend=True,
legend_fontsize=11,
valid_rnames=[],
valid_flags=[],
):
"""
plot alignments that match the motif.
"""
bam = BAM(bamfile)
print("Found {} hits".format(len(bam)))
pylab.clf()
count = 0
for aln in bam:
if valid_rnames and aln.rname not in valid_rnames:
continue
if valid_flags and aln.flag not in valid_flags:
continue
seq = aln.query_sequence
if seq:
count += 1
X1 = [seq[i : i + window].count(motif) for i in range(len(seq))]
pylab.plot(
range(aln.reference_start, aln.reference_start + len(seq)),
X1,
label=aln.query_name,
)
print("Showing {} entries after filtering".format(count))
max_theo = int(1.2 * window / len(motif))
pylab.ylim([0, max_theo])
if legend and count < 15:
pylab.legend(fontsize=legend_fontsize)
if title:
pylab.title(title, fontsize=16)
# return df