Source code for sequana.repeats.aphased

#!/usr/bin/env python3
"""Detect A-phased repeats (intrinsically bent DNA).

A-phased repeats are clusters of short A-tracts (runs of A/T) repeated in phase
with the ~10.5 bp helical period, which bends the double helix. This reproduces
the *A_Phased_Repeat* output of the non-B gfa / nBMST tool (``gfa_APR.gff``),
ported from its ``findAPR.c`` / ``getAtracts`` routines.

Algorithm (gfa defaults shown):

1. **A-tracts** — every maximal run of A/T whose length is in
   ``[min_tract_len, max_tract_len]`` = ``[3, 9]``. A run is a valid A-tract
   when, on the forward **or** reverse-complement strand, the longest A / AnTn
   pattern minus the longest pure-T run is ``>= min_tract_len``. Each tract is
   summarised by the centre of its longest A-run.
2. **Phasing** — consecutive tract centres separated by ``[min_sep, max_sep]``
   (gfa uses 9.9..11.1, i.e. ~10 bp) are chained; a chain of at least
   ``min_tracts`` (3) tracts is reported as one A-phased repeat.

Validated against gfa on a 30+ Mb *Leishmania* assembly: identical count (889)
and 100% reciprocal positional overlap with ``gfa_APR.gff``.
"""
import re

import pandas as pd
from tqdm import tqdm

from sequana import FastA
from sequana.tools import reverse_complement

_ATRUN = re.compile(r"[AT]+")


def _metric(ctx):
    """Longest A/AnTn pattern and longest pure-T run within ``ctx[1:]``.

    ``ctx[0]`` is the preceding-base context. Mirrors gfa's ``getAtracts`` inner
    loop. Returns (max_AT_len, max_T_len, end_index_of_longest_AT_within_run).
    """
    a_len = t_len = at_len = ta_len = 0
    max_at = max_t = 0
    max_at_end = -1
    for j in range(1, len(ctx)):
        ch = ctx[j]
        if ch == "A":
            t_len = ta_len = 0
            if ctx[j - 1] == "T":
                a_len = at_len = 0
            else:
                a_len += 1
                at_len += 1
        elif ch == "T":
            if ta_len < a_len:  # T following an A-run extends the AnTn pattern
                ta_len += 1
                at_len += 1
            else:  # pure T
                t_len += 1
                ta_len = at_len = a_len = 0
        if max_at < at_len:
            max_at = at_len
            max_at_end = j - 1
        if max_t < t_len:
            max_t = t_len
    return max_at, max_t, max_at_end


def _tract_center(seq, start, length, min_tract_len):
    """Return the A-run centre (float, forward coords) of a valid A-tract, else None."""
    run = seq[start : start + length]
    prev = seq[start - 1] if start > 0 else "N"
    f_at, f_t, f_end = _metric(prev + run)
    nxt = seq[start + length] if start + length < len(seq) else "N"
    r_at, r_t, r_end = _metric(reverse_complement(run + nxt))
    f_diff, r_diff = f_at - f_t, r_at - r_t
    if max(f_diff, r_diff) < min_tract_len:
        return None
    if f_diff >= r_diff:
        return start + (f_end - (f_at - 1) / 2.0)
    # reverse-complement branch: map the centre back to forward coordinates
    return start + (length - 1 - r_end) + (r_at - 1) / 2.0


[docs] class APhasedRepeats: """Detect A-phased repeats (bent DNA) like gfa ``gfa_APR.gff``. :param fasta_file: input FASTA. :param min_tract_len: minimum A-tract (A/T run) length (gfa default 3). :param max_tract_len: maximum A-tract length (gfa default 9). :param min_tracts: minimum number of phased tracts (gfa default 3). :param min_sep: minimum centre-to-centre tract separation (gfa 9.9). :param max_sep: maximum centre-to-centre tract separation (gfa 11.1). """ def __init__(self, fasta_file, min_tract_len=3, max_tract_len=9, min_tracts=3, min_sep=9.9, max_sep=11.1): self.fasta_file = fasta_file self.min_tract_len = min_tract_len self.max_tract_len = max_tract_len self.min_tracts = min_tracts self.min_sep = min_sep self.max_sep = max_sep self.df = pd.DataFrame(columns=["seqid", "start", "end", "length", "tracts", "sequence"]) def _find(self, seq): # collect valid A-tracts: (start, end, centre) tracts = [] for m in _ATRUN.finditer(seq): length = m.end() - m.start() if not (self.min_tract_len <= length <= self.max_tract_len): continue center = _tract_center(seq, m.start(), length, self.min_tract_len) if center is not None: tracts.append((m.start(), m.end(), center)) tracts.sort(key=lambda x: x[2]) # chain tracts whose centres are one helical turn apart hits = [] if not tracts: return hits chain = [tracts[0]] for t in tracts[1:]: if self.min_sep <= t[2] - chain[-1][2] <= self.max_sep: chain.append(t) else: if len(chain) >= self.min_tracts: hits.append((chain[0][0], chain[-1][1], len(chain))) chain = [t] if len(chain) >= self.min_tracts: hits.append((chain[0][0], chain[-1][1], len(chain))) return hits
[docs] def run(self, progress=True): fa = FastA(self.fasta_file) frames = [] for seqid in tqdm(fa.names, disable=not progress, desc="A-phased repeats", unit="seq"): seq = fa.sequences[fa.names.index(seqid)].upper() hits = self._find(seq) if not hits: continue frames.append( pd.DataFrame( { "seqid": seqid, "start": [h[0] for h in hits], "end": [h[1] for h in hits], "length": [h[1] - h[0] for h in hits], "tracts": [h[2] for h in hits], "sequence": [seq[h[0] : h[1]] for h in hits], } ) ) cols = ["seqid", "start", "end", "length", "tracts", "sequence"] self.df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(columns=cols)
[docs] def to_bed(self, output_file, mode="w"): if self.df.empty: raise ValueError("No results. Run `.run()` first.") bed = self.df[["seqid", "start", "end"]].copy() bed["name"] = self.df["sequence"] bed["score"] = 0 bed["strand"] = "+" bed.to_csv(output_file, sep="\t", header=False, index=False, mode=mode)
[docs] def to_gff(self, output_file, source="sequana"): """Write a gff3 comparable to gfa ``gfa_APR.gff`` (1-based, inclusive).""" if self.df.empty: raise ValueError("No results. Run `.run()` first.") with open(output_file, "w") as fout: fout.write("##gff-version 3\n") for row in self.df.itertuples(index=False): start = row.start + 1 # gff is 1-based attrs = f"ID={row.seqid}_{start}_{row.end}_APR;tracts={row.tracts};sequence={row.sequence.lower()}" fout.write(f"{row.seqid}\t{source}\tA_Phased_Repeat\t{start}\t{row.end}\t.\t+\t.\t{attrs}\n")