Source code for sequana.telomere

from collections import Counter, defaultdict

from tqdm import tqdm

from sequana import FastA, FastQ, logger
from sequana.kmer import get_kmer
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab, pysam, scipy
from sequana.tools import reverse_complement

# Thresholds used in find_LHS/RHS_telomere gap warnings
_GAP_THRESHOLD = 1000


[docs] def factorize_sequences(sequences): def generate_circular_shifts(sequence): length = len(sequence) return {sequence[i:] + sequence[:i] for i in range(length)} # Dictionary to store factorized groups groups = defaultdict(list) for sequence in sequences: # Generate all circular shifts for the current sequence shifts = generate_circular_shifts(sequence) # Find if any shift is already in groups found = False for rep in groups: if rep in shifts: groups[rep].append(sequence) found = True break # If no match, add as a new representative if not found: groups[sequence].append(sequence) # Convert to a dictionary with representative -> full group return {rep: members for rep, members in groups.items()}
[docs] def circular_shifts(sequence): """Return all circular shifts of *sequence* as an ordered list. For example, ``circular_shifts("AACCCT")`` returns all 6 rotations of the string. Useful for kmer-based analyses where all phases of a repeat unit must be considered. """ length = len(sequence) return [sequence[i:] + sequence[:i] for i in range(length)]
[docs] class Telomere: """Scan a FASTA file and identify the extent of telomeric regions. **Basic usage**:: from sequana.telomere import Telomere telo = Telomere("ref.fa") df = telo.run(tag="myrun") ``run()`` processes every contig, writes per-contig PNG plots and a CSV summary, and returns a DataFrame with one row per contig. **Identifying the right k-mers** By default the scanner uses all six rotations of the canonical vertebrate telomere repeat ``AACCCT`` plus additional 6-mers that improve signal-to-noise. To discover organism-specific k-mers:: contig = telo.fasta.sequences[0] candidates = telo.find_representative_kmers(contig, kmers=6) telo.kmers = [k for k, _ in candidates] **Sliding-window counts** The two core signals used for peak detection:: XX = telo.get_sliding_kmer_count_five_to_three_prime(seq) YY = telo.get_sliding_kmer_count_three_to_five_prime(seq) A telomere on a correctly oriented contig will appear as a peak at the left of ``XX`` (LHS) and a peak at the right of ``YY`` (RHS). Any peak at the *opposite* end indicates a reversed/mis-oriented contig. **Plotting** Two plot styles are available through the *plot_style* argument of :meth:`run`: ``'annotated'`` (default) *Per-contig*: :meth:`plot_contig` — single panel, both strand signals overlaid with colour-coded shaded telomere regions and a status badge. Reversed telomeres are highlighted in red. *Summary*: :meth:`plot_summary` — horizontal chromosome map with each contig drawn proportionally to its length, telomere blocks coloured by orientation, and per-row status badges. ``'legacy'`` Original two-subplot raw signals (per-contig) and two heatmaps (binary presence + length, summary). **Telomere categories** Each contig in the output DataFrame is assigned a ``telomere`` column: ============ ============================================================ ``complete`` Both LHS (forward) and RHS (reverse-complement) detected. ``LHS_only`` Only the left-hand telomere detected. ``RHS_only`` Only the right-hand telomere detected. ``none`` No telomeric signal above threshold. ============ ============================================================ Reversed peaks (signal on the unexpected end) are flagged by :meth:`run` via :mod:`logging` warnings and highlighted in red in the annotated plots. """ def __init__(self, reference_file=None, peak_height=20, peak_width=50): """Initialize the Telomere scanner. :param reference_file: path to a FASTA file. :param peak_height: minimum peak height for telomere peak detection. :param peak_width: minimum peak width for telomere peak detection. """ self.filename = reference_file if reference_file: self.fasta = FastA(self.filename) self.kmers = ["AACCCT", "ACCCTA", "CCCTAA", "CCTAAC", "CTAACC", "TAACCC"] # based on representative kmers, these lists also increase the # separation between telomeric regions and other regions self.kmers += ["GTACAC", "TACACC", "AGTACA"] # same 6-mer permutations self.kmers += ["ACCAGT", "CAGTAC", "CCAGTA"] self.peak_height = peak_height self.peak_width = peak_width
[docs] def find_representative_kmers(self, seq, N=100000, pocc=0.005, n_sigma=5, kmers=6): """Identify over-represented k-mers in the first *N* bases of *seq*. The probability of occurrence threshold *pocc* works well for ~100,000 bases. For shorter sequences the threshold may need to be adjusted. .. note:: The fitted beta distribution used for plotting is estimated from a Leishmania genome and may not be appropriate for other organisms. """ N = min([N, len(seq)]) counts = Counter(get_kmer(seq[0:N], kmers)) normed_counts = [x / float(N) for x in counts.values()] self._normed_counts = normed_counts mu = np.mean(normed_counts) sigma = np.std(normed_counts) # do we have outliers? candidates = [(k, v) for k, v in counts.items() if v / N > pocc] pylab.clf() pylab.hist(normed_counts, density=True, bins=100) # NOTE: beta distribution params estimated from a Leishmania genome # using Fitter; may not generalise to other organisms. X = np.linspace(0, 0.001, 1000) params = ( np.float64(1.7243971943954688), np.float64(159041953024.94153), np.float64(6.702034616135221e-06), np.float64(21897590.42432911), ) Y = scipy.stats.beta.pdf(X, *params) pylab.plot(X, Y, ls="--", color="k") pylab.axvline(mu, color="r", label="mean") pylab.axvline(pocc, color="r", ls="--", label="probability threshold") pylab.semilogy() pylab.xlabel(f"{kmers}-mer occurrences") pylab.legend(loc="upper center") return candidates
def _get_sliding_kmer_count(self, seq, kmers, W=100): """Shared sliding-window kmer counter used by both strand directions. Uses an incremental update (O(L) instead of O(L*W)) by only adding the kmer entering the right edge of the window and removing the one leaving the left edge. """ assert len(set([len(x) for x in kmers])) == 1 X = np.zeros(len(seq)) Wby2 = int(W / 2.0) kmer_len = len(kmers[0]) kmer_set = set(kmers) # Initialise position Wby2 with a full-window count i = Wby2 for kmer in kmers: X[i] += seq[i - Wby2 : i + Wby2].count(kmer) for i in range(Wby2 + 1, len(seq) - Wby2): X[i] = X[i - 1] newseq = seq[i - Wby2 : i + Wby2] if newseq[-kmer_len:] in kmer_set: X[i] += 1 oldseq = seq[i - Wby2 - 1 : i + Wby2 - 1] if oldseq[0:kmer_len] in kmer_set: X[i] -= 1 # Handle borders for i in range(0, Wby2): for kmer in kmers: X[i] += seq[0 : i + Wby2].count(kmer) X[len(X) - i - 1] += seq[len(X) - i - Wby2 :].count(kmer) return np.array(X)
[docs] def get_sliding_kmer_count_five_to_three_prime(self, seq, W=100): """Return sliding kmer counts in the 5' → 3' direction.""" return self._get_sliding_kmer_count(seq, self.kmers, W)
[docs] def get_sliding_kmer_count_three_to_five_prime(self, seq, W=100): """Return sliding kmer counts in the 3' → 5' direction.""" kmers = [reverse_complement(x) for x in self.kmers] return self._get_sliding_kmer_count(seq, kmers, W)
[docs] def is_telomeric(self, seq, W=100): slide_5to3 = self.get_sliding_kmer_count_five_to_three_prime(seq, W=W) slide_3to5 = self.get_sliding_kmer_count_three_to_five_prime(seq, W=W) return max(sum(slide_3to5), sum(slide_5to3)) / len(seq)
[docs] def find_RHS_telomere(self, XX, plotting=False): height, width = self.peak_height, self.peak_width # first, we assume that XX is 5' to 3' nby2 = int(len(XX) / 2) N = len(XX) peaks, others = scipy.signal.find_peaks(XX[nby2:], height=height, width=width) self.peaks = peaks self.others = others if len(peaks) == 0: return 0, 0, 0, 0, 0 else: offset = np.max(sorted(others["right_bases"])) + nby2 start = np.min(sorted(others["left_bases"])) + nby2 + width / 2 extend = offset - start + 1 if abs(N - offset) > _GAP_THRESHOLD: logger.warning(f"offset={offset}, start={start}, extend={extend}") logger.warning(f"ending is not close to zero: {offset}") RHS = len(XX) - start # adjust for uncertainty of the window if plotting: pylab.hlines( y=others["peak_heights"], xmin=others["left_bases"] + nby2 + width / 2, xmax=others["right_bases"] + nby2, color="r", ) pylab.axvline(start, color="k") pylab.fill_between(x=[start, start + extend], y1=100, color="yellow", alpha=0.5) if extend < len(XX) - offset: logger.warning("Gap between telomere and start/end of contig/chromosome") return RHS, extend, len(XX) - offset, peaks, others
[docs] def find_LHS_telomere(self, XX, plotting=False): height, width = self.peak_height, self.peak_width # first, we assume that XX is 5' to 3' nby2 = int(len(XX) / 2) peaks, others = scipy.signal.find_peaks(XX[0:nby2], height=height, width=width) self.peaks = peaks self.others = others if len(peaks) == 0: return 0, 0, 0, 0, 0 else: offset = np.min(sorted(others["left_bases"])) stop = np.max(sorted(others["right_bases"])) extend = stop - offset + 1 if offset > _GAP_THRESHOLD: logger.warning(f"offset={offset}, stop={stop}, extend={extend}") logger.warning(f"starting is not close to zero: {offset}") LHS = stop - width / 2 # adjust for uncertainty of the window if plotting: pylab.hlines(y=others["peak_heights"], xmin=others["left_bases"], xmax=others["right_bases"], color="r") pylab.axvline(LHS, color="k") pylab.fill_between(x=[offset, stop], y1=100, color="yellow", alpha=0.5) return LHS, extend, offset, peaks, others
[docs] def plot_contig( self, XX, YY, chrom, total_length, midpoint, lhs1, rhs1, lhs1_extend, rhs1_extend, lhs2, rhs2, lhs2_extend, rhs2_extend, ): """Produce an annotated per-contig telomere figure. Both sliding-kmer-count signals are shown overlaid in a single panel: - **Blue** line / shading: 5'→3' signal; telomere expected at the left (LHS). A signal at the right (RHS) indicates a reversed orientation and is highlighted in **red**. - **Orange** line / shading: 3'→5' signal; telomere expected at the right (RHS). A signal at the left (LHS) is reversed and shown in **red**. A vertical grey bar marks the midpoint when the sequence was trimmed to *Nmax*. The title carries a status badge (COMPLETE / LHS only / RHS only / NONE). :param XX: 5'→3' sliding kmer count array. :param YY: 3'→5' sliding kmer count array. :param chrom: contig/chromosome name. :param total_length: original full sequence length in bp. :param midpoint: index of the midpoint cut (0 means no trimming). :param lhs1: LHS telomere boundary position from XX. :param rhs1: RHS telomere boundary position from XX. :param lhs1_extend: LHS telomere length from XX. :param rhs1_extend: RHS telomere length from XX. :param lhs2: LHS telomere boundary position from YY. :param rhs2: RHS telomere boundary position from YY. :param lhs2_extend: LHS telomere length from YY. :param rhs2_extend: RHS telomere length from YY. """ fig, ax = pylab.subplots(figsize=(14, 4)) L = len(XX) x = np.arange(L) # --- signals --- ax.plot(x, XX, color="steelblue", lw=1.2, label="5'→3' (forward)") ax.plot(x, YY, color="darkorange", lw=1.2, label="3'→5' (reverse complement)") y_max = max(XX.max(), YY.max()) * 1.15 or 10 def _shade(start, length, color, label=None): """Draw a filled region and annotate with its length.""" if length <= 0: return end = start + length ax.axvspan(start, min(end, L - 1), alpha=0.25, color=color, label=label) ax.annotate( f"{int(length)} bp", xy=((start + min(end, L - 1)) / 2, y_max * 0.88), ha="center", va="top", fontsize=8, color="white", bbox=dict(boxstyle="round,pad=0.2", fc=color, alpha=0.8, lw=0), ) # --- normal telomeres --- # LHS from forward strand (expected) if lhs1 and lhs1_extend: _shade(lhs1 - lhs1_extend, lhs1_extend, "steelblue", label="LHS telomere (5'→3')") # RHS from reverse-complement strand (expected) if rhs2 and rhs2_extend: rhs2_start = L - rhs2 _shade(rhs2_start, rhs2_extend, "darkorange", label="RHS telomere (3'→5')") # --- reversed / unexpected telomeres (highlighted in red) --- # RHS signal on forward strand → orientation issue if rhs1 and rhs1_extend: rhs1_start = L - rhs1 _shade(rhs1_start, rhs1_extend, "crimson", label="⚠ Reversed RHS (5'→3')") # LHS signal on reverse-complement strand → orientation issue if lhs2 and lhs2_extend: _shade(lhs2 - lhs2_extend, lhs2_extend, "crimson", label="⚠ Reversed LHS (3'→5')") # --- midpoint separator when sequence was trimmed --- if midpoint and midpoint < L: ax.axvline(midpoint, color="#555555", lw=8, alpha=0.35, label="sequence join (Nmax trim)") # --- x-axis: explicit limits, pinned boundary ticks, auto interior --- from matplotlib.ticker import FixedFormatter, FixedLocator, MaxNLocator ax.set_xlim(0, L - 1) # Get nice interior positions from MaxNLocator, then pin 0 and L-1 locator = MaxNLocator(nbins=5, integer=True) interior = [int(t) for t in locator.tick_values(0, L - 1) if 0 < t < L - 1] tick_positions = sorted({0, *interior, L - 1}) if midpoint and midpoint < L: # Two disjoint segments: # indices [0..midpoint] → genome [0..midpoint] bp # indices [midpoint..L-1] → genome [total_length-midpoint..total_length] bp def _genome_label(idx): if idx >= L - 1: return f"{total_length:,}" if idx <= midpoint: return f"{idx:,}" return f"{total_length - (L - 1 - idx):,}" tick_labels = [_genome_label(t) for t in tick_positions] else: def _genome_label(idx): # noqa: F811 return f"{total_length:,}" if idx >= L - 1 else f"{idx:,}" tick_labels = [_genome_label(t) for t in tick_positions] ax.xaxis.set_major_locator(FixedLocator(tick_positions)) ax.xaxis.set_major_formatter(FixedFormatter(tick_labels)) ax.set_xlabel("Position (bp)") ax.set_ylabel("k-mer count per window") ax.set_ylim(0, y_max) # --- deduplicated legend, placed below the axes to avoid overlap --- handles, labels = ax.get_legend_handles_labels() seen = {} for h, l in zip(handles, labels): if l not in seen: seen[l] = h n_items = len(seen) ax.legend( seen.values(), seen.keys(), loc="upper center", bbox_to_anchor=(0.5, -0.13), ncol=min(n_items, 4), fontsize=8, framealpha=0.9, ) # --- status badge in title --- has_lhs = bool(lhs1 and lhs1_extend) has_rhs = bool(rhs2 and rhs2_extend) has_reversed = bool((rhs1 and rhs1_extend) or (lhs2 and lhs2_extend)) if has_lhs and has_rhs: status, badge_color = "COMPLETE", "#2ecc71" elif has_lhs: status, badge_color = "LHS only", "#3498db" elif has_rhs: status, badge_color = "RHS only", "#e67e22" else: status, badge_color = "NONE", "#95a5a6" if has_reversed: status += " \u26a0 REVERSED" badge_color = "#e74c3c" ax.set_title( f"Contig {chrom} [{total_length:,} bp] ", loc="left", fontsize=10, ) ax.text( 1.0, 1.01, f" {status} ", transform=ax.transAxes, ha="right", va="bottom", fontsize=9, fontweight="bold", color="white", bbox=dict(boxstyle="round,pad=0.3", fc=badge_color, lw=0), ) fig.subplots_adjust(bottom=0.28) return fig
[docs] def plot_summary(self, df): """Produce an annotated genome-wide telomere summary figure. Each contig is drawn as a horizontal bar scaled to its length (so longer chromosomes appear wider). Telomeric blocks are overlaid at the appropriate ends: - **Blue**: LHS telomere (5'→3', expected orientation) - **Orange**: RHS telomere (3'→5', expected orientation) - **Red**: Reversed telomere (unexpected end) A coloured status badge on the right of each row indicates: ``COMPLETE`` (green), ``LHS only`` (blue), ``RHS only`` (orange), ``NONE`` (grey), or ``⚠ REVERSED`` (red). Contigs are sorted by descending length so the largest chromosomes appear at the top. :param df: DataFrame returned by :meth:`run`. :returns: matplotlib Figure. """ df = df.sort_values("length", ascending=True).reset_index(drop=True) n = len(df) max_len = df["length"].max() # Dynamic figure height: 0.35 in per row, at least 4 in fig_h = max(4, n * 0.35) fig, ax = pylab.subplots(figsize=(14, fig_h)) # Color palette _C = { "backbone": "#d0d0d0", "lhs": "steelblue", "rhs": "darkorange", "rev": "crimson", } status_colors = { "complete": "#2ecc71", "LHS_only": "#3498db", "RHS_only": "#e67e22", "none": "#95a5a6", } bar_h = 0.6 # bar height in axis units for i, row in df.iterrows(): L = row["length"] scale = L / max_len # relative width in [0, 1] # --- backbone bar --- ax.barh(i, scale, height=bar_h, color=_C["backbone"], left=0, align="center", zorder=2) def _block(pos, length, color): """Draw a telomere block; pos and length are in bp.""" if not (pos and length): return x_start = (pos - length) / max_len x_width = length / max_len ax.barh(i, x_width, height=bar_h, left=x_start, color=color, align="center", zorder=3, alpha=0.85) # LHS from forward strand (normal) _block(row["5to3_LHS_position"], row["5to3_LHS_length"], _C["lhs"]) # RHS from reverse-complement strand (normal) — sits at the right end if row["3to5_RHS_position"] and row["3to5_RHS_length"]: rhs_start_bp = L - row["3to5_RHS_position"] _block(rhs_start_bp + row["3to5_RHS_length"], row["3to5_RHS_length"], _C["rhs"]) # Reversed telomeres if row["5to3_RHS_position"] and row["5to3_RHS_length"]: rhs_rev_start = L - row["5to3_RHS_position"] _block(rhs_rev_start + row["5to3_RHS_length"], row["5to3_RHS_length"], _C["rev"]) if row["3to5_LHS_position"] and row["3to5_LHS_length"]: _block(row["3to5_LHS_position"], row["3to5_LHS_length"], _C["rev"]) # --- length annotation inside/beside bar --- ax.text(scale + 0.005, i, f"{int(L):,} bp", va="center", ha="left", fontsize=7, color="#444444") # --- status badge --- status = row.get("telomere", "unknown") has_rev = row["5to3_RHS_position"] or row["3to5_LHS_position"] badge_txt = status.replace("_", " ") badge_col = status_colors.get(status, "#aaaaaa") if has_rev: badge_txt += " ⚠" badge_col = _C["rev"] ax.text( -0.01, i, badge_txt, va="center", ha="right", fontsize=7, color="white", fontweight="bold", bbox=dict(boxstyle="round,pad=0.25", fc=badge_col, lw=0), ) # --- legend --- from matplotlib.patches import Patch legend_elements = [ Patch(facecolor=_C["lhs"], label="LHS telomere (5'→3')"), Patch(facecolor=_C["rhs"], label="RHS telomere (3'→5')"), Patch(facecolor=_C["rev"], label="Reversed / unexpected"), Patch(facecolor=_C["backbone"], label="Contig (to scale)"), ] ax.legend( handles=legend_elements, loc="upper center", bbox_to_anchor=(0.5, -0.04), ncol=4, fontsize=8, framealpha=0.9 ) # --- axes cosmetics --- ax.set_yticks(range(n)) ax.set_yticklabels(df["name"].values, fontsize=8) ax.set_xlim(-0.18, 1.25) ax.set_ylim(-0.8, n - 0.2) ax.set_xlabel("Relative contig length (contigs scaled to longest)", fontsize=9) ax.set_xticks([]) ax.spines[["top", "right", "bottom"]].set_visible(False) ax.set_title( f"Telomere summary — {n} contigs", fontsize=11, loc="left", ) fig.subplots_adjust(left=0.22, right=0.88, bottom=0.10, top=0.95) return fig
def _write_log(self, df, tag): """Write a summary log file for the telomere run.""" with open(f"sequana.telomark.{tag}.log", "w") as fout: msg = f"Total Number of contigs {len(df)}" fout.write(msg + "\n") logger.info(msg) for category, label in [ ("complete", "both telomeres"), ("LHS_only", "telomere on LHS only"), ("RHS_only", "telomere on RHS only"), ("none", "no telomeres"), ]: N = len(df.query(f"telomere == '{category}'")) msg = f"Number of contigs with {label}: {N}" fout.write(msg + "\n") logger.info(msg)
[docs] def run(self, tag, names=None, W=100, Nmax=100000, plot_style="annotated"): """Run telomere detection across all (or selected) contigs. :param tag: output filename prefix (``None`` suppresses file output). :param names: list of chromosome/contig names to process; defaults to all. :param W: sliding window half-width in bp. :param Nmax: maximum bp to examine at each end of the contig. :param plot_style: ``'annotated'`` (default) uses :meth:`plot_contig` which overlays both strand signals with colour-coded telomeric regions and a status badge; ``'legacy'`` reproduces the original two-subplot raw output. """ if names is None: names = self.fasta.names results = { "5to3_LHS_position": [], "5to3_LHS_length": [], "5to3_LHS_offset": [], "5to3_RHS_position": [], "5to3_RHS_length": [], "5to3_RHS_offset": [], "3to5_LHS_position": [], "3to5_LHS_length": [], "3to5_LHS_offset": [], "3to5_RHS_position": [], "3to5_RHS_length": [], "3to5_RHS_offset": [], "name": [], "length": [], } for chrom in tqdm(names, colour="#00dd55"): seq = self.fasta.sequences[self.fasta.names.index(str(chrom))] N = len(seq) # reduce number of points to look at midpoint = int(min([len(seq) / 2, Nmax / 2])) if len(seq) > Nmax: seq = seq[0:midpoint] + seq[-midpoint:] self._XX = self.get_sliding_kmer_count_five_to_three_prime(seq, W=W) self._YY = self.get_sliding_kmer_count_three_to_five_prime(seq, W=W) LHS1, lhs1_extend, offset, _, _ = self.find_LHS_telomere(self._XX) results["5to3_LHS_position"].append(LHS1) results["5to3_LHS_length"].append(lhs1_extend) results["5to3_LHS_offset"].append(offset) RHS1, rhs1_extend, offset, _, _ = self.find_RHS_telomere(self._XX) results["5to3_RHS_position"].append(RHS1) results["5to3_RHS_length"].append(rhs1_extend) results["5to3_RHS_offset"].append(offset) if RHS1 != 0: logger.warning(f"RHS: {RHS1} -- expecting none") LHS2, lhs2_extend, offset, _, _ = self.find_LHS_telomere(self._YY) results["3to5_LHS_position"].append(LHS2) results["3to5_LHS_length"].append(lhs2_extend) results["3to5_LHS_offset"].append(offset) RHS2, rhs2_extend, offset, _, _ = self.find_RHS_telomere(self._YY) results["3to5_RHS_position"].append(RHS2) results["3to5_RHS_length"].append(rhs2_extend) results["3to5_RHS_offset"].append(offset) if LHS2 != 0: logger.warning(f"LHS: {LHS2} -- expecting none") results["name"].append(chrom) results["length"].append(N) if plot_style == "annotated": fig = self.plot_contig( self._XX, self._YY, chrom, N, midpoint if midpoint < len(seq) else 0, LHS1, RHS1, lhs1_extend, rhs1_extend, LHS2, RHS2, lhs2_extend, rhs2_extend, ) if tag: # pragma: no cover fig.savefig(f"sequana.telomark.{tag}.{chrom}.png", dpi=150) pylab.close(fig) else: # legacy: two raw subplots pylab.clf() ax1 = pylab.subplot(2, 1, 1) ax1.plot(self._XX, color="steelblue") ax1.set_ylabel("5'→3' k-mer count") ax1.set_title(f"contig {chrom} [{N:,} bp] — [{LHS1} / {RHS1}] [{LHS2} / {RHS2}]") if midpoint < len(seq): ax1.axvline(midpoint, color="black", lw=10, alpha=0.5) ax2 = pylab.subplot(2, 1, 2, sharex=ax1) ax2.plot(self._YY, color="darkorange") ax2.set_ylabel("3'→5' k-mer count") ax2.set_xlabel("Position (bp)") if midpoint < len(seq): ax2.axvline(midpoint, color="black", lw=10, alpha=0.5) pylab.tight_layout() if tag: # pragma: no cover pylab.savefig(f"sequana.telomark.{tag}.{chrom}.png", dpi=150) pylab.close("all") df = pd.DataFrame(results) df["length_wo_telomere"] = ( df["length"] - df["5to3_LHS_length"] - df["5to3_RHS_length"] - df["3to5_LHS_length"] - df["3to5_RHS_length"] ) # is the chromosome complete? complete = np.logical_and(df["5to3_LHS_length"], df["3to5_RHS_length"]) LHS_only = np.logical_and(df["5to3_LHS_length"], ~complete) RHS_only = np.logical_and(df["3to5_RHS_length"], ~complete) no_telomere = ~np.logical_or(df["5to3_LHS_length"], df["3to5_RHS_length"]) df["telomere"] = "unknown" df.loc[complete, "telomere"] = "complete" df.loc[LHS_only, "telomere"] = "LHS_only" df.loc[RHS_only, "telomere"] = "RHS_only" df.loc[no_telomere, "telomere"] = "none" self._write_log(df, tag) return df
[docs] class TelomerFilter: """Filter reads based on telomeric repeat content. :param str filename: Input FastQ file (can be .gz) :param str pattern: Telomeric repeat unit (default: "AACCCT") :param float threshold: Fraction of the read that must be telomeric (default: 0.8) """ def __init__(self, filename, pattern="AACCCT", threshold=0.8): self.filename = filename self.pattern = pattern self.threshold = threshold self.fastq = FastQ(filename) # Build kmers (all circular shifts of pattern and its reverse complement) self.kmers = circular_shifts(pattern) + circular_shifts(reverse_complement(pattern))
[docs] def save_reads(self, telomeric_output=None, non_telomeric_output=None, progress=True): """Identify and save reads list on-the-fly for maximum speed. :param str telomeric_output: File to save telomeric reads (optional) :param str non_telomeric_output: File to save non-telomeric reads (optional) """ fastq = pysam.FastxFile(self.filename) f_telo = None f_non_telo = None tozip_telo = False tozip_non_telo = False if telomeric_output: telomeric_output, tozip_telo = self.fastq._istozip(telomeric_output) f_telo = open(telomeric_output, "w") if non_telomeric_output: non_telomeric_output, tozip_non_telo = self.fastq._istozip(non_telomeric_output) f_non_telo = open(non_telomeric_output, "w") try: for read in tqdm(fastq, disable=not progress, desc="Filtering telomeric reads"): sequence = read.sequence count = sum(sequence.count(k) for k in self.kmers) ratio = (count * len(self.pattern)) / len(sequence) if ratio >= self.threshold: if f_telo: f_telo.write(read.__str__() + "\n") else: if f_non_telo: f_non_telo.write(read.__str__() + "\n") finally: if f_telo is not None: f_telo.close() if f_non_telo is not None: f_non_telo.close() if tozip_telo: self.fastq._gzip(telomeric_output) if tozip_non_telo: self.fastq._gzip(non_telomeric_output)
[docs] def save_telomeric_reads(self, output_filename="telomeric.fastq", progress=True): """Save telomeric reads to a file.""" self.save_reads(telomeric_output=output_filename, progress=progress)
[docs] def save_non_telomeric_reads(self, output_filename="non_telomeric.fastq", progress=True): """Save non-telomeric reads to a file.""" self.save_reads(non_telomeric_output=output_filename, progress=progress)