# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import os
import shutil
import sys
from pathlib import Path, PosixPath
import colorlog
from easydev import TempFile, md5
from snakemake import shell
from sequana import sequana_config_path
from sequana.kraken.analysis import KrakenAnalysis, KrakenDB, KrakenResults
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.misc import wget
logger = colorlog.getLogger(__name__)
__all__ = [
"KrakenConsensus",
]
[docs]
class KrakenConsensus(object):
"""Kraken Sequential Analysis
This runs Kraken on a FastQ file with multiple k-mer databases in a
sequencial way way. Unclassified sequences with the first database are input
for the second, and so on.
The input may be a single FastQ file or paired, gzipped or not. FastA are
also accepted.
"""
def __init__(
self,
filename_fastq,
fof_databases,
threads=1,
output_directory="./kraken_sequential/",
keep_temp_files=False,
output_filename_unclassified=None,
output_filename_classified=None,
force=False,
confidence=0,
):
""".. rubric:: **constructor**
:param filename_fastq: FastQ file to analyse
:param fof_databases: file that contains a list of databases paths
(one per line). The order is important. Note that you may also
provide a list of datab ase paths.
:param threads: number of threads to be used by Kraken
:param output_directory: name of the output directory
:param keep_temp_files: bool, if True, will keep intermediate files
from each Kraken analysis, and save html report at each step
:param bool force: if the output directory already exists, the
instanciation fails so that the existing data is not overrwritten.
If you wish to overwrite the existing directory, set this
parameter to iTrue.
"""
self.filename_fastq = filename_fastq
self.confidence = confidence
# input databases may be stored in a file
if isinstance(fof_databases, str) and os.path.exists(fof_databases):
with open(fof_databases, "r") as fof:
self.databases = [absolute_path.split("\n")[0] for absolute_path in fof.readlines()]
# or simply provided as a list
elif isinstance(fof_databases, (list, tuple)):
self.databases = fof_databases[:]
else:
raise TypeError(
"input databases must be a list of valid kraken2 " "databases or a file (see documentation)"
)
self.databases = [KrakenDB(x) for x in self.databases]
for d in self.databases:
if d.version != "kraken2":
logger.error(f"input database {d} is not valid kraken2 ")
sys.exit(1)
self.threads = threads
self.output_directory = output_directory
self.keep_temp_files = keep_temp_files
# check if the output directory already exist
try:
os.mkdir(output_directory)
except OSError:
if os.path.isdir(output_directory) and force is False:
logger.error("Output directory %s already exists" % output_directory)
raise Exception
elif force is True:
logger.warning(
"Output directory %s already exists. You may " "overwrite existing results" % output_directory
)
# list of input fastq files
if isinstance(filename_fastq, list) and len(filename_fastq) in [1, 2]:
self.inputs = filename_fastq[:]
elif isinstance(filename_fastq, str):
self.inputs = [filename_fastq]
else:
msg = "input file must be a string or list of 2 filenames"
msg += "\nYou provided {}".format(filename_fastq)
raise TypeError(msg)
if len(self.inputs) == 1:
self.paired = False
elif len(self.inputs) == 2:
self.paired = True
self.unclassified_output = output_filename_unclassified
self.classified_output = output_filename_classified
def _run_one_analysis(self, iteration):
"""Run one analysis"""
db = self.databases[iteration]
logger.info("Analysing data using database {}".format(db))
# a convenient alias
_pathto = lambda x: self.output_directory / x
inputs = self.inputs
file_kraken_out = self.output_directory / f"kraken_{iteration}.out"
# The analysis itself
analysis = KrakenAnalysis(inputs, db, self.threads, confidence=self.confidence)
analysis.run(
output_filename=file_kraken_out,
# output_filename_unclassified=output_filename_unclassified,
only_classified_output=False,
)
# save input/output files.
self._list_kraken_output.append(file_kraken_out)
[docs]
def run(self, dbname="multiple", output_prefix="kraken_final"):
"""Run the sequential analysis
:param dbname:
:param output_prefix:
:return: dictionary summarizing the databases names and
classified/unclassied
This method does not return anything creates a set of files:
- kraken_final.out
- krona_final.html
- kraken.png (pie plot of the classified/unclassified reads)
.. note:: the databases are run in the order provided in the constructor.
"""
# list of all output to merge at the end
self._list_kraken_output = []
self._list_kraken_input = []
# Iteration over the databases
for iteration in range(len(self.databases)):
# The analysis itself
status = self._run_one_analysis(iteration)
# build consensus from the different output files
build_consensus(self._list_kraken_output, self.output_directory / "kraken.out")
file_output_final = self.output_directory / "kraken.out"
logger.info("Analysing final results")
result = KrakenResults(file_output_final, verbose=False)
"""try:
result.histo_classified_vs_read_length()
pylab.savefig(self.output_directory / "hist_read_length.png")
except Exception as err:
logger.warning("hist read length could not be computed")
try:
result.boxplot_classified_vs_read_length()
pylab.savefig(self.output_directory / "boxplot_read_length.png")
except Exception as err:
logger.warning("hist read length could not be computed")
"""
# TODO: this looks similar to the code in KrakenPipeline. could be factorised
result.to_js("%s.html" % (self.output_directory / output_prefix))
try:
result.plot2(kind="pie")
except Exception as err:
logger.warning(err)
result.plot(kind="pie")
pylab.savefig(self.output_directory / "kraken.png")
prefix = self.output_directory
result.kraken_to_json(prefix / "kraken.json", dbname)
result.kraken_to_csv(prefix / "kraken.csv", dbname)
# remove kraken intermediate files (including unclassified files)
if self.unclassified_output:
# Just cp the last unclassified file
try:
# single-end data (one file)
shutil.copy2(self._list_kraken_input[-1], self.unclassified_output)
except:
for i, x in enumerate(self._list_kraken_input[-1]):
shutil.copy2(x, self.unclassified_output.replace("#", str(i + 1)))
if self.classified_output:
# Just cp the last classified file
shutil.copy2(self._list_kraken_input[-1], self.classified_output)
summary = {"databases": [x.name for x in self.databases]}
total = 0
classified = 0
for f_temp, db in zip(self._list_kraken_output, self.databases):
# In theory, the first N-1 DB returns only classified (C) read
# and the last one contains both
try:
df = pd.read_csv(f_temp, sep="\t", header=None, usecols=[0])
C = sum(df[0] == "C")
U = sum(df[0] == "U")
except pd.errors.EmptyDataError:
# if no read classified,
C = 0
U = 0
total += U
total += C
classified += C
summary[db.name] = {"C": C}
if U != 0: # the last one
summary["unclassified"] = U
summary["total"] = total
summary["classified"] = classified
return summary
from functools import lru_cache
from easydev import do_profile
# @lru_cache(maxsize=32)
def searchLCA(taxids, taxonomy_file, buffer={}):
if taxids in buffer:
return buffer[taxids]
obsolets = {1513193: 2748961, 35306: 3052441}
# taxonomy_file.update(obsolets)
IDs = taxids
taxids = [int(x) for x in taxids]
# TODO. handle unknown IDs
try:
parents = taxonomy_file.loc[taxids].parent.values
except:
parents = []
if len(parents) == 0:
logger.warning(f"the taxids {taxids} were not found")
buffer[IDs] = -1
return -1
elif len(parents) == 1:
buffer[IDs] = parents.index[0]
return parents.index[0]
else:
taxid = "1"
LCA_found = False
while not LCA_found:
if len(set(parents)) == 1:
LCA_found = True
taxid = list(parents)[0]
break
else:
parents = taxonomy_file.loc[parents].parent.values
if taxid == 1:
# print("no LCA found for these taxids:", taxids)
buffer[IDs] = 1
return 1
else:
buffer[IDs] = taxid
return taxid
def build_consensus(inputs, output):
import glob
from collections import defaultdict
from sequana.taxonomy import Taxonomy
tax = Taxonomy(verbose=True)
tax.load_records()
def find_best_hit(kmer_totals):
"""Find the k-mer key with the highest total value."""
best_hit_key = 0
best_hit_value = 0
for key, value in kmer_totals.items():
# 'if key' means that key==0 are ignored.
if key not in ["0", 0] and value > best_hit_value:
best_hit_key = key
best_hit_value = value
return best_hit_key, best_hit_value
def parse_kmer_info(kmer_info):
"""Parse the k-mer information string into a dictionary."""
kmer_dict = {}
parts = kmer_info.split()
for part in parts:
if part != "|:|":
key, value = part.split(":")
kmer_dict[key] = int(value)
return kmer_dict
def scanner(files, output):
buffer = {}
fout = open(output, "w")
streams = [open(x, "r") for x in files]
count = 0
while True:
try:
count += 1
if count % 10000 == 0:
logger.info(f"Processed {count} reads")
lines = [stream.readline() for stream in streams]
kmer_totals = defaultdict(int)
taxids = []
classifications = []
kmers = []
for line in lines:
parts = line.split("\t")
# Parse the k-mer info and update totals
# 50% here
# kmer_dict = parse_kmer_info(parts[4].strip())
# for key, value in kmer_dict.items():
# print("=", key, value)
# kmer_totals[key] += value
# if parts[1] == "HISEQ:426:C5T65ACXX:5:2302:18333:2293":
# print(line)
# print(kmer_totals)
if parts[0] == "C":
taxids.append(parts[2])
classifications.extend(parts[4])
kmers.append(parts[4])
taxids = list(set(taxids))
if len(taxids) == 0:
classified = "U"
best_taxid = "0"
elif len(taxids) == 1:
classified = "C"
best_taxid = taxids[0]
else:
best_taxid = searchLCA(tuple(taxids), tax.records, buffer)
classified = "C"
# TODO
# save the kmers for saving later
# buffer_kmers = kmer_totals.copy()
# let us just ignore the 'A' and taxid 0
# kmer_totals['A'] = -1
# kmer_totals['0'] = -1
# 6% here
# best_taxid = find_best_hit(kmer_totals)
# print(kmer_totals, best_taxid)
# count+=1
# if count>10:
# break
# kmers = " ".join([f"{k}:{v}" for k,v in kmer_totals.items()])
kmer_left = " ".join([x.split("|:|")[0] for x in kmers])
try:
kmer_right = " ".join([x.split("|:|")[1] for x in kmers])
kmers = kmer_left + " |:| " + kmer_right
except:
kmers = kmer_left
fout.write("\t".join([classified, parts[1], str(best_taxid), parts[3], kmers]))
except Exception as err:
logger.error(str(err))
break
fout.close()
for stream in streams:
stream.close()
# Get the aggregated k-mer information
kmer_totals = scanner(inputs, output)