Source code for sequana.krona
#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import collections
import colorlog
from sequana.lazy import pandas as pd
logger = colorlog.getLogger(__name__)
__all__ = ["KronaMerger"]
[docs]
class KronaMerger(collections.Counter):
"""Utility to merge two Krona files
Imagine those two files (formatted for Krona; first column is a counter)::
14011 Bacteria Proteobacteria species1
591 Bacteria Proteobacteria species4
184 Bacteria Proteobacteria species3
132 Bacteria Proteobacteria species2
32 Bacteria Proteobacteria species1
You can merge the two files. The first and last lines correspond to the same
taxon (species1) so we should end up with a new Krona file with 4 lines
only.
The test files are available within Sequana as test_krona_k1.tsv
and test_krona_k2.tsv::
from sequana import KronaMerger, sequana_data
k1 = KronaMerger(sequana_data("test_krona_k1.tsv"))
k2 = KronaMerger(sequana_data("test_krona_k2.tsv"))
k1 += k2
# Save the results. Note that it must be tabulated for Krona external usage
k1.to_tsv("new.tsv")
.. warning:: separator must be tabulars
"""
def __init__(self, filename):
""".. rubric:: constructor
:param str filename:
"""
super(KronaMerger, self).__init__()
self.filename = filename
self._read()
def _read(self):
with open(self.filename, "r") as fin:
for line in fin.readlines():
count, name = line.split("\t", 1)
count = int(count)
self[name] += count
[docs]
def to_tsv(self, output_filename):
"""Save the content into a new file in TSV format"""
assert output_filename.endswith(".tsv")
labels = []
counts = []
for k, count in self.items():
labels.append(k)
counts.append(count)
df = pd.DataFrame({"label": labels, "count": counts})
df = df[["count", "label"]]
df["label"] = df["label"].apply(lambda x: x.strip())
try:
df.sort_values("count", inplace=True, ascending=False)
except:
df.sort("count", inplace=True, ascending=False)
df.to_csv(output_filename, sep="\t", index=None, header=None)
return df