Source code for sequana.repeats.mirror

#!/usr/bin/env python3
"""Detect mirror repeats.

A mirror repeat is two arms that read the same forwards and backwards across a
central axis (the right arm is the **reverse** of the left arm, NOT the reverse
complement — that would be an inverted repeat / cruciform), optionally separated
by a spacer::

    5'- ARM ----- spacer ----- reverse(ARM) -3'

This reproduces the *Mirror_Repeat* output of the non-B gfa / nBMST tool
(``gfa_MR.gff``). The homopurine / homopyrimidine subset of mirror repeats forms
H-DNA (triplex); see :class:`sequana.hdna.HDNA`.

gfa defaults (from ``print_usage.c``): arm (repeat) >= 10, spacer 0..100.

Maximal arms are reported; hits fully contained in a longer hit are kept but
flagged (``subset=1``); ``include_subsets=False`` drops them.

Validated against gfa on a 30+ Mb *Leishmania* assembly: all 22254 gfa mirror
repeats are recovered (100% positional overlap), density within ~2% (22704 vs
22254). The numba kernel processes the whole genome in a few seconds.
"""
import os

import numpy as np
import pandas as pd
from tqdm import tqdm

from sequana import FastA

_A, _C, _G, _T = 65, 67, 71, 84
_MR_COLS = ["seqid", "start", "end", "length", "repeat", "spacer", "subset", "sequence"]


def _scan_python(arr, min_repeat, max_repeat, min_spacer, max_spacer):
    """Pure-numpy fallback mirroring the numba kernel."""
    n = arr.shape[0]
    starts, ends, reps, spacers = [], [], [], []
    for spacer in range(min_spacer, max_spacer + 1):
        limit = n - 1 - spacer
        if limit <= 0:
            break
        for a0 in np.nonzero(arr[:limit] == arr[spacer + 1 : n])[0].tolist():
            b0 = arr[a0]
            if b0 not in (_A, _C, _G, _T):  # N never seeds a mirror
                continue
            rb = a0 + spacer + 1
            k = 1
            while (
                a0 - k >= 0
                and rb + k < n
                and k < max_repeat
                and arr[a0 - k] == arr[rb + k]
                and arr[a0 - k] in (_A, _C, _G, _T)
            ):
                k += 1
            if k >= min_repeat:
                starts.append(a0 - k + 1)
                ends.append(rb + k)
                reps.append(k)
                spacers.append(spacer)
    dt = np.int64
    return np.array(starts, dt), np.array(ends, dt), np.array(reps, dt), np.array(spacers, dt)


try:
    from numba import njit

    @njit(cache=True)
    def _scan_numba(arr, min_repeat, max_repeat, min_spacer, max_spacer):
        n = arr.shape[0]
        starts = np.empty(0, np.int64)
        ends = np.empty(0, np.int64)
        reps = np.empty(0, np.int64)
        spacers = np.empty(0, np.int64)
        for fill in range(2):
            idx = 0
            for spacer in range(min_spacer, max_spacer + 1):
                limit = n - 1 - spacer
                if limit <= 0:
                    break
                for a0 in range(limit):
                    b0 = arr[a0]
                    if b0 != 65 and b0 != 67 and b0 != 71 and b0 != 84:  # N never seeds
                        continue
                    rb = a0 + spacer + 1
                    if arr[rb] != b0:  # mirror seed (same base, not complement)
                        continue
                    k = 1
                    while a0 - k >= 0 and rb + k < n and k < max_repeat:
                        lb = arr[a0 - k]
                        if lb != arr[rb + k] or (lb != 65 and lb != 67 and lb != 71 and lb != 84):
                            break
                        k += 1
                    if k >= min_repeat:
                        if fill == 1:
                            starts[idx] = a0 - k + 1
                            ends[idx] = rb + k
                            reps[idx] = k
                            spacers[idx] = spacer
                        idx += 1
            if fill == 0:
                starts = np.empty(idx, np.int64)
                ends = np.empty(idx, np.int64)
                reps = np.empty(idx, np.int64)
                spacers = np.empty(idx, np.int64)
        return starts, ends, reps, spacers

    _HAS_NUMBA = True
except ImportError:  # pragma: no cover
    _scan_numba = None
    _HAS_NUMBA = False


def _flag_subsets_one(starts, ends):
    """subset=1 for any hit contained in a longer one (single sequence)."""
    order = np.lexsort((-ends, starts))  # start asc, then end desc
    e = ends[order]
    flags = np.zeros(len(e), np.int64)
    max_end = -1
    for i in range(len(e)):
        if e[i] <= max_end:
            flags[i] = 1
        else:
            max_end = e[i]
    return order, flags


def _scan_one(item):
    """Worker (module-level, picklable). Scan one sequence; return a per-seq df
    with subset flags applied (sorted start asc / end desc)."""
    seqid, seq, args = item
    seq_b = seq.encode()
    arr = np.frombuffer(seq_b, dtype=np.uint8)
    scan = _scan_numba if _HAS_NUMBA else _scan_python
    starts, ends, reps, spacers = scan(arr, *args)
    if len(starts) == 0:
        return None
    order, flags = _flag_subsets_one(starts, ends)
    starts, ends, reps, spacers = starts[order], ends[order], reps[order], spacers[order]
    return pd.DataFrame(
        {
            "seqid": seqid,
            "start": starts,
            "end": ends,
            "length": ends - starts,
            "repeat": reps,
            "spacer": spacers,
            "subset": flags,
            "sequence": [seq_b[s:e].decode() for s, e in zip(starts.tolist(), ends.tolist())],
        }
    )


[docs] class MirrorRepeats: """Detect mirror repeats like gfa ``gfa_MR.gff``. :param fasta_file: input FASTA. :param min_repeat: minimum arm length (gfa default 10). :param max_repeat: maximum arm length (cap on arm extension). :param min_spacer: minimum loop length (gfa default 0). :param max_spacer: maximum loop length (gfa default 100). """ def __init__(self, fasta_file, min_repeat=10, max_repeat=200, min_spacer=0, max_spacer=100): self.fasta_file = fasta_file self.min_repeat = min_repeat self.max_repeat = max_repeat self.min_spacer = min_spacer self.max_spacer = max_spacer self.df = pd.DataFrame(columns=["seqid", "start", "end", "length", "repeat", "spacer", "subset", "sequence"])
[docs] def run(self, include_subsets=True, progress=True, processes=None): """Scan every sequence for mirror repeats. Sequences are independent, so the scan is spread over ``processes`` workers (``None`` = all CPUs; ``1`` = serial). """ fa = FastA(self.fasta_file) args = (self.min_repeat, self.max_repeat, self.min_spacer, self.max_spacer) items = [(seqid, fa.sequences[i].upper(), args) for i, seqid in enumerate(fa.names)] if processes is None: processes = os.cpu_count() or 1 use_pool = _HAS_NUMBA and processes > 1 and len(items) > 1 if use_pool: from multiprocessing import Pool with Pool(min(processes, len(items))) as pool: frames = list( tqdm( pool.imap(_scan_one, items), total=len(items), disable=not progress, desc="Mirror repeats", unit="seq", ) ) else: frames = [_scan_one(it) for it in tqdm(items, disable=not progress, desc="Mirror repeats", unit="seq")] frames = [f for f in frames if f is not None] df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(columns=_MR_COLS) if not include_subsets: df = df[df["subset"] == 0].reset_index(drop=True) self.df = df.reset_index(drop=True)
[docs] def to_bed(self, output_file, mode="w"): if self.df.empty: raise ValueError("No results. Run `.run()` first.") bed = self.df[["seqid", "start", "end"]].copy() bed["name"] = self.df["sequence"] bed["score"] = 0 bed["strand"] = "+" bed.to_csv(output_file, sep="\t", header=False, index=False, mode=mode)
[docs] def to_gff(self, output_file, source="sequana"): """Write a gff3 comparable to gfa ``gfa_MR.gff`` (1-based, inclusive).""" if self.df.empty: raise ValueError("No results. Run `.run()` first.") with open(output_file, "w") as fout: fout.write("##gff-version 3\n") for row in self.df.itertuples(index=False): start = row.start + 1 # gff is 1-based attrs = ( f"ID={row.seqid}_{start}_{row.end}_MR;spacer={row.spacer};" f"repeat={row.repeat};subset={row.subset};sequence={row.sequence.lower()}" ) fout.write(f"{row.seqid}\t{source}\tMirror_Repeat\t{start}\t{row.end}\t.\t+\t.\t{attrs}\n")