#
# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Utilities to manipulate FastA files"""
import os
import textwrap
from collections import defaultdict
import colorlog
from sequana.lazy import numpy as np
from sequana.lazy import pylab, pysam
from sequana.stats import L50, N50
logger = colorlog.getLogger(__name__)
__all__ = ["FastA"]
class _LazySequences:
"""List-like lazy access to sequences without loading entire FASTA in memory."""
def __init__(self, fasta):
self._fasta_obj = fasta
self._cache = {}
def __len__(self):
return len(self._fasta_obj.names)
def __getitem__(self, idx):
if isinstance(idx, slice):
return [self[i] for i in range(*idx.indices(len(self)))]
name = self._fasta_obj.names[idx]
# cache optional but speeds repeated access
if name not in self._cache:
self._cache[name] = self._fasta_obj._fasta.fetch(name)
return self._cache[name]
def __iter__(self):
for i in range(len(self)):
yield self[i]
def clear_cache(self):
"""Free cached sequences if memory becomes tight."""
self._cache.clear()
def is_fasta(filename):
with open(filename, "r") as fin:
try:
line = fin.readline()
assert line.startswith(">")
line = fin.readline()
return True
except StopIteration: # pragma: no cover
return False
# cannot inherit from FastxFile (no object in the API ?)
[docs]
class FastA:
"""Class to handle FastA files
Works like an iterator::
from sequana import FastA
f = FastA("test.fa")
read = next(f)
names and sequences can be accessed with attributes::
f.names
f.sequences
"""
def __init__(self, filename, verbose=False):
fai = str(filename) + ".fai"
if os.path.exists(fai) and (os.path.getmtime(fai) < os.path.getmtime(filename)):
logger.warning(f"Your .fai file looks old compared to fasta. Please delete the .fai {fai} or update it.")
self._lazy_sequences = _LazySequences(self)
# pysam.FastaFile requires bgzip (not plain gzip) and builds a .fai index.
# Open it lazily so iteration-only workflows (common with gzipped fastq/fasta) still work.
self._fasta_obj = None
self._fastx = pysam.FastxFile(filename)
self.filename = filename
self._N = None
@property
def _fasta(self):
if self._fasta_obj is None:
self._fasta_obj = pysam.FastaFile(self.filename)
return self._fasta_obj
def __iter__(self):
return self
def __next__(self): # python 3
return self.next()
[docs]
def next(self): # python 2
try:
d = next(self._fastx)
return d
except KeyboardInterrupt: # pragma: no cover
# This should allow developers to break a loop that takes too long
# through the reads to run forever
self._fastx.close()
self._fastx = pysam.FastxFile(self.filename)
except StopIteration:
self._fastx.close()
self._fastx = pysam.FastxFile(self.filename)
raise StopIteration
def __getitem__(self, name):
return self._fasta[name]
def __len__(self):
return len(self._fasta)
def _get_names(self):
return self._fasta.references
names = property(_get_names)
@property
def sequences(self):
return self._lazy_sequences
def _get_comment(self):
if not hasattr(self, "_comments"):
self._fastx = pysam.FastxFile(self.filename)
self._comments = [this.comment for this in self._fastx]
return self._comments
comments = property(_get_comment)
[docs]
def clear_sequence_cache(self):
self._lazy_sequences.clear_cache()
[docs]
def get_cumulative_sum(self, mode="mixed", exclude=[]):
"""Compute the cumulative sum of values from a dictionary, sorted by name.
This method returns two lists:
- A list of names sorted according to the specified mode.
- A list of cumulative sums corresponding to the lengths of the sorted names.
Sorting behavior:
- If `mode="mixed"` (default), names containing numbers are sorted naturally,
meaning numerical values are sorted as integers while preserving non-numeric strings.
- If `mode="alphanum"`, all names are treated as strings and sorted lexicographically.
Parameters:
- mode (str): Sorting mode, either `"mixed"` (natural sorting) or `"alphanum"` (lexicographic).
- exclude (list): List of names to exclude from processing.
Returns:
- tuple: (sorted_names, cumulative_sums)
- sorted_names (list): Names sorted based on the selected mode.
- cumulative_sums (list): Cumulative sum of corresponding values.
Example:
If input is `['1', 'maxi', '10', '2']`, mixed mode returns `['1', '2', '10', 'maxi']`,
ensuring `['1', 10, 2, 'maxi']` is correctly ordered as `[1, 2, '10', 'maxi']`.
"""
assert mode in ["mixed", "alphanum"]
if mode == "mixed":
sorted_names = self.sorted_mixed_names
else:
sorted_names = self.sorted_names
from itertools import accumulate
lengths = self.get_lengths_as_dict()
return sorted_names, list(accumulate([lengths[name] for name in sorted_names]))
# FIXME: same get_sorted_mixed_names and get_sorted_names functions ?
def _get_sorted_mixed_names(self):
return sorted(self.names, key=lambda x: (isinstance(x, str) and not x.isdigit(), int(x) if x.isdigit() else x))
sorted_mixed_names = property(_get_sorted_mixed_names)
def _get_sorted_names(self):
return sorted(self.names, key=lambda x: (isinstance(x, str) and not x.isdigit(), int(x) if x.isdigit() else x))
sorted_names = property(_get_sorted_names)
def _get_lengths(self):
return self._fasta.lengths
lengths = property(_get_lengths)
[docs]
def get_lengths_as_dict(self):
"""Return dictionary with sequence names and lengths as keys/values"""
return dict(zip(self.names, self.lengths))
[docs]
def explode(self, outdir="."):
"""extract sequences from one fasta file and save them into individual files"""
with open(self.filename, "r") as fin:
for line in fin:
if line.startswith(">"):
# ignore the comment and > character and use it as the
# filename
name = line.split()[0][1:]
try:
# if a file was already open, let us close it
fout.close()
except NameError:
pass
finally:
fout = open(f"{outdir}/{name}.fasta", "w")
fout.write(line)
else:
fout.write(line)
# need to close the last file
fout.close()
[docs]
def filter(self, output_filename, names_to_keep=None, names_to_exclude=None):
"""save FastA excluding or including specific sequences"""
if names_to_exclude is None and names_to_keep is None: # pragma: no cover
logger.warning("No ids provided")
return
if names_to_exclude:
with open(self.filename) as fin:
with open(output_filename, "w") as fout:
skip = False
# do no use readlines. may be slower but may cause memory
# issue
for line in fin:
if line.startswith(">"):
if line[1:].split()[0] in names_to_exclude:
skip = True
else:
skip = False
if skip is False:
fout.write(line)
elif names_to_keep:
with open(self.filename) as fin:
with open(output_filename, "w") as fout:
# do no use readlines. may be slower but may cause memory
# issue
skip = True
for line in fin:
if line.startswith(">"):
if line[1:].split()[0] in names_to_keep:
skip = False
else:
skip = True
if skip is False:
fout.write(line)
[docs]
def select_random_reads(self, N=None, output_filename="random.fasta"):
"""Select random reads and save in a file
:param int N: number of random unique reads to select
should provide a number but a list can be used as well.
:param str output_filename:
"""
thisN = len(self)
if isinstance(N, int):
if N > thisN:
N = thisN
# create random set of reads to pick up
cherries = list(range(thisN))
np.random.shuffle(cherries)
# cast to set for efficient iteration
cherries = set(cherries[0:N])
elif isinstance(N, set):
cherries = N
elif isinstance(N, list):
cherries = set(N)
comments = self.comments[:]
with open(output_filename, "w") as fh:
for i in cherries:
name = self.names[i]
seq = self._fasta.fetch(self.names[i])
comment = comments[i]
fh.write(f">{name}\t{comment}\n{seq}\n")
return cherries
[docs]
def get_stats(self):
"""Return a dictionary with basic statistics
N the number of contigs, the N50 and L50, the min/max/mean
contig lengths and total length.
"""
stats = {}
stats["N"] = len(self)
stats["mean_length"] = pylab.mean(self.lengths)
stats["total_length"] = sum(self.lengths)
stats["N50"] = N50(self.lengths)
stats["L50"] = L50(self.lengths)
stats["min_length"] = min(self.lengths)
stats["max_length"] = max(self.lengths)
return stats
[docs]
def summary(self, max_contigs=-1):
"""returns summary and print information on the stdout
This method is used when calling sequana standalone ::
sequana summary test.fasta
"""
# used by sequana summary fasta
summary = {"number_of_contigs": len(self)}
summary["total_contigs_length"] = sum(self.lengths)
summary["mean_contig_length"] = pylab.mean(self.lengths)
summary["max_contig_length"] = max(self.lengths)
summary["min_contig_length"] = min(self.lengths)
N = 0
lengths = self.lengths[:]
positions = list(range(len(lengths)))
stats = self.get_stats()
print("#sample_name: {}".format(self.filename))
print("#total length: {}".format(stats["total_length"]))
print("#N50: {}".format(stats["N50"]))
print("#Ncontig: {}".format(stats["N"]))
print("#L50: {}".format(stats["L50"]))
print("#max_contig_length: {}".format(stats["max_length"]))
print("#min_contig_length: {}".format(stats["min_length"]))
print("#mean_contig_length: {}".format(stats["mean_length"]))
print("contig name,length,count A,C,G,T,N")
if max_contigs == -1:
max_contigs = len(lengths) + 1
while lengths and N < max_contigs:
N += 1
index = pylab.argmax(lengths)
length = lengths.pop(index)
position = positions.pop(index)
name = self.names[position]
sequence = self._fasta.fetch(name)
print(
"{},{},{},{},{},{},{}".format(
name,
length,
sequence.count("A"),
sequence.count("C"),
sequence.count("G"),
sequence.count("T"),
sequence.count("N"),
)
)
[docs]
def GC_content_sequence(self, sequence):
"""Return GC content in percentage of a sequence"""
from collections import Counter
c = Counter(sequence.upper())
GC = c["G"] + c["C"]
return GC / len(sequence) * 100
[docs]
def GC_content(self):
"""Return GC content in percentage of all sequences found in the FastA file"""
lengths = sum(self.lengths)
GC = 0
for name in self.names:
seq = self._fasta.fetch(name)
GC += seq.count("G") + seq.count("g")
GC += seq.count("C") + seq.count("c")
return GC / lengths * 100
[docs]
def reverse_and_save(self, filename):
"""Reverse sequences and save in a file"""
with open(filename, "w") as fout:
for read in self:
fout.write(">{}\t{}\n{}\n".format(read.name, read.comment, read.sequence[::-1]))
[docs]
def save_ctg_to_fasta(self, ctgname, outname, max_length=-1):
"""Select a contig and save in a file"""
with open("{}.fa".format(outname), "w") as fout:
if max_length == -1:
fout.write(">{}\n{}".format(outname, self._fasta.fetch(ctgname)))
else:
fout.write(">{}\n{}".format(outname, self._fasta.fetch(ctgname)[0:max_length]))
[docs]
def to_fasta(self, outfile, width=80, sorting="natsort"):
"""Save the input FastA file into a new file
The interest of this method is to wrap the sequence into 80 characters.
This is useful if the input file is not formatted correctly.
"""
if sorting == "natsort":
import natsort
names = natsort.natsorted(self.names)
else:
names = self.names
with open(outfile, "w") as fout:
for name in names:
comment = self.comments[self.names.index(name)]
# fetch sequence and wrap it
seq = self._fasta.fetch(name)
seq = "\n".join(textwrap.wrap(seq, width))
if comment is None:
fout.write(f">{name}\n{seq}\n")
else:
fout.write(f">{name}\t{comment}\n{seq}\n")
[docs]
def to_igv_chrom_size(self, output):
"""Create a IGV file storing chromosomes and their sizes"""
data = self.get_lengths_as_dict()
with open(output, "w") as fout:
for k, v in data.items():
fout.write("{}\t{}\n".format(k, v))
[docs]
def save_collapsed_fasta(self, outfile, ctgname, width=80, comment=None):
"""Concatenate all contigs and save results"""
with open(outfile, "w") as fout:
fout.write(f">{ctgname}\n")
for seq in self.sequences:
seq = "\n".join(textwrap.wrap(seq, width))
if comment is None:
fout.write(f">{ctgname}\n{seq}\n")
else:
fout.write(f">{ctgname}\t{comment}\n{seq}\n")
[docs]
def find_gaps(self):
"""Identify NNNNs in data
returns a dictionary. keys are the chromosomes' names
values is a list. the first item is the number of Ns. the next items are the gaps' positions
"""
results = defaultdict(list)
for i, seq in enumerate(self.sequences):
count = 0
positions = [0]
for pos, x in enumerate(seq):
if x == "N":
count += 1
positions.append(pos)
if count:
name = self.names[i]
results[name].append(count)
for i, pos in enumerate(positions[1:]):
if positions[i] - pos == -1:
pass
else:
results[name].append(pos)
return results
[docs]
def print_sequence_region_for_gff(self):
for name in sorted(self.names):
L = len(self.sequences[self.names.index(name)])
# the 2 # are important e.g. for snpeff
print(f"##sequence-region\t{name}\t{1}\t{L}")