Source code for sequana.snpeff

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2021 - Sequana Development Team
#
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
""" Tools to launch snpEff."""
import os
import re
import shutil
import subprocess as sp
import sys

import colorlog

from sequana import FastA, sequana_data
from sequana.resources import snpeff

logger = colorlog.getLogger(__name__)


[docs]class SnpEff(object): """SnpEff is a tool dedicated to annotate detected variants in a VCF file. This wrapper eases the annotation with a genbank file. It create automatically the custom database. Then, run snpEff with a subprocess. Caution, the locus name (or chromosome name) in genbank file and the sequence name in VCF file must be the same. Otherwise, snpEff is not able to bind informations. Example: :: snpeff = SnpEff('file.gbk') snpeff.launch_snpeff('variants.vcf', 'variant.ann.vcf') If your input is in GFF format, you must also provide the fasta reference file. Will save relevant snpeff data into ./data directory (or snpeff_datadir). """ def __init__( self, annotation, log=None, snpeff_datadir="data", fastafile=None, build_options="", ): """.. rubric:: Constructor :param annotation: annotation reference. :param file_format: format of your file. ('only genbank actually') :param log: log file :param snpeff_datadir: default to data. :param fastafile: if a GFF is used, you must provide the FASTA input file as well """ # Check if the input file exist if os.path.isfile(annotation): self.annotation = annotation self.fastafile = fastafile self.ref_name = os.path.basename(annotation).split(".")[0] if self.annotation.endswith(".genbank") or self.annotation.endswith(".gbk"): self.format = "gbk" elif self.annotation.endswith(".gff3") or self.annotation.endswith(".gff"): self.format = "gff3" else: logger.error("Format must be genbank or gff3") sys.exit(1) else: logger.error("FileNotFoundError: The file " + annotation + " does not exist") sys.exit(1) # Keep data directory where everything will be saved # cast to string in case we ahave a localpath instance self.snpeff_datadir = str(snpeff_datadir) # Set the log file self.log_file = log if log is not None: if os.path.isfile(log): # pragma: no cover os.remove(log) # Check if snpEff.config is present self.configfile = f"{self.snpeff_datadir}/snpEff.config" if not os.path.exists(self.configfile): logger.info("snpEff.config file not found, creating one") self._copy_snpeff_config() else: # pragma: no cover logger.info(f"Using existing config file: {self.configfile}.") # Create custom database self.build_options = build_options if not os.path.exists(os.sep.join([self.snpeff_datadir, self.ref_name, "snpEffectPredictor.bin"])): self._add_custom_db() else: # pragma: no cover logger.info("DB already added in your config and database") def _copy_snpeff_config(self): """Copy and unzip the snpEff.config file.""" CONFIG = sequana_data("snpEff.config", "snpeff") os.makedirs(self.snpeff_datadir, exist_ok=True) shutil.copyfile(CONFIG, self.configfile) def _add_custom_db(self): """Add your custom file in the local snpEff database.""" # create directory and copy annotation file logger.info("adding custom DB using your input file(s)") logger.info(f" - {self.annotation}") if self.fastafile: logger.info(f" - {self.fastafile}") genome_dir = os.path.sep.join([self.snpeff_datadir, self.ref_name]) os.makedirs(genome_dir, exist_ok=True) # add new annotation file in config file self._add_db_in_config() if self.format == "gbk": shutil.copyfile(self.annotation, os.sep.join([genome_dir, "genes.gbk"])) snpeff_build_line = ["snpEff", "build", "-genbank", "-v"] snpeff_build_line += [self.ref_name] elif self.format == "gff3": shutil.copyfile(self.annotation, os.sep.join([genome_dir, "genes.gff"])) if self.fastafile is None or not os.path.exists(self.fastafile): logger.error(f"Input file {self.fastafile} does not exist") sys.exit(1) shutil.copyfile(self.fastafile, os.sep.join([genome_dir, "sequences.fa"])) snpeff_build_line = ["snpEff", "build", "-gff3", "-v"] snpeff_build_line += [self.ref_name] # set config path, which has been saved in the datadir directory snpeff_build_line += ["-c", self.configfile] # add any extra build options for 'snpeff build' command snpeff_build_line += self.build_options.split() if self.log_file: with open(self.log_file, "ab") as fl: snp_build = sp.Popen(snpeff_build_line, stderr=fl, stdout=fl) else: snp_build = sp.Popen(snpeff_build_line) snp_build.wait() rc = snp_build.returncode if rc != 0: # pragma: no cover logger.error(f"snpEff build return a non-zero code. See {self.log_file} for details") sys.exit(rc) def _add_db_in_config(self): """Add new annotation at the end of snpEff.config file.""" logger.info(f"Updating configuration file in {self.configfile}") with open(self.configfile, "a") as fp: print(self.ref_name + ".genome : " + self.ref_name, file=fp)
[docs] def launch_snpeff(self, vcf_filename, output, html_output=None, options=""): """Launch snpEff with the custom genbank file. :param str vcf_filename: input VCF filename. :param str output: output VCF filename. :param str html_output: filename of the HTML creates by snpEff. :param str options: any options recognised by snpEff. """ # Create command line for Popen args_ann = ["snpEff", "-formatEff"] # cast in case we have a Path instance (e.g. in testing) if html_output is not None: args_ann += ["-s", str(html_output)] args_ann += options.split() args_ann += ["-v", self.ref_name, vcf_filename] # specify the config file args_ann += ["-c", self.configfile] logger.info(" ".join(args_ann)) # Launch snpEff if self.log_file: with open(self.log_file, "ab") as fl, open(output, "wb") as fp: proc_ann = sp.Popen(args_ann, stdout=fp, stderr=fl) proc_ann.wait() else: with open(output, "wb") as fp: proc_ann = sp.Popen(args_ann, stdout=fp) proc_ann.wait()
def _get_seq_ids(self): if self.format == "gbk": regex = re.compile(r"^LOCUS\s+([\w\.\-]+)") chrom_regex = re.compile(r'\\chromosome="([\w\.\-]+)"') with open(self.annotation, "r") as fp: line = fp.readline() seq = regex.findall(line) for line in fp: if line.strip().startswith( ( "gene", "CDS", ) ): break chrom = chrom_regex.search(line) if chrom: # pragma: no cover seq = [chrom.group(1)] regex = chrom_regex seq += [regex.search(line).group(1) for line in fp if regex.search(line)] return seq else: # pragma: no cover regex = re.compile(r"^##sequence-region\s+([\w\.\-]+)") with open(self.annotation, "r") as fp: line = fp.readline() seq = regex.findall(line) for line in fp: chrom = regex.findall(line) if chrom: seq += chrom return seq
[docs] def add_locus_in_fasta(self, fasta, output_file): """Add locus of annotation file in description line of fasta file. If fasta file and genbank file do not have the same names. :param str fasta: input fasta file where you want to add locus. :param str output_file: output file. FIXME: fasta is already known if provided in the init """ fasta_record = FastA(fasta) ids_list = self._get_seq_ids() # check if both files have same number of contigs if len(fasta_record) != len(ids_list): # pragma: no cover print( "fasta and annotation files don't have the same number of " "contigs. Found {} and {}".format(len(fasta_record), len(ids_list)) ) sys.exit(1) # check if directory exist output_dir = os.path.dirname(output_file) os.makedirs(output_dir, exist_ok=True) if sorted(fasta_record.names) == sorted(ids_list): logger.info("Files have same sequence id.") if os.path.isfile(output_file): # pragma: no cover os.remove(output_file) os.symlink(os.path.realpath(fasta), output_file) return else: logger.info( "fasta and GFF seem to have different IDs. Creating a" "new coherent fasta file assuming the chromsome names appear " "in the same order in the fasta and gff" ) with open(output_file, "w") as fp: # write fasta with seqid of annotation file for n in range(len(fasta_record)): seq_id = ">{0} {1}\n".format(ids_list[n], fasta_record.names[n]) seq = fasta_record.sequences[n] sequence = "\n".join([seq[i : min(i + 80, len(seq))] for i in range(0, len(seq), 80)]) + "\n" contigs = seq_id + sequence fp.write(contigs)
[docs]def download_fasta_and_genbank(identifier, tag, genbank=True, fasta=True, outdir="."): """ :param identifier: valid identifier to retrieve from NCBI (genbank) and ENA (fasta) :param tag: name of the filename for the genbank and fasta files. """ if genbank: from bioservices import EUtils eu = EUtils() data = eu.EFetch(db="nuccore", id=identifier, rettype="gbwithparts", retmode="text") if isinstance(data, int) and data == 400: # pragma: no cover raise ValueError(f"{identifier} not found on NCBI") else: with open(f"{outdir}/{tag}.gbk", "w") as fout: fout.write(data.decode()) if fasta: from bioservices import ENA ena = ENA() data = ena.get_data(identifier, "fasta") if isinstance(data, int) and data == 400: # pragma: no cover raise ValueError("{} not found on ENA".format(identifier)) else: with open(f"{outdir}/{tag}.fa", "w") as fout: try: # change in API in v1.7.8 fout.write(data) except: # pragma: no cover fout.write(data.decode())