#
# This file is part of Sequana software
#
# Copyright (c) 2016-2024 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
from collections import defaultdict
import colorlog
from tqdm import tqdm
from sequana.lazy import pandas as pd
from sequana.lazy import pylab, pysam
from sequana.vcftools import (
compute_fisher_strand_filter,
compute_frequency,
compute_strand_balance,
strand_ratio,
)
logger = colorlog.getLogger(__name__)
[docs]
class VariantFile:
"""
Reads a BCF or VCF or GZIPPED VCF
::
v = VariantFile()
You can access to a list of all variants (unchanged from pysam) using::
vlist = v.variants
you may transform a variant into a dictionary that contains a subset of most important information
including SNPeff annotation (if available) using::
v._variant_to_dict(vlist[0])
A dataframe of all variants processed with this _variant_to_dict function is provided as an attribute::
v.df
Samples and contigs/chromosomes are available also as attributes::
v.samples
v.contigs
v.chromosomes # a synomym of v.contigs
There is a set of parameter to filter variants. To apply it, use the filter_vcf function.
fvcf = v.filter_vcf()
"""
def __init__(self, filename, progress=False, keep_polymorphic=True):
self._iterator_index = 0
self.filename = filename
self.filters_params = {
"freebayes_score": 0,
"frequency": 0,
"min_depth": 0,
"forward_depth": 0,
"reverse_depth": 0,
"strand_ratio": 0,
"keep_polymorphic": keep_polymorphic,
}
# just to get the dataframe
self.progress = progress
self._chromosomes = None
self._variants = None
self._samples = None
self._contigs = None
self._df = None
self._is_joint = None
self._snpeff = False
# FIXME what is happening with an empty file?
# store the header to save the file back
_vcf = pysam.VariantFile(self.filename)
self._header = _vcf.header
# do we used snpeff ?
variant = next(_vcf)
variant_dict = self._variant_to_dict(variant)
if "effect_type" in variant_dict:
self._snpeff = True
def _is_joint(self):
if self._is_joint is None:
if len(self.samples) > 1:
self._is_joint = True
else:
self._is_joint = False
return self._is_joint
is_joint = property(_is_joint)
def _get_variants(self):
if self._variants is None:
vcf_reader = pysam.VariantFile(self.filename)
self._variants = [v for v in vcf_reader]
for i, v in enumerate(self._variants):
v.id = str(i + 1)
return self._variants
variants = property(_get_variants)
def __len__(self):
return len(self.variants)
def __iter__(self):
return self
def __next__(self):
if self._iterator_index >= len(self.variants):
self._iterator_index = 0
raise StopIteration # Stop when the list is exhausted
value = self.variants[self._iterator_index]
self._iterator_index += 1
return value
def _get_samples(self):
if self._samples is None:
vcf_reader = pysam.VariantFile(self.filename)
self._samples = list(vcf_reader.header.samples)
return self._samples
samples = property(_get_samples)
def _get_contigs(self):
if self._contigs is None:
vcf_reader = pysam.VariantFile(self.filename)
self._contigs = dict([(x.name, x.length) for x in vcf_reader.header.contigs.values()])
return self._contigs
contigs = property(_get_contigs)
def _get_chromosomes(self):
if not self._chromosomes:
self._chromosomes = sorted(self.contigs.keys())
return self._chromosomes
chromosomes = property(_get_chromosomes)
[docs]
def filter_vcf(self, filter_dict=None):
"""Filter variants in the VCF file.
:param dict filter_dict: dictionary of filters. It updates the
attribute :attr:`VCF_freebayes.filter_params`
Return Filtered_freebayes object.
"""
if filter_dict:
self.filters_params = filter_dict
variants = [v for v in self.variants if self._filter_line(v)]
return FilteredVariantFile(variants, self)
def _filter_line(self, variant):
"""Filter variant with parameter set in :attr:`VCF_freebayes.filters`.
:param vcf.model._Record vcf_line:
:return: line if all filters are passed.
"""
if variant.qual < self.filters_params["freebayes_score"]:
return False
if variant.info["DP"] <= self.filters_params["min_depth"]:
return False
if self.is_joint:
return True
forward_depth = variant.info["SRF"] + sum(variant.info["SAF"])
if forward_depth <= self.filters_params["forward_depth"]:
return False
reverse_depth = variant.info["SRR"] + sum(variant.info["SAR"])
if reverse_depth <= self.filters_params["reverse_depth"]:
return False
alt_freq = compute_frequency(variant)
if alt_freq[0] < self.filters_params["frequency"]:
return False
strand_bal = compute_strand_balance(variant)
if strand_bal[0] < self.filters_params["strand_ratio"]:
return False
if self.filters_params["keep_polymorphic"] is False and len(variant.alts) > 1:
return False
return True
[docs]
def get_variant_type(self):
variants = defaultdict(int)
for variant in self.variants:
if "TYPE" in variant.info:
for typ in variant.info["TYPE"]:
variants[typ] += 1
elif "SVTYPE" in variant.info:
variants[variant.info["SVTYPE"]] += 1
return variants
[docs]
def barplot(self):
variants = self.get_variant_type()
pylab.clf()
keys = sorted(variants.keys())
pylab.bar(x=list(range(len(keys))), height=[variants[k] for k in keys])
pylab.xticks(range(len(keys)), keys)
pylab.ylabel("# Number of variants per type")
pylab.gcf().set_layout_engine("tight")
[docs]
def pieplot(self):
variants = self.get_variant_type()
pylab.clf()
labels = sorted(variants.keys())
data = [variants[x] for x in labels]
labels = [f"{x.upper()} ({variants[x]})" for x in labels]
pylab.pie(data, labels=labels)
[docs]
def manhattan_plot(self, chrom_name=None, bins=200, types=["ins", "del", "mnp", "snp", "complex", "INS", "DEL"]):
positions = defaultdict(list)
for variant in pysam.VariantFile(self.filename):
# keeps only the requested type of variants
if "TYPE" in variant.info:
for vkind in variant.info["TYPE"]:
if vkind in types:
# keep track of the chrom name
chrom = variant.chrom
if chrom_name and chrom == chrom_name:
positions[chrom].append(variant.pos)
elif chrom_name is None:
positions[chrom].append(variant.pos)
elif "SVTYPE" in variant.info:
vkind = variant.info["SVTYPE"]
if vkind in types:
# keep track of the chrom name
chrom = variant.chrom
if chrom_name and chrom == chrom_name:
positions[chrom].append(variant.pos)
elif chrom_name is None:
positions[chrom].append(variant.pos)
sorted_contigs = dict(sorted(self.contigs.items(), key=lambda item: item[1]))
concatenated_pos = []
S = 0
Ss = []
chroms = []
for chrom, length in sorted_contigs.items():
if chrom in positions:
concatenated_pos.extend([x + S for x in positions[chrom]])
Ss.append(S)
S += length
chroms.append(chrom)
pylab.clf()
pylab.hist(concatenated_pos, bins=bins, log=True)
for x in Ss:
pylab.axvline(x, color="k")
pylab.xticks(Ss, chroms, rotation=45)
[docs]
def hist_score(self, bins=200, min_score=1):
"""Histogram of Quality score"""
variants = self.variants
pylab.hist([x.qual for x in variants if x.qual >= min_score], bins=bins)
[docs]
def plot_frequency(self):
sorted_contigs = dict(sorted(self.contigs.items(), key=lambda item: item[1]))
for chrom in sorted_contigs.keys():
pass
def _variant_to_dict(self, variant):
alt_freq = compute_frequency(variant)
strand_bal = compute_strand_balance(variant)
fisher = compute_fisher_strand_filter(variant)
variant_dict = {
"chr": variant.chrom,
"position": variant.pos,
"depth": variant.info["DP"] if "DP" in variant.info else variant.info["COVERAGE"][2],
"reference": variant.ref,
"alternative": ";".join(str(x) for x in variant.alts),
"freebayes_score": variant.qual,
"strand_balance": ";".join("{0:.3f}".format(x) for x in strand_bal),
"fisher_pvalue": "; ".join(f"{x}" for x in fisher),
"ID": variant.id,
}
if "TYPE" in variant.info:
variant_dict["type"] = ";".join(x for x in variant.info["TYPE"])
elif "SVTYPE" in variant.info:
variant_dict["type"] = ";".join([variant.info["SVTYPE"]])
if len(self.samples) == 1:
variant_dict["frequency"] = "; ".join("{0:.3f}".format(x) for x in alt_freq)
else:
# AO is the Alternate allele observation count. It indicates the number of reads
# supporting the alternate (variant) allele. I
# DP is the depth of coverage on the variant position
# GT is the genotype. For instance (0,0) indicates a heterozygous variant where
# the individual carries one copy of the variant allele and one copy of the reference allele.
# GL is the genotype likelihoods. These are the log-scaled likelihoods of each possible
# genotype (homozygous reference, heterozygous, and homozygous alternate).
# There are others such as AD (allelic depths on ref and alternate),
# RO (reference allele count), QR, QA (Q for; sum of quality on ref or alternate allele)
for i, sample in enumerate(variant.samples.keys()):
# for each sample, we store the frequency of the variant
data = variant.samples[sample]
if data["DP"]:
try:
freq = "; ".join("{0:.3f}".format(alt / data["DP"]) for alt in data["AO"])
except TypeError:
freq = "{0:.3f}".format(data["AO"] / data["DP"])
variant_dict[sample] = freq
# and genotyping information
if "GL" in data:
variant_dict["info_{0}".format(i)] = f"{data['GT']}:{data['DP']}:{data['GL']}"
else:
variant_dict["info_{0}".format(i)] = f"{data['GT']}:{data['DP']}:"
else:
variant_dict[sample] = 0
variant_dict["info_{0}".format(i)] = "::"
# If vcf is annotated by snpEff
if "EFF" in variant.info:
annotation = variant.info["EFF"][0].split("|")
effect_type, effect_lvl = annotation[0].split("(")
try:
prot_effect, cds_effect = annotation[3].split("/")
except ValueError:
cds_effect = annotation[3]
prot_effect = ""
ann_dict = {
"CDS_position": cds_effect[2:],
"effect_type": effect_type,
"codon_change": annotation[2],
"gene_name": annotation[5],
"mutation_type": annotation[1],
"prot_effect": prot_effect[2:],
"prot_size": annotation[4],
"effect_impact": effect_lvl,
}
variant_dict = dict(variant_dict, **ann_dict)
return variant_dict
def _get_df(self):
if self._df is None:
data = []
if self.progress:
for variant in tqdm(self.variants):
data.append(self._variant_to_dict(variant))
else:
data = [self._variant_to_dict(variant) for variant in self.variants]
self._df = pd.DataFrame(data)
return self._df
df = property(_get_df)
[docs]
def to_vcf(self, output_filename):
"""Write VCF file in VCF format.
:params str output_filename: output VCF filename.
"""
vcf_writer = pysam.VariantFile(output_filename, mode="w", header=self._header)
IDs = set(self.df.ID)
for i, variant in enumerate(self.variants):
if str(i + 1) in IDs:
vcf_writer.write(variant)
vcf_writer.close()
[docs]
class FilteredVariantFile:
"""Variants filtered with VCF_freebayes.
Class kept for back compatiblity in wrappers and variant_calling pipeline
"""
def __init__(self, variants, fb_vcf):
""".. rubric:: constructor
:param list variants: list of variants record.
:param VCF_freebayes fb_vcf: class parent.
"""
self._variants = variants
self._vcf = fb_vcf
self._df = self._vcf_to_df()
@property
def variants(self):
"""Get the variant list."""
return self._variants
@property
def df(self):
"""Get the data frame."""
return self._df
@property
def vcf(self):
"""Get the VCF_freebayes object."""
return self._vcf
def _vcf_to_df(self):
"""Create a data frame with the most important information contained
in the VCF file.
"""
return pd.DataFrame([self.vcf._variant_to_dict(v) for v in self.variants])
[docs]
def to_csv(self, output_filename, info_field=False):
"""Write DataFrame in CSV format.
:params str output_filename: output CSV filename.
"""
with open(output_filename, "w") as fp:
print("# sequana_variant_calling;{0}".format(self.vcf.filters_params), file=fp)
if self.df.empty:
print(",".join(self.df.columns), file=fp)
else:
if info_field:
self.df.to_csv(fp, index=False)
else:
self.df.to_csv(fp, index=False, columns=self.df.columns[: -len(self.vcf.samples)])
[docs]
def to_vcf(self, output_filename):
"""Write VCF file in VCF format.
:params str output_filename: output VCF filename.
"""
vcf_writer = pysam.VariantFile(output_filename, mode="w", header=self._vcf._header)
for variant in self.variants:
vcf_writer.write(variant)
vcf_writer.close()
[docs]
def apply_variants(fasta_path, vcf_path, output_fasta):
logger.info("TO BE CHECKED / TESTED")
# Load reference sequence
fasta = pysam.FastaFile(fasta_path)
# Read VCF
try:
vcf = pysam.VariantFile(vcf_path)
except AttributeError:
vcf = vcf_path
# Store modifications
sequences = {seq: list(fasta.fetch(seq)) for seq in fasta.references}
print(sequences.keys())
# Apply variants
for record in tqdm(vcf):
chrom = record.chrom
pos = record.pos - 1 # VCF is 1-based, Python lists are 0-based
ref = record.ref
alts = record.alts # Tuple of alternative alleles
if chrom not in sequences:
continue
seq = sequences[chrom]
seq1 = sequences[chrom][:]
# Assume first ALT is the desired one (modify if needed)
alt = alts[0] if alts else ref
if len(ref) == len(alt): # SNP
print(pos, ref, alt, len(ref))
seq[pos : pos + len(ref)] = list(alt)
elif len(ref) > len(alt): # Deletion
seq[pos : pos + len(ref)] = list(alt) # alt is usually empty
elif len(ref) < len(alt): # Insertion
seq[pos : pos + 1] = list(alt) # Insert extra bases
# Write corrected FASTA
with open(output_fasta, "w") as out_fasta:
for chrom, seq in sequences.items():
out_fasta.write(f">{chrom}\n")
out_fasta.write("".join(seq) + "\n")