Source code for sequana.fasta

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2021 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Utilities to manipulate FastA files"""
import os
import textwrap

import colorlog
import tqdm

from sequana.lazy import numpy as np
from sequana.lazy import pylab, pysam
from sequana.stats import L50, N50

logger = colorlog.getLogger(__name__)


__all__ = ["FastA"]


def is_fasta(filename):
    with open(filename, "r") as fin:
        try:
            line = fin.readline()
            assert line.startswith(">")
            line = fin.readline()
            return True
        except:  # pragma: no cover
            return False


# cannot inherit from FastxFile (no object in the API ?)
[docs]class FastA: """Class to handle FastA files Works like an iterator:: from sequana import FastA f = FastA("test.fa") read = next(f) names and sequences can be accessed with attributes:: f.names f.sequences """ def __init__(self, filename, verbose=False): self._fasta = pysam.FastxFile(filename) self.filename = filename self._N = None def __iter__(self): return self def __next__(self): # python 3 return self.next()
[docs] def next(self): # python 2 # reads 4 lines try: d = next(self._fasta) return d except KeyboardInterrupt: # pragma: no cover # This should allow developers to break a loop that takes too long # through the reads to run forever self._fasta.close() self._fasta = pysam.FastxFile(self._fasta.filename) except: self._fasta.close() self._fasta = pysam.FastxFile(self._fasta.filename) raise StopIteration
def __len__(self): if self._N is None: logger.debug("Reading input fasta file...please wait") self._N = sum(1 for x in pysam.FastxFile(self.filename)) return self._N def _get_names(self): _fasta = pysam.FastxFile(self.filename) return [this.name for this in _fasta] names = property(_get_names) def _get_sequences(self): _fasta = pysam.FastxFile(self.filename) return [this.sequence for this in _fasta] sequences = property(_get_sequences) def _get_comment(self): _fasta = pysam.FastxFile(self.filename) return [this.comment for this in _fasta] comments = property(_get_comment) def _get_lengths(self): _fasta = pysam.FastxFile(self.filename) return [len(this.sequence) for this in _fasta] lengths = property(_get_lengths)
[docs] def get_lengths_as_dict(self): """Return dictionary with sequence names and lengths as keys/values""" return dict(zip(self.names, self.lengths))
[docs] def explode(self, outdir="."): """extract sequences from one fasta file and save them into individual files""" with open(self.filename, "r") as fin: for line in fin.readlines(): if line.startswith(">"): # ignore the comment and > character and use it as the # filename name = line.split()[0][1:] try: # if a file was already open, let us close it fout.close() except NameError: pass finally: fout = open(f"{outdir}/{name}.fasta", "w") fout.write(line) else: fout.write(line) # need to close the last file fout.close()
[docs] def format_contigs_denovo(self, output_file, len_min=500): """Remove contigs with sequence length below specific threshold. :param str output_file: output file name. :param int len_min: minimal length of contigs. Example:: from sequana import FastA contigs = FastA("denovo_assembly.fasta") contigs.format_contigs_denovo("output.fasta", len_min=500) """ # catch basename of file without extension project = os.path.basename(output_file).split(".")[0] # check if directory exist output_dir = os.path.dirname(output_file) try: if not os.path.exists(output_dir): # pragma: no cover os.makedirs(output_dir) except FileNotFoundError: # pragma: no cover pass n = 1 with open(output_file, "w") as fp: for contigs in self: if len(contigs.sequence) < len_min: break name = ">{}_{} {}\n".format(project, n, contigs.name) sequence = ( "\n".join( [ contigs.sequence[i : min(i + 80, len(contigs.sequence))] for i in range(0, len(contigs.sequence), 80) ] ) + "\n" ) fp.write(name + sequence) n += 1
[docs] def filter(self, output_filename, names_to_keep=None, names_to_exclude=None): """save FastA excluding or including specific sequences""" if names_to_exclude is None and names_to_keep is None: # pragma: no cover logger.warning("No ids provided") return if names_to_exclude: with open(self.filename) as fin: with open(output_filename, "w") as fout: skip = False # do no use readlines. may be slower but may cause memory # issue for line in fin: if line.startswith(">"): if line[1:].split()[0] in names_to_exclude: skip = True else: skip = False if skip is False: fout.write(line) elif names_to_keep: with open(self.filename) as fin: with open(output_filename, "w") as fout: # do no use readlines. may be slower but may cause memory # issue skip = True for line in fin: if line.startswith(">"): if line[1:].split()[0] in names_to_keep: skip = False else: skip = True if skip is False: fout.write(line)
[docs] def select_random_reads(self, N=None, output_filename="random.fasta"): """Select random reads and save in a file :param int N: number of random unique reads to select should provide a number but a list can be used as well. :param str output_filename: """ thisN = len(self) if isinstance(N, int): if N > thisN: N = thisN # create random set of reads to pick up cherries = list(range(thisN)) np.random.shuffle(cherries) # cast to set for efficient iteration cherries = set(cherries[0:N]) elif isinstance(N, set): cherries = N elif isinstance(N, list): cherries = set(N) fasta = pysam.FastxFile(self.filename) with open(output_filename, "w") as fh: for i, read in enumerate(tqdm.tqdm(fasta)): if i in cherries: fh.write(read.__str__() + "\n") else: pass return cherries
[docs] def get_stats(self): """Return a dictionary with basic statistics N the number of contigs, the N50 and L50, the min/max/mean contig lengths and total length. """ stats = {} stats["N"] = len(self.sequences) stats["mean_length"] = pylab.mean(self.lengths) stats["total_length"] = sum(self.lengths) stats["N50"] = N50(self.lengths) stats["L50"] = L50(self.lengths) stats["min_length"] = min(self.lengths) stats["max_length"] = max(self.lengths) return stats
[docs] def summary(self, max_contigs=-1): """returns summary and print information on the stdout This method is used when calling sequana standalone :: sequana summary test.fasta """ # used by sequana summary fasta summary = {"number_of_contigs": len(self.sequences)} summary["total_contigs_length"] = sum(self.lengths) summary["mean_contig_length"] = pylab.mean(self.lengths) summary["max_contig_length"] = max(self.lengths) summary["min_contig_length"] = min(self.lengths) N = 0 lengths = self.lengths[:] positions = list(range(len(lengths))) stats = self.get_stats() print("#sample_name: {}".format(self.filename)) print("#total length: {}".format(stats["total_length"])) print("#N50: {}".format(stats["N50"])) print("#Ncontig: {}".format(stats["N"])) print("#L50: {}".format(stats["L50"])) print("#max_contig_length: {}".format(stats["max_length"])) print("#min_contig_length: {}".format(stats["min_length"])) print("#mean_contig_length: {}".format(stats["mean_length"])) print("contig name,length,count A,C,G,T,N") if max_contigs == -1: max_contigs = len(lengths) + 1 while lengths and N < max_contigs: N += 1 index = pylab.argmax(lengths) length = lengths.pop(index) position = positions.pop(index) sequence = self.sequences[position] name = self.names[position] print( "{},{},{},{},{},{},{}".format( name, length, sequence.count("A"), sequence.count("C"), sequence.count("G"), sequence.count("T"), sequence.count("N"), ) )
[docs] def GC_content_sequence(self, sequence): """Return GC content in percentage of a sequence""" GC = sequence.count("G") + sequence.count("g") GC += sequence.count("C") + sequence.count("c") return GC / len(sequence) * 100
[docs] def GC_content(self): """Return GC content in percentage of all sequences found in the FastA file""" lengths = sum(self.lengths) GC = 0 for seq in self.sequences: GC += seq.count("G") + seq.count("g") GC += seq.count("C") + seq.count("c") return GC / lengths * 100
[docs] def reverse_and_save(self, filename): """Reverse sequences and save in a file""" with open(filename, "w") as fout: for read in self: fout.write(">{}\t{}\n{}\n".format(read.name, read.comment, read.sequence[::-1]))
[docs] def save_ctg_to_fasta(self, ctgname, outname, max_length=-1): """Select a contig and save in a file""" index = self.names.index(ctgname) with open("{}.fa".format(outname), "w") as fout: if max_length == -1: fout.write(">{}\n{}".format(outname, self.sequences[index])) else: fout.write(">{}\n{}".format(outname, self.sequences[index][0:max_length]))
[docs] def to_fasta(self, outfile, width=80): """Save the input FastA file into a new file The interest of this method is to wrap the sequence into 80 characters. This is useful if the input file is not formatted correctly. """ with open(outfile, "w") as fout: for name, comment, seq in zip(self.names, self.comments, self.sequences): seq = "\n".join(textwrap.wrap(seq, width)) if comment is None: fout.write(f">{name}\n{seq}\n") else: fout.write(f">{name}\t{comment}\n{seq}\n")
[docs] def to_igv_chrom_size(self, output): """Create a IGV file storing chromosomes and their sizes""" data = self.get_lengths_as_dict() with open(output, "w") as fout: for k, v in data.items(): fout.write("{}\t{}\n".format(k, v))
[docs] def save_collapsed_fasta(self, outfile, ctgname, width=80, comment=None): """Concatenate all contigs and save results""" with open(outfile, "w") as fout: data = "".join(self.sequences) seq = "\n".join(textwrap.wrap(data, width)) if comment is None: fout.write(f">{ctgname}\n{seq}\n") else: fout.write(f">{ctgname}\t{comment}\n{seq}\n")