Source code for sequana.repeats.shustring

#  This file is part of Sequana software
#
#  Copyright (c) 2018-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Find exact repeats in a genome using the *shustring* tool."""
import subprocess

import colorlog

from sequana.fasta import FastA
from sequana.lazy import pandas as pd
from sequana.lazy import pylab

logger = colorlog.getLogger(__name__)


__all__ = ["Repeats"]


[docs] class Repeats: """Class for finding repeats in DNA or RNA linear sequences. Computation is performed each time the :attr:`threshold` is set to a new value. .. plot:: :include-source: from sequana import sequana_data, Repeats rr = Repeats(sequana_data("measles.fa")) rr.threshold = 4 rr.hist_length_repeats() .. note:: Works with shustring package from Bioconda (April 2017) .. todo:: use a specific sequence (first one by default). Others can be selected by name """ def __init__(self, filename_fasta, merge=False, name=None): """.. rubric:: Constructor Input must be a fasta file with valid DNA or RNA characters :param str filename_fasta: a Fasta file, only the first sequence is used ! :param int threshold: Minimal length of repeat to output :param str name: if name is provided, scan the Fasta file and select the corresponding sequence. if you want to analyse all sequences, you need to use a loop by setting _header for each sequence with the sequence name found in sequence header. .. note:: known problems. Header with a > character (e.g. in the comment) are left strip and only the comments is kept. Another issue is for multi-fasta where one sequence is ignored (last or first ?) """ # used to check everything is fine with the header/name self._fasta = FastA(filename_fasta) # Define the attributes, and set the header if already provided self._threshold = None self._df_shustring = None self._header = None self._length = None self._longest_shustring = None self._begin_end_repeat_position = None self._begin_end_repeat_position_merge = None self._filename_fasta = filename_fasta self._previous_thr = None self._list_len_repeats = None self._contig_names = None if not isinstance(merge, bool): raise TypeError("do_merge must be boolean") self._do_merge = merge if name is not None: self.header = name else: self.header = self._fasta.names[0] def _get_header(self): return self._header def _set_header(self, name): if name not in self._fasta.names: raise ValueError("invalid name. Use one of %s" % self._fasta.names) self._header = name self._df_shustring = None header = property(_get_header, _set_header) def _get_names(self): if self._contig_names is None: self._contig_names = self._fasta.names return self._contig_names names = property(_get_names) def _get_shustrings_length(self): """Return dataframe with shortest unique substring length at each position shortest unique substrings are unique in the sequence and its complement Uses shustring tool""" if self._df_shustring is None: # read fasta task_read = subprocess.Popen(["cat", self._filename_fasta], stdout=subprocess.PIPE) # shustring command # the -l option uses a regular expression task_shus = subprocess.Popen( # ["shustring", "-r", "-q", "-l", ">{}[\s,\n]*?".format(self.header)], ["shustring", "-r", "-q", "-l", r">{}($|\s+)".format(self.header)], stdin=task_read.stdout, stdout=subprocess.PIPE, ) # read stdout line by line and append to list list_df = [] for line in task_shus.stdout: list_df.append(line.decode("utf8").replace("\n", "").split("\t")) # df=pd.concat([df, line]) task_shus.wait() # convert to dataframe df = pd.DataFrame(list_df[2 : len(list_df)]) self._df_shustring = df.astype(int) self._df_shustring.columns = ["position", "repeat_length"] # get input sequence length and longest shustring in the first line self._length = int(list_df[0][1]) self._longest_shustring = int(list_df[0][3].split("<=")[2]) return self._df_shustring df_shustring = property(_get_shustrings_length) def _get_genome_length(self): if self._df_shustring is None: self._get_shustrings_length() return self._length length = property(_get_genome_length) def _get_longest_shustring(self): if self._df_shustring is None: self._get_shustrings_length() return self._longest_shustring longest_shustring = property(_get_longest_shustring) def _find_begin_end_repeats(self, force=False): """Returns position of repeats longer than threshold as an ordered list """ if self.df_shustring is None: self._get_shustrings_length() if self._threshold is None: # print("No threshold : please set minimul length of repeats to output") raise ValueError("threshold : please set threshold (minimum length of repeats to output)") # if there is no result yet, or the threshold has changed if (self._begin_end_repeat_position is None) | (self.threshold != self._previous_thr) | force: nb_row = self.df_shustring.shape[0] i = 0 step_repeat_seq = [] be_repeats = [] e = 0 # use list because faster list_len_shus = list(self.df_shustring.loc[:, "repeat_length"]) while i < nb_row: # begining of repeat if list_len_shus[i] > self.threshold: b = i # compute new end of repeat len_repeat = list_len_shus[i] e = b + len_repeat # save (b,e) be_repeats.append((b, e)) # update i i = e - 1 i += 1 self._begin_end_repeat_position = be_repeats self._get_merge_repeats() def _get_be_repeats(self): self._find_begin_end_repeats() return self._begin_end_repeat_position begin_end_repeat_position = property(_get_be_repeats) def _set_threshold(self, value): if value < 0: raise ValueError("Threshold must be positive integer") if value != self._threshold: self._previous_thr = self._threshold self._threshold = value self._find_begin_end_repeats() self._list_len_repeats = [tup[1] - tup[0] for tup in self._begin_end_repeat_position] def _get_threshold(self): return self._threshold threshold = property(_get_threshold, _set_threshold) def _get_list_len_repeats(self): if self._list_len_repeats is None: raise UserWarning("Please set threshold (minimum length of repeats to output)") return self._list_len_repeats list_len_repeats = property(_get_list_len_repeats) def _get_merge_repeats(self): if self._do_merge: # if there are repeats, merge repeats that are fragmented if len(self._begin_end_repeat_position) > 0: prev_tup = self._begin_end_repeat_position[0] b = prev_tup[0] begin_end_repeat_position_merge = [] for i in range(1, len(self._begin_end_repeat_position)): tup = self._begin_end_repeat_position[i] if tup[0] == prev_tup[1]: # concat e = tup[1] prev_tup = tup if i == (len(self._begin_end_repeat_position) - 1): # last tup : append to result begin_end_repeat_position_merge.append((b, e)) else: # real end of repeat : append result and update b, e e = prev_tup[1] begin_end_repeat_position_merge.append((b, e)) prev_tup = tup b = prev_tup[0] if i == (len(self._begin_end_repeat_position) - 1): # last tup : append to result begin_end_repeat_position_merge.append(tup) self._begin_end_repeat_position = begin_end_repeat_position_merge def _get_do_merge(self): return self._do_merge def _set_do_merge(self, do_merge): if not isinstance(do_merge, bool): raise TypeError("do_merge must be boolean") # if different if do_merge != self._do_merge: self._do_merge = do_merge if self._do_merge: # did not merge before, merge now if self._begin_end_repeat_position is None: self._find_begin_end_repeats() else: # data is already merged : need to compute again to un-merge self._find_begin_end_repeats(force=True) do_merge = property(_get_do_merge, _set_do_merge)
[docs] def hist_length_repeats( self, bins=20, alpha=0.5, hold=False, fontsize=12, grid=True, title="Repeat length", xlabel="Repeat length", ylabel="#", logy=True, ): """Plots histogram of the repeat lengths""" # check that user has set a threshold if hold is False: pylab.clf() pylab.hist(self.list_len_repeats, alpha=alpha, bins=bins) pylab.title(title) pylab.xlabel(xlabel, fontsize=fontsize) pylab.ylabel(ylabel, fontsize=fontsize) if grid is True: pylab.grid(True) if logy: pylab.semilogy()
[docs] def plot(self, clf=True, fontsize=12): if clf: pylab.clf() pylab.grid(True) pylab.plot(self.df_shustring.repeat_length) pylab.xlabel("Position (bp)", fontsize=fontsize) pylab.ylabel("Repeat lengths (bp)", fontsize=fontsize) pylab.ylim(bottom=0)
[docs] def to_wig(self, filename, step=1000): """export repeats into WIG format to import in IGV""" assert self.threshold and self.df_shustring is not None N = len(self.df_shustring) with open(filename, "w") as fout: fout.write( f'track type=wiggle_0 name="Repeat Density" visibility=full fixedStep chrom=1 start=0 step={step} span={step}\n' ) # min_period = 1 to prevent NAN at first position rolling_max = self.df_shustring.rolling(step, step=step).max() for _, row in rolling_max.iterrows(): if row.name == 0: continue M = row.repeat_length start = row.name - step stop = row.name fout.write(f"{self.header}\t{start}\t{stop}\t{row.repeat_length}\n")
[docs] def get_peak_position_and_length(self, THRESHOLD=3000): df = self.df_shustring.copy() # Filter for repeats above threshold df = df[df["repeat_length"] > THRESHOLD].copy() # Sort by genomic position df = df.sort_values(by="position").reset_index(drop=True) # Identify local maxima: where the value is greater than the left and right neighbors df["prev"] = df["repeat_length"].shift(1, fill_value=0) df["next"] = df["repeat_length"].shift(-1, fill_value=0) # Keep only local maxima peaks = df[(df["repeat_length"] > df["prev"]) & (df["repeat_length"] > df["next"])] # Select only relevant columns final_df = peaks[["position", "repeat_length"]] # Print results print(f"Final number of peaks after filtering: {len(final_df)}") return final_df