# This file is part of Sequana software
#
# Copyright (c) 2016-2020 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import gzip
import io
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from tqdm import tqdm
from sequana import logger
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab, pysam
__all__ = ["SomyScore"]
class ClusterModels:
"""Cluster GMM to simplify models
Sometimes, a model may be split in 2 if we provide a large K.
Instead of using AIC criteria, we can cluster GMM that mean is close
regarding their variability. This could be useful in some cases
when we know that models should be clearly separater e.g. in somy studies.
"""
def __init__(self, mus, sigmas, pis):
self.mus = mus
self.sigmas = sigmas
self.pis = pis
def __len__(self):
return len(self.mus)
def __getitem__(self, index):
return self.mus[index], self.sigmas[index], self.pis[index]
def cluster(self):
# find the most significant
index = np.argmax(self.pis)
to_cluster = [index]
mu, sigma, pi = self[index]
for i in range(len(self)):
if i == index:
pass
else:
mu2, sigma2, pi2 = self[i]
d = abs(mu - mu2)
if d < 1.2 * sigma + 1.2 * sigma2:
to_cluster.append(i)
print(self.mus)
print(to_cluster)
newmu = [self.mus[i] * self.pis[i] for i in to_cluster]
pis = [self.pis[i] for i in to_cluster]
newmu = sum(newmu) / sum(pis)
print(newmu)
return newmu
def get_std(self):
return sum([pi * sigma for pi, sigma in zip(self.pis, self.sigmas)]) / sum(self.pis)
[docs]
class SomyScore:
"""
from sequana.somy import SomyScore
ss = SomyScore("file.bam")
ss.run()
# filters
ss.remove_outliers()
ss.remove_flanking_regions()
ss.boxplot()
# store df for later
df = ss.df.copy()
ss2.run(mapq=35)
# filters
ss.remove_outliers()
ss.remove_flanking_regions()
ss.boxplot()
ss.df = pd.merge(ss.df, df)
ss.boxplot()
"""
def __init__(self, filename=None, window_size=1000):
self.filename = filename
self.window_size = window_size
if self.filename:
align = pysam.AlignmentFile(self.filename)
if not align.has_index():
logger.error(
f"Your input file must have an index. Please build one using 'samtools index {filename}'. You can install samtools using 'damona install samtools'"
)
sys.exit(1)
self._df = None
self.save_em = True
self.save_em_filename = "sequana_somy_em.png"
self.info = {}
self._filter_stats = {}
def _get_df(self):
return self._df
df = property(_get_df)
def _get_coverage(
self, use_median=True, chrom=None, flag=1796, threads=4, fast=True, window=1000, mapq=40, tag="default"
):
# TODO: include -F 256 to remove secondary ?
options = "-x" if fast else ""
options += f" -t {threads} "
options += f" --flag {flag}"
options += " -m " if use_median else ""
options += f" -c {chrom}" if chrom else ""
# -n do not output per base depth. faster
options += " -n "
with tempfile.TemporaryDirectory() as TEMPDIR:
cmd = f"mosdepth {options} -b {window} -Q {mapq} {TEMPDIR}/mos {self.filename}"
logger.debug(cmd)
status = subprocess.run(cmd.split())
assert status.returncode == 0
with gzip.GzipFile(f"{TEMPDIR}/mos.regions.bed.gz", "r") as zip_ref:
data = zip_ref.read()
df = pd.read_csv(io.StringIO(data.decode()), sep="\t", header=None)
df.columns = ["chr", "start", "stop", "depth"]
# no need to keep both start/stop
del df["stop"]
# add a convenient distance to the border for future filtering
N = len(df)
X = list(range(N))
df["dist"] = [min(x, abs(x - N + 1)) for x in range(len(X))]
df["tag"] = tag
return df
[docs]
def compute_coverage(
self,
use_median=True,
fast=True,
mapq=0,
flag=1796,
chromosomes=None,
tag="default",
threads=4,
exclude_chromosomes=[],
):
# 3844 also exclude supplementary
align = pysam.AlignmentFile(self.filename)
contig_names = [x.contig for x in align.get_index_statistics()]
dfs = []
if chromosomes is None:
chromosomes = contig_names
else:
for chrom in chromosomes:
if chrom not in contig_names:
logger.error(f"chromosome/contig {chrom} not found in the list: {contig_names}")
sys.exit()
import multiprocessing
arguments = []
for chrom in chromosomes:
if chrom not in exclude_chromosomes:
arguments.append(
{
"chrom": chrom,
"use_median": use_median,
"fast": fast,
"flag": flag,
"mapq": mapq,
"window": self.window_size,
}
)
with multiprocessing.Pool(processes=threads) as pool:
with tqdm(total=len(arguments)) as pbar:
def update(*_):
pbar.update()
results = []
for args in arguments:
result = pool.apply_async(self._get_coverage, kwds=args, callback=update)
results.append(result)
# collect the results
results = [result.get() for result in results]
# build df, or accumulate them with previous runs.
if self._df is not None:
self._df = pd.concat([self._df] + results)
else:
self._df = pd.concat(results)
self._filter_stats["initial"] = len(self._df)
def _estimate_diploid_mean_depth_em(self, k=None, bins=100):
# we can estimate the mean by brute force
mu = self.df["depth"].median()
# however, this may be biased if lots of different somy. We can also try an
# estimate based on mixture model assuming that diploid is the main population
from sequana.mixture import EM
em = EM(self.df["depth"])
sigma = mu / 3.0
best_k = 2
best_LL = 0
results = {}
if k is None:
kmin = 2
kmax = 6
else:
kmin = k
kmax = k + 1
for ktry in range(kmin, kmax):
em.estimate(k=ktry)
LL = em.results["log_likelihood"]
results[ktry] = em.results
if LL > best_LL:
best_LL = em.results["log_likelihood"]
best_k = ktry
print(ktry, LL)
# identifies main peak (diploid)
import numpy as np
best_results = results[best_k]
print(best_k)
print(best_results)
estimate_em_mu = best_results["mus"][np.argmax(best_results["pis"])]
# need to check somy here. Are they close to 1, 1.5, 2, 3
# or 0.5,0.75,1,1.5 ? in the second case, it means the estimated mu was not correct
if self.save_em:
pylab.clf()
em.estimate(k=best_k)
em.plot(bins=bins)
pylab.savefig(self.save_em_filename)
self.info["mu"] = self.df["depth"].median()
self.info["median"] = self.df["depth"].median()
self.info["estimate_mu_diploid"] = estimate_em_mu
self.info["em_results"] = best_results
# Given the mus, we need to figure out what is the best mus for diploid.
# Taken the minimum is not robust enough since, you may have distribution below.
# ClusterModels, can help merging.clustering closeby values. the diploid case is suppose
# to be the most important one but if we test only a subset of chromosomes, this may not work
# or in extreme case, triploi+tetra are as important.
# For now, find the main gaussian and consider this is the chromosomes with somy of 2
best_results["pis"]
index = np.argmax(best_results["pis"])
muhat = best_results["mus"][index]
# if k>=2:
# cm = ClusterModels(best_results['mus'], best_results['sigmas'], best_results['pis'])
# muhat = cm.cluster()
# else:
# muhat = estimate_em_mu
# k may be too large and main diploid model split in 2 or 3
# difficuly and not robust to use AIC
# we can assume that distance between k is large and so merge models that are closeby
print(mu, estimate_em_mu, muhat)
return muhat
[docs]
def remove_outliers(self, percentile=0.05):
before = len(self.df)
if before == 0:
logger.warning("Dataframe is empty, skipping outlier removal")
return
# Filter per-chromosome to preserve chr column
filtered_dfs = []
for chr_name in self.df["chr"].unique():
group = self.df[self.df["chr"] == chr_name]
lower = group["depth"].quantile(0.05)
upper = group["depth"].quantile(0.95)
filtered_group = group[(group["depth"] >= lower) & (group["depth"] <= upper)]
filtered_dfs.append(filtered_group)
self._df = pd.concat(filtered_dfs, ignore_index=True)
self._filter_stats["after_outliers"] = len(self._df)
removed = before - len(self._df)
if before > 0:
logger.info(f"Removed {removed} windows as outliers ({100*removed/before:.1f}%)")
else:
logger.info(f"Removed {removed} windows as outliers")
[docs]
def remove_flanks(self, remove_flanking_regions_kb=10):
# filter out data to remove telomeric regions based on distance (number of windows)
# e.g. 10 kb means 10 regions of 1kb to remove on both sides
from math import ceil
before = len(self.df)
if before == 0:
logger.warning("Dataframe is empty, skipping flank removal")
return
N = ceil(remove_flanking_regions_kb * 1000.0 / self.window_size)
self._df = self._df.query("dist>@N")
self._filter_stats["after_flanks"] = len(self._df)
removed = before - len(self._df)
if before > 0:
logger.info(f"Removed {removed} windows in flanking regions ({100*removed/before:.1f}%)")
else:
logger.info(f"Removed {removed} windows in flanking regions")
[docs]
def remove_low_depth(self, threshold=10):
before = len(self.df)
if before == 0:
logger.warning("Dataframe is empty, skipping low depth removal")
return
self._df = self.df.query("depth>@threshold")
self._filter_stats["after_low_depth"] = len(self._df)
removed = before - len(self._df)
if before > 0:
logger.info(f"Removed {removed} windows with low depth (<{threshold}) ({100*removed/before:.1f}%)")
else:
logger.info(f"Removed {removed} windows with low depth (<{threshold})")
def _estimate_diploid_median_depth(self):
# iterative estimate
# first, we take everything
muhat = self.df["depth"].median()
# this may be biased by tetraploid chromosomes
return muhat
[docs]
def plot_filter_stats(self, filename="sequana_filter_stats.png"):
stages = ["initial"]
counts = [self._filter_stats.get("initial", 0)]
if "after_outliers" in self._filter_stats:
stages.append("after outliers\nremoval")
counts.append(self._filter_stats["after_outliers"])
if "after_low_depth" in self._filter_stats:
stages.append("after low depth\nremoval")
counts.append(self._filter_stats["after_low_depth"])
if "after_flanks" in self._filter_stats:
stages.append("after flanking\nregion removal")
counts.append(self._filter_stats["after_flanks"])
pylab.clf()
pylab.figure(figsize=(10, 6))
colors = ["#2ecc71" if i == 0 else "#e74c3c" for i in range(len(stages))]
bars = pylab.bar(stages, counts, color=colors, alpha=0.7, edgecolor="black")
for i, (stage, count) in enumerate(zip(stages, counts)):
if i == 0:
removed = 0
else:
removed = counts[i - 1] - count
pct = 100 * removed / counts[i - 1]
pylab.text(
i, count / 2, f"-{removed}\n({pct:.1f}%)", ha="center", va="center", fontweight="bold", fontsize=10
)
pylab.text(i, count + max(counts) * 0.02, str(count), ha="center", va="bottom", fontweight="bold")
pylab.ylabel("Number of windows", fontsize=12)
pylab.xlabel("Filtering stage", fontsize=12)
pylab.title("Filtering pipeline: windows retained at each step", fontsize=14, fontweight="bold")
pylab.tight_layout()
pylab.savefig(filename, dpi=200)
logger.info(f"Filtering statistics plot saved to {filename}")
[docs]
def boxplot(
self,
legend=False,
hlines=[1, 1.5, 2, 2.5],
normalise=True,
k=None,
muhat=None,
hybrid=False,
method="em",
palette=None,
):
"""
:param bool legend: set to 'auto' to add a legend
"""
import seaborn as sns
data = self._df.copy()
if len(data) == 0:
logger.error("Dataframe is empty after filtering. No windows remain for analysis.")
logger.error("Filtering statistics:")
for stage, count in self._filter_stats.items():
logger.error(f" {stage}: {count} windows")
self.plot_filter_stats("sequana_filter_stats.png")
raise ValueError(
f"No data remaining after filtering. Check sequana_filter_stats.png for details. "
f"Consider using less stringent filters (e.g., increase --minimum-depth, decrease --telomeric-span)"
)
if muhat:
if normalise:
data["depth"] /= muhat
muhat = 1
data["depth"] *= 2
else:
muhat = known_diploid_coverage
elif normalise is True:
if method == "em":
try:
muhat = self._estimate_diploid_mean_depth_em(k=k)
print(f"Estimate mu: {muhat}")
except Exception as err:
print(err)
logger.warning("Could not estimate mixture model. Rolling back to simple median estimator")
muhat = self.df["depth"].median()
data["depth"] /= muhat
elif method == "median":
muhat = self.df["depth"].median()
print(f"Estimate mu: {muhat}")
data["depth"] /= muhat
elif method == "mean":
muhat = self.df["depth"].mean()
print(f"Estimate mu: {muhat}")
data["depth"] /= muhat
muhat = 1
data["depth"] *= 2 # (diploid)
elif method == "bootstrap":
pass
else:
muhat = self._estimate_diploid_median_depth()
print(f"Estimate mu: {muhat}")
if hybrid:
# data['depth'] /=2
pass
# self.df.groupby(["chr", "tag"])['depth'].to_csv("somies.csv")
estimated_somies = []
measured_somies = []
chromosomes = []
tags = []
for datum in data.groupby(["chr", "tag"])["depth"]:
tag = datum[0][1]
x = datum[1].median()
if abs(1 - x) < abs(2 - x):
somy = 1
elif abs(2 - x) < abs(3 - x):
somy = 2
elif abs(x - 3) < abs(4 - x):
somy = 3
elif abs(x - 4) < abs(x - 5):
somy = 4
else:
somy = 5
estimated_somies.append(somy)
measured_somies.append(round(x, 3))
chromosomes.append(datum[0][0])
tags.append(tag)
somies = pd.DataFrame(
{
"chrom": chromosomes,
"tag": tags,
"estimated_somies": estimated_somies,
"measured_somies": measured_somies,
}
)
# somies.to_csv(f"{outdir}/somies.csv", index=None)
# plot somies
import seaborn as sns
pylab.clf()
ax = sns.regplot(
x="estimated_somies", y="measured_somies", data=somies, x_jitter=0.0, scatter_kws={"alpha": 0.3}
)
ax.set(xlabel="Estimated somies", ylabel="Measured somies")
m, M = somies["estimated_somies"].min(), somies["estimated_somies"].max()
pylab.plot([m, M], [m, M], color="r", ls="--", lw=1)
# pylab.savefig(f"{outdir}/sequana_somies_means_vs_estim.png")
self.somies = somies
error = ((somies["estimated_somies"] - somies["measured_somies"]) ** 2).sum() / len(somies)
print(f"Error: {error}")
self.info["error"] = error
pylab.clf()
sns.boxplot(data=data, x="chr", y="depth", showfliers=False, hue="tag", legend=legend, palette=palette)
pylab.xticks(rotation=90)
for h in hlines:
if h == 1:
pylab.axhline(2 * muhat * h, color="r", lw=1, ls="--", label=f"diploid")
elif h == 1.5:
pylab.axhline(2 * muhat * h, color="r", lw=1, ls="--", label=f"triploid")
elif h == 2:
pylab.axhline(2 * muhat * h, color="r", lw=1, ls="--", label=f"tetraploid")
elif h == 2.5:
pylab.axhline(2 * muhat * h, color="r", lw=1, ls="--", label=f"pentaploid")
if normalise:
pylab.ylabel("Somy", fontsize=16)
if hybrid:
from pylab import fill_betweenx, xlim, ylim
Ymax = ylim()[1]
Nchrom = len(self.df["chr"].unique())
for x in range(int(Nchrom / 2)):
fill_betweenx([0, Ymax], -0.5 + x * 2, 0.5 + x * 2, color="grey", alpha=0.5)
ylim([0, Ymax])
xlim([-0.5, Nchrom - 0.5])
def plot_sliding_window_boxplot(data, window_size, step=1000, overlap_percentage=50, facecolor="lightblue"):
"""
Plot a boxplot with sliding windows from a specified column in a DataFrame.
Parameters:
data (pd.DataFrame): The input DataFrame.
column (str): The column of interest.
window_size (int): The size of each sliding window.
overlap_percentage (float): The percentage overlap between windows (default 50).
Example::
ss = SomyScore(input_bam)
ss.compute_coverage()
chrom_name = 1
data = ss.df.query("chr==@chrom_name")['depth']
overlap = 50 #percentage
Nwin = 20 # number of windows of size (Step) to use per boxplot
step=ss.window_size # value used in the SomyScore
plot_sliding_window_boxplot(data, Nwin, step, overlap)
"""
step_size = int(window_size * (1 - overlap_percentage / 100)) # Calculate step size
print(f"step is {step_size*step}; step_size={step_size}")
# Create sliding windows
windows = [data[i : i + window_size].values for i in range(0, len(data) - window_size + 1, step_size)]
print(len(data) - window_size + 1, step_size)
print(f"Number of windows = {len(windows)}")
if not windows:
raise ValueError("Window size too large for the given data.")
# Plot boxplot
# pylab.clf()
pylab.boxplot(windows, patch_artist=True, boxprops=dict(facecolor=facecolor))
pylab.xlabel("Sliding Window")
pylab.ylabel("Values")
return windows