#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import re
from collections import defaultdict
"""
Note: could use pysam most probably to improve the speed.
"""
import colorlog
logger = colorlog.getLogger(__name__)
[docs]
class Cigar:
"""A class to handle CIGAR strings from BAM files.
.. doctest::
>>> from sequana.cigar import Cigar
>>> c = Cigar("2S30M1I")
>>> len(c)
33
>>> c = Cigar("1S1S1S1S")
>>> c.compress()
>>> c.cigarstring
'4S'
Possible CIGAR types are:
- "M" : alignment match
- "I" : insertion to the reference
- "D" : deletion from the reference
- "N" : skipped region from the reference
- "S" : soft clipping (clipped sequence present in seq)
- "H" : hard clipping (sequence NOT present)
- "P" : padding (silent deletion from padded reference)
- "=" : sequence match
- "X" : sequence mismatched
- "B" : back (rare) (could be also NM ?)
!!! BWA MEM get_cigar_stats returns list with 11 items
Last item is
!!! what is the difference between M and = ???
Last item is I + S + X
!!! dans BWA, mismatch (X) not provided... should be deduced from last item - I - S
.. note:: the length of the query sequence based on the CIGAR is calculated
by adding the M, I, S, =, or X and other operations are ignored.
source: https://stackoverflow.com/questions/39710796/infer-the-length-of-a-sequence-using-the-cigar/39812985#39812985
:reference: https://github.com/samtools/htslib/blob/develop/htslib/sam.h
"""
__slots__ = ("_cigarstring",)
pattern = re.compile(r"(\d+)([A-Za-z])?")
types = "MIDNSHP=XB"
def __init__(self, cigarstring: str):
""".. rubric:: Constructor
:param str cigarstring: the CIGAR string.
.. note:: the input CIGAR string validity is not checked.
If an unknown type is found, it is ignored generally.
For instance, the length of 1S100Y is 1 since Y is not correct.
"""
if not isinstance(cigarstring, str):
raise TypeError("Cigar string must be a string.")
#: the CIGAR string attribute
self._cigarstring = cigarstring
@property
def cigarstring(self):
return self._cigarstring
def __str__(self):
return self._cigarstring
def __repr__(self):
return "Cigar( {} )".format(self._cigarstring)
def __len__(self):
return sum([y for x, y in self._decompose() if x in "MIS=X"])
def _decompose(self):
for num, op in self.pattern.findall(self._cigarstring):
if op and op in self.types:
yield op, int(num)
[docs]
def as_sequence(self):
return "".join(op * count for op, count in self._decompose())
[docs]
def as_dict(self):
"""Return cigar types and their count
:return: dictionary
Note that repeated types are added::
>>> c = Cigar('1S2M1S')
>>> c.as_dict()
{"S":2,"M":2}
"""
# !! here, we have to make sure that duplicated letters are summed up
d = defaultdict(int)
for op, count in self._decompose():
d[op] += count
return d
[docs]
def as_tuple(self):
"""Decompose the cigar string into tuples keeping track of repeated types
:return: tuple
.. doctest::
>>> from sequana import Cigar
>>> c = Cigar("1S2M1S")
>>> c.as_tuple()
(('S', 1), ('M', 2), ('S', 1))
"""
return tuple(self._decompose())
[docs]
def compress(self):
"""1S1S should become 2S. inplace modification"""
data = list(self._decompose())
if not data:
return
compressed = [data[0]]
for op, count in data[1:]:
last_op, last_count = compressed[-1]
if op == last_op:
compressed[-1] = (op, last_count + count)
else:
compressed.append((op, count))
self._cigarstring = "".join(f"{count}{op}" for op, count in compressed)
[docs]
def stats(self):
"""Returns number of occurence for each type found in :attr:`types`
::
>>> c = Cigar("1S2M1S")
>>> c.stats()
[2, 0, 0, 0, 2, 0, 0, 0, 0, 0]
"""
counts = [0] * len(self.types)
d = self.as_dict()
for op, val in d.items():
idx = self.types.find(op)
if idx != -1:
counts[idx] = val
return counts
[docs]
def get_query_length(self) -> int:
"""Return length consumed by the query (read)."""
return sum(count for op, count in self._decompose() if op in "MIS=X")
[docs]
def get_reference_length(self) -> int:
"""Return length consumed by the reference."""
return sum(count for op, count in self._decompose() if op in "MDN=X")
[docs]
def fetch_exon(chrom, start, cigar):
chrom_start = start
exon_bound = []
for c, size in cigar:
if c == 0:
exon_bound.append((chrom, chrom_start, chrom_start + size))
chrom_start += size
elif c == 1:
continue
elif c == 2:
chrom_start += size
elif c == 3:
chrom_start += size
elif c == 4: # FIXME do we want to include this in the exon
chrom_start += size
else:
continue
return exon_bound
[docs]
def fetch_intron(chrom, start, cigar):
# equivalence:
# c = 0 -> M
# c = 1 -> I
# c = 2 -> D
# c = 3 -> N gap/intron
# c = 4 -> S soft clipping
chrom_start = start
intron_bound = []
for c, size in cigar:
if c == 0:
chrom_start += size
elif c == 1:
continue
elif c == 2:
chrom_start += size
elif c == 3:
intron_bound.append((chrom, chrom_start, chrom_start + size))
chrom_start += size
elif c == 4: # not including soft clipping in the intron
continue
else:
continue
return intron_bound
[docs]
def fetch_clip(chrom, start, cigar):
chrom_start = start
clip_bound = []
for c, size in cigar:
if c == 0:
chrom_start += size
elif c == 1:
continue
elif c == 2:
chrom_start += size
elif c == 3:
chrom_start += size
elif c == 4:
clip_bound.append((chrom, chrom_start, chrom_start + size))
chrom_start += size
else:
continue
return clip_bound
[docs]
def fetch_deletion(chrom, start, cigar):
chrom_start = start
deletion_bound = []
for c, size in cigar:
if c == 0:
chrom_start += size
elif c == 1:
continue
elif c == 2:
deletion_bound.append((chrom, chrom_start, chrom_start + size))
chrom_start += size
elif c == 3:
chrom_start += size
elif c == 4:
chrom_start += size
else:
continue
return deletion_bound
[docs]
def fetch_insertion(chrom, start, cigar):
# NOTE that the returned insertions are stored as
# chrom, start, size rather than chrom, start, end in other fetchers
# functions
chrom_start = start
insertion_bound = []
for c, size in cigar:
if c == 0:
chrom_start += size
elif c == 1:
# See note above
insertion_bound.append((chrom, chrom_start, size))
continue
elif c == 2:
chrom_start += size
elif c == 3:
chrom_start += size
elif c == 4:
chrom_start += size
else:
continue
return insertion_bound