Source code for sequana.mpileup

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2023 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Tools to manipulate output of mpileup software"""

import os

from tqdm import tqdm

from sequana import FastA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab, pysam
from sequana.viz import Imshow


[docs] class MPileup: """ http://www.htslib.org/doc/samtools-mpileup.html """ def __init__(self, filename, reference, progress=True): """ reference must be a string of nucleotides. """ if os.path.exists(filename) is False: raise IOError(f"filename {filename} does not exists") else: self.filename = filename try: f = FastA(reference) self.reference = f.sequences[0] except OSError: # pragma: no cover self.reference = reference self.progress = progress self._df = None def _get_df(self): if self._df is None: self._df = self.get_mutation() return self._df df = property(_get_df)
[docs] def get_mutation(self): # Dictionary to store base counts at each position counts = [] # Open the pileup file generated by samtools mpileup with open(self.filename, "r") as pileup_file: # . means a match # -A means a deletion of A # The "^]" signifies a base that was at the start of a read # and the "$" signifies a base that was at the end of a read. for k, line in enumerate(tqdm(pileup_file, disable=not self.progress)): base_counts = {"A": 0, "C": 0, "T": 0, "G": 0, "N": 0, "*": 0} parts = line.strip().split() position = int(parts[1]) # Position in the reference genome coverage = int(parts[3]) bases = parts[4] # String representing bases at this position ref_base = parts[2] # Count the occurrences of each base (A, C, G, T) at this position # if we encouter a -, we need to scan the next N bases base_counts["coverage"] = coverage base_counts if position == 1: base_counts[ref_base] = coverage else: i = 0 length_bases = len(bases) while i < length_bases: x = bases[i] if x in "+-": i += 1 x = bases[i] shift = "" while x.isdigit(): shift += x i += 1 x = bases[i] j = 0 shift = int(shift) while j < int(shift): x = bases[i] j += 1 i += 1 elif x in "NnACGTacgt": base_counts[x.upper()] += 1 i += 1 elif x in ",.": base_counts[ref_base] += 1 i += 1 elif x in "^": # ^ sign is followed by an alignment mapping qaulity ascii character # it could be the special characters * or $ # that must appear later in the elfi branches i += 2 elif x == "*": # a deletion base_counts["*"] += 1 i += 1 elif x == "$": i += 1 else: # pragma: no cover raise NotImplementedError counts.append(base_counts) # Print base counts at each position data = [] for position, count in enumerate(counts): S = sum(count.values()) - count["coverage"] + count["*"] # * characters are for deletions but could also be used when coverage is zero if count["coverage"] == 0: S -= count["*"] # le nucleotide correspondant a la ref n'a pas d'erreur count[self.reference[position]] = 0 data.append( [ position + 1, self.reference[position], count["coverage"], S, count["A"] / S * 100, count["C"] / S * 100, count["G"] / S * 100, count["T"] / S * 100, count["N"] / S * 100, count["*"] / S * 100, ] ) df = pd.DataFrame(data) df.columns = ["position", "ref", "coverage", "S", "A", "C", "G", "T", "N", "D"] return df
[docs] def plot_stack_bars(self, include_deletions=False, ec="k"): """Plot error on A, C, G, T as a bar stacked plot.""" A = self.df["A"].values C = self.df["C"].values G = self.df["G"].values T = self.df["T"].values D = self.df["D"].values X1, X2 = 1, len(self.reference) + 1 pylab.clf() pylab.bar(range(X1 + 1, X2 + 1), A, width=1, ec=ec, label="A") pylab.bar(range(X1 + 1, X2 + 1), C, bottom=A, width=1, ec=ec, label="C") pylab.bar(range(X1 + 1, X2 + 1), G, bottom=A + C, width=1, ec=ec, label="G") pylab.bar(range(X1 + 1, X2 + 1), T, bottom=A + C + G, width=1, ec=ec, label="T") if include_deletions: pylab.bar( range(X1 + 1, X2 + 1), D, bottom=A + C + G + T, width=1, ec=ec, label="D", ) pylab.legend() pylab.xlabel("Position", fontsize=16) pylab.ylabel("Error rate (percentage)", fontsize=16)
[docs] def set_GC_to_AT_mutations(self): """Compute the mutation errors GC->AT Here, we only consider mutation G->A, G->T, C->A, C->T. We sum the ->A an ->T errors. So, if the reference is A or T the errors is zeros. """ AT = self.df[["A", "T"]].sum(axis=1).values self.df["GC_to_AT"] = [AT[i] if x["ref"] in "GC" else 0 for i, x in self.df.iterrows()]
[docs] def get_total_errors(self, nucs=["A", "C", "G", "T"], include_deletions=False, include_Ns=False): if include_Ns: nucs += ["N"] if include_deletions: nucs += ["D"] return self.df[nucs].sum(axis=1)
[docs] def plot_total_errors(self, nucs, include_deletions=False, include_Ns=False): errors = self.get_total_errors(nucs, include_deletions=include_deletions, include_Ns=include_Ns) N = len(self.df) pylab.plot(range(1, N + 1), errors) pylab.axhline(pylab.mean(errors), ls="--", color="gray") pylab.xlabel("Position", fontsize=16) pylab.ylabel("Error rate (%)", fontsize=16) pylab.xlim([0, N + 1]) return errors
# Error rate are the sum of the illumina errors and biological errors # WT is suppose to be compose of illumina error and the WT errors # we can suctract the WT error
[docs] def plot_mutation_matrix(self, x1=0, x2=None, vmin=0, vmax=None, cmap="hot_r"): matmut = np.zeros((4, 4)) if x2 == None: x2 = len(self.reference) pylab.clf() for i, xref in enumerate("ACGT"): for j, y in enumerate("ACGT"): if xref == y: matmut[i, j] = 0 else: datum = self.df.query("ref == @xref and (position >= @x1 and position <=@x2)")[y].mean() matmut[i, j] = datum matmut = pd.DataFrame(matmut) matmut.index = ["A", "C", "G", "T"] matmut.columns = ["A", "C", "G", "T"] Imshow(matmut).plot(cmap=cmap, vmin=vmin, vmax=vmax) pylab.ylabel("Reference", fontsize=16) pylab.xlabel("Mutation", fontsize=16) pylab.xticks([0, 1, 2, 3], ["A", "C", "G", "T"], rotation=0) pylab.ylim([-0.5, 3.5]) try: # pragma: no cover pylab.gcf().set_layout_engine("tight") except: # pragma: no cover pass return matmut