Source code for sequana.tools

#  This file is part of Sequana software
#
#  Copyright (c) 2016-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""General tools"""
import gzip
import io
import json
import os
import re
import shutil
import string
import subprocess
from collections import Counter

import colorlog
from tqdm import tqdm

from sequana.bamtools import BAM
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pysam

logger = colorlog.getLogger(__name__)


__all__ = ["StatsBAM2Mapped", "bam_to_mapped_unmapped_fastq", "GZLineCounter"]


_translate = bytes.maketrans(b"ACGTacgt", b"TGCAtgca")


def reverse_complement(seq):
    # use hastag but for now, do it manually
    return seq.translate(_translate)[::-1]


def reverse(seq):
    return seq[::-1]


[docs]class StatsBAM2Mapped: def __init__(self, bamfile=None, wkdir=None, verbose=True): self.wkdir = wkdir if bamfile.endswith(".bam"): self.data = bam_to_mapped_unmapped_fastq(bamfile, wkdir, verbose) elif bamfile.endswith(".json"): self.data = json.load(open(bamfile, "r"))
[docs] def to_json(self, filename): json.dump(self.data, open(filename, "w"))
[docs] def to_html(self): data = self.data html = "Reads with Phix: %s %%<br>" % round(data["contamination"], 3) # add HTML table if "R2_mapped" in data.keys(): df = pd.DataFrame( { "R1": [data["R1_mapped"], data["R1_unmapped"]], "R2": [data["R2_mapped"], data["R2_unmapped"]], } ) else: df = pd.DataFrame({"R1": [data["R1_mapped"], data["R1_unmapped"]]}) df.index = ["mapped", "unmapped"] html += "Unpaired: %s <br>" % data["unpaired"] html += "duplicated: %s <br>" % data["duplicated"] return html
[docs]def bam_to_mapped_unmapped_fastq(filename, output_directory=None, progress=True): """Create mapped and unmapped fastq files from a BAM file :context: given a reference, one or two FastQ files are mapped onto the reference to generate a BAM file. This BAM file is a compressed version of a SAM file, which interpretation should be eased within this function. :param filename: input BAM file :param output_directory: where to save the mapped and unmapped files :return: dictionary with number of reads for each file (mapped/unmapped for R1/R2) as well as the mode (paired or not), the number of unpaired reads, and the number of duplicated reads. The unpaired reads should be zero (sanity check) Given a BAM file, create FASTQ with R1/R2 reads mapped and unmapped. In the paired-end case, 4 files are created. Note that this function is efficient in that it does not create intermediate files limiting IO in the process. As compared to standard tools such as bedtools bamtofastq, it is 1.5 to 2X slower but it does create the mapped AND unmapped reads. :Details: Secondary alignment (flag 256) are dropped so as to remove any ambiguous alignments. The output dictionary stores "secondary" key to keep track of the total number of secondary reads that are dropped. If the flag is 256 and the read is unpaired, the key *unpaired* is also incremented. If the flag is not equal to 256, we first reverse complement reads that are tagged as *reverse* in the BAM file. Then, reads that are not paired or not "proper pair" (neither flag 4 nor flag 8) are ignored. If R1 is mapped **or** R2 is mapped then the reads are considered mapped. If both R1 and R2 are unmapped, then reads are unmapped. .. note:: about chimeric alignment: one is the representative and the other is the supplementary. This flag is not used in this function. Note also that chimeric alignment have same QNAME and flag 4 and 8 .. note:: the contamination reported is based on R1 only. .. todo:: comments are missing since there are not stored in the BAM file. .. note:: the mapped reads may not be synchronized because we include also the chimeric alignment (cf samtools documentation). However, total reads = unmappeds reads + R1 mapped + R2 mapped - supplementary reads (those with flag 2048). """ bam = BAM(filename) # figure out if this is paired or unpaired newname, ext = os.path.splitext(filename) import collections stats = collections.defaultdict(int) stats["R1_unmapped"] = 0 stats["R1_mapped"] = 0 # figure out where to save the file if output_directory is None: pass else: assert isinstance(filename, str) from sequana_pipetools.snaketools import FileFactory ff = FileFactory(filename) newname = output_directory + os.sep + ff.filenames[0] rt1 = "_R1_" rt2 = "_R2_" R1_mapped = open(newname + "{}.mapped.fastq".format(rt1), "wb") R1_unmapped = open(newname + "{}.unmapped.fastq".format(rt1), "wb") stats["duplicated"] = 0 stats["unpaired"] = 0 unpaired = 0 # if paired, let open other files if bam.is_paired: stats["mode"] = "pe" stats["R2_unmapped"] = 0 stats["R2_mapped"] = 0 R2_mapped = open(newname + "{}.mapped.fastq".format(rt2), "wb") R2_unmapped = open(newname + "{}.unmapped.fastq".format(rt2), "wb") else: stats["mode"] = "se" # loop through the BAM (make sure it is rewinded) bam.reset() for this in tqdm(bam, disable=not progress): if this.flag & 256: # Unmapped reads are in the BAM file but have no valid assigned # position (N.B., they may have an assigned position, but it should be ignored). # It's typically the case that a number of reads can't be aligned, due to things # like sequencing errors, imperfect matches between the DNA sequenced and the # reference, random e. coli or other contamination, etc.. # A secondary alignment occurs when a given read could align reasonably well to # more than one place. One of the possible reported alignments is termed "primary" # and the others will be marked as "secondary". stats["secondary"] += 1 if this.is_paired is False: stats["unpaired"] += 1 else: # quick hack if this.is_read1: suffix = b"/1" else: suffix = b"/2" # in pysam, seq is a string and qual a bytes.... if this.is_reverse is True: txt = b"@" + bytes(this.qname, "utf-8") + suffix + b"\n" revcomp = reverse_complement(this.seq) txt += bytes(revcomp, "utf-8") + b"\n" txt += b"+\n" txt += bytes(this.qual[::-1], "utf-8") + b"\n" else: txt = b"@" + bytes(this.qname, "utf-8") + suffix + b"\n" txt += bytes(this.seq, "utf-8") + b"\n" txt += b"+\n" txt += bytes(this.qual, "utf-8") + b"\n" # Here, we must be careful as to keep the pairs. So if R1 is mapped # but R2 is unmapped (or the inverse), then the pair is mapped if this.is_read1: if this.is_unmapped and this.mate_is_unmapped: R1_unmapped.write(txt) stats["R1_unmapped"] += 1 else: R1_mapped.write(txt) stats["R1_mapped"] += 1 elif this.is_read2: if this.is_unmapped and this.mate_is_unmapped: R2_unmapped.write(txt) stats["R2_unmapped"] += 1 else: R2_mapped.write(txt) stats["R2_mapped"] += 1 else: # pragma: no cover # This should be a single read # assert self.is_paired is False stats["unpaired"] += 1 if this.is_unmapped: R1_unmapped.write(txt) stats["R1_unmapped"] += 1 else: R1_mapped.write(txt) stats["R1_mapped"] += 1 if this.is_duplicate: stats["duplicated"] += 1 if bam.is_paired: R2_mapped.close() R2_unmapped.close() R1_mapped.close() R1_unmapped.close() _x = stats["R1_mapped"] _y = stats["R1_unmapped"] stats["contamination"] = _x / float(_x + _y) * 100 return stats
def bam_get_paired_distance(filename): """Return distance between 2 mated-reads :return: list of tuples where each tuple contains the position start, position end of the paired-end reads that were mapped + the mode. mode =1 means fragment is reversed. mode = 2 means mate is reversed. mode = 3 means none are reversed. :: distances = bam_get_paired_distance(bamfile) hist([x[1]-x[0] for x in distances]) .. warning:: experimental """ b = BAM(filename) distances = [] for fragment in b: if not fragment.is_unmapped and not fragment.mate_is_unmapped and fragment.is_read1: # get the mate: mate = next(b) if fragment.is_reverse: position2 = fragment.reference_end position1 = mate.reference_start mode = 1 elif mate.is_reverse: position1 = fragment.reference_start position2 = mate.reference_end mode = 2 else: # if both are not reversed, what does that mean. # On Hm2, this is the case for 4 pairs out of 1622 # This seems to be a special case for fragment ends exactly # at the end of the reference and mate starts exactly at # the beginnin with a length less than 100 print(fragment.reference_start, fragment.reference_end) print(mate.reference_start, mate.reference_end) position1 = -1 position2 = -1 mode = 3 distances.append((position1, position2, mode)) return distances def fast_gc_content(chrom): """Returns mean GC content on a sequence Cope with lower and upper cases if x or X are present, the final GC percentage ignores them. """ L = len(chrom) GC = chrom.count("G") + chrom.count("C") + chrom.count("c") + chrom.count("g") xX = chrom.count("X") + chrom.count("x") return GC / (L - xX) def _base_content(filename, window_size, letters, circular=False): # DOC: see gc_content fasta = pysam.FastxFile(filename) checker = set(letters) chrom_gc_content = dict() for chrom in fasta: mid = int(window_size / 2) # Create gc_content array gc_content = np.empty(len(chrom.sequence)) gc_content[:] = np.nan if circular: chrom.sequence = chrom.sequence[-mid:] + chrom.sequence + chrom.sequence[:mid] # Does not shift index of array mid = 0 # Count first window content counter = Counter(chrom.sequence[0:window_size]) gc_count = 0 for letter in letters: gc_count += counter[letter] gc_content[mid] = gc_count for i in range(1, len(chrom.sequence) - window_size + 1): if chrom.sequence[i - 1] in checker: gc_count -= 1 if chrom.sequence[i + window_size - 1] in checker: gc_count += 1 gc_content[i + mid] = gc_count chrom_gc_content[chrom.name] = gc_content / window_size return chrom_gc_content def gc_content(filename, window_size, circular=False, letters=["G", "C", "c", "g"]): """Return GC content for the different sequences found in a FASTA file :param filename: fasta formated file :param window_size: window length used to compute GC content :param circular: set to True if sequences are circular. :return: dictionary with keys as fasta names and values as GC content vecor .. todo:: case when the genome is not circular -> Add NaN at start and stop of the np.arange() """ return _base_content(filename, window_size, letters, circular=circular) def genbank_features_parser(input_filename): """Return dictionary with features contains inside a genbank file. :param str input_filename: genbank formated file """ logger.warning("deprecated. please use GenBank.genbank_features_parser instead") new_feature = {} records = {} feature_list = [] feature_field = False with open(input_filename, "r") as fp: for line in fp: # pass header and sequence fields if not feature_field: # get contig/chrom name if line.startswith("LOCUS"): name = line.split()[1] elif line.startswith("FEATURE"): feature_field = True else: # if feature field is finished if line.startswith("ORIGIN"): feature_field = False records[name] = feature_list feature_list = [] new_feature = [] continue # if there are a word in qualifier indent (feature type) # maybe we need to infer the size of indentation ??? if line[0:20].split(): if new_feature: feature_list.append(new_feature) split_line = line.split() t = split_line[0] # Handle : # complement(order(1449596..1449640,1449647..1450684, # 1450695..1450700)) positions = split_line[1] if positions[0].isalpha(): while not line[:-1].endswith(")"): line = next(fp) positions += line pos = [int(n) for n in re.findall(r"\d+", positions)] # Handle complement(join(3773333..3774355,3774357..3774431)) start = pos[0] end = pos[-1] strand = "-" if split_line[1].startswith("c") else "+" new_feature = { "type": t, "gene_start": start, "gene_end": end, "strand": strand, } # recover qualifier bound with feature else: quali_line = line.strip().replace('"', "") if quali_line.startswith("/") and "=" in quali_line: qualifier = quali_line.split("=") key = qualifier[0][1:] new_feature[key] = qualifier[1] else: if key == "translation": new_feature[key] += quali_line else: new_feature[key] += " " + quali_line return records
[docs]class GZLineCounter(object): """Fast GZipped line counter Uses zcat if possible, otherwise gzip library (twice as slow). .. doctest:: >>> from sequana import sequana_data >>> from sequana.tools import GZLineCounter >>> gz = GZLineCounter(sequana_data("test.fastq.gz")) >>> len(gz) 1000 """ def __init__(self, filename): self.filename = filename if shutil.which("zcat"): self.use_zcat = True else: # pragma: no cover self.use_zcat = False def __len__(self): if self.use_zcat: return self._use_zcat() else: return self._use_gzip() def _use_zcat(self): i = 0 p = subprocess.Popen(["zcat", self.filename], stdout=subprocess.PIPE) for line in p.stdout: i += 1 return i """twice as fast as the other method below but large memory print def _use_gzip(self): with gzip.open(self.filename) as gz_file: data = gz_file.read() return len(data.splitlines()) """ def _use_gzip(self): i = 0 with gzip.open(self.filename) as gz_file: with io.BufferedReader(gz_file) as f: for line in f: i += 1 return i
def entropy(x): letters = set(x) from pylab import log pi = [x.count(l) / float(len(x)) for l in letters] pi = [x for x in pi if x != 0] return -sum(pi * log(pi)) class PairedFastQ(object): def __init__(self, fq1, fq2): self.fq1 = fq1 self.fq2 = fq2 def is_synchronised(self): from sequana import FastQ N = 0 for a, b in zip(FastQ(self.fq1), FastQ(self.fq2)): a = a["identifier"].decode() b = b["identifier"].decode() a = a.split()[0] b = b.split()[0] if a.endswith("/1"): id1 = a.rsplit("/1")[0] elif a.endswith("/2"): id1 = a.rsplit("/2")[0] else: id1 = a if b.endswith("/1"): id2 = b.rsplit("/1")[0] elif b.endswith("/2"): id2 = b.rsplit("/2")[0] else: id2 = b if id1 != id2: print("%s differs from %s" % (id1, id2)) print(a) print(b) return False N += 1 print(N) return True