Source code for sequana.repeats.gquad

#!/usr/bin/env python3
"""Detect G-quadruplex forming motifs.

A G-quadruplex (G4) forms where four or more runs of guanines (*G-islands*,
each at least ``min_rep`` bp) sit close together (separated by loops of at most
``max_spacer`` bp). This reproduces the *G_Quadruplex_Motif* output of the
non-B gfa / nBMST tool (``gfa_GQ.gff``), ported faithfully from its
``findGQ.c`` / ``getGislands``.

Both strands are scanned: runs of ``G`` give plus-strand motifs, runs of ``C``
give minus-strand motifs (the reported sequence is then the reverse complement,
i.e. G-rich). For each motif gfa reports:

- ``islands`` : number of merged G-islands (``conIls``),
- ``runs``    : how many minimal (``min_rep``) runs fit across the islands
  (``npos``, must be >= 4),
- ``max``     : the largest run size for which at least 4 runs still fit.

gfa defaults (from ``gfa.c``): min_rep = 3, max_spacer = 7.

Validated against gfa on a 30+ Mb *Leishmania* assembly: identical coordinates,
strand, islands/runs/max attributes. As for Z-DNA, the only field that differs
is ``composition`` because gfa's printer counts bases over ``[start, end-1)``
(an off-by-one dropping the last base); this module counts the full motif, so
its A/C/G/T counts are correct and match the reported ``sequence``.
"""
import numpy as np
import pandas as pd
from tqdm import tqdm

from sequana import FastA

_G, _C = 71, 67  # uppercase byte codes
_COMP = bytes.maketrans(b"ACGTN", b"TGCAN")


def _islands_py(seq, target, min_rep):
    """Return (start_1based, length) for every run of ``target`` >= min_rep."""
    strt, lng = [], []
    n = len(seq)
    run = 0
    for i in range(n + 1):
        c = seq[i] if i < n else None
        if c == target:
            run += 1
        else:
            if run >= min_rep:
                strt.append(i - run + 1)  # 1-based, as in gfa
                lng.append(run)
            run = 0
    return strt, lng


def _findgq_py(strt, lng, min_rep, max_spacer):
    """Faithful port of findGQ for one strand; islands are 1-based starts."""
    nIls = len(strt)
    starts, ends, nums, subs, lens = [], [], [], [], []
    i = 0
    while i < nIls:
        conIls = 1
        npos = (lng[i] + 1) // (min_rep + 1)
        i2 = i + 1
        while i2 < nIls and (strt[i2] - (strt[i2 - 1] + lng[i2 - 1])) <= max_spacer:
            conIls += 1
            npos += (lng[i2] + 1) // (min_rep + 1)
            i2 += 1
        if npos >= 4:
            maxGQ = min_rep
            for j in range(i, i2):
                k = lng[j]
                while k > maxGQ:
                    nposMax = (lng[j] + 1) // (k + 1)
                    m = j + 1
                    while m < i2:
                        nposMax += (lng[m] + 1) // (k + 1)
                        if nposMax >= 4:
                            maxGQ = k
                            break
                        if (lng[m] + 1) // (k + 1) == 0:
                            sm1 = strt[m + 1] if m + 1 < nIls else 0
                            if sm1 > (strt[m - 1] + lng[m - 1] + max_spacer):
                                break
                        m += 1
                    k -= 1
            starts.append(strt[i])
            ends.append(strt[i2 - 1] + lng[i2 - 1] - 1)
            nums.append(npos)
            subs.append(conIls)
            lens.append(maxGQ)
        i = i + conIls  # skip the islands merged into this motif
    return starts, ends, nums, subs, lens


try:
    from numba import njit

    @njit(cache=True)
    def _islands_numba(arr, target, min_rep, strt, lng, fill):
        n = arr.shape[0]
        ndx = 0
        run = 0
        for i in range(n + 1):
            c = arr[i] if i < n else 0
            if c == target:
                run += 1
            else:
                if run >= min_rep:
                    if fill:
                        strt[ndx] = i - run + 1
                        lng[ndx] = run
                    ndx += 1
                run = 0
        return ndx

    @njit(cache=True)
    def _findgq_numba(strt, lng, nIls, min_rep, max_spacer, ostart, oend, onum, osub, olen, fill):
        ndx = 0
        i = 0
        while i < nIls:
            conIls = 1
            npos = (lng[i] + 1) // (min_rep + 1)
            i2 = i + 1
            while i2 < nIls and (strt[i2] - (strt[i2 - 1] + lng[i2 - 1])) <= max_spacer:
                conIls += 1
                npos += (lng[i2] + 1) // (min_rep + 1)
                i2 += 1
            if npos >= 4:
                maxGQ = min_rep
                for j in range(i, i2):
                    k = lng[j]
                    while k > maxGQ:
                        nposMax = (lng[j] + 1) // (k + 1)
                        m = j + 1
                        while m < i2:
                            nposMax += (lng[m] + 1) // (k + 1)
                            if nposMax >= 4:
                                maxGQ = k
                                break
                            if (lng[m] + 1) // (k + 1) == 0:
                                sm1 = strt[m + 1] if m + 1 < nIls else 0
                                if sm1 > (strt[m - 1] + lng[m - 1] + max_spacer):
                                    break
                            m += 1
                        k -= 1
                if fill:
                    ostart[ndx] = strt[i]
                    oend[ndx] = strt[i2 - 1] + lng[i2 - 1] - 1
                    onum[ndx] = npos
                    osub[ndx] = conIls
                    olen[ndx] = maxGQ
                ndx += 1
            i = i + conIls
        return ndx

    _HAS_NUMBA = True
except ImportError:  # pragma: no cover
    _islands_numba = None
    _findgq_numba = None
    _HAS_NUMBA = False


def _scan_strand_numba(arr, target, min_rep, max_spacer):
    n = arr.shape[0]
    cap = n // (min_rep + 1) + 1
    strt = np.empty(cap, np.int64)
    lng = np.empty(cap, np.int64)
    nIls = _islands_numba(arr, target, min_rep, strt, lng, 1)
    strt = strt[:nIls]
    lng = lng[:nIls]
    z = np.empty(0, np.int64)
    count = _findgq_numba(strt, lng, nIls, min_rep, max_spacer, z, z, z, z, z, 0)
    ostart = np.empty(count, np.int64)
    oend = np.empty(count, np.int64)
    onum = np.empty(count, np.int64)
    osub = np.empty(count, np.int64)
    olen = np.empty(count, np.int64)
    _findgq_numba(strt, lng, nIls, min_rep, max_spacer, ostart, oend, onum, osub, olen, 1)
    return ostart, oend, onum, osub, olen


def _scan_strand_python(seq, target, min_rep, max_spacer):
    strt, lng = _islands_py(seq, target, min_rep)
    s, e, num, sub, ln = _findgq_py(strt, lng, min_rep, max_spacer)
    dt = np.int64
    return tuple(np.array(x, dt) for x in (s, e, num, sub, ln))


[docs] class GQuadruplex: """Detect G-quadruplex forming motifs like gfa ``gfa_GQ.gff``. :param fasta_file: input FASTA. :param min_rep: minimum guanine run length / G-island size (gfa default 3). :param max_spacer: maximum loop length between islands (gfa default 7). """ def __init__(self, fasta_file, min_rep=3, max_spacer=7): self.fasta_file = fasta_file self.min_rep = min_rep self.max_spacer = max_spacer self.df = pd.DataFrame(columns=["seqid", "start", "end", "strand", "islands", "runs", "max", "sequence"])
[docs] def run(self, progress=True): fa = FastA(self.fasta_file) frames = [] for seqid in tqdm(fa.names, disable=not progress, desc="G-quadruplex", unit="seq"): seq_b = fa.sequences[fa.names.index(seqid)].upper().encode() # plus strand (G runs) then minus strand (C runs), matching gfa order for target, strand in ((_G, "+"), (_C, "-")): if _HAS_NUMBA: arr = np.frombuffer(seq_b, dtype=np.uint8) s, e, num, sub, ln = _scan_strand_numba(arr, target, self.min_rep, self.max_spacer) else: s, e, num, sub, ln = _scan_strand_python(seq_b, target, self.min_rep, self.max_spacer) if len(s) == 0: continue seqs = [] for gs, ge in zip(s.tolist(), e.tolist()): fwd = seq_b[gs - 1 : ge] # gs is 1-based start, ge 1-based inclusive if strand == "-": fwd = fwd.translate(_COMP)[::-1] seqs.append(fwd.decode()) frames.append( pd.DataFrame( { "seqid": seqid, "start": s - 1, # 0-based "end": e, # 0-based exclusive (== 1-based inclusive) "strand": strand, "islands": sub, "runs": num, "max": ln, "sequence": seqs, } ) ) cols = ["seqid", "start", "end", "strand", "islands", "runs", "max", "sequence"] self.df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(columns=cols)
[docs] def to_bed(self, output_file, mode="w"): if self.df.empty: raise ValueError("No results. Run `.run()` first.") bed = self.df[["seqid", "start", "end"]].copy() bed["name"] = self.df["sequence"] bed["score"] = 0 bed["strand"] = self.df["strand"] bed.to_csv(output_file, sep="\t", header=False, index=False, mode=mode)
[docs] def to_gff(self, output_file, source="sequana"): """Write a gff3 comparable to gfa ``gfa_GQ.gff`` (1-based, inclusive).""" if self.df.empty: raise ValueError("No results. Run `.run()` first.") with open(output_file, "w") as fout: fout.write("##gff-version 3\n") for row in self.df.itertuples(index=False): start = row.start + 1 # gff is 1-based s = row.sequence.lower() comp = f"{s.count('a')}A/{s.count('c')}C/{s.count('g')}G/{s.count('t')}T" attrs = ( f"ID={row.seqid}_{start}_{row.end}_GQ;islands={row.islands};" f"runs={row.runs};max={row.max};composition={comp};sequence={s}" ) fout.write( f"{row.seqid}\t{source}\tG_Quadruplex_Motif\t{start}\t{row.end}\t.\t{row.strand}\t.\t{attrs}\n" )