Source code for sequana.sequence

#  This file is part of Sequana software
#
#  Copyright (c) 2018-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
import re
import string
import subprocess
from collections import Counter, deque

import colorlog

from sequana.fasta import FastA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab

logger = colorlog.getLogger(__name__)


__all__ = ["DNA", "RNA", "Repeats", "Sequence"]


[docs]class Sequence(object): """Abstract base classe for other specialised sequences such as DNA. Sequenced is the base class for other classes such as :class:`DNA` and :class:`RNA`. .. doctest:: from sequana import Sequence s = Sequence("ACGT") s.stats() s.get_complement() .. note:: You may use a Fasta file as input (see constructor) """ def __init__(self, sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGT"): """.. rubric:: Constructor A sequence is just a string stored in the :attr:`sequence` attribute. It has properties related to the type of alphabet authorised. :param str sequence: May be a string of a Fasta File, in which case only the first sequence is used. :param complement_in: :param complement_out: :param letters: authorise letters. Used in :meth:`check` only. .. todo:: use counter only once as a property """ if sequence.endswith(".fa") or sequence.endswith(".fasta"): self.fasta = FastA(sequence) sequence = self.fasta.next().sequence.upper() else: # assume correct string sequence pass self._data = sequence try: self._translate = string.maketrans(complement_in, complement_out) except: self._translate = bytes.maketrans(complement_in, complement_out) self._letters = letters def __iter__(self): return self def __next__(self): self._data = self.fasta.next().sequence.upper() return self._data def _get_sequence(self): return self._data sequence = property(_get_sequence)
[docs] def get_complement(self): """Return complement""" return self._data.translate(self._translate)
[docs] def get_reverse_complement(self): """Return reverse complement""" return self.get_complement()[::-1]
[docs] def get_reverse(self): """Return reverse sequence""" return self._data[::-1]
[docs] def complement(self): """Alias to :meth:`get_complement`""" self._data = self.get_complement()
[docs] def reverse(self): """Alias to :meth:`get_reverse`""" self._data = self.get_reverse()
[docs] def reverse_complement(self): """Alias to get_reverse_complement""" self._data = self.get_reverse_complement()
[docs] def check(self): """Check that all letters are valid""" counter = Counter(self._data).keys() for key in counter: if key not in self._letters: raise ValueError("Found unexpected letter in the sequence (%s)" % key)
def __len__(self): return len(self._data)
[docs] def gc_content(self): """Return mean GC content""" c = Counter(self._data) ratio = (c["G"] + c["C"]) / len(self.sequence) return ratio
[docs] def stats(self): """Return basic stats about the number of letters""" from collections import Counter return Counter(self.sequence)
[docs] def get_occurences(self, pattern, overlap=False): """Return position of the input pattern in the sequence :: >>> from sequana import Sequence >>> s = Sequence('ACGTTTTACGT') >>> s.get_occurences("ACGT") [0, 7] """ if overlap is False: res = [m.start() for m in re.finditer(pattern, self.sequence)] elif overlap is True: res = [m.start() for m in re.finditer("(?=%s)" % pattern, self.sequence)] return res
# reverse find-all without overlaps, you can combine positive and # negative lookahead into an expression like this: # res = [m.start() for m in re.finditer('(?=%s)(?!.{1,%d}%s)' % (search, # len(pattern)-1, pattern), 'ttt')]
[docs]class DNA(Sequence): """Simple DNA class :: >>> from sequana.sequence import DNA >>> d = DNA("ACGTTTT") >>> d.reverse_complement() Some long computations are done when setting the window size:: d.window = 100 The ORF detection has been validated agains a plasmodium 3D7 ORF file found on plasmodb.org across the 14 chromosomes. """ def __init__( self, sequence, codons_stop=["TAA", "TGA", "TAG"], codons_stop_rev=["TTA", "TCA", "CTA"], codons_start=["ATG"], codons_start_rev=["CAT"], ): super(DNA, self).__init__(sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGTN") self._window = None self._type_window = None self._slide_window = None self._seq_right = None self._dict_nuc = dict_nuc = {"A": 0, "T": 1, "G": 2, "C": 3} self._cumul = None self._Xn = None self._Yn = None self._Zn = None self._ignored_nuc = None self._AT_skew_slide = None self._GC_skew_slide = None self._GC_content_slide = None self._AT_content_slide = None self._template_fft = None self._c_fft = None self._codons_stop = codons_stop self._codons_stop_rev = codons_stop_rev self._codons_start = codons_start self._codons_start_rev = codons_start_rev self._ORF_pos = None self._threshold = None self._type_filter = None def _get_window(self): return self._window def _set_window(self, window): if (window > 0) & (window < 1): self._type_window = "adapted to genome length : %.1f %% of total length" % (window * 100) self._window = int(round(self.__len__() * window)) elif (window >= 1) & (window <= self.__len__()): self._type_window = "fixed window length : %d" % (window) self._window = int(window) else: raise ValueError( "Incorrect value for window: choose either float ]0,1]" + " (fraction of genome) or integer [1,genome_length] (window size)" ) logger.info("Computing GC skew") self._compute_skews() window = property(_get_window, _set_window) def _get_type_window(self): return self._type_window type_window = property(_get_type_window) def _init_sliding_window(self): """slide_window : deque of n first nucleotides (size of window) seq_right : deque of the rest of the genome, with the n-1 nucleotides at the end (circular DNA) """ # split sequence_reference in two : sliding window and seq_right # for circular genome : add begining at the end of genome self._slide_window = deque(self.sequence[0 : self._window]) self._seq_right = deque(self.sequence[self._window : self.__len__()] + self.sequence[0 : (self._window - 1)]) def _init_list_results(self): # init IJ content and IJ skew IJ_content_res = np.empty((1, self.__len__())) IJ_content_res[:] = np.NAN IJ_skew_res = np.empty((1, self.__len__())) IJ_skew_res[:] = np.NAN return IJ_content_res, IJ_skew_res def _init_cumul_nuc(self): """Cumulative of nucleotide count along genome (init from first window)""" # ATGC (index stored in self._dict_nuc) cumul = np.zeros((4, (self.__len__() + self._window))) for j in range(self._window): nuc = self.sequence[j] if nuc in self._dict_nuc: cumul[self._dict_nuc[nuc]][j] += 1 self._cumul = cumul def _compute_skews(self): ### initialisation = Calculating GC skew and AT skew for first window self._init_sliding_window() GC_content_slide, GC_skew_slide = self._init_list_results() AT_content_slide, AT_skew_slide = self._init_list_results() self._init_cumul_nuc() c = Counter(self._slide_window) dict_counts = {"G": c["G"], "C": c["C"], "A": c["A"], "T": c["T"]} i = 0 # GC sumGC = float(dict_counts["G"] + dict_counts["C"]) GC_content_slide[0][i] = sumGC if sumGC > 0: GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC # AT sumAT = float(dict_counts["A"] + dict_counts["T"]) AT_content_slide[0][i] = sumAT if sumAT > 0: AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT ### Compute for all genome while self._seq_right: out_nuc = self._slide_window.popleft() in_nuc = self._seq_right.popleft() self._slide_window.append(in_nuc) i += 1 if i % 500000 == 0: # pragma: no cover logger.info("%d / %d" % (i, self.__len__())) # if in and out are the same : do nothing, append same result if out_nuc != in_nuc: # remove out from counters if out_nuc in self._dict_nuc: dict_counts[out_nuc] -= 1 if in_nuc in self._dict_nuc: dict_counts[in_nuc] += 1 sumGC = float(dict_counts["G"] + dict_counts["C"]) sumAT = float(dict_counts["A"] + dict_counts["T"]) # fill results # GC GC_content_slide[0][i] = sumGC if sumGC > 0: GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC # AT AT_content_slide[0][i] = sumAT if sumAT > 0: AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT # cumul if in_nuc in self._dict_nuc: self._cumul[self._dict_nuc[in_nuc]][i + self._window - 1] += 1 self._GC_content_slide = GC_content_slide / float(self._window) self._AT_content_slide = AT_content_slide / float(self._window) self._cumul = np.delete(self._cumul, range(self.__len__(), self._cumul.shape[1]), 1) self._cumul = np.cumsum(self._cumul, axis=1) ### save result for Z curve self._Xn = list( (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["G"]]) - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["T"]]) ) self._Yn = list( (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["C"]]) - (self._cumul[self._dict_nuc["G"]] + self._cumul[self._dict_nuc["T"]]) ) self._Zn = list( (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["T"]]) - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["G"]]) ) self._AT_skew_slide = AT_skew_slide self._GC_skew_slide = GC_skew_slide ### check proportion of ignored nucleotides GC_content_total = (self._cumul[self._dict_nuc["G"]][-1] + self._cumul[self._dict_nuc["C"]][-1]) / float( self.__len__() ) AT_content_total = (self._cumul[self._dict_nuc["A"]][-1] + self._cumul[self._dict_nuc["T"]][-1]) / float( self.__len__() ) self._ignored_nuc = 1.0 - GC_content_total - AT_content_total def _get_AT_skew(self): if self._AT_skew_slide is None: # pragma: no cover raise AttributeError("Please set a valid window to compute skew") else: return self._AT_skew_slide AT_skew = property(_get_AT_skew) def _get_GC_skew(self): if self._GC_skew_slide is None: # pragma: no cover raise AttributeError("Please set a valid window to compute skew") else: return self._GC_skew_slide GC_skew = property(_get_GC_skew) def _create_template_fft(self, M=1000): # pragma: no cover M_3 = int(M / 3) W = [-0.5] * M_3 + list(np.linspace(-0.5, 0.5, M - 2 * M_3)) + [0.5] * M_3 return list(W * np.hanning(M)) def _myfft_gc_skew(self, M): # pragma: no cover """ x : GC_skew vector (list) param N: length of the GC skew vector param M: length of the template param A: amplitude between positive and negative GC skew vector """ x = self._GC_skew_slide[0] N = len(x) template = self._create_template_fft(M) + [0] * (N - M) template /= pylab.norm(template) c = np.fft.fft(x) c = ( abs(np.fft.ifft(np.fft.fft(x) * pylab.conj(np.fft.fft(template))) ** 2) / pylab.norm(x) / pylab.norm(template) ) # shift the SNR vector by the template length so that the peak is at the END of the template c = np.roll(c, M // 2) self._template_fft = template self._c_fft = c * 2.0 / N
[docs] def plot_all_skews(self, figsize=(10, 12), fontsize=16, alpha=0.5): if self._window is None: # pragma: no cover raise AttributeError("Please set a valid window to compute skew") # create figure fig, axarr = pylab.subplots(9, 1, sharex=True, figsize=figsize) main_title = ( "Window size = %d (%.0f %% of genome )\n\ GC content = %.0f %%, AT content = %.0f %%, ignored = %.0f %%" % ( self._window, self._window * 100 / self.__len__(), self.gc_content() * 100, (1 - self.gc_content()) * 100, self._ignored_nuc * 100, ) ) pylab.suptitle(main_title, fontsize=fontsize) # GC skew axarr[0].set_title("GC skew (blue) - Cumulative sum (red)") axarr[0].plot(list(self._GC_skew_slide[0]), "b-", alpha=alpha) axarr[0].set_ylabel("(G -C) / (G + C)") axarr[1].plot(list(np.cumsum(self._GC_skew_slide[0])), "r-", alpha=alpha) axarr[1].set_ylabel("(G -C) / (G + C)") # AT skew axarr[2].set_title("AT skew (blue) - Cumulative sum (red)") axarr[2].plot(list(self._AT_skew_slide[0]), "b-", alpha=alpha) axarr[2].set_ylabel("(A -T) / (A + T)") axarr[3].plot(list(np.cumsum(self._AT_skew_slide[0])), "r-", alpha=alpha) axarr[3].set_ylabel("(A -T) / (A + T)", rotation=0) # Xn axarr[4].set_title("Cumulative RY skew (Purine - Pyrimidine)") axarr[4].plot(self._Xn, "g-", alpha=alpha) axarr[4].set_ylabel("(A + G) - (C + T)") # Yn axarr[5].set_title("Cumulative MK skew (Amino - Keto)") axarr[5].plot(self._Yn, "g-", alpha=alpha) axarr[5].set_ylabel("(A + C) - (G + T)") # Zn axarr[6].set_title("Cumulative H-bond skew (Weak H-bond - Strong H-bond)") axarr[6].plot(self._Zn, "g-", alpha=alpha) axarr[6].set_ylabel("(A + T) - (G + C)") # GC content axarr[7].set_title("GC content") axarr[7].plot(list(self._GC_content_slide[0]), "k-", alpha=alpha) axarr[7].set_ylabel("GC") # AT content axarr[8].set_title("AT content") axarr[8].plot(list(self._AT_content_slide[0]), "k-", alpha=alpha) axarr[8].set_ylabel("AT") # # FFT # axarr[9].set_title("FFT") # axarr[9].plot(list(self._c_fft),'g-',alpha=alpha) # axarr[9].set_ylabel("FFT") fig.tight_layout() fig.subplots_adjust(top=0.88)
def _update_ORF_frame(self, i, nuc, j, frame, d_vars): d_vars["codon"][j] = d_vars["codon"][j] + nuc if len(d_vars["codon"][j]) == 3: # test for start forward (if none already found) is_start = d_vars["codon"][j] in self._codons_start if is_start & np.isnan(d_vars["pos_ATG_f"][j]): d_vars["pos_ATG_f"][j] = i - 2 # test for stop forward is_stop = d_vars["codon"][j] in self._codons_stop if is_stop: d_vars["end_f"][j] = i # test is length of CDS or ORF found is enough to be in output if self._type_filter == "CDS": len_to_filter = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j] # len_CDS else: len_to_filter = d_vars["end_f"][j] - d_vars["begin_f"][j] # len_ORF if len_to_filter > self._threshold: len_ORF = d_vars["end_f"][j] - d_vars["begin_f"][j] len_CDS = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j] self._ORF_pos.append( [ d_vars["begin_f"][j], d_vars["end_f"][j], frame + 1, d_vars["pos_ATG_f"][j], len_ORF, len_CDS, ] ) d_vars["begin_f"][j] = i + 1 d_vars["pos_ATG_f"][j] = np.nan # test for start reverse is_start_rev = d_vars["codon"][j] in self._codons_start_rev if is_start_rev: d_vars["pos_ATG_r"][j] = i # test for stop reverse is_stop_rev = d_vars["codon"][j] in self._codons_stop_rev if is_stop_rev: d_vars["end_r"][j] = i - 3 # test is length of CDS or ORF found is enough to be in output if self._type_filter == "CDS": len_to_filter = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j] # len_CDS else: len_to_filter = d_vars["end_r"][j] - d_vars["begin_r"][j] # len_ORF if len_to_filter > self._threshold: len_ORF = d_vars["end_r"][j] - d_vars["begin_r"][j] len_CDS = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j] self._ORF_pos.append( [ d_vars["begin_r"][j], d_vars["end_r"][j], -(frame + 1), d_vars["pos_ATG_r"][j], len_ORF, len_CDS, ] ) d_vars["begin_r"][j] = i - 3 + 1 d_vars["pos_ATG_r"][j] = np.nan # reset codon d_vars["codon"][j] = "" return d_vars def _find_ORF_CDS(self): """Function for finding ORF and CDS in both strands of DNA""" # init variables d_vars = { "codon": ["", "-", "--"], "begin_f": [0] * 3, "begin_r": [0] * 3, "end_f": [0] * 3, "end_r": [0] * 3, "pos_ATG_f": [np.nan] * 3, "pos_ATG_r": [np.nan] * 3, } print("Finding ORF and CDS") # result list self._ORF_pos = [] if self._threshold is None: self._threshold = 0 if self._type_filter is None: self._type_filter = "ORF" # search ORF and CDS for i, nuc in enumerate(self.sequence): # print(i, nuc) frame = (i + 3 - 2) % 3 # if (i % 200000)==0: # print("%d / %d" %(i, len_genome)) for j in range(3): d_vars = self._update_ORF_frame(i, nuc, j, frame, d_vars) # convert result to dataframe self._ORF_pos = pd.DataFrame(self._ORF_pos) self._ORF_pos.columns = [ "begin_pos", "end_pos", "frame", "pos_ATG", "len_ORF", "len_CDS", ] def _get_ORF_pos(self): if self._ORF_pos is None: self._find_ORF_CDS() return self._ORF_pos ORF_pos = property(_get_ORF_pos) def _get_threshold(self): return self._threshold def _set_threshold(self, value): if value < 0: raise ValueError("Threshold cannot be negative") if (value < self._threshold) | (self._threshold is None): # need to compute again the result self._threshold = value self._find_ORF_CDS() elif value > self._threshold: # do not compute result again : filter existing df self._threshold = value if self._type_filter == "ORF": self._ORF_pos = self._ORF_pos[self._ORF_pos["len_ORF"] > self._threshold] elif self._type_filter == "CDS": self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold] threshold = property(_get_threshold, _set_threshold) def _get_type_filter(self): return self._type_filter def _set_type_filter(self, value): if value not in ["ORF", "CDS"]: raise ValueError("Please select valid type of filter : ORF (default), CDS") if self._ORF_pos is None: self._type_filter = value elif self._type_filter != value: self._type_filter = value if value == "ORF": # need to compute again the result self._find_ORF_CDS() else: # need to filter the result by CDS length self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold] type_filter = property(_get_type_filter, _set_type_filter)
[docs] def hist_ORF_CDS_linearscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"): if self._ORF_pos is None: self._find_ORF_CDS() n_ORF = self._ORF_pos["len_ORF"].dropna().shape[0] n_CDS = self._ORF_pos["len_CDS"].dropna().shape[0] # plot for all ORF and CDS pylab.hist( self._ORF_pos["len_ORF"].dropna(), alpha=alpha, label="ORF, N = " + str(n_ORF), bins=bins, ) pylab.hist( self._ORF_pos["len_CDS"].dropna(), alpha=alpha, label="CDS, N = " + str(n_CDS), bins=bins, ) pylab.xlabel(xlabel) pylab.ylabel(ylabel) pylab.legend() pylab.title("Length of ORF and CDS (after filter %s > %d)" % (self._type_filter, self._threshold))
[docs] def hist_ORF_CDS_logscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"): self.hist_ORF_CDS_linearscale(alpha, bins, xlabel, ylabel) pylab.yscale("log")
[docs] def barplot_count_ORF_CDS_by_frame(self, alpha=0.5, bins=40, xlabel="Frame", ylabel="#", bar_width=0.35): if self._ORF_pos is None: self._find_ORF_CDS() # number of ORF and CDS found by frame frames = [-3, -2, -1, 1, 2, 3] nb_res_ORF = [] nb_res_CDS = [] for fr in frames: nb_res_ORF.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_ORF"].dropna().shape[0]) nb_res_CDS.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_CDS"].dropna().shape[0]) pylab.bar( np.array(frames) - (bar_width / 2), nb_res_ORF, bar_width, alpha=alpha, label="ORF N = %d" % sum(nb_res_ORF), ) pylab.bar( np.array(frames) + (bar_width / 2), nb_res_CDS, bar_width, alpha=alpha, label="CDS N = %d" % sum(nb_res_CDS), ) pylab.xlabel(xlabel) pylab.ylabel(ylabel) pylab.legend(loc=1) pylab.title("Number of ORF and CDS by frame")
[docs] def entropy(self, sequence): # https://www.sciencedirect.com/science/article/pii/S0022519397904938?via%3Dihub pi = [sequence.count(l) / float(len(sequence)) for l in "ACGT"] pi = [x for x in pi if x != 0] return -sum(pi * log(pi))
[docs]class RNA(Sequence): """Simple RNA class >>> from sequana.sequence import RNA >>> d = RNA("ACGUUUU") >>> d.reverse_complement() """ def __init__(self, sequence): super(RNA, self).__init__(sequence, complement_in=b"ACGU", complement_out=b"UGCA", letters="ACGUN")
[docs]class Repeats(object): """Class for finding repeats in DNA or RNA linear sequences. Computation is performed each time the :attr:`threshold` is set to a new value. .. plot:: :include-source: from sequana import sequana_data, Repeats rr = Repeats(sequana_data("measles.fa")) rr.threshold = 4 rr.hist_length_repeats() .. note:: Works with shustring package from Bioconda (April 2017) .. todo:: use a specific sequence (first one by default). Others can be selected by name """ def __init__(self, filename_fasta, merge=False, name=None): """.. rubric:: Constructor Input must be a fasta file with valid DNA or RNA characters :param str filename_fasta: a Fasta file, only the first sequence is used ! :param int threshold: Minimal length of repeat to output :param str name: if name is provided, scan the Fasta file and select the corresponding sequence. if you want to analyse all sequences, you need to use a loop by setting _header for each sequence with the sequence name found in sequence header. .. note:: known problems. Header with a > character (e.g. in the comment) are left strip and only the comments is kept. Another issue is for multi-fasta where one sequence is ignored (last or first ?) """ # used to check everything is fine with the header/name self._fasta = FastA(filename_fasta) # Define the attributes, and set the header if already provided self._threshold = None self._df_shustring = None self._header = None self._length = None self._longest_shustring = None self._begin_end_repeat_position = None self._begin_end_repeat_position_merge = None self._filename_fasta = filename_fasta self._previous_thr = None self._list_len_repeats = None self._contig_names = None if not isinstance(merge, bool): raise TypeError("do_merge must be boolean") self._do_merge = merge if name is not None: self.header = name else: self.header = self._fasta.names[0] def _get_header(self): """get first line of fasta (needed in input shustring) and replace spaces by underscores """ """if self._header is None: # -n is to specify the DNA nature of the sequence first_line = subprocess.check_output(["head", "-n", "1", self._filename_fasta]) first_line = first_line.decode('utf8') first_line = first_line.replace("\n","").replace(" ","_") self._header = first_line""" return self._header def _set_header(self, name): if name not in self._fasta.names: raise ValueError("invalid name. Use one of %s" % self._fasta.names) self._header = name self._df_shustring = None header = property(_get_header, _set_header) def _get_names(self): if self._contig_names is None: self._contig_names = self._fasta.names return self._contig_names names = property(_get_names) def _get_shustrings_length(self): """Return dataframe with shortest unique substring length at each position shortest unique substrings are unique in the sequence and its complement Uses shustring tool""" if self._df_shustring is None: # read fasta task_read = subprocess.Popen(["cat", self._filename_fasta], stdout=subprocess.PIPE) # replace spaces of fasta task_replace_spaces = subprocess.Popen(["sed", "s/ /_/g"], stdin=task_read.stdout, stdout=subprocess.PIPE) # shustring command # the -l option uses a regular expression # if 2 identifier such as >1 and >11 then using regular expression # >1 means both match. Therefore, we use a $ in the next comand line # In Repeats, we set the chrom name therefore, we should add this $ # character task_shus = subprocess.Popen( ["shustring", "-r", "-q", "-l", ">{}$".format(self.header)], stdin=task_replace_spaces.stdout, stdout=subprocess.PIPE, ) # read stdout line by line and append to list list_df = [] for line in task_shus.stdout: list_df.append(line.decode("utf8").replace("\n", "").split("\t")) # df=pd.concat([df, line]) task_shus.wait() # convert to dataframe df = pd.DataFrame(list_df[2 : len(list_df)]) self._df_shustring = df.astype(int) self._df_shustring.columns = ["position", "shustring_length"] # get input sequence length and longest shustring in the first line self._length = int(list_df[0][1]) self._longest_shustring = int(list_df[0][3].split("<=")[2]) return self._df_shustring df_shustring = property(_get_shustrings_length) def _get_genome_length(self): if self._df_shustring is None: self._get_shustrings_length() return self._length length = property(_get_genome_length) def _get_longest_shustring(self): if self._df_shustring is None: self._get_shustrings_length() return self._longest_shustring longest_shustring = property(_get_longest_shustring) def _find_begin_end_repeats(self, force=False): """Returns position of repeats longer than threshold as an ordered list """ if self.df_shustring is None: self._get_shustrings_length() if self._threshold is None: # print("No threshold : please set minimul length of repeats to output") raise ValueError("threshold : please set threshold (minimum length of repeats to output)") # if there is no result yet, or the threshold has changed if (self._begin_end_repeat_position is None) | (self.threshold != self._previous_thr) | force: nb_row = self.df_shustring.shape[0] i = 0 step_repeat_seq = [] be_repeats = [] e = 0 # use list because faster list_len_shus = list(self.df_shustring.loc[:, "shustring_length"]) while i < nb_row: # begining of repeat if list_len_shus[i] > self.threshold: b = i # compute new end of repeat len_repeat = list_len_shus[i] e = b + len_repeat # save (b,e) be_repeats.append((b, e)) # update i i = e - 1 i += 1 self._begin_end_repeat_position = be_repeats self._get_merge_repeats() def _get_be_repeats(self): self._find_begin_end_repeats() return self._begin_end_repeat_position begin_end_repeat_position = property(_get_be_repeats) def _set_threshold(self, value): if value < 0: raise ValueError("Threshold must be positive integer") if value != self._threshold: self._previous_thr = self._threshold self._threshold = value self._find_begin_end_repeats() self._list_len_repeats = [tup[1] - tup[0] for tup in self._begin_end_repeat_position] def _get_threshold(self): return self._threshold threshold = property(_get_threshold, _set_threshold) def _get_list_len_repeats(self): if self._list_len_repeats is None: raise UserWarning("Please set threshold (minimum length of repeats to output)") return self._list_len_repeats list_len_repeats = property(_get_list_len_repeats) def _get_merge_repeats(self): if self._do_merge: # if there are repeats, merge repeats that are fragmented if len(self._begin_end_repeat_position) > 0: prev_tup = self._begin_end_repeat_position[0] b = prev_tup[0] begin_end_repeat_position_merge = [] for i in range(1, len(self._begin_end_repeat_position)): tup = self._begin_end_repeat_position[i] if tup[0] == prev_tup[1]: # concat e = tup[1] prev_tup = tup if i == (len(self._begin_end_repeat_position) - 1): # last tup : append to result begin_end_repeat_position_merge.append((b, e)) else: # real end of repeat : append result and update b, e e = prev_tup[1] begin_end_repeat_position_merge.append((b, e)) prev_tup = tup b = prev_tup[0] if i == (len(self._begin_end_repeat_position) - 1): # last tup : append to result begin_end_repeat_position_merge.append(tup) self._begin_end_repeat_position = begin_end_repeat_position_merge def _get_do_merge(self): return self._do_merge def _set_do_merge(self, do_merge): if not isinstance(do_merge, bool): raise TypeError("do_merge must be boolean") # if different if do_merge != self._do_merge: self._do_merge = do_merge if self._do_merge: # did not merge before, merge now if self._begin_end_repeat_position is None: self._find_begin_end_repeats() else: # data is already merged : need to compute again to un-merge self._find_begin_end_repeats(force=True) do_merge = property(_get_do_merge, _set_do_merge)
[docs] def hist_length_repeats( self, bins=20, alpha=0.5, hold=False, fontsize=12, grid=True, title="Repeat length", xlabel="Repeat length", ylabel="#", logy=True, ): """Plots histogram of the repeat lengths""" # check that user has set a threshold if hold is False: pylab.clf() pylab.hist(self.list_len_repeats, alpha=alpha, bins=bins) pylab.title(title) pylab.xlabel(xlabel, fontsize=fontsize) pylab.ylabel(ylabel, fontsize=fontsize) if grid is True: pylab.grid(True) if logy: pylab.semilogy()
[docs] def plot(self, clf=True): if clf: pylab.clf() M = self.df_shustring.shustring_length.max() print(M) M = int(M / 1000) + 1 for i in range(M): pylab.axhline(i * 1000, ls="--", color="grey") pylab.plot(self.df_shustring.shustring_length) pylab.xlabel("position (bp)") pylab.ylabel("Length of repeats") pylab.ylim(bottom=0)