#
# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Utilities to manipulate FastA files"""
import os
import textwrap
import colorlog
import tqdm
from sequana.lazy import numpy as np
from sequana.lazy import pylab, pysam
from sequana.stats import L50, N50
logger = colorlog.getLogger(__name__)
__all__ = ["FastA"]
def is_fasta(filename):
with open(filename, "r") as fin:
try:
line = fin.readline()
assert line.startswith(">")
line = fin.readline()
return True
except: # pragma: no cover
return False
# cannot inherit from FastxFile (no object in the API ?)
[docs]class FastA:
"""Class to handle FastA files
Works like an iterator::
from sequana import FastA
f = FastA("test.fa")
read = next(f)
names and sequences can be accessed with attributes::
f.names
f.sequences
"""
def __init__(self, filename, verbose=False):
self._fasta = pysam.FastxFile(filename)
self.filename = filename
self._N = None
def __iter__(self):
return self
def __next__(self): # python 3
return self.next()
[docs] def next(self): # python 2
# reads 4 lines
try:
d = next(self._fasta)
return d
except KeyboardInterrupt: # pragma: no cover
# This should allow developers to break a loop that takes too long
# through the reads to run forever
self._fasta.close()
self._fasta = pysam.FastxFile(self._fasta.filename)
except:
self._fasta.close()
self._fasta = pysam.FastxFile(self._fasta.filename)
raise StopIteration
def __len__(self):
if self._N is None:
logger.debug("Reading input fasta file...please wait")
self._N = sum(1 for x in pysam.FastxFile(self.filename))
return self._N
def _get_names(self):
_fasta = pysam.FastxFile(self.filename)
return [this.name for this in _fasta]
names = property(_get_names)
def _get_sequences(self):
_fasta = pysam.FastxFile(self.filename)
return [this.sequence for this in _fasta]
sequences = property(_get_sequences)
def _get_comment(self):
_fasta = pysam.FastxFile(self.filename)
return [this.comment for this in _fasta]
comments = property(_get_comment)
def _get_lengths(self):
_fasta = pysam.FastxFile(self.filename)
return [len(this.sequence) for this in _fasta]
lengths = property(_get_lengths)
[docs] def get_lengths_as_dict(self):
"""Return dictionary with sequence names and lengths as keys/values"""
return dict(zip(self.names, self.lengths))
[docs] def explode(self, outdir="."):
"""extract sequences from one fasta file and save them into individual files"""
with open(self.filename, "r") as fin:
for line in fin.readlines():
if line.startswith(">"):
# ignore the comment and > character and use it as the
# filename
name = line.split()[0][1:]
try:
# if a file was already open, let us close it
fout.close()
except NameError:
pass
finally:
fout = open(f"{outdir}/{name}.fasta", "w")
fout.write(line)
else:
fout.write(line)
# need to close the last file
fout.close()
[docs] def filter(self, output_filename, names_to_keep=None, names_to_exclude=None):
"""save FastA excluding or including specific sequences"""
if names_to_exclude is None and names_to_keep is None: # pragma: no cover
logger.warning("No ids provided")
return
if names_to_exclude:
with open(self.filename) as fin:
with open(output_filename, "w") as fout:
skip = False
# do no use readlines. may be slower but may cause memory
# issue
for line in fin:
if line.startswith(">"):
if line[1:].split()[0] in names_to_exclude:
skip = True
else:
skip = False
if skip is False:
fout.write(line)
elif names_to_keep:
with open(self.filename) as fin:
with open(output_filename, "w") as fout:
# do no use readlines. may be slower but may cause memory
# issue
skip = True
for line in fin:
if line.startswith(">"):
if line[1:].split()[0] in names_to_keep:
skip = False
else:
skip = True
if skip is False:
fout.write(line)
[docs] def select_random_reads(self, N=None, output_filename="random.fasta"):
"""Select random reads and save in a file
:param int N: number of random unique reads to select
should provide a number but a list can be used as well.
:param str output_filename:
"""
thisN = len(self)
if isinstance(N, int):
if N > thisN:
N = thisN
# create random set of reads to pick up
cherries = list(range(thisN))
np.random.shuffle(cherries)
# cast to set for efficient iteration
cherries = set(cherries[0:N])
elif isinstance(N, set):
cherries = N
elif isinstance(N, list):
cherries = set(N)
fasta = pysam.FastxFile(self.filename)
with open(output_filename, "w") as fh:
for i, read in enumerate(tqdm.tqdm(fasta)):
if i in cherries:
fh.write(read.__str__() + "\n")
else:
pass
return cherries
[docs] def get_stats(self):
"""Return a dictionary with basic statistics
N the number of contigs, the N50 and L50, the min/max/mean
contig lengths and total length.
"""
stats = {}
stats["N"] = len(self.sequences)
stats["mean_length"] = pylab.mean(self.lengths)
stats["total_length"] = sum(self.lengths)
stats["N50"] = N50(self.lengths)
stats["L50"] = L50(self.lengths)
stats["min_length"] = min(self.lengths)
stats["max_length"] = max(self.lengths)
return stats
[docs] def summary(self, max_contigs=-1):
"""returns summary and print information on the stdout
This method is used when calling sequana standalone ::
sequana summary test.fasta
"""
# used by sequana summary fasta
summary = {"number_of_contigs": len(self.sequences)}
summary["total_contigs_length"] = sum(self.lengths)
summary["mean_contig_length"] = pylab.mean(self.lengths)
summary["max_contig_length"] = max(self.lengths)
summary["min_contig_length"] = min(self.lengths)
N = 0
lengths = self.lengths[:]
positions = list(range(len(lengths)))
stats = self.get_stats()
print("#sample_name: {}".format(self.filename))
print("#total length: {}".format(stats["total_length"]))
print("#N50: {}".format(stats["N50"]))
print("#Ncontig: {}".format(stats["N"]))
print("#L50: {}".format(stats["L50"]))
print("#max_contig_length: {}".format(stats["max_length"]))
print("#min_contig_length: {}".format(stats["min_length"]))
print("#mean_contig_length: {}".format(stats["mean_length"]))
print("contig name,length,count A,C,G,T,N")
if max_contigs == -1:
max_contigs = len(lengths) + 1
while lengths and N < max_contigs:
N += 1
index = pylab.argmax(lengths)
length = lengths.pop(index)
position = positions.pop(index)
sequence = self.sequences[position]
name = self.names[position]
print(
"{},{},{},{},{},{},{}".format(
name,
length,
sequence.count("A"),
sequence.count("C"),
sequence.count("G"),
sequence.count("T"),
sequence.count("N"),
)
)
[docs] def GC_content_sequence(self, sequence):
"""Return GC content in percentage of a sequence"""
GC = sequence.count("G") + sequence.count("g")
GC += sequence.count("C") + sequence.count("c")
return GC / len(sequence) * 100
[docs] def GC_content(self):
"""Return GC content in percentage of all sequences found in the FastA file"""
lengths = sum(self.lengths)
GC = 0
for seq in self.sequences:
GC += seq.count("G") + seq.count("g")
GC += seq.count("C") + seq.count("c")
return GC / lengths * 100
[docs] def reverse_and_save(self, filename):
"""Reverse sequences and save in a file"""
with open(filename, "w") as fout:
for read in self:
fout.write(">{}\t{}\n{}\n".format(read.name, read.comment, read.sequence[::-1]))
[docs] def save_ctg_to_fasta(self, ctgname, outname, max_length=-1):
"""Select a contig and save in a file"""
index = self.names.index(ctgname)
with open("{}.fa".format(outname), "w") as fout:
if max_length == -1:
fout.write(">{}\n{}".format(outname, self.sequences[index]))
else:
fout.write(">{}\n{}".format(outname, self.sequences[index][0:max_length]))
[docs] def to_fasta(self, outfile, width=80):
"""Save the input FastA file into a new file
The interest of this method is to wrap the sequence into 80 characters.
This is useful if the input file is not formatted correctly.
"""
with open(outfile, "w") as fout:
for name, comment, seq in zip(self.names, self.comments, self.sequences):
seq = "\n".join(textwrap.wrap(seq, width))
if comment is None:
fout.write(f">{name}\n{seq}\n")
else:
fout.write(f">{name}\t{comment}\n{seq}\n")
[docs] def to_igv_chrom_size(self, output):
"""Create a IGV file storing chromosomes and their sizes"""
data = self.get_lengths_as_dict()
with open(output, "w") as fout:
for k, v in data.items():
fout.write("{}\t{}\n".format(k, v))
[docs] def save_collapsed_fasta(self, outfile, ctgname, width=80, comment=None):
"""Concatenate all contigs and save results"""
with open(outfile, "w") as fout:
data = "".join(self.sequences)
seq = "\n".join(textwrap.wrap(data, width))
if comment is None:
fout.write(f">{ctgname}\n{seq}\n")
else:
fout.write(f">{ctgname}\t{comment}\n{seq}\n")