# coding: utf-8
#
# This file is part of Sequana software
#
# Copyright (c) 2016 - Sequana Development Team
#
# File author(s):
# Dimitri Desvillechabrol <dimitri.desvillechabrol@pasteur.fr>,
# <d.desvillechabrol@gmail.com>
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Module to write variant calling report"""
import ast
from sequana.lazy import pandas as pd
from sequana.modules_report.base_module import SequanaBaseModule
from sequana.utils.datatables_js import DataTable
[docs]
class VariantCallingModule(SequanaBaseModule):
"""Write HTML report of variant calling. This class takes a csv file
generated by sequana_variant_filter.
"""
def __init__(self, data):
""".. rubric:: constructor
:param data: a CSV filename created by ``sequana.freebayes_vcf_filter``
or a :class:`freebayes_vcf_filter.Filtered_freebayes` object.
"""
super().__init__()
self.title = "Variant Calling Report"
try:
with open(data, "r") as fp:
self.filename = data
line = fp.readline()
if line.startswith("# sequana_variant_calling"):
string_dict = line.split(";")[-1].strip()
try:
self.filter_dict = ast.literal_eval(string_dict)
except SyntaxError:
self.filter_dict = None
self.df = pd.read_csv(fp)
except FileNotFoundError:
msg = "The csv file is not present. Please, check if your" " file is present."
raise FileNotFoundError(msg)
except TypeError:
self.df = data.df
self.filter_dict = data.vcf.filters_params
self.create_report_content()
self.create_html("variant_calling.html")
[docs]
def create_report_content(self):
self.sections = list()
try:
self.add_intro_download()
except:
pass
if self.filter_dict:
self.filters_information()
try:
self.add_overview()
except:
pass
try:
self.create_histogram_frequencies()
except:
pass
try:
self.create_histogram_frequencies2()
except:
pass
try:
self.variant_calling()
except:
pass
[docs]
def add_intro_download(self):
self.sections.append(
{
"name": "Data",
"anchor": "intro_download",
"content": f'You can download the <a href="./freebayes_vcf_filter/temp.filter.vcf">VCF here</a>. This is the filtered VCF (see parameters used in the section below). Raw VCF file can be found in the sequana structure directory (freebayes/YOUR_SAMPLE.raw.vcf)',
}
)
[docs]
def add_overview(self):
import os
import tempfile
import warnings
from collections import defaultdict
import plotly.express as px
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Count variant types
variants = defaultdict(int)
for typ in sorted(self.df["type"].unique()):
variants[typ] = sum(self.df["type"] == typ)
# Prepare data
labels = sorted(variants.keys())
counts = [variants[x] for x in labels]
labels_display = [f"{x.upper()} ({variants[x]})" for x in labels]
# Create interactive pie chart
fig = px.pie(
names=labels_display,
values=counts,
title="Variant Type Distribution",
color_discrete_sequence=px.colors.qualitative.Set3,
)
# Reduce margins, adjust layout
fig.update_layout(
title_x=0.5,
margin=dict(t=50, b=10, l=10, r=10),
width=700,
height=700,
)
tmpfile = tempfile.NamedTemporaryFile(suffix=".html", delete=False)
fig.write_html(tmpfile.name, include_plotlyjs=False, full_html=False)
# Read the HTML fragment and return it as a string
with open(tmpfile.name, "r") as f:
html_fragment = f.read()
os.unlink(tmpfile.name)
# Integration in your HTML report
self.sections.append(
{
"name": "Overview",
"anchor": "overview",
"content": "Here below is a pie plot showing the distribution of various variants found in the sample. "
+ html_fragment,
}
)
[docs]
def create_histogram_frequencies(self):
import plotly.express as px
import plotly.io as pio
df = self.df.copy()
df = df.query("type=='snp'").copy()
df["freq"] = [float(x) for x in df["frequency"]]
fig = px.histogram(
df,
x="freq",
color="chr",
facet_col="chr",
facet_col_wrap=6,
nbins=40,
title="SNP Frequency Distribution per Chromosome",
labels={"AF": "Allele frequency"},
)
# fig.update_xaxes(range=[0, 1])
fig.update_layout(height=1000, width=1000)
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[-1]))
fig_html = pio.to_html(fig, full_html=False, include_plotlyjs=False)
self.sections.append(
{
"name": "Histogram (frequency) per contig",
"anchor": "histo",
"content": "Here is the frequency distribution of SNPs for each contig. Contigs with no SNPs are not shown."
+ f"{fig_html}",
}
)
[docs]
def create_histogram_frequencies2(self):
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
# --- PARSE VCF ---
df = self.df.copy()
df = df.query("type in ['snp', 'del', 'ins']").copy()
df["freq"] = [float(x) for x in df["frequency"]]
# --- BUILD FIGURE TRACES ---
traces = []
variant_types = ["snp", "del", "ins"]
for vtype in variant_types:
sub = df[df["type"] == vtype]
if sub.empty:
continue
trace = go.Histogram(
x=sub["freq"], nbinsx=50, name=vtype, opacity=0.75, visible=True if vtype in ["snp"] else False
)
traces.append(trace)
# --- CREATE DROPDOWN MENU ---
buttons = []
for i, vtype in enumerate(variant_types):
visible = [False] * len(traces)
if i < len(traces):
visible[i] = True
buttons.append(
dict(
label=vtype,
method="update",
args=[{"visible": visible}, {"title": f"Allele Frequency Distribution — {vtype}"}],
)
)
# --- FIGURE LAYOUT ---
fig = go.Figure(data=traces)
fig.update_layout(
title="Allele Frequency Distribution — SNP",
xaxis_title="Allele Frequency",
yaxis_title="Count",
updatemenus=[dict(buttons=buttons, direction="down", x=0.45, y=1.15, showactive=True)],
bargap=0.1,
template="plotly_white",
)
# --- EXPORT TO HTML ---
html_code = pio.to_html(fig, full_html=False, include_plotlyjs=False)
self.sections.append(
{
"name": "histogram (all contigs)",
"anchor": "histo2",
"content": "Here are the distribution across all contigs of some non-complex variants. variants that are combination such as SNP,SNP are not shown. "
+ f"{html_code}",
}
)
[docs]
def variant_calling(self):
"""Variants detected section."""
datatable = DataTable(self.df, "vc")
# set options
datatable.datatable.datatable_options = {
"scrollX": "true",
"pageLength": 30,
"scrollCollapse": "true",
"dom": "Bfrtip",
"buttons": ["copy", "csv"],
}
js = datatable.create_javascript_function()
html_tab = datatable.create_datatable(float_format="%.3f")
self.sections.append(
{
"name": "Variants Detected",
"anchor": "basic_stats",
"content": "<p>This table gives variants detected by freebayes after "
"filtering. The important metrics are the depth (if not enough reads supports "
"the variant it should be ignored); the strand_balance (forward and reverse number "
" of reads supporting the variants should be similar e.g balance of 0.5); the fisher "
"pvalue (variants with pvalue<0.05 should be rejected since the strand balance of"
"alternate and reference are different).</p><p>Note: the freebayes score can be"
" understood as 1 - P(locus is homozygous given the data)</p> {0}\n{1}\n".format(js, html_tab),
}
)