Source code for sequana.ribodesigner

#  This file is part of Sequana software
#
#  Copyright (c) 2016-2020 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Ribodesigner module"""
import datetime
import json
import shutil
import subprocess
import sys
from itertools import product
from pathlib import Path

from sequana import logger
from sequana.fasta import FastA
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab, pysam
from sequana.tools import reverse_complement

logger.setLevel("INFO")


[docs]class RiboDesigner(object): """Design probes for ribosomes depletion. From a complete genome assembly FASTA file and a GFF annotation file: - Extract genomic sequences corresponding to the selected ``seq_type``. - For these selected sequences, design probes computing probe length and inter probe space according to the length of the ribosomale sequence. - Detect the highest cd-hit-est identity threshold where the number of probes is inferior or equal to ``max_n_probes``. - Report the list of probes in BED and CSV files. In the CSV, the oligo names are in column 1 and the oligo sequences in column 2. :param fasta: The FASTA file with complete genome assembly to extract ribosome sequences from. :param gff: GFF annotation file of the genome assembly. :param output_directory: The path to the output directory. :param seq_type: string describing sequence annotation type (column 3 in GFF) to select rRNA from. :param max_n_probes: Max number of probes to design :param force: If the `output_directory` already exists, overwrite it. :param threads: Number of threads to use in cd-hit clustering. :param float identity_step: step to scan the sequence identity (between 0 and 1) defaults to 0.01. """ def __init__( self, fasta, gff, output_directory, seq_type="rRNA", max_n_probes=384, force=False, threads=4, identity_step=0.01, **kwargs, ): # Input self.fasta = fasta self.gff = gff self.seq_type = seq_type self.max_n_probes = max_n_probes self.threads = threads self.outdir = Path(output_directory) self.identity_step = identity_step if force: self.outdir.mkdir(exist_ok=True) else: try: self.outdir.mkdir() except FileExistsError as err: # pragma: no cover logger.error(f"Output directory {output_directory} exists. Use --force or set force=True") sys.exit(1) # Output self.filtered_gff = self.outdir / "ribosome_filtered.gff" self.ribo_sequences_fasta = self.outdir / "ribosome_sequences.fas" self.probes_fasta = self.outdir / "probes_sequences.fas" self.clustered_probes_fasta = self.outdir / "clustered_probes.fas" self.clustered_probes_csv = self.outdir / "clustered_probes.csv" self.clustered_probes_bed = self.outdir / "clustered_probes.bed" self.output_json = self.outdir / "ribodesigner.json" self.json = { "max_n_probes": max_n_probes, "identity_step": identity_step, "feature": seq_type, }
[docs] def get_rna_pos_from_gff(self): """Convert a GFF file into a pandas DataFrame filtered according to the self.seq_type. """ gff = pd.read_csv( self.gff, sep="\t", comment="#", names=[ "seqid", "source", "seq_type", "start", "end", "score", "strand", "phase", "attributes", ], ) filtered_gff = gff.query("seq_type == @self.seq_type") with pysam.Fastafile(self.fasta) as fas: with open(self.ribo_sequences_fasta, "w") as fas_out: for row in filtered_gff.itertuples(): region = f"{row.seqid}:{row.start}-{row.end}" seq_record = f">{region}\n{fas.fetch(region=region)}\n" fas_out.write(seq_record) seq_types = gff.seq_type.unique().tolist() logger.info(f"Genetic types found in gff: {','.join(seq_types)}") logger.info(f"Found {filtered_gff.shape[0]} '{self.seq_type}' entries in the annotation file.") logger.debug(f"\t" + filtered_gff.to_string().replace("\n", "\n\t")) filtered_gff.to_csv(self.filtered_gff)
def _get_probe_and_step_len_greedy(self, seq): """Modified version of _get_probe_and_step_len""" seq_len = len(seq.sequence) # sequences below 92 base fail when scanning the parameter space. # in such case, one probe or two overlapping probes should do it. if seq_len < 50: return 50, 15 elif seq_len < 100: return 40, 10 probe_lens = range(60, 40, -1) inter_probe_space = range(20, 10, -1) for probe_len, inter_probe_space in product(probe_lens, inter_probe_space): if ((seq_len + inter_probe_space) / (probe_len + inter_probe_space)).is_integer(): return probe_len, inter_probe_space # 4% of sequence length are not found in the parameter space [60-40] x [10-20] # Using 70-40 x 30-10 gives 0 fails for sequences up to 200,000 bases probe_lens = range(70, 40, -1) inter_probe_space = range(30, 10, -1) for probe_len, inter_probe_space in product(probe_lens, inter_probe_space): X = (seq_len + inter_probe_space) / (probe_len + inter_probe_space) if X.is_integer(): return probe_len, inter_probe_space # 34 of sequence length are not found in the parameter space [60-40] x [10-20] # Using 80-40 x 35-10 gives 0 fails for sequences up to 200,000 bases probe_lens = range(80, 40, -1) inter_probe_space = range(35, 10, -1) for probe_len, inter_probe_space in product(probe_lens, inter_probe_space): X = (seq_len + inter_probe_space) / (probe_len + inter_probe_space) if X.is_integer(): return probe_len, inter_probe_space raise ValueError( f"No correct probe length/inter probe space combination was found for {seq.name}" ) # pragma: no cover def _get_probe_and_step_len(self, seq): """Calculates the probe_len and inter_probe_space for a ribosomal sequence. ribo_len = probe_len * n + (inter_probe_space * (n - 1)) <=> n = (ribo_len + inter_probe_space) / (prob_len + inter_probe_space) """ seq_len = len(seq.sequence) probe_lens = range(60, 40, -1) inter_probe_space = range(20, 10, -1) for probe_len, inter_probe_space in product(probe_lens, inter_probe_space): if ((seq_len + inter_probe_space) / (probe_len + inter_probe_space)).is_integer(): return probe_len, inter_probe_space raise ValueError( f"No correct probe length/inter probe space combination was found for {seq.name}" ) # pragma: no cover def _get_probe_and_step_len_simple(self, seq): seq_len = len(seq.sequence) if seq_len < 50: return 50, 15 elif seq_len < 100: return 40, 10 probe_len = 50 inter_probe_space = 15 # starts = arange(0, seq_len, probe_len+inter_probe_space) return 50, 15 def _get_probe_and_step_len_spiral(self, seq): # much slower than original and greedy but ensure that probes are closer to the # expected value seq_len = len(seq.sequence) if seq_len < 50: return 50, 15 elif seq_len < 100: return 40, 10 def spiral(X, Y, x0, y0): items = [] x = y = 0 dx = 0 dy = -1 for i in range(max(X, Y) ** 2): if (-X / 2 - 1 < x <= X / 2) and (-Y / 2 - 1 < y <= Y / 2): items.append((x, y)) if x == y or (x < 0 and x == -y) or (x > 0 and x == 1 - y): dx, dy = -dy, dx x, y = x + dx, y + dy items = [(x + X / 2 + x0, y + Y / 2 + y0) for x, y in items] return items # set of points from 40 to 60 (40+21) and from 10 to 20 (10+11) positions = spiral(40, 10, 40, 10) for probe_len, inter_probe_space in positions: if ((seq_len + inter_probe_space) / (probe_len + inter_probe_space)).is_integer(): return int(probe_len), int(inter_probe_space) raise ValueError( f"No correct probe length/inter probe space combination was found for {seq.name}" ) # pragma: no cover def _get_probes_df(self, seq, probe_len, step_len, mode="generic"): """Generate the Dataframe with probes information. Design probes to have end-to-end coverage on the + strand and fill the inter_probe_space present on the + strand with probes designed on the - strand. :param seq: A pysam sequence object. :param prob_len: The length of the probes calculated by self._get_probe_and_step_len. :param step_len: The length of the inter-probe space calculated by self._get_probe_and_step_len. :param strand: The strand on which probes are designed. """ # + strand probes starts = [start for start in range(0, len(seq.sequence) - probe_len + 1, probe_len + step_len)] stops = [start + probe_len for start in starts] df = pd.DataFrame( { "name": seq.name, "start": starts, "stop": stops, "strand": "+", "score": 0, } ) df["sequence"] = [seq.sequence[row.start : row.stop] for row in df.itertuples()] df["seq_id"] = df["name"] + f"_+_" + df["start"].astype(str) + "_" + df["stop"].astype(str) # - strand probes sequence = reverse_complement(seq.sequence) # Starts reverse probes to be centered on inter_probe_space of the forward probes rev_starts = [int((starts[i + 1] + starts[i]) / 2) for i in range(0, len(starts) - 1)] rev_stops = [start + probe_len for start in rev_starts] if mode == "simple": rev_starts = [x for x in starts] rev_stops = [start + probe_len for start in rev_starts] df_rev = pd.DataFrame( { "name": seq.name, "start": rev_starts, "stop": rev_stops, "strand": "-", "score": 0, } ) df_rev["sequence"] = [sequence[row.start : row.stop] for row in df_rev.itertuples()] df_rev["seq_id"] = df_rev["name"] + f"_-_" + df_rev["start"].astype(str) + "_" + df_rev["stop"].astype(str) # Transform to bed coordinates for the reverse_complement df_rev["start"] = len(sequence) - df_rev["start"] df_rev["stop"] = len(sequence) - df_rev["stop"] df_rev.rename(columns={"start": "stop", "stop": "start"}, inplace=True) return pd.concat([df, df_rev])
[docs] def get_all_probes(self, method="original"): """Run all probe design and concatenate results in a single DataFrame.""" probes_dfs = [] with pysam.FastxFile(self.ribo_sequences_fasta) as fas: for seq in fas: if method == "greedy": probe_len, step_len = self._get_probe_and_step_len_greedy(seq) df = self._get_probes_df(seq, probe_len, step_len) elif method == "original": probe_len, step_len = self._get_probe_and_step_len(seq) df = self._get_probes_df(seq, probe_len, step_len) elif method == "spiral": probe_len, step_len = self._get_probe_and_step_len_spiral(seq) df = self._get_probes_df(seq, probe_len, step_len) elif method == "simple": probe_len, step_len = self._get_probe_and_step_len_simple(seq) df = self._get_probes_df(seq, probe_len, step_len, mode="simple") probes_dfs.append(df) self.probes_df = pd.concat(probes_dfs) self.probes_df["kept_after_clustering"] = True self.probes_df["bed_color"] = self.probes_df.kept_after_clustering.map({True: "21,128,0", False: "128,64,0"})
[docs] def export_to_fasta(self): """From the self.probes_df, export to FASTA and CSV files.""" with open(self.probes_fasta, "w") as fas: for row in self.probes_df.itertuples(): fas.write(f">{row.seq_id}\n{row.sequence}\n")
[docs] def clustering_needed(self, force=False): """Checks if a clustering is needed. :param force: force clustering even if unecessary. """ # Do not cluster if number of probes already inferior to defined threshold if not force and self.probes_df.shape[0] <= self.max_n_probes: logger.info( f"Number of probes {self.probes_df.shape[0]} already inferior to {self.max_n_probes}. No clustering will be performed." ) return False else: return True
[docs] def cluster_probes(self): """Use cd-hit-est to cluster highly similar probes.""" outdir = ( Path(self.clustered_probes_fasta).parent / f"cd-hit-est-{datetime.datetime.today().isoformat(timespec='seconds', sep='_')}" ) outdir.mkdir() log_file = outdir / "cd-hit.log" res_dict = {"seq_id_thres": [], "n_probes": []} for seq_id_thres in np.arange(0.8, 1, self.identity_step).round(3): tmp_fas = outdir / f"clustered_{seq_id_thres}.fas" cmd = f"cd-hit-est -i {self.probes_fasta} -o {tmp_fas} -c {seq_id_thres} -n {self.threads}" logger.debug(f"Clustering probes with command: {cmd} (log in '{log_file}').") with open(log_file, "a") as f: subprocess.run(cmd, shell=True, check=True, stdout=f) res_dict["seq_id_thres"].append(seq_id_thres) res_dict["n_probes"].append(len(FastA(tmp_fas))) # Add number of probes without clustering res_dict["seq_id_thres"].append(1) res_dict["n_probes"].append(self.probes_df.shape[0]) self.json["results"] = res_dict # Dataframe with number of probes for each cdhit identity threshold pylab.clf() df = pd.DataFrame(res_dict) import seaborn as sns # local import to speed up imports p = sns.lineplot(data=df, x="seq_id_thres", y="n_probes", markers=["o"]) p.axhline( self.max_n_probes, alpha=0.8, linestyle="--", color="red", label="max number of probes requested", ) pylab.xlabel("Sequence identity", fontsize=16) pylab.ylabel("Number of probes", fontsize=16) # Extract the best identity threshold best_thres = df.query("n_probes <= @self.max_n_probes").seq_id_thres.max() if not np.isnan(best_thres): n_probes = df.query("seq_id_thres == @best_thres").loc[:, "n_probes"].values[0] self.json["n_probes"] = int(n_probes) self.json["best_thres"] = best_thres logger.info(f"Best clustering threshold: {best_thres}, with {n_probes} probes.") shutil.copy(outdir / f"clustered_{best_thres}.fas", self.clustered_probes_fasta) kept_probes = [seq.name for seq in FastA(outdir / f"clustered_{best_thres}.fas")] self.probes_df["kept_after_clustering"] = self.probes_df.seq_id.isin(kept_probes) self.probes_df["bed_color"] = self.probes_df.kept_after_clustering.map( {True: "21,128,0", False: "128,64,0"} ) self.probes_df["clustering_thres"] = best_thres pylab.plot(best_thres, n_probes, "o", label="Final number of probes") pylab.legend() else: logger.warning( f"No identity threshold was found to have as few as {self.max_n_probes} probes. Keep all probes. Set a valid value with --max-n-probes between {df.n_probes.min()} (min) and {df.n_probes.max()} (max)" ) self.clustering_df = df.sort_values("seq_id_thres") return self.probes_df.query("kept_after_clustering == True")
[docs] def export_to_csv_bed(self): """Export final results to CSV and BED files""" if self.clustering_needed(): df = self.cluster_probes() else: df = self.probes_df df.to_csv(self.clustered_probes_csv, index=False, columns=["seq_id", "sequence"]) self.probes_df.to_csv( self.clustered_probes_bed, sep="\t", index=False, header=None, columns=[ "name", "start", "stop", "sequence", "score", "strand", "start", "stop", "bed_color", ], )
[docs] def export_to_json(self): with open(self.output_json, "w") as fout: json.dump(self.json, fout, indent=4, sort_keys=True)
[docs] def run(self, method="greedy"): if self.gff: self.get_rna_pos_from_gff() else: shutil.copy(self.fasta, self.ribo_sequences_fasta) self.get_all_probes(method=method) self.export_to_fasta() self.export_to_csv_bed() self.export_to_json()