Source code for sequana.fasta_gff_correction

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Utilities to manipulate FastA files"""
import textwrap
from collections import defaultdict

import colorlog

from sequana import GFF3, Codon, FastA, VariantFile
from sequana.lazy import pandas as pd
from sequana.lazy import pysam

logger = colorlog.getLogger(__name__)


__all__ = ["FastaGFFCorrection"]


[docs] class FastaGFFCorrection: """Class to correct FastA reference and its GFF given a set of variants :: f = FastaGFFCorrection("input.fa", "input.vcf") f.fix_and_save_fasta("new.fa") f.fix_and_save_gff("new.fa", "input.gff3", "new.gff3") You must call :meth:`save_fasta` before :meth:`save_gff` to compute the shifts of positions found in the VCF file. The GFF corrections are made on the start and stop positions of all features found in the GFF. In addition, gene and CDS start and stop positions are also corrected for the change of start/stop codon implied by the variants. """ def __init__(self, fasta_file, vcf_file): self._fasta_file = fasta_file self.vcf = VariantFile(vcf_file) self.shifts = None self.positions = None self.codon = Codon() # parameter used in find_start_codon_position method. self._start_codon_max_shift = 10000
[docs] def fix_and_save_fasta(self, outfile): """ We consider that the VCF file contains variants that have already been filtered. .. todo:: handle multiple-variants at a given position """ # initiate the fasta file self._fasta = pysam.FastxFile(self._fasta_file) # read the variants once for all variants = self.vcf.variants # and build a convenient dataframe df_vcf = self.vcf.df # let us make sure the positions are integers df_vcf["position"] = [int(x) for x in df_vcf.position] # what chrom names do we have ? chrom_names = df_vcf["chr"].unique() # let us open the output FastA file handler with open(outfile, "w") as fout: # we will keep track of shifts due to INDELs and their positions # for each chromosome. This information will be required if we also # want to correct the GFF file. shifts = {k: [] for k in chrom_names} positions = {k: [] for k in chrom_names} # let us scan each input sequence in the input FastA file for entry in self._fasta: # retrieve name/comment/sequence sequence, chrname, comment = entry.sequence, entry.name, entry.comment # initiate the new sequence with variants new_sequence = "" # we will use a cursor to scan the original sequence cursor = 0 diff = 0 sub = df_vcf.query("chr == @chrname") if sub.empty: new_sequence = sequence else: for row in sub[["position", "reference", "alternative"]].itertuples(index=False): pos, ref, alt = row.position, row.reference, row.alternative # accumulate bases before the variant and add the alternate new_sequence += sequence[cursor : pos - 1] + alt # we update the cursor to scan the original sequence cursor = pos - 1 + len(ref) assert sequence[pos - 1 : pos - 1 + len(ref)] == ref if len(ref) != len(alt): diff += len(alt) - len(ref) shifts[chrname].append(diff) positions[chrname].append(pos) # do not forget the final slice after last variant new_sequence += sequence[cursor:] fout.write(f">{chrname}\t{comment}\n") for line in textwrap.wrap(new_sequence, width=70): fout.write(line + "\n") self.shifts = shifts self.positions = positions return {"shifts": shifts, "positions": positions}
def _get_shift(self, position, chrom_name): """Utility method to get the shift created by INDELs at a given position This functions returns the sum of Insertion minus Deletions on the LHS of a given position """ shifts = self.shifts[chrom_name] positions = self.positions[chrom_name] if position < positions[0]: return 0 else: # identify the closest INDELs just before # got forward and stop once we are before the position for pos, shift in zip(positions[::-1], shifts[::-1]): if pos <= position: return shift
[docs] def fix_and_save_gff(self, gff_infile, fasta_corrected, gff_outfile, maxshift=8000): """Corrects the start/stop codons of input GFF given corrected fasta You must call :meth:`fix_and_save_fasta` first. .. todo:: multiple contig is not yet implemented""" # initiate the original fasta file fasta = pysam.FastxFile(self._fasta_file) contig = next(fasta) L = len(contig.sequence) # reads the corrected fasta corfasta = pysam.FastxFile(fasta_corrected) contig = next(corfasta) chrom_name = contig.name corseq = contig.sequence newL = len(corseq) with open(gff_infile, "r") as fin, open(gff_outfile, "w") as fout: for line in fin: # we can skip empty lines if not line.strip(): continue if line.startswith("#"): # first commented lines usually contains the sequence # length. may also contain the sequence name line = line.replace(str(L), str(newL)) fout.write(f"{line}") else: # process line, splitting items items = line.split("\t") # regions just need to update the length of the region if items[2] == "region": items[4] = str(newL) line = "\t".join(items) fout.write(f"{line}") else: # we update the starting and ending positions of all other features start, end = int(items[3]), int(items[4]) S1 = self._get_shift(start, chrom_name) S2 = self._get_shift(end, chrom_name) start += S1 end += S2 # We could stop here but gene/CDS need to be check carefully since # their starting/ending codon may not be correct anymore due # to the shift (not a modulo 3 anymore), or variant that may happen # exactly at the start/stop codon position. In principle the start codon # should be correct (except if a variant occured) but stop codon will differ # if an INDEL exists in between if items[2] in ["gene", "CDS"]: strand = items[6] if strand == "+": # given a gene/CDS, first retrieve the start codon position # on the corrected sequence at start - 1 (-1 because python # uses 0-base convetion. start = self.find_start_codon_position(corseq, start - 1, strand="+") + 1 assert corseq[start - 1 : start - 1 + 3] in self.codon.codons["start"]["+"] # We now scan the CDS/gene from start to the end searching for stop codon # until we reach the end position. stop_codon_position = 0 for cursor in range(start, end + 1, 3): codon = corseq[cursor - 1 : cursor - 1 + 3] if codon in self.codon.codons["stop"]["+"]: stop_codon_position = cursor # we now have the closest stop codon to the end position # we keep the delta with respect to the current known end position delta = end - stop_codon_position # and see if we can get a closer one after the end position for cursor in range(end + 1, end + maxshift, 3): codon = corseq[cursor - 1 : cursor - 1 + 3] if codon in self.codon.codons["stop"]["+"]: stop_codon_position = cursor break if cursor - end > delta and stop_codon_position != 0: break # we need to check whether we found something if stop_codon_position == 0: raise Exception("stop_codon_position no found") items[3] = str(start) items[4] = str(stop_codon_position + 2) elif strand == "-": # given a gene/CDS, first retrieve the start codon position # on the corrected sequence at start - 1 (-1 because python # uses 0-base convetion. end = self.find_start_codon_position(corseq, end - 3 - 1, strand="-") end += 3 + 1 # 3 for the end of the codon +1 for 1-base convention assert corseq[end - 3 - 1 : end - 1] in self.codon.codons["start"]["-"] # We now scan the CDS/gene from start to the end searching for stop codon # until we reach the end position. stop_codon_position = 0 for cursor in range(end, start - 1, -3): codon = corseq[cursor - 3 - 1 : cursor - 1] if codon in self.codon.codons["stop"]["-"]: stop_codon_position = cursor - 3 # we now have the closest stop codon to the end position # we keep the delta with respect to the current known end position delta = stop_codon_position - start - 1 # and see if we can get a closer one after the end position for cursor in range(start, start - maxshift - 1, -3): codon = corseq[cursor - 3 - 1 : cursor - 1] if codon in self.codon.codons["stop"]["-"]: stop_codon_position = cursor - 3 break if start - 1 - cursor > delta and stop_codon_position != 0: break # we need to check whether we found something if stop_codon_position == 0: raise Exception("stop_codon_position not found") items[3] = str(stop_codon_position) items[4] = str(end - 1) else: raise ValueError(f"strand must be + or -. Found {strand}") line = "\t".join(items) fout.write(f"{line}")
[docs] def find_start_codon_position(self, sequence, position, strand): """return position of start codon""" try: position, codon = self.codon.find_start_codon_position( sequence, position, strand, max_shift=self._start_codon_max_shift ) return position except TypeError: logger.warning(f"Could not find start codon at position {position}. keep given position") return position
[docs] def get_all_start_codons(self, fasta, gff, chr_name, strand, feature="CDS"): """ On strand +, start codon are ATG, TTG, GTG On strand -, start codon (from 3-5 prime) are CAT, CAA, CAC (reverse complement of start codon on strand+). """ gff = GFF3(gff) fasta = FastA(fasta) for ctg in fasta: if ctg.name == chr_name: seq = ctg.sequence break d = defaultdict(int) if strand == "+": for start in gff.df.query("seqid==@ctg.name and genetic_type==@feature and strand=='+'").start: d[ctg.sequence[start - 1 : start + 3 - 1]] += 1 else: # for start in ...stop is not a mistake for start in gff.df.query("seqid==@ctg.name and genetic_type==@feature and strand=='-'").stop: d[ctg.sequence[start - 3 : start]] += 1 return d
[docs] def get_all_stop_codons(self, fasta, gff, chr_name, strand, feature="CDS"): """ On strand +, stop codons are TAG, TGA, TAA On strand -, stop codons (from 3-5 prime) are TTA, TCA, CTA (reverse complement of start codon on strand+). """ gff = GFF3(gff) fasta = FastA(fasta) for ctg in fasta: if ctg.name == chr_name: seq = ctg.sequence break d = defaultdict(int) if strand == "+": for stop in gff.df.query("seqid==@ctg.name and genetic_type==@feature and strand=='+'").stop: d[ctg.sequence[stop - 3 : stop]] += 1 else: # for stop in ...start is not a mistake for stop in gff.df.query("seqid==@ctg.name and genetic_type==@feature and strand=='-'").start: d[ctg.sequence[stop - 1 : stop - 1 + 3]] += 1 return d