Source code for sequana.sequence

#  This file is part of Sequana software
#
#  Copyright (c) 2018-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
import re
import string
import subprocess
from collections import Counter, deque

import colorlog

from sequana.fasta import FastA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab

logger = colorlog.getLogger(__name__)


__all__ = ["DNA", "RNA", "Repeats", "Sequence"]


from sequana.iuapc import codons  # your existing dictionary


def translate(dna: str, to_stop=True) -> str:
    """Translate a DNA sequence to a protein using sequana.iupac.codons.

    Args:
        dna (str): DNA sequence (A/T/C/G)
        to_stop (bool): If True, stop translation at the first stop codon

    Returns:
        str: Translated protein sequence
    """
    dna = dna.upper().replace(" ", "").replace("\n", "")
    rna = dna.replace("T", "U")
    protein = []

    for i in range(0, len(rna) - 2, 3):
        codon = rna[i : i + 3]
        if len(codon) < 3:
            break
        aa = codons.get(codon, "X")  # 'X' for unknown codons
        if aa == "*" and to_stop:
            break
        protein.append(aa)
    return "".join(protein)


[docs] class Sequence(object): """Abstract base classe for other specialised sequences such as DNA. Sequenced is the base class for other classes such as :class:`DNA` and :class:`RNA`. .. doctest:: from sequana import Sequence s = Sequence("ACGT") s.stats() s.get_complement() .. note:: You may use a Fasta file as input (see constructor) """ def __init__(self, sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGT"): """.. rubric:: Constructor A sequence is just a string stored in the :attr:`sequence` attribute. It has properties related to the type of alphabet authorised. :param str sequence: May be a string of a Fasta File, in which case only the first sequence is used. :param complement_in: :param complement_out: :param letters: authorise letters. Used in :meth:`check` only. .. todo:: use counter only once as a property """ if sequence.endswith(".fa") or sequence.endswith(".fasta"): self.fasta = FastA(sequence) sequence = self.fasta.next().sequence.upper() else: # assume correct string sequence pass self._data = sequence try: self._translate = string.maketrans(complement_in, complement_out) except: self._translate = bytes.maketrans(complement_in, complement_out) self._letters = letters def __iter__(self): return self def __next__(self): self._data = self.fasta.next().sequence.upper() return self._data def _get_sequence(self): return self._data sequence = property(_get_sequence)
[docs] def get_complement(self): """Return complement""" return self._data.translate(self._translate)
[docs] def get_reverse_complement(self): """Return reverse complement""" return self.get_complement()[::-1]
[docs] def get_reverse(self): """Return reverse sequence""" return self._data[::-1]
[docs] def complement(self): """Alias to :meth:`get_complement`""" self._data = self.get_complement()
[docs] def reverse(self): """Alias to :meth:`get_reverse`""" self._data = self.get_reverse()
[docs] def reverse_complement(self): """Alias to get_reverse_complement""" self._data = self.get_reverse_complement()
[docs] def check(self): """Check that all letters are valid""" counter = Counter(self._data).keys() for key in counter: if key not in self._letters: raise ValueError("Found unexpected letter in the sequence (%s)" % key)
def __len__(self): return len(self._data)
[docs] def gc_content(self): """Return mean GC content""" c = Counter(self._data) ratio = (c["G"] + c["C"]) / len(self.sequence) return ratio
[docs] def stats(self): """Return basic stats about the number of letters""" from collections import Counter return Counter(self.sequence)
[docs] def get_statistics(self): from collections import Counter counter = Counter(self.sequence.upper()) N = sum(list(counter.values())) counter["GC"] = counter["G"] + counter["C"] names = ["A", "C", "G", "T", "GC", "N"] basic_stats = [counter.get(x, 0) for x in ["A", "C", "G", "T", "GC", "N"]] + [N] basic_stats_pct = [float(x) / N * 100 for x in basic_stats] basic_stats = pd.DataFrame( {"count": basic_stats, "percentage": basic_stats_pct}, index=["A", "C", "G", "T", "GC", "N", "all"] ) return {"single": basic_stats}
[docs] def get_occurences(self, pattern, overlap=False): """Return position of the input pattern in the sequence :: >>> from sequana import Sequence >>> s = Sequence('ACGTTTTACGT') >>> s.get_occurences("ACGT") [0, 7] """ if overlap is False: res = [m.start() for m in re.finditer(pattern, self.sequence)] elif overlap is True: res = [m.start() for m in re.finditer("(?=%s)" % pattern, self.sequence)] return res
# reverse find-all without overlaps, you can combine positive and # negative lookahead into an expression like this: # res = [m.start() for m in re.finditer('(?=%s)(?!.{1,%d}%s)' % (search, # len(pattern)-1, pattern), 'ttt')]
[docs] class DNA(Sequence): """Simple DNA class :: >>> from sequana.sequence import DNA >>> d = DNA("ACGTTTT") >>> d.reverse_complement() Some long computations are done when setting the window size:: d.window = 100 The ORF detection has been validated agains a plasmodium 3D7 ORF file found on plasmodb.org across the 14 chromosomes. """ def __init__( self, sequence, codons_stop=["TAA", "TGA", "TAG"], codons_stop_rev=["TTA", "TCA", "CTA"], codons_start=["ATG"], codons_start_rev=["CAT"], ): super(DNA, self).__init__(sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGTN") self._window = None self._type_window = None self._slide_window = None self._seq_right = None self._dict_nuc = dict_nuc = {"A": 0, "T": 1, "G": 2, "C": 3} self._cumul = None self._Xn = None self._Yn = None self._Zn = None self._ignored_nuc = None self._AT_skew_slide = None self._GC_skew_slide = None self._GC_content_slide = None self._AT_content_slide = None self._template_fft = None self._c_fft = None self._codons_stop = codons_stop self._codons_stop_rev = codons_stop_rev self._codons_start = codons_start self._codons_start_rev = codons_start_rev self._ORF_pos = None self._threshold = None self._type_filter = None def _get_window(self): return self._window def _set_window(self, window): if (window > 0) & (window < 1): self._type_window = "adapted to genome length : %.1f %% of total length" % (window * 100) self._window = int(round(self.__len__() * window)) elif (window >= 1) & (window <= self.__len__()): self._type_window = "fixed window length : %d" % (window) self._window = int(window) else: raise ValueError( "Incorrect value for window: choose either float ]0,1]" + " (fraction of genome) or integer [1,genome_length] (window size)" ) logger.info("Computing GC skew") self._compute_skews() window = property(_get_window, _set_window) def _get_type_window(self): return self._type_window type_window = property(_get_type_window) def _init_sliding_window(self): """slide_window : deque of n first nucleotides (size of window) seq_right : deque of the rest of the genome, with the n-1 nucleotides at the end (circular DNA) """ # split sequence_reference in two : sliding window and seq_right # for circular genome : add begining at the end of genome self._slide_window = deque(self.sequence[0 : self._window]) self._seq_right = deque(self.sequence[self._window : self.__len__()] + self.sequence[0 : (self._window - 1)]) def _init_list_results(self): # init IJ content and IJ skew IJ_content_res = np.empty((1, self.__len__())) IJ_content_res[:] = np.nan IJ_skew_res = np.empty((1, self.__len__())) IJ_skew_res[:] = np.nan return IJ_content_res, IJ_skew_res def _init_cumul_nuc(self): """Cumulative of nucleotide count along genome (init from first window)""" # ATGC (index stored in self._dict_nuc) cumul = np.zeros((4, (self.__len__() + self._window))) for j in range(self._window): nuc = self.sequence[j] if nuc in self._dict_nuc: cumul[self._dict_nuc[nuc]][j] += 1 self._cumul = cumul def _compute_skews(self): ### initialisation = Calculating GC skew and AT skew for first window self._init_sliding_window() GC_content_slide, GC_skew_slide = self._init_list_results() AT_content_slide, AT_skew_slide = self._init_list_results() karlin_content_slide, karlin_skew_slide = self._init_list_results() self._init_cumul_nuc() c = Counter(self._slide_window) dict_counts = {"G": c["G"], "C": c["C"], "A": c["A"], "T": c["T"]} i = 0 # GC sumGC = float(dict_counts["G"] + dict_counts["C"]) GC_content_slide[0][i] = sumGC if sumGC > 0: GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC # AT sumAT = float(dict_counts["A"] + dict_counts["T"]) AT_content_slide[0][i] = sumAT if sumAT > 0: AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT # karlin difference measures excess of purines over pyrimidines as G - C + A - T sumAG = float(dict_counts["A"] + dict_counts["G"]) sumCT = float(dict_counts["C"] + dict_counts["T"]) karlin_content_slide[0][i] = sumAG - sumCT ### Compute for all genome while self._seq_right: out_nuc = self._slide_window.popleft() in_nuc = self._seq_right.popleft() self._slide_window.append(in_nuc) i += 1 if i % 500000 == 0: # pragma: no cover logger.info("%d / %d" % (i, self.__len__())) # if in and out are the same : do nothing, append same result if out_nuc != in_nuc: # remove out from counters if out_nuc in self._dict_nuc: dict_counts[out_nuc] -= 1 if in_nuc in self._dict_nuc: dict_counts[in_nuc] += 1 sumGC = float(dict_counts["G"] + dict_counts["C"]) sumAT = float(dict_counts["A"] + dict_counts["T"]) sumAG = float(dict_counts["A"] + dict_counts["G"]) sumCT = float(dict_counts["C"] + dict_counts["T"]) # fill results # GC GC_content_slide[0][i] = sumGC if sumGC > 0: GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC # AT AT_content_slide[0][i] = sumAT if sumAT > 0: AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT # cumul if in_nuc in self._dict_nuc: self._cumul[self._dict_nuc[in_nuc]][i + self._window - 1] += 1 karlin_content_slide[0][i] = sumAG - sumCT self._GC_content_slide = GC_content_slide / float(self._window) self._AT_content_slide = AT_content_slide / float(self._window) self._karlin_content_slide = karlin_content_slide / float(self._window) self._cumul = np.delete(self._cumul, range(self.__len__(), self._cumul.shape[1]), 1) self._cumul = np.cumsum(self._cumul, axis=1) ### save result for Z curve self._Xn = list( (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["G"]]) - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["T"]]) ) self._Yn = list( (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["C"]]) - (self._cumul[self._dict_nuc["G"]] + self._cumul[self._dict_nuc["T"]]) ) self._Zn = list( (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["T"]]) - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["G"]]) ) self._AT_skew_slide = AT_skew_slide self._GC_skew_slide = GC_skew_slide ### check proportion of ignored nucleotides GC_content_total = (self._cumul[self._dict_nuc["G"]][-1] + self._cumul[self._dict_nuc["C"]][-1]) / float( self.__len__() ) AT_content_total = (self._cumul[self._dict_nuc["A"]][-1] + self._cumul[self._dict_nuc["T"]][-1]) / float( self.__len__() ) self._ignored_nuc = 1.0 - GC_content_total - AT_content_total def _get_AT_skew(self): if self._AT_skew_slide is None: # pragma: no cover raise AttributeError("Please set a valid window to compute skew") else: return self._AT_skew_slide AT_skew = property(_get_AT_skew) def _get_GC_skew(self): if self._GC_skew_slide is None: # pragma: no cover raise AttributeError("Please set a valid window to compute skew") else: return self._GC_skew_slide GC_skew = property(_get_GC_skew) def _create_template_fft(self, M=1000): # pragma: no cover M_3 = int(M / 3) W = [-0.5] * M_3 + list(np.linspace(-0.5, 0.5, M - 2 * M_3)) + [0.5] * M_3 return list(W * np.hanning(M)) def _myfft_gc_skew(self, M): # pragma: no cover """ x : GC_skew vector (list) param N: length of the GC skew vector param M: length of the template param A: amplitude between positive and negative GC skew vector """ x = self._GC_skew_slide[0] N = len(x) template = self._create_template_fft(M) + [0] * (N - M) template /= pylab.norm(template) c = np.fft.fft(x) c = ( abs(np.fft.ifft(np.fft.fft(x) * pylab.conj(np.fft.fft(template))) ** 2) / pylab.norm(x) / pylab.norm(template) ) # shift the SNR vector by the template length so that the peak is at the END of the template c = np.roll(c, M // 2) self._template_fft = template self._c_fft = c * 2.0 / N
[docs] def plot_all_skews(self, figsize=(10, 12), fontsize=16, alpha=0.5): if self._window is None: # pragma: no cover raise AttributeError("Please set a valid window to compute skew") # create figure fig, axarr = pylab.subplots(9, 1, sharex=True, figsize=figsize) main_title = ( "Window size = %d (%.0f %% of genome )\n\ GC content = %.0f %%, AT content = %.0f %%, ignored = %.0f %%" % ( self._window, self._window * 100 / self.__len__(), self.gc_content() * 100, (1 - self.gc_content()) * 100, self._ignored_nuc * 100, ) ) pylab.suptitle(main_title, fontsize=fontsize) # GC skew axarr[0].set_title("GC skew (blue) - Cumulative sum (red)") axarr[0].plot(list(self._GC_skew_slide[0]), "b-", alpha=alpha) axarr[0].set_ylabel("(G -C) / (G + C)") axarr[1].plot(list(np.cumsum(self._GC_skew_slide[0])), "r-", alpha=alpha) axarr[1].set_ylabel("(G -C) / (G + C)") # AT skew axarr[2].set_title("AT skew (blue) - Cumulative sum (red)") axarr[2].plot(list(self._AT_skew_slide[0]), "b-", alpha=alpha) axarr[2].set_ylabel("(A -T) / (A + T)") axarr[3].plot(list(np.cumsum(self._AT_skew_slide[0])), "r-", alpha=alpha) axarr[3].set_ylabel("(A -T) / (A + T)", rotation=0) # Xn axarr[4].set_title("Cumulative RY skew (Purine - Pyrimidine)") axarr[4].plot(self._Xn, "g-", alpha=alpha) axarr[4].set_ylabel("(A + G) - (C + T)") # Yn axarr[5].set_title("Cumulative MK skew (Amino - Keto)") axarr[5].plot(self._Yn, "g-", alpha=alpha) axarr[5].set_ylabel("(A + C) - (G + T)") # Zn axarr[6].set_title("Cumulative H-bond skew (Weak H-bond - Strong H-bond)") axarr[6].plot(self._Zn, "g-", alpha=alpha) axarr[6].set_ylabel("(A + T) - (G + C)") # GC content axarr[7].set_title("GC content") axarr[7].plot(list(self._GC_content_slide[0]), "k-", alpha=alpha) axarr[7].set_ylabel("GC") # AT content axarr[8].set_title("AT content") axarr[8].plot(list(self._AT_content_slide[0]), "k-", alpha=alpha) axarr[8].set_ylabel("AT") # # FFT # axarr[9].set_title("FFT") # axarr[9].plot(list(self._c_fft),'g-',alpha=alpha) # axarr[9].set_ylabel("FFT") fig.tight_layout() fig.subplots_adjust(top=0.88)
def _update_ORF_frame(self, i, nuc, j, frame, d_vars): d_vars["codon"][j] = d_vars["codon"][j] + nuc if len(d_vars["codon"][j]) == 3: # test for start forward (if none already found) is_start = d_vars["codon"][j] in self._codons_start if is_start & np.isnan(d_vars["pos_ATG_f"][j]): d_vars["pos_ATG_f"][j] = i - 2 # test for stop forward is_stop = d_vars["codon"][j] in self._codons_stop if is_stop: d_vars["end_f"][j] = i # test is length of CDS or ORF found is enough to be in output if self._type_filter == "CDS": len_to_filter = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j] # len_CDS else: len_to_filter = d_vars["end_f"][j] - d_vars["begin_f"][j] # len_ORF if len_to_filter > self._threshold: len_ORF = d_vars["end_f"][j] - d_vars["begin_f"][j] len_CDS = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j] self._ORF_pos.append( [ d_vars["begin_f"][j], d_vars["end_f"][j], frame + 1, d_vars["pos_ATG_f"][j], len_ORF, len_CDS, ] ) d_vars["begin_f"][j] = i + 1 d_vars["pos_ATG_f"][j] = np.nan # test for start reverse is_start_rev = d_vars["codon"][j] in self._codons_start_rev if is_start_rev: d_vars["pos_ATG_r"][j] = i # test for stop reverse is_stop_rev = d_vars["codon"][j] in self._codons_stop_rev if is_stop_rev: d_vars["end_r"][j] = i - 3 # test is length of CDS or ORF found is enough to be in output if self._type_filter == "CDS": len_to_filter = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j] # len_CDS else: len_to_filter = d_vars["end_r"][j] - d_vars["begin_r"][j] # len_ORF if len_to_filter > self._threshold: len_ORF = d_vars["end_r"][j] - d_vars["begin_r"][j] len_CDS = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j] self._ORF_pos.append( [ d_vars["begin_r"][j], d_vars["end_r"][j], -(frame + 1), d_vars["pos_ATG_r"][j], len_ORF, len_CDS, ] ) d_vars["begin_r"][j] = i - 3 + 1 d_vars["pos_ATG_r"][j] = np.nan # reset codon d_vars["codon"][j] = "" return d_vars def _find_ORF_CDS(self): """Function for finding ORF and CDS in both strands of DNA""" # init variables d_vars = { "codon": ["", "-", "--"], "begin_f": [0] * 3, "begin_r": [0] * 3, "end_f": [0] * 3, "end_r": [0] * 3, "pos_ATG_f": [np.nan] * 3, "pos_ATG_r": [np.nan] * 3, } print("Finding ORF and CDS") # result list self._ORF_pos = [] if self._threshold is None: self._threshold = 0 if self._type_filter is None: self._type_filter = "ORF" # search ORF and CDS for i, nuc in enumerate(self.sequence): # print(i, nuc) frame = (i + 3 - 2) % 3 # if (i % 200000)==0: # print("%d / %d" %(i, len_genome)) for j in range(3): d_vars = self._update_ORF_frame(i, nuc, j, frame, d_vars) # convert result to dataframe self._ORF_pos = pd.DataFrame(self._ORF_pos) self._ORF_pos.columns = [ "begin_pos", "end_pos", "frame", "pos_ATG", "len_ORF", "len_CDS", ] def _get_ORF_pos(self): if self._ORF_pos is None: self._find_ORF_CDS() return self._ORF_pos ORF_pos = property(_get_ORF_pos) def _get_threshold(self): return self._threshold def _set_threshold(self, value): if value < 0: raise ValueError("Threshold cannot be negative") if (value < self._threshold) | (self._threshold is None): # need to compute again the result self._threshold = value self._find_ORF_CDS() elif value > self._threshold: # do not compute result again : filter existing df self._threshold = value if self._type_filter == "ORF": self._ORF_pos = self._ORF_pos[self._ORF_pos["len_ORF"] > self._threshold] elif self._type_filter == "CDS": self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold] threshold = property(_get_threshold, _set_threshold) def _get_type_filter(self): return self._type_filter def _set_type_filter(self, value): if value not in ["ORF", "CDS"]: raise ValueError("Please select valid type of filter : ORF (default), CDS") if self._ORF_pos is None: self._type_filter = value elif self._type_filter != value: self._type_filter = value if value == "ORF": # need to compute again the result self._find_ORF_CDS() else: # need to filter the result by CDS length self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold] type_filter = property(_get_type_filter, _set_type_filter)
[docs] def hist_ORF_CDS_linearscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"): if self._ORF_pos is None: self._find_ORF_CDS() n_ORF = self._ORF_pos["len_ORF"].dropna().shape[0] n_CDS = self._ORF_pos["len_CDS"].dropna().shape[0] # plot for all ORF and CDS pylab.hist( self._ORF_pos["len_ORF"].dropna(), alpha=alpha, label="ORF, N = " + str(n_ORF), bins=bins, ) pylab.hist( self._ORF_pos["len_CDS"].dropna(), alpha=alpha, label="CDS, N = " + str(n_CDS), bins=bins, ) pylab.xlabel(xlabel) pylab.ylabel(ylabel) pylab.legend() pylab.title("Length of ORF and CDS (after filter %s > %d)" % (self._type_filter, self._threshold))
[docs] def hist_ORF_CDS_logscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"): self.hist_ORF_CDS_linearscale(alpha, bins, xlabel, ylabel) pylab.yscale("log")
[docs] def barplot_count_ORF_CDS_by_frame(self, alpha=0.5, bins=40, xlabel="Frame", ylabel="#", bar_width=0.35): if self._ORF_pos is None: # pragma: no cover self._find_ORF_CDS() # number of ORF and CDS found by frame frames = [-3, -2, -1, 1, 2, 3] nb_res_ORF = [] nb_res_CDS = [] for fr in frames: nb_res_ORF.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_ORF"].dropna().shape[0]) nb_res_CDS.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_CDS"].dropna().shape[0]) pylab.bar( np.array(frames) - (bar_width / 2), nb_res_ORF, bar_width, alpha=alpha, label="ORF N = %d" % sum(nb_res_ORF), ) pylab.bar( np.array(frames) + (bar_width / 2), nb_res_CDS, bar_width, alpha=alpha, label="CDS N = %d" % sum(nb_res_CDS), ) pylab.xlabel(xlabel) pylab.ylabel(ylabel) pylab.legend(loc=1) pylab.title("Number of ORF and CDS by frame")
[docs] def entropy(self, sequence): # https://www.sciencedirect.com/science/article/pii/S0022519397904938?via%3Dihub from pylab import log pi = [sequence.count(l) / float(len(sequence)) for l in "ACGT"] pi = [x for x in pi if x != 0] return -sum(pi * log(pi))
[docs] def get_dna_flexibility(self, window=100, step=1, threshold=13.7): # flexibility angle # Source: Drew and Travers (1984), based on DNase I digestion experiments. # if N is present, a minimum value is used ( 7.2 with GC, 7.6 with AT) flex_angles = { "AA": 7.6, "CA": 10.9, "AC": 14.6, "CC": 7.2, "AG": 8.2, "CG": 8.9, "AT": 25, "CT": 8.2, "GA": 8.8, "TA": 12.5, "GC": 11.1, "TC": 8.8, "GG": 7.2, "TG": 10.9, "GT": 14.6, "TT": 7.6, "CN": 7.2, "NC": 7.2, "GN": 7.2, "NG": 7.2, "NN": 7.2, "AN": 7.6, "NA": 7.6, "TN": 7.6, "NT": 7.6, } N = len(self.sequence) wby2 = int(window / 2) flex = np.zeros(N) from collections import deque slide = deque() # init first chunk for i in range(0, window): slide.append(flex_angles[self.sequence[i : i + 2]]) flex[wby2] = sum(slide) for i in range(wby2, N - wby2): slide.append(flex_angles[self.sequence[i : i + 2]]) slide.popleft() flex[i] = sum(slide) # handles the sides for i in range(0, wby2): flex[i] = flex[wby2] flex[N - i - 1] = flex[N - wby2 - 1] return flex / (window - 1)
[docs] def get_entropy(self, window): step = 1 N = len(self.sequence) wby2 = int(window / 2) entropy = np.zeros(N) import math from collections import Counter def calculate_entropy(counter, size): entropy = 0 for count in counter.values(): if count > 0: p = count / size entropy -= p * math.log2(p) return entropy # init first chunk counter = Counter(self.sequence[0:window]) entropy[wby2] = calculate_entropy(counter, window) for i in range(wby2, N - wby2): outgoing = self.sequence[i - wby2] ingoing = self.sequence[i + wby2] counter[ingoing] += 1 counter[outgoing] -= 1 entropy[i] = calculate_entropy(counter, window) # handles the sides for i in range(0, wby2): entropy[i] = entropy[wby2] entropy[N - i - 1] = entropy[N - wby2 - 1] return entropy
[docs] def get_informational_entropy(self, window=500, poly=3): # Informational Entropy calculated overlapping DNA triplet frequencies within a window. # overlapping triplets smooths the frame effect. IE = -p*log10(p)/log10(2) # rational of log10(2) ? reference ? based on UGENE N = len(self.sequence) wby2 = int(window / 2) entropy = np.zeros(N) import math from collections import defaultdict # Based on def calculate_entropy(counter, size): entropy = 0 log10_2 = 0.301029995 # could use math.log10(2). slower for count in counter.values(): if count > 0: p = count / size entropy -= p * math.log10(p) / log10_2 return entropy # init first chunk counter = defaultdict(int) for i in range(window): counter[self.sequence[i : i + 3]] += 1 entropy[wby2] = calculate_entropy(counter, window) for i in range(wby2, N - wby2): outgoing = self.sequence[i - wby2 : i - wby2 + 3] ingoing = self.sequence[i + wby2 : i + wby2 + 3] counter[ingoing] += 1 counter[outgoing] -= 1 entropy[i] = calculate_entropy(counter, window) # handles the sides for i in range(0, wby2): entropy[i] = entropy[wby2] entropy[N - i - 1] = entropy[N - wby2 - 1] return entropy
[docs] def get_dinucleotide_count(self, window=100): res = [ sum([self.sequence[i : i + window].count(x) for x in ["AA", "CC", "GG", "TT"]]) for i in range(0, len(self.sequence)) ] return res
[docs] def get_trinucleotide_count(self, window=100): res = [ sum([self.sequence[i : i + window].count(x) for x in ["AAA", "CCC", "GGG", "TTT"]]) for i in range(0, len(self.sequence)) ] return res
[docs] def get_homopolymers(self, N, window=100): res = [ sum([self.sequence[i : i + window].count(x) for x in ["A" * N, "C" * N, "G" * N, "T" * N]]) for i in range(0, len(self.sequence)) ] return res
[docs] def get_karlin_signature_difference(self, window=500, dinucleotide_only=False): # Karlin Signature Difference is dinucleotide absolute relative abundance difference # between the whole sequence and a sliding window. # f(XY) = frequency of the dinucleotide XY # f(X) = frequency of the nucleotide X # p(XY) = f(XY) / (f(X) * f(Y)) # p_seq(XY) = p(XY) for the whole sequence # p_win(XY) = p(XY) for a window # final metric is sum(p_seq(XY) - p_win(XY)) / 16 import math from collections import defaultdict N = len(self.sequence) wby2 = int(window / 2) karling_window = np.zeros(N) def calculate_karlin_difference(counter, global_counter, window): pXY = defaultdict(int) deltaXY = 0 for x in "ACGT": for y in "ACGT": if dinucleotide_only and x != y: continue xy = f"{x}{y}" pXY[xy] = ( counter[xy] / (2 * (window - 1)) / max([0.000000001, counter[x] / (2 * window) * counter[y] / (2 * window)]) ) deltaXY += abs(pXY[xy] - global_counter[xy]) return deltaXY / 16.0 counter_window = defaultdict(int) counter_seq = defaultdict(int) # init first chunk both local and global for i in range(window): # nucleotide counter_window[self.sequence[i]] += 1 counter_seq[self.sequence[i]] += 1 # dinucleotide counter_window[self.sequence[i : i + 2]] += 1 counter_seq[self.sequence[i : i + 2]] += 1 # need to compute the global pXY first (on the entire sequence) for i in range(wby2, N - 2): # nucleotide counter_seq[self.sequence[i]] += 1 # dinucleotide update is more complicated so we start from stract counter_seq[self.sequence[i : i + 2]] += 1 # compute final pXY for x in "ACGT": for y in "ACGT": xy = f"{x}{y}" N = len(self.sequence) counter_seq[xy] = ( counter_seq[xy] / (2 * (N - 1)) / max([0.000000001, counter_seq[x] / (2 * N) * counter_seq[y] / (2 * N)]) ) karling_window[wby2] = calculate_karlin_difference(counter_window, counter_seq, window) for i in range(wby2, N - wby2): outgoing = self.sequence[i - wby2] ingoing = self.sequence[i + wby2] # nucleotide update counter_window[ingoing] += 1 counter_window[outgoing] -= 1 # dinucleotide update is more complicated so we start from stract counter_window[self.sequence[i + wby2 - 1 : i + wby2 + 1]] += 1 counter_window[self.sequence[i - wby2 : i - wby2 + 2]] -= 1 karling_window[i] = calculate_karlin_difference(counter_window, counter_seq, window) # handles the sides for i in range(0, wby2): karling_window[i] = karling_window[wby2] karling_window[N - i - 1] = karling_window[N - wby2 - 1] return karling_window
[docs] class RNA(Sequence): """Simple RNA class >>> from sequana.sequence import RNA >>> d = RNA("ACGUUUU") >>> d.reverse_complement() """ def __init__(self, sequence): super(RNA, self).__init__(sequence, complement_in=b"ACGU", complement_out=b"UGCA", letters="ACGUN")
# ``Repeats`` moved to :mod:`sequana.repeats.shustring`. Re-exported here for # backward compatibility (``from sequana.sequence import Repeats``). from sequana.repeats.shustring import Repeats # noqa: F401,E402