Source code for sequana.repeats.directrepeat

#!/usr/bin/env python3
"""Detect direct (and slipped / tandem) repeats.

A direct repeat is two identical arms in the same orientation, optionally
separated by a spacer::

    5'- ARM ----- spacer ----- ARM -3'

When the spacer is zero the arm is extended into a big tandem repeat (BTR).
This reproduces the *Direct_Repeat* output of the non-B gfa / nBMST tool
(``gfa_DR.gff``), ported faithfully from its ``findDR.c``.

gfa defaults (from ``gfa.c``): min arm 10, max arm 300, spacer 0..10 (the usage
text saying spacer 100 is stale; ``gfa.c`` uses 10). The search is greedy: for
each start it tries the largest arm first and the smallest qualifying spacer,
records the first full match, and skips ahead so successive repeats must extend
beyond the previous one (the ``end`` escape variable in gfa).

A motif is flagged ``subset=1`` (gfa's "slipped" set) when its spacer is 0.

Validated against the gfa binary compiled from source on a 30+ Mb *Leishmania*
assembly: identical coordinates and attributes.
"""
import os

import numpy as np
import pandas as pd
from tqdm import tqdm

from sequana import FastA

_N = 78  # uppercase 'N'


def _scan_python(arr, mindir, maxdir, dspacer):
    n = arr.shape[0]
    starts, ends, sizes, spacers, nums, rems = [], [], [], [], [], []
    lasti = n - mindir * 2
    end = 0
    strti = 0
    while strti <= lasti:
        while strti < n and arr[strti] == _N:
            strti += 1
        if strti >= lasti:
            break
        size = maxdir
        while size >= mindir:
            if (size * 2 + dspacer) <= (end - strti):
                break  # even the largest config cannot escape previous DR
            sp_min = max(0, (end - strti) - (size * 2) + 2)
            sp_max = min(dspacer, lasti - strti)
            found = False
            sp = sp_min
            while sp <= sp_max:
                j = strti + size + sp
                i = strti
                k = 0
                while j < n and arr[i] == arr[j] and k < size and arr[i] != _N:
                    k += 1
                    j += 1
                    i += 1
                if k == size:
                    totlen = k
                    if sp == 0:
                        while j < n and arr[i] == arr[j]:
                            totlen += 1
                            j += 1
                            i += 1
                    starts.append(strti)
                    ends.append(j)
                    sizes.append(size)
                    spacers.append(sp)
                    nums.append(totlen // size)
                    rems.append(totlen % size)
                    end = j - 1
                    found = True
                    break
                sp += 1
            if found:
                break
            size -= 1
        strti += 1
    dt = np.int64
    return tuple(np.array(x, dt) for x in (starts, ends, sizes, spacers, nums, rems))


try:
    from numba import njit

    @njit(cache=True)
    def _scan_numba(arr, mindir, maxdir, dspacer):
        n = arr.shape[0]
        lasti = n - mindir * 2
        starts = np.empty(0, np.int64)
        ends = np.empty(0, np.int64)
        sizes = np.empty(0, np.int64)
        spacers = np.empty(0, np.int64)
        nums = np.empty(0, np.int64)
        rems = np.empty(0, np.int64)
        for fill in range(2):
            ndx = 0
            end = 0
            strti = 0
            while strti <= lasti:
                while strti < n and arr[strti] == _N:
                    strti += 1
                if strti >= lasti:
                    break
                # common path: no active escape (end behind), so sp_min is always 0
                # and the size-2*size break never fires -> hoist the arithmetic out
                escape = end > strti
                sp_max0 = dspacer
                if lasti - strti < sp_max0:
                    sp_max0 = lasti - strti
                size = maxdir
                while size >= mindir:
                    if escape:
                        if (size * 2 + dspacer) <= (end - strti):
                            break
                        sp_min = (end - strti) - (size * 2) + 2
                        if sp_min < 0:
                            sp_min = 0
                    else:
                        sp_min = 0
                    found = False
                    sp = sp_min
                    while sp <= sp_max0:
                        j = strti + size + sp
                        i = strti
                        k = 0
                        while j < n and arr[i] == arr[j] and k < size and arr[i] != _N:
                            k += 1
                            j += 1
                            i += 1
                        if k == size:
                            totlen = k
                            if sp == 0:
                                while j < n and arr[i] == arr[j]:
                                    totlen += 1
                                    j += 1
                                    i += 1
                            if fill == 1:
                                starts[ndx] = strti
                                ends[ndx] = j
                                sizes[ndx] = size
                                spacers[ndx] = sp
                                nums[ndx] = totlen // size
                                rems[ndx] = totlen % size
                            ndx += 1
                            end = j - 1
                            found = True
                            break
                        sp += 1
                    if found:
                        break
                    size -= 1
                strti += 1
            if fill == 0:
                starts = np.empty(ndx, np.int64)
                ends = np.empty(ndx, np.int64)
                sizes = np.empty(ndx, np.int64)
                spacers = np.empty(ndx, np.int64)
                nums = np.empty(ndx, np.int64)
                rems = np.empty(ndx, np.int64)
        return starts, ends, sizes, spacers, nums, rems

    _HAS_NUMBA = True
except ImportError:  # pragma: no cover
    _scan_numba = None
    _HAS_NUMBA = False


def _scan_one(item):
    """Worker (module-level so it can be pickled for multiprocessing).

    ``item`` = (seqid, upper-cased sequence string, (min_repeat, max_repeat,
    max_spacer)). Returns a per-sequence dict ready to become a DataFrame.
    """
    seqid, seq, args = item
    seq_b = seq.encode()
    arr = np.frombuffer(seq_b, dtype=np.uint8)
    scan = _scan_numba if _HAS_NUMBA else _scan_python
    starts, ends, sizes, spacers, nums, rems = scan(arr, *args)
    seqs = [seq_b[s:e].decode() for s, e in zip(starts.tolist(), ends.tolist())]
    return seqid, starts, ends, sizes, spacers, nums, rems, seqs


[docs] class DirectRepeats: """Detect direct/slipped repeats like gfa ``gfa_DR.gff``. :param fasta_file: input FASTA. :param min_repeat: minimum arm length (gfa default 10). :param max_repeat: maximum arm length (gfa default 300). :param max_spacer: maximum spacer between arms (gfa default 10). """ def __init__(self, fasta_file, min_repeat=10, max_repeat=300, max_spacer=10): self.fasta_file = fasta_file self.min_repeat = min_repeat self.max_repeat = max_repeat self.max_spacer = max_spacer self.df = pd.DataFrame( columns=["seqid", "start", "end", "repeat", "spacer", "num", "remainder", "subset", "sequence"] )
[docs] def run(self, progress=True, processes=None): """Scan every sequence for direct repeats. :param processes: number of worker processes. ``None`` (default) uses all CPUs; sequences are independent so this is an exact speed-up. Pass ``1`` to force a single in-process (serial) scan. """ fa = FastA(self.fasta_file) args = (self.min_repeat, self.max_repeat, self.max_spacer) items = [(seqid, fa.sequences[i].upper(), args) for i, seqid in enumerate(fa.names)] if processes is None: processes = os.cpu_count() or 1 # multiprocessing only pays off with numba and several sequences; a single # in-process scan keeps the numba/fallback monkeypatch honest for tests use_pool = _HAS_NUMBA and processes > 1 and len(items) > 1 if use_pool: from multiprocessing import Pool with Pool(min(processes, len(items))) as pool: results = list( tqdm( pool.imap(_scan_one, items), total=len(items), disable=not progress, desc="Direct repeats", unit="seq", ) ) else: results = [_scan_one(it) for it in tqdm(items, disable=not progress, desc="Direct repeats", unit="seq")] frames = [] for seqid, starts, ends, sizes, spacers, nums, rems, seqs in results: if len(starts) == 0: continue frames.append( pd.DataFrame( { "seqid": seqid, "start": starts, "end": ends, "repeat": sizes, "spacer": spacers, "num": nums, "remainder": rems, "subset": (spacers == 0).astype(np.int64), "sequence": seqs, } ) ) cols = ["seqid", "start", "end", "repeat", "spacer", "num", "remainder", "subset", "sequence"] self.df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(columns=cols)
[docs] def to_bed(self, output_file, mode="w"): if self.df.empty: raise ValueError("No results. Run `.run()` first.") bed = self.df[["seqid", "start", "end"]].copy() bed["name"] = self.df["sequence"] bed["score"] = 0 bed["strand"] = "+" bed.to_csv(output_file, sep="\t", header=False, index=False, mode=mode)
[docs] def to_gff(self, output_file, source="sequana"): """Write a gff3 comparable to gfa ``gfa_DR.gff`` (1-based, inclusive). Like gfa, ``composition`` counts only the first arm (``repeat`` bases) while ``sequence`` is the full motif span. """ if self.df.empty: raise ValueError("No results. Run `.run()` first.") with open(output_file, "w") as fout: fout.write("##gff-version 3\n") for row in self.df.itertuples(index=False): start = row.start + 1 # gff is 1-based s = row.sequence.lower() arm = s[: row.repeat] comp = f"{arm.count('a')}A/{arm.count('c')}C/{arm.count('g')}G/{arm.count('t')}T" attrs = ( f"ID={row.seqid}_{start}_{row.end}_DR;spacer={row.spacer};" f"repeat={row.repeat};x{row.num}+{row.remainder};" f"composition={comp};sequence={s};subset={row.subset}" ) fout.write(f"{row.seqid}\t{source}\tDirect_Repeat\t{start}\t{row.end}\t.\t+\t.\t{attrs}\n")