#
# This file is part of Sequana software
#
# Copyright (c) 2016-2023 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Tools to manipulate output of mpileup software"""
import os
from tqdm import tqdm
from sequana import FastA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab, pysam
from sequana.viz import Imshow
[docs]
class MPileup:
"""
http://www.htslib.org/doc/samtools-mpileup.html
"""
def __init__(self, filename, reference, progress=True):
"""
reference must be a string of nucleotides.
"""
if os.path.exists(filename) is False:
raise IOError(f"filename {filename} does not exists")
else:
self.filename = filename
try:
f = FastA(reference)
self.reference = f.sequences[0]
except OSError: # pragma: no cover
self.reference = reference
self.progress = progress
self._df = None
def _get_df(self):
if self._df is None:
self._df = self.get_mutation()
return self._df
df = property(_get_df)
[docs]
def get_mutation(self):
# Dictionary to store base counts at each position
counts = []
# Open the pileup file generated by samtools mpileup
with open(self.filename, "r") as pileup_file:
# . means a match
# -A means a deletion of A
# The "^]" signifies a base that was at the start of a read
# and the "$" signifies a base that was at the end of a read.
for k, line in enumerate(tqdm(pileup_file, disable=not self.progress)):
base_counts = {"A": 0, "C": 0, "T": 0, "G": 0, "N": 0, "*": 0}
parts = line.strip().split()
position = int(parts[1]) # Position in the reference genome
coverage = int(parts[3])
bases = parts[4] # String representing bases at this position
ref_base = parts[2]
# Count the occurrences of each base (A, C, G, T) at this position
# if we encouter a -, we need to scan the next N bases
base_counts["coverage"] = coverage
base_counts
if position == 1:
base_counts[ref_base] = coverage
else:
i = 0
length_bases = len(bases)
while i < length_bases:
x = bases[i]
if x in "+-":
i += 1
x = bases[i]
shift = ""
while x.isdigit():
shift += x
i += 1
x = bases[i]
j = 0
shift = int(shift)
while j < int(shift):
x = bases[i]
j += 1
i += 1
elif x in "NnACGTacgt":
base_counts[x.upper()] += 1
i += 1
elif x in ",.":
base_counts[ref_base] += 1
i += 1
elif x in "^":
# ^ sign is followed by an alignment mapping qaulity ascii character
# it could be the special characters * or $
# that must appear later in the elfi branches
i += 2
elif x == "*":
# a deletion
base_counts["*"] += 1
i += 1
elif x == "$":
i += 1
else: # pragma: no cover
raise NotImplementedError
counts.append(base_counts)
# Print base counts at each position
data = []
for position, count in enumerate(counts):
S = sum(count.values()) - count["coverage"] + count["*"]
# * characters are for deletions but could also be used when coverage is zero
if count["coverage"] == 0:
S -= count["*"]
# le nucleotide correspondant a la ref n'a pas d'erreur
count[self.reference[position]] = 0
data.append(
[
position + 1,
self.reference[position],
count["coverage"],
S,
count["A"] / S * 100,
count["C"] / S * 100,
count["G"] / S * 100,
count["T"] / S * 100,
count["N"] / S * 100,
count["*"] / S * 100,
]
)
df = pd.DataFrame(data)
df.columns = ["position", "ref", "coverage", "S", "A", "C", "G", "T", "N", "D"]
return df
[docs]
def plot_stack_bars(self, include_deletions=False, ec="k"):
"""Plot error on A, C, G, T as a bar stacked plot."""
A = self.df["A"].values
C = self.df["C"].values
G = self.df["G"].values
T = self.df["T"].values
D = self.df["D"].values
X1, X2 = 1, len(self.reference) + 1
pylab.clf()
pylab.bar(range(X1 + 1, X2 + 1), A, width=1, ec=ec, label="A")
pylab.bar(range(X1 + 1, X2 + 1), C, bottom=A, width=1, ec=ec, label="C")
pylab.bar(range(X1 + 1, X2 + 1), G, bottom=A + C, width=1, ec=ec, label="G")
pylab.bar(range(X1 + 1, X2 + 1), T, bottom=A + C + G, width=1, ec=ec, label="T")
if include_deletions:
pylab.bar(
range(X1 + 1, X2 + 1),
D,
bottom=A + C + G + T,
width=1,
ec=ec,
label="D",
)
pylab.legend()
pylab.xlabel("Position", fontsize=16)
pylab.ylabel("Error rate (percentage)", fontsize=16)
[docs]
def set_GC_to_AT_mutations(self):
"""Compute the mutation errors GC->AT
Here, we only consider mutation G->A, G->T, C->A, C->T.
We sum the ->A an ->T errors. So, if the reference is A or T the errors is zeros.
"""
AT = self.df[["A", "T"]].sum(axis=1).values
self.df["GC_to_AT"] = [AT[i] if x["ref"] in "GC" else 0 for i, x in self.df.iterrows()]
[docs]
def get_total_errors(self, nucs=["A", "C", "G", "T"], include_deletions=False, include_Ns=False):
if include_Ns:
nucs += ["N"]
if include_deletions:
nucs += ["D"]
return self.df[nucs].sum(axis=1)
[docs]
def plot_total_errors(self, nucs, include_deletions=False, include_Ns=False):
errors = self.get_total_errors(nucs, include_deletions=include_deletions, include_Ns=include_Ns)
N = len(self.df)
pylab.plot(range(1, N + 1), errors)
pylab.axhline(pylab.mean(errors), ls="--", color="gray")
pylab.xlabel("Position", fontsize=16)
pylab.ylabel("Error rate (%)", fontsize=16)
pylab.xlim([0, N + 1])
return errors
# Error rate are the sum of the illumina errors and biological errors
# WT is suppose to be compose of illumina error and the WT errors
# we can suctract the WT error
[docs]
def plot_mutation_matrix(self, x1=0, x2=None, vmin=0, vmax=None, cmap="hot_r"):
matmut = np.zeros((4, 4))
if x2 == None:
x2 = len(self.reference)
pylab.clf()
for i, xref in enumerate("ACGT"):
for j, y in enumerate("ACGT"):
if xref == y:
matmut[i, j] = 0
else:
datum = self.df.query("ref == @xref and (position >= @x1 and position <=@x2)")[y].mean()
matmut[i, j] = datum
matmut = pd.DataFrame(matmut)
matmut.index = ["A", "C", "G", "T"]
matmut.columns = ["A", "C", "G", "T"]
Imshow(matmut).plot(cmap=cmap, vmin=vmin, vmax=vmax)
pylab.ylabel("Reference", fontsize=16)
pylab.xlabel("Mutation", fontsize=16)
pylab.xticks([0, 1, 2, 3], ["A", "C", "G", "T"], rotation=0)
pylab.ylim([-0.5, 3.5])
try: # pragma: no cover
pylab.gcf().set_layout_engine("tight")
except: # pragma: no cover
pass
return matmut