Source code for sequana.bed

#  This file is part of Sequana software
#  Copyright (c) 2016-2022 - Sequana Development Team
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#  website:
#  documentation:
import colorlog

logger = colorlog.getLogger(__name__)

__all__ = ["BED"]

[docs]class BED(object): """a structure to read and manipulate BED files (12-column file) columns are defined as chromosome name, start and end, gene_name, score, strand, CDS start and end, blcok count, block sizes, block starts: """ def __init__(self, filename): self.filename = filename def _get_line(self, line): try: if line.startswith("#") or line.startswith("track") or line.startswith("browser"): return {} else: fields = line.rstrip("\r\n").split() chrom_start = int(fields[1]) return { "chrom": fields[0], "start": chrom_start, "end": int(fields[2]), "gene_name": fields[3], "score": fields[4], "strand": fields[5], "cds_start": int(fields[6]), "cds_end": int(fields[7]), "block_count": int(fields[9]), "block_sizes": [int(x) for x in fields[10].strip(",").split(",")], "block_starts": [int(x) + chrom_start for x in fields[11].strip(",").split(",")], } except Exception as err: print(err) print("Input bed must be 12-column] skipped line {}".format(line)) return {} def __len__(self): count = 0 with open(self.filename, "r") as fin: for line in fin: if line.startswith(("#", "track", "browser")): continue count += 1 return count
[docs] def get_exons(self): """Extract exon regions from input BED file. Uses the first (chromosome name), second (chromosome start), 11th and 12th columns (exon start and size) of a 12-columns BED file. :: from sequana import sequana_data from sequana import BED b = BED(sequana_data("hg38_chr18.bed")) b.get_exons() """ exons = [] with open(self.filename, "r") as fin: for line in fin: if line.startswith("#"): continue if line.startswith("track"): continue if line.startswith("browser"): continue fields = line.rstrip("\r\n").split() assert len(fields) == 12 chrom_start = int(fields[1]) chrom_name = fields[0] block_sizes = (int(x) for x in fields[10].strip(",").split(",")) block_starts = (int(x) + chrom_start for x in fields[11].strip(",").split(",")) for start, size in zip(block_starts, block_sizes): exons.append((chrom_name, start, start + size)) return exons
[docs] def get_transcript_ranges(self): """Extract transcript from input BED file.""" with open(self.filename, "r") as fin: for line in fin: if line.startswith("#"): continue if line.startswith("track"): continue if line.startswith("browser"): continue fields = line.rstrip("\r\n").split() assert len(fields) == 12 chrom = fields[0] chrom_start = int(fields[1]) chrom_end = int(fields[2]) strand = fields[5] gene_name = fields[3] yield ( [ chrom, chrom_start, chrom_end, strand, "{}:{}:{}-{}".format(gene_name, chrom, chrom_start, chrom_end), ] )
[docs] def get_CDS_exons(self): """Extract CDS from input BED file.""" results = [] with open(self.filename, "r") as fin: for line in fin: row = self._get_line(line) if row: cds_start = row["cds_start"] cds_end = row["cds_end"] for start, size in zip(row["block_starts"], row["block_sizes"]): if (start + size) < cds_start: continue if start > cds_end: continue exon_start = max(start, cds_start) exon_end = min(start + size, cds_end) results.append([row["chrom"], exon_start, exon_end]) return results