Source code for sequana.fastq_splitter

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2022 - Sequana Development Team
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/sequana/sequana
#  documentation: http://sequana.readthedocs.io
#
##############################################################################
# sequana/fastq_tools/splitter.py
import gzip
import os
from pathlib import Path
from typing import Iterator, Union

import colorlog
from tqdm import tqdm

logger = colorlog.getLogger(__name__)


[docs] class FastqSplitter: def __init__(self, filename: Union[str, Path]): self.filename = Path(filename) self.is_gz = self.filename.suffix == ".gz" def _open(self): if self.is_gz: return gzip.open(self.filename, "rt") return open(self.filename, "r") def _read_fastq(self) -> Iterator[list[str]]: """Yield one FASTQ record (4 lines).""" with self._open() as fh: while True: record = [fh.readline() for _ in range(4)] if not record[0]: break yield record def _open_outfile(self, pattern: str, part_num: int, gzip_output: bool): out_name = pattern.replace("#", str(part_num)) out_path = Path(out_name) out_path.parent.mkdir(parents=True, exist_ok=True) mode = "wt" if gzip_output and not out_path.suffix == ".gz": out_path = out_path.with_suffix(out_path.suffix + ".gz") if gzip_output: return gzip.open(out_path, mode, compresslevel=5) return open(out_path, mode)
[docs] def get_nreads(self): logger.info("Computing read counts") from sequana.tools import GZLineCounter count = GZLineCounter(self.filename) count = count._use_gzip() / 4 print(count) return count
[docs] def by_size(self, reads_per_chunk: int, pattern: str, gzip_output: bool = False, buffer_size: int = 10000): """Split FASTQ into chunks of N reads with file-based progress.""" # rough estimate of total reads nreads = self.get_nreads() est_total_files = max(1, nreads // reads_per_chunk) chunk_id = 1 out_fh = self._open_outfile(pattern, chunk_id, gzip_output) buffer = [] reads_written = 0 progress = tqdm(total=est_total_files, desc="Splitting FASTQ", unit="file") for record in self._read_fastq(): buffer.extend(record) reads_written += 1 if len(buffer) >= 4 * buffer_size: out_fh.writelines(buffer) buffer.clear() if reads_written >= reads_per_chunk: if buffer: out_fh.writelines(buffer) buffer.clear() out_fh.close() chunk_id += 1 out_fh = self._open_outfile(pattern, chunk_id, gzip_output) reads_written = 0 progress.update(1) if buffer: out_fh.writelines(buffer) out_fh.close() progress.update(1) # last file progress.close()
[docs] def by_part(self, n_parts: int, pattern: str, gzip_output: bool = False, buffer_size: int = 10000): """Split FASTQ into N parts with approximate read counting.""" nreads = self.get_nreads() reads_per_part = max(1, nreads // n_parts) chunk_id = 1 out_fh = self._open_outfile(pattern, chunk_id, gzip_output) buffer = [] reads_written = 0 progress = tqdm(total=n_parts, desc="Splitting FASTQ", unit="file") for record in self._read_fastq(): buffer.extend(record) reads_written += 1 # write buffer for efficiency if len(buffer) >= 4 * buffer_size: out_fh.writelines(buffer) buffer.clear() # switch file when reads_per_part is reached, except for last chunk if reads_written >= reads_per_part and chunk_id < n_parts: if buffer: out_fh.writelines(buffer) buffer.clear() out_fh.close() chunk_id += 1 out_fh = self._open_outfile(pattern, chunk_id, gzip_output) reads_written = 0 progress.update(1) # write remaining reads to the last chunk if buffer: out_fh.writelines(buffer) out_fh.close() progress.update(1) progress.close()