Source code for sequana.modules_report.cutadapt

# coding: utf-8
#
#  This file is part of Sequana software
#
#  Copyright (c) 2016 - Sequana Development Team
#
#  File author(s):
#      Thomas Cokelaer <thomas.cokelaer@pasteur.fr>
#      Dimitri Desvillechabrol <dimitri.desvillechabrol@pasteur.fr>,
#          <d.desvillechabrol@gmail.com>
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Module to write coverage report"""
import io
import json
import os
from collections import Counter

import colorlog

from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.modules_report.base_module import SequanaBaseModule
from sequana.utils import config
from sequana.utils.datatables_js import DataTable

logger = colorlog.getLogger(__name__)


[docs] class CutadaptModule(SequanaBaseModule): """Write HTML report of cutadapt analysis""" def __init__(self, cutadapt_log, sample_name, output_filename=None): """ :param input: """ super().__init__() # Expected input data is the cutadapt log file if os.path.exists(cutadapt_log) is False: logger.error("This file {} does not exist".format(cutadapt_log)) self.input_filename = cutadapt_log self.sample_name = sample_name self.jinja = {} self.data = {} atropos_log = cutadapt_log.replace(".txt", ".json") if os.path.exists(atropos_log): self.input_mode = "atropos" self.read_data() # store the rawdata self.parse_atropos(atropos_log) else: self.input_mode = "cutadapt" self.read_data() # store the rawdata self.parse_cutadapt() self._data_histograms = self._get_histogram_data() self.create_report_content() self.create_html(output_filename)
[docs] def create_report_content(self): """Generate the sections list to fill the HTML report.""" self.sections = list() self.add_summary_section() self.add_stat_section() self.add_adapters_section() self.add_histogram_section() self.add_log_section()
[docs] def read_data(self): with open(self.input_filename, "r") as fin: self._rawdata = fin.read() if "Total read pairs processed" in self._rawdata: self.jinja["mode"] = "Paired-end" self.mode = "pe" else: self.jinja["mode"] = "Single-end" self.mode = "se"
def _get_data_tobefound(self): tobefound = [] if self.mode == "se": tobefound.append(("total_reads", "Total reads processed:")) tobefound.append(("reads_with_adapters", "Reads with adapters:")) tobefound.append(("reads_with_adapters", "Reads with adapter:")) tobefound.append(("reads_too_short", "Reads that were too short:")) tobefound.append(("reads_kept", "Reads written (passing filters):")) else: # ! spaces are probably import here below ! tobefound.append(("paired_total_reads", "Total read pairs processed:")) tobefound.append(("paired_reads1_with_adapters", " Read 1 with adapter:")) tobefound.append(("paired_reads2_with_adapters", " Read 2 with adapter:")) tobefound.append(("paired_reads_too_short", "Pairs that were too short")) tobefound.append(("paired_reads_kept", "Pairs written (passing filters):")) return tobefound
[docs] def add_log_section(self): self.sections.append( { "name": "log", "anchor": "log", "content": "<pre>\n" + self._rawdata + "</pre>\n", } )
def _get_stats(self): if self.mode == "pe": prefix = "paired_" else: prefix = "" df = pd.DataFrame({"Number of reads": [], "percent": []}) df.loc["Total paired reads"] = [self.jinja["%stotal_reads" % prefix], "(100%)"] if self.mode == "pe": df.loc["Read1 with adapters"] = [ self.jinja["%sreads1_with_adapters" % prefix], self.jinja["%sreads1_with_adapters_percent" % prefix], ] df.loc["Read2 with adapters"] = [ self.jinja["%sreads2_with_adapters" % prefix], self.jinja["%sreads2_with_adapters_percent" % prefix], ] else: df.loc["Pairs with adapters"] = [ self.jinja["%sreads_with_adapters" % prefix], self.jinja["%sreads_with_adapters_percent" % prefix], ] try: df.loc["Pairs too short"] = [ self.jinja["%sreads_too_short" % prefix], self.jinja["%sreads_too_short_percent" % prefix], ] except: # somehow we should use strings because all others fields are made # of strings and multisummary will then try to merge int and strings # otherwise df.loc["Pairs too short"] = ["0", "0"] df.loc["Pairs kept"] = [ self.jinja["%sreads_kept" % prefix], self.jinja["%sreads_kept_percent" % prefix], ] if self.mode != "pe": df.index = [this.replace("paired", "").replace("Pairs", "Reads") for this in df.index] return df def _get_stat_section(self): datatable = DataTable(self._get_stats(), "cutadapt", index=True) datatable.datatable.datatable_options = { "scrollX": "300px", "pageLength": 30, "scrollCollapse": "true", "dom": "rtpB", "paging": "false", "buttons": ["copy", "csv"], } js = datatable.create_javascript_function() html_tab = datatable.create_datatable(float_format="%.3g") # csv_link = self.create_link('link', self.filename) # vcf_link = self.create_link('here', 'test.vcf') html = ( "Reads statistics after trimming and adapter removal. The " + "A, C, G, T, N columns report the percentage of each bases in " + "the overall sequences" ) html += "<p>{} {}</p>".format(html_tab, js) return html
[docs] def add_stat_section(self): self.sections.append({"name": "Stats", "anchor": "stats", "content": self._get_stat_section()})
[docs] def add_adapters_section(self): # Create a Table with adapters df = pd.DataFrame() df = pd.DataFrame( { "Length": [], "Trimmed": [], "Type": [], "Sequence": [], } ) for count, adapter in enumerate(self.data["adapters"]): name = adapter["name"] info = adapter["info"] df.loc[name] = [ info["Length"], info["Trimmed"], info["Type"], info["Sequence"], ] df.columns = ["Length", "Trimmed", "Type", "Sequence"] try: df["Trimmed"] = df.Trimmed.map(lambda x: int(x.replace("times.", ""))) except: pass try: df["Trimmed"] = df.Trimmed.map(lambda x: int(x.replace("times", ""))) except: pass # df.to_json(self.sample_name + "/cutadapt/cutadapt_stats2.json") df.sort_values(by="Trimmed", ascending=False, inplace=True) datatable = DataTable(df, "adapters", index=True) datatable.datatable.datatable_options = { "scrollX": "true", "pageLength": 15, "scrollCollapse": "true", "dom": "frtipB", "buttons": ["copy", "csv"], } js = datatable.create_javascript_function() html_tab = datatable.create_datatable(float_format="%.3g") self.jinja["adapters"] = "" self.sections.append( { "name": "Adapters", "anchor": "adapters", "content": "<p>{} {}</p>".format(html_tab, js), } )
[docs] def add_summary_section(self): """Coverage section.""" # image = self.create_embedded_png(self.chromosome.plot_coverage, # input_arg="filename") import textwrap command = "\n".join(textwrap.wrap(self.jinja["command"], 80)) command = self.jinja["command"] html = "<p>Data type: {} </p>".format(self.jinja["mode"]) html += '<div style="textwidth:80%">Command: <pre>{}</pre></div>'.format(command) self.sections.append({"name": "Data and command used", "anchor": "cutadapt", "content": html})
[docs] def add_histogram_section(self): """Show only histograms with at least 3 counts""" histograms = self._data_histograms html = "" html += "<div>\n" # get keys and count; Sort by number of adapters removed. # TODO: could have reused the df adapter_names = list(histograms.keys()) count = [histograms[k]["count"].sum() for k in adapter_names] df2 = pd.DataFrame({"key": adapter_names, "count": count}) df2.sort_values(by="count", ascending=False, inplace=True) for count, key in zip(df2["count"], df2["key"]): if len(histograms[key]) <= 3: continue def plotter(filename, key): name = key.replace(" ", "_") pylab.ioff() histograms[key].plot(logy=False, lw=2, marker="o") pylab.title(name + "(%s)" % count) pylab.grid(True) pylab.savefig(filename) pylab.close() # need to close the figure otherwise warnings imagehtml = self.create_embedded_png(plotter, "filename", style="width:45%", key=key) html += imagehtml html += "</div>\n" self.jinja["cutadapt"] = html self.sections.append( { "name": "Histogram", "anchor": "histogram", "content": "<p>Here are the most representative/significant adapters found in the data</p>" + html, } )
[docs] def parse_cutadapt(self): d = {} # output tobefound = self._get_data_tobefound() adapters = [] data = self._rawdata.splitlines() # some metadata to extract for this in tobefound: key, pattern = this found = [line for line in data if line.startswith(pattern)] if len(found) == 0: logger.warning("ReportCutadapt: %s (not found)" % pattern) elif len(found) == 1: text = found[0].split(":", 1)[1].strip() try: this, percent = text.split() self.jinja[key] = this self.jinja[key + "_percent"] = percent except: self.jinja[key] = text self.jinja[key + "_percent"] = "?" dd = {} positions = [] executable = "cutadapt" for pos, this in enumerate(data): if "This is Atropos" in this: executable = "atropos" if "Command line parameters: " in this: cmd = this.split("Command line parameters: ")[1] self.jinja["command"] = executable + " " + cmd if this.startswith("=== ") and "Adapter" in this: name = this.split("=== ")[1].split(" ===")[0].strip() dd["name"] = name continue if this.startswith("Sequence:"): info = this.split("Sequence:", 1)[1].strip() info = info.split(";") dd["info"] = { "Sequence": info[0].strip(), "Type": info[1].split(":", 1)[1].strip(), "Length": info[2].split(":", 1)[1].strip(), "Trimmed": info[3].split(":", 1)[1].strip(), } adapters.append(dd.copy()) self.data["adapters"] = adapters
def _get_histogram_data(self): """In cutadapt logs, an adapter section contains an histogram of matches that starts with a header and ends with a blank line """ header = "length\tcount\texpect\tmax.err\terror counts\n" with open(self.input_filename, "r") as fin: # not too large so let us read everything data = fin.readlines() scanning_histogram = False adapters = [] current_hist = header dfs = {} if "variable 5'/3'" in "\n".join(data): cutadapt_mode = "b" else: cutadapt_mode = "other" for this in data: # while we have not found a new adapter histogram section, # we keep going # !! What about 5' / 3' if this.startswith("==="): if "read: Adapter" in this: # We keep read: Adatpter because it may be the first # or second read so to avoid confusion we keep the full # name for now. name = this.replace("First read: Adapter ", "R1_") name = name.replace("Second read: Adapter ", "R2_") name = name.strip().strip("===") name = name.strip() elif "=== Adapter" in this: name = this.split("=== ")[1].split(" ===")[0] name = name.strip() else: pass if scanning_histogram is False: if this == header: # we found the beginning of an histogram scanning_histogram = True else: # we are somewhere in the log we do not care about pass elif scanning_histogram is True and len(this.strip()) != 0: # accumulate the histogram data current_hist += this elif scanning_histogram is True and len(this.strip()) == 0: # we found the end of the histogram # Could be a 5'/3' case ? if so another histogram is # possible df = pd.read_csv(io.StringIO(current_hist), sep="\t") # cast the 'error counts' that must be coherent for future concatenation df = df.astype({"error counts": str}) # reinitiate the variables if cutadapt_mode != "b": dfs[name] = df.set_index("length") current_hist = header scanning_histogram = False else: # there will be another histogram so keep scanning current_hist = header # If we have already found an histogram, this is # therefore the second here. if name in dfs: if len(df): existing = dfs[name].dropna(axis=1, how="all") new_part = df.set_index("length").dropna(axis=1, how="all") dfs[name] = pd.concat([existing, new_part]) scanning_histogram = False dfs[name] = dfs[name].reset_index().groupby("length").aggregate("sum") else: dfs[name] = df.set_index("length") scanning_histogram = True else: pass return dfs
[docs] def parse_atropos(self, filename): """Parse the atropos report (JSON format)""" with open(filename, "r") as fh: data = json.load(fh) # Is it paired or single-ended ? if data["input"]["input_names"][1] is None: self.jinja["mode"] = "Singled-end" prefix = "" self.mode = "se" else: self.jinja["mode"] = "Paired-end" prefix = "paired_" self.mode = "pe" dfs = {} self.data["adapters"] = [] data_adapters = data["trim"]["modifiers"]["AdapterCutter"]["adapters"] reads = [0] * len(data_adapters[0]) adapters = list(data_adapters[0].keys()) N = data["record_counts"]["0"] try: # Read2 reads.extend([1] * len(data_adapters[1])) adapters.extend(list(data_adapters[1].keys())) except: pass read_tag = {0: "First read: ", 1: "Second read: "} for read, name in zip(reads, adapters): data_adapter = data_adapters[read][name] type_ = data_adapter["where"]["desc"] sequence = data_adapter["sequence"] length = len(sequence) trimmed = data_adapter["total"] max_error = data_adapter["max_error_rate"] # this takes care of the A,B,G mode of cutadapt/atropos d = Counter() for this in ["lengths_front", "lengths_back"]: if this in data_adapter.keys(): d += Counter(data_adapter[this]) count = pd.DataFrame(list(d.values()), list(d.keys()), columns=["count"]) count = count.reset_index().astype(int).sort_values("index", ascending=True) count.set_index("index", inplace=True) count["max err"] = [int(round(x * max_error)) for x in count.index] count.reset_index(inplace=True) count.rename(columns={"index": "length"}, inplace=True) count["expect"] = 0.25 ** count["length"] * N count.set_index("length", inplace=True) count = count[["count", "expect", "max err"]] dfs["R{}_".format(read + 1) + name] = count.copy() # Note that the following text must be kept as it is since # it is then parsed in other methods self.data["adapters"].append( { "info": { "Length": length, "Sequence": sequence, "Trimmed": "{} times.".format(trimmed), "Type": type_, }, "name": read_tag[read] + name, } ) # Store the histograms self._data_histograms = dfs # aliases formatters = data["trim"]["formatters"] filters = data["trim"]["filters"]["too_short"] cutter = data["trim"]["modifiers"]["AdapterCutter"] def _format(value): return "({}%)".format(100 * int(round(value, 3) * 1000) / 1000.0) self.jinja["%stotal_reads" % prefix] = N self.jinja["%sreads1_with_adapters" % prefix] = str(cutter["records_with_adapters"][0]) self.jinja["%sreads1_with_adapters_percent" % prefix] = _format(cutter["fraction_records_with_adapters"][0]) # duplicated reads1 in reads for the single-end cae # This should be clean but is required for now to be compatibl # with the code used with cutadapt self.jinja["%sreads_with_adapters" % prefix] = str(cutter["records_with_adapters"][0]) self.jinja["%sreads_with_adapters_percent" % prefix] = _format(cutter["fraction_records_with_adapters"][0]) if self.mode == "pe": self.jinja["%sreads2_with_adapters" % prefix] = cutter["records_with_adapters"][1] self.jinja["%sreads2_with_adapters_percent" % prefix] = _format(cutter["fraction_records_with_adapters"][1]) self.jinja["%sreads_too_short" % prefix] = filters["records_filtered"] self.jinja["%sreads_too_short_percent" % prefix] = _format(filters["fraction_records_filtered"]) self.jinja["%sreads_kept" % prefix] = formatters["records_written"] self.jinja["%sreads_kept_percent" % prefix] = _format(formatters["fraction_records_written"]) self.jinja["command"] = "{} {} {}".format( "atropos", data["options"]["action"], " ".join(data["options"]["orig_args"]) )