#
# This file is part of Sequana software
#
# Copyright (c) 2018-2022 - Sequana Development Team
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"IEM class"
import collections
import io
import os
import sys
import colorlog
from sequana.lazy import pandas as pd
logger = colorlog.getLogger(__name__)
__all__ = ["SampleSheet", "IEM"]
[docs]class SampleSheet:
"""Reader and validator of Illumina samplesheets
The Illumina samplesheet reader and validator verifies the correctness of the sections
in the samplesheet, which are not case-sensitive and are enclosed within square brackets.
Following the closing bracket, no additional characters are permitted, except
for commas and the end-of-line marker. For instance [Data]a prevents the [Data] section
from being correctly processed.
The sections then consist of key-value pairs represented as records, with each line consisting
of precisely two fields.
An optional [Settings] section can contain key-value pairs, and
the [Reads] section specifies the number of cycles per read, which
is exclusively required for MiSeq.
The [Data] section, which is a table similar to CSV format, is optional.
However, without [Data] section all reads are sent to a single 'undetermined'
output file. Sample_ID is highly recommended.
Example of typical Data section to be used with bcl2fastq::
[Header]
[Data]
Sample_ID,Sample_Name,I7_Index_ID,index,I5_INdex_ID,index2
A10001,Sample_A,D701,AATACTCG,D501,TATAGCCT
A10002,Sample_B,D702,TCCGGAGA,D501,TATAGCCT
A10003,Sample_C,D703,CGCTCATT,D501,TATAGCCT
A10004,Sample_D,D704,GAGATTCC,D501,TATAGCCT
Important: altough we have upper case names as specified in the Illumina specs, the
bcl2fastq does not care about the upper case. This is not intuitive since IEM produces
keys with upper and lower case names similarly to the specs.
**Sequana Standalone**
The standalone application **sequana** contains a subcommand based on this class::
sequana samplesheet
that can be used to check the correctness of a samplesheet::
sequana samplesheet --check SampleSheet.csv
:references: illumina specifications 970-2017-004.pdf
"""
expected_headers_fields = [
"IEMFileVersion",
"Investigator Name",
"Instrument Type",
"Experiment Name",
"Date",
"Workflow",
"Application",
"Assay",
"Description",
"Chemistry",
"Index Adapters",
]
expected_data_headers = {"SE": [], "PE": []}
def __init__(self, filename):
self.filename = filename
if not os.path.exists(self.filename):
raise IOError(f"{filename} does not exist")
# figures out the sections in the sample sheet.
# we use a try/except so that even in case of failure, we can still use
# quickfix or attributes.
try:
self._scan_sections()
except Exception as err: # pragma: no cover
print(err)
def _line_cleaner(self, line, line_count):
# We can get rid of EOL and spaces
line = line.strip()
# is it an empty line ?
if len(line) == 0:
return line
# if we are dealing with a section title, we can cleanup the
# line. A section must start with '[' and ends with ']' but
# there could be spaces and commands. If it ends with a ; then
# the section will not be found as expected since this is
# sympatomatic of further issues
if line.startswith("["):
# [Header], ,, ,\n becomes [Header]
line = line.strip(", ;") # note the space AND comma
return line
def _scan_sections(self):
# looks for special section Header/Data/Reads/Settings or
# any other matching section thst looks like [XXX]
# sections can be of any types of cases (lower, upper, mixed)
current_section = None
data = {}
with open(self.filename, "r") as fin:
for line_count, line in enumerate(fin.readlines()):
line = self._line_cleaner(line, line_count + 1)
if len(line) == 0:
continue
if line.startswith("[") and line.endswith("]"):
name = line.lstrip("[").rstrip("]")
current_section = name
data[current_section] = [] # create empty list just to create the section
else:
if current_section in data:
data[current_section] += [line]
else:
data[current_section] = [line]
self.sections = data
# if the sample sheet starts with an incorrect name
# e.g section not closed by square brackets or empty lines
# the a None key is present and should be ignored or cast
# into a string:
if None in self.sections:
self.sections["None"] = self.sections[None]
del self.sections[None]
# Some cleanup. Since the sections are case insensitive within
# bcl2fastq, we need to convert everything back to titles
self.sections = {k.title(): v for k, v in self.sections.items()}
def _get_df(self):
if "Data" in self.sections:
if self.sections["Data"]:
# cope with the case of comma or semicolon separators.
df1 = pd.read_csv(io.StringIO("\n".join(self.sections["Data"])), index_col=False, sep=",")
df2 = pd.read_csv(io.StringIO("\n".join(self.sections["Data"])), index_col=False, sep=";")
if len(df1.columns) > len(df2.columns):
# rename all columns to lower case
df1.columns = [x.lower() for x in df1.columns]
return df1
else:
df2.columns = [x.lower() for x in df2.columns]
return df2
else:
return pd.DataFrame()
else: # pragma: no cover
return pd.DataFrame()
df = property(_get_df, doc="Returns the [Data] section")
def _get_samples(self):
try:
return self.df["sample_id"].values
except AttributeError: # pragma: no cover
return "No Sample_ID found in the Data header section"
samples = property(_get_samples, doc="returns the sample identifiers as a list")
def _get_version(self):
try:
return self.header["IEMFileVersion"]
except KeyError:
return None
version = property(_get_version, doc="return the version of the IEM file")
[docs] def checker(self):
from sequana.utils.checker import Checker
checks = Checker()
if "Reads" in self.sections:
pass
else:
checks.results.append({"msg": f"The optional [Reads] section is missing", "status": "Warning"})
if "Header" in self.sections:
pass
else:
checks.results.append({"msg": f"The optional [Header] section is missing", "status": "Warning"})
if "Data" in self.sections:
checks.tryme(self._check_data_section_csv_format)
checks.tryme(self._check_optional_data_column_names)
checks.tryme(self._check_mandatory_data_columns)
checks.tryme(self._check_sample_ID)
checks.tryme(self._check_sample_project)
checks.tryme(self._check_unique_sample_ID)
checks.tryme(self._check_unique_indices)
checks.tryme(self._check_nucleotide_indices)
checks.tryme(self._check_sample_lane_number)
checks.tryme(self._check_alpha_numerical)
if "sample_name" in self.df.columns:
checks.tryme(self._check_sample_names)
checks.tryme(self._check_unique_sample_name)
else:
checks.results.append(
{
"name": "check_sample_names",
"msg": f"Column Sample_Name not found in the header of the [Data] section. Recommended.",
"status": "Warning",
}
)
# if data section is incorrect, self.df is not accessible so the following validation
# will fail.
try:
if "index" in self.df.columns:
checks.tryme(self._check_homogene_I7_length)
if "index2" in self.df.columns:
checks.tryme(self._check_homogene_I5_length)
checks.tryme(self._check_homogene_I5_and_I7_length)
except pd.errors.ParserError: # pragma: no cover
pass
else:
checks.results.append({"msg": f"The [Data] section is missing. ", "status": "Error"})
if "Settings" in self.sections:
checks.tryme(self._check_settings)
else:
checks.results.append({"msg": f"The optional [Settings] section is missing", "status": "Warning"})
checks.tryme(self._check_semi_column_presence)
return checks.results
def _check_settings(self):
for k, v in self.settings.items():
# checks ACGT content (no acgt allowed)
if k.lower() in [
x.lower()
for x in [
"Adapter",
"TrimAdapter",
"AdapterRead2",
"TrimAdapterRead2",
"MaskAdapter",
"MaskAdapterRead2",
]
]:
allowed_chars = set("ACGT")
def is_valid_string(s):
return set(s).issubset(allowed_chars)
if is_valid_string(v) is False:
return {
"name": "check_settings",
"msg": f"Invalid nucleotide sequence found for {k} (v)",
"status": "Error",
}
elif k.lower() in [
x.lower()
for x in ["ReverseComplement", "FindAdaptersWithIndels", "TrimUMI", "CreateFastqForIndexReads"]
]:
if v not in ("true", "false", "t", "f", "yes", "no", "y", "n", "1", "0"):
return {
"name": "check_settings",
"msg": f"Invalid valid for {k} ({v}). Must be set to one of : true, false, t, f, yes, no, y, n, 1, 0",
"status": "Error",
}
elif k.lower() in [
x.lower()
for x in [
"Read1StartFromCycle",
"Read2StartFromCycle",
"Read1EndWithCycle",
"Read2EndWithCycle",
"Read1UMILength",
"Read2UMILength",
"Read1UMIStartFromCycle",
"Read2UMIStartFromCycle",
]
]:
allowed_chars = set("0123456789")
def is_valid_int(s):
return set(s).issubset(allowed_chars)
if is_valid_int(v) is False or int(v) < 0:
return {
"name": "check_settings",
"msg": f"Invalid value for {k}. Must be positive. You provided: {v}",
"status": "Error",
}
elif k.lower() in [x.lower() for x in ["ExcludeTiles", "ExcludeTilesLaneX"]]:
allowed_chars = set("0123456789")
def is_valid_int(s):
return set(s).issubset(allowed_chars)
# valid is 1101+1102+1103-1110 (only +-,numbers)
for item in v.split("+"):
for x in item.split("-"):
if is_valid_int(x) is False:
return {
"name": "check_settings",
"msg": f"Invalid value for {k} (v). Must be made of integers, + and - signs. e.g. 1101+1105-1110 to exclude 1101 and values in [1105-1110].",
"status": "Error",
}
else:
return {"name": "check_settings", "msg": f"The [Settings] section looks good", "status": "Success"}
def _check_sample_ID(self):
if "sample_id" in self.df.columns: # optional
# check that names are not in 'all' or 'undetermined'
if (self.df["sample_id"] == "unknown").sum() or (self.df["sample_id"] == "all").sum():
return {
"name": "check_sample_ID",
"msg": "Sample_ID column contains forbidden name ('all' or 'unknown')",
"status": "Error",
}
else:
return {
"name": "check_sample_ID",
"msg": "Sample_ID column (no unknown/undetermined label). Looks correct",
"status": "Success",
}
else:
return {
"name": "check_sample_ID",
"msg": f"Column Sample_ID not found in the header of the [Data] section. All data will be stored in Undetermined.fastq.gz",
"status": "Warning",
}
def _check_sample_names(self):
# check that names are not in 'all' or 'undetermined'
if (self.df["sample_name"] == "unknown").sum() or (self.df["sample_name"] == "undetermined").sum():
return {
"name": "check_sample_names",
"msg": "Sample_Name column contains forbidden name ('all' or 'undetermined')",
"status": "Error",
}
else:
return {
"name": "check_sample_names",
"msg": "Sample_Name column (no unknown/undetermined label). Looks correct",
"status": "Success",
}
def _check_sample_project(self):
if "sample_project" in self.df.columns: # optional
# check that names are not in 'all' or 'undetermined'
if (self.df["sample_project"] == "all").sum() or (self.df["sample_project"] == "default").sum():
return {
"name": "check_sample_project",
"msg": "Sample_Project column contains forbidden name ('all' or 'default')",
"status": "Error",
}
else:
return {
"name": "check_sample_project",
"msg": "Sample_Project column (no all/default label). Looks correct",
"status": "Success",
}
else:
return {
"name": "check_sample_project",
"msg": f"Column Sample_Project not found in the header of the [Data] section. Recommended.",
"status": "Warning",
}
def _check_mandatory_data_columns(self):
# In fact, all columns are optional except index and Sample_Name is optional
# Sample_Project is optional. If provided, fastq are saved in that sub directory.
for column in ["sample_id", "index"]:
if column not in self.df.columns:
return {
"name": "check_mandatory_data_columns",
"msg": f"Mandatory '{column}' column not found in the header of the [Data] section",
"status": "Error",
}
return {
"name": "check_mandatory_data_columns",
"msg": f"Mandatory columns (index, Sample_ID) found in the header of the [Data] section",
"status": "Success",
}
def _check_unique_sample_name(self):
# check that sample names are unique and that sample Names are unique too
if self.df["sample_name"].isnull().sum() > 0:
return {"name": "check_unique_sample_name", "msg": "Some sample names are empty", "status": "Warning"}
elif len(self.df.sample_name) != len(self.df.sample_name.unique()):
duplicated = self.df.sample_name[self.df.sample_name.duplicated()].index
duplicated = ",".join([str(x + 1) for x in duplicated])
return {
"name": "check_unique_sample_name",
"msg": f"Sample_Name not unique. Duplicated entries on lines: {duplicated}",
"status": "Warning",
}
else:
return {"name": "check_unique_sample_name", "msg": "Sample name uniqueness", "status": "Success"}
def _check_unique_sample_ID(self):
# check that sample names are unique and that sample Names are unique too
if "sample_id" not in self.df.columns:
return {
"name": "check_unique_sample_ID",
"msg": "Sample ID not found in the header of the [Data] section",
"status": "Warning",
}
if len(self.df["sample_id"]) != len(self.df["sample_id"].unique()):
duplicated = self.df.sample_id[self.df.sample_id.duplicated()].index
duplicated = ",".join([str(x + 1) for x in duplicated])
return {
"name": "check_unique_sample_ID",
"msg": f"Sample ID not unique. Duplicated entries on lines: {duplicated}",
"status": "Error",
}
else:
return {"name": "check_unique_sample_ID", "msg": "Sample ID uniqueness", "status": "Success"}
def _check_sample_lane_number(self):
if "sample_lane" in self.df.columns:
# Define the allowed lanes
allowed_chars = set("12345678")
def is_valid_lane(s):
return set(str(s)).issubset(allowed_chars)
# Apply the function to the DataFrame
invalid_lanes = list(self.df[~self.df["sample_lane"].apply(is_valid_lane)].index)
if len(invalid_lanes):
invalid_lanes = [x + 1 for x in invalid_lanes]
return {
"name": "check_sample_lane_number",
"msg": f"Incorrect lane number in these rows: {invalid_lanes}. Must be in the range 1-8",
"status": "Error",
}
return {"name": "check_sample_lane_number", "msg": "Correct lane number range", "status": "Success"}
def _check_nucleotide_indices(self):
# Define the allowed characters
allowed_chars = set("ACGTN")
def is_valid_string(s):
try:
return set(s).issubset(allowed_chars)
except:
return False
# Apply the function to the DataFrame
invalid_rows = list(self.df[~self.df["index"].apply(is_valid_string)].index)
if "index2" in self.df.columns:
invalid_rows += list(self.df[~self.df["index2"].apply(is_valid_string)].index)
if len(invalid_rows):
invalid_rows = [x + 1 for x in invalid_rows]
return {
"name": "check_nucleotide_indices",
"msg": f"these rows have invalid index with wrong nucleotides {invalid_rows}",
"status": "Error",
}
return {"msg": "Indices are made of A, C, G, T, N.", "status": "Success"}
def _check_unique_indices(self):
if "index2" in self.df.columns:
indices = self.df["index"] + "," + self.df["index2"]
msg = "You have duplicated index I7/I5."
elif "index" in self.df.columns:
indices = self.df["index"]
msg = "You have duplicated index I7."
else:
return {
"name": "check_unique_indices",
"msg": f"column 'index' not found in the header of the [Data] section.",
"status": "Error",
}
if indices.duplicated().sum() > 0:
duplicated = indices[indices.duplicated()].values
try:
IDs = self.df[indices.duplicated()].sample_id.values
IDs = ", ".join([str(x) for x in IDs])
IDs = f"related to sample IDs: {IDs}"
except Exception as err: # pragma: no cover
IDs = ""
return {"name": "check_unique_indices", "msg": f"{msg} {duplicated} {IDs}", "status": "Error"}
else:
return {"name": "check_unique_indices", "msg": "Indices are unique.", "status": "Success"}
def _check_optional_data_column_names(self):
msg = ""
warnings = []
# check whether minimal columns are included
for x in [
"i7_index_id",
"sample_project",
"description",
"sample_plate",
"sample_well",
"lane",
"index_plate",
"index_plate_well",
]:
if x not in self.df.columns:
warnings.append(x)
if len(warnings):
warnings = ",".join(warnings)
msg = f"Some columns are missing in the [Data] section: {warnings}"
return {"msg": msg, "status": "Warning"}
else: # pragma: no cover
return {"msg": "Columns of the data section looks good", "status": "Success"}
def _get_data_length(self):
N = len(set([x.count(",") for x in self.sections["Data"]]))
if len(self.sections["Data"]) >= 2 and N == 1:
return True
else:
return False
def _check_homogene_I7_length(self):
if self._get_data_length():
if len(self.df) == 1:
L = len(self.df["index"].values[0].strip())
if L:
return {"msg": f"Only one sample. Index length is {L}", "status": "Success"}
else:
return {"msg": f"Only one sample. Index length is {L}", "status": "Error"}
diff = self.df["index"].apply(lambda x: len(x)).std()
if diff == 0:
return {"msg": "Indices length in I7 have same lengths", "status": "Success"}
else:
return {"msg": "Indices length in I7 have different lengths", "status": "Error"}
else:
return {"msg": "Indices length could not be read.", "status": "Warning"}
def _check_homogene_I5_length(self):
if self._get_data_length():
if len(self.df) == 1:
L = len(self.df["index2"].values[0].strip())
if L:
return {"msg": f"Only one sample. Index length for I5 is {L}", "status": "Success"}
else:
return {"msg": f"Only one sample. Index length for I5 is {L}", "status": "Error"}
diff = self.df["index2"].apply(lambda x: len(x)).std()
if diff == 0:
return {"msg": "Indices length in I5 have same lengths", "status": "Success"}
else:
return {"msg": "Indices length in I5 have different lengths", "status": "Error"}
else:
return {"msg": "Indices length could not be read. ", "status": "Warning"}
def _check_homogene_I5_and_I7_length(self):
# this is a warning only since you may have custom index
lengths = [len(x) for x in self.df["index"]] + [len(x) for x in self.df["index2"]]
lengths = list(set(lengths))
L = lengths[0]
if len(lengths) == 1:
return {"msg": f"I5 and I7 have coherent length of {L}", "status": "Success"}
else:
return {"msg": "I5 and I7 have different lengths", "status": "Warning"}
return {"msg": "Indices length could not be read. ", "status": "Success"}
def _check_data_section_csv_format(self):
N = len(set([x.count(",") for x in self.sections["Data"]]))
if N == 1: # looks correct
if len(self.sections["Data"]) == 1:
return {
"name": "check_data_section_csv_format",
"msg": "The [Data] section CSV format looks empty. Remove if of fill it.",
"status": "Error",
}
else:
return {
"name": "check_data_section_csv_format",
"msg": "The [Data] section CSV format looks correct.",
"status": "Success",
}
elif N == 0:
return {
"name": "check_data_section_csv_format",
"msg": "The [Data] section CSV format looks empty. Remove it of fill it",
"status": "Error",
}
else:
lengths = set([x.count(",") for x in self.sections["Data"]])
return {
"name": "check_data_section_csv_format",
"msg": f"The [Data] section has lines with different number of entries {lengths}. Probably missing or commas in the [Data] section.",
"status": "Error",
}
def _check_semi_column_presence(self):
with open(self.filename, "r") as fp:
line_count = 1
for line in fp.readlines():
if line.rstrip().endswith(";"):
return {
"name": "check_semi_column_presence",
"msg": f"suspicous ; at the end of line ({line_count})",
"status": "Error",
}
line_count += 1
return {"name": "check_semi_column_presence", "msg": "No extra semi column found.", "status": "Success"}
def _check_alpha_numerical(self):
for column in ["sample_id", "sample_name", "sample_project"]:
if column not in self.df.columns:
continue
for i, x in enumerate(self.df[column].values):
status = str(x).replace("-", "").replace("_", "").isalnum()
if status is False:
msg = f"type error: wrong {column} name in [Data] section (line {i+1}). Must be made of alpha numerical characters, _, and - characters only. Found {x}"
return {"msg": msg, "name": "check_alpha_numerical", "status": "Error"}
return {
"name": "check_alpha_numerical",
"msg": "sample names and ID looks correct in the Sample_ID, Sample_Name, and Project column (alpha numerical and - or _ characters)",
"status": "Success",
}
[docs] def validate(self):
"""This method checks whether the sample sheet is correctly formatted
Checks for:
* presence of ; at the end of lines indicated an edition with excel that
wrongly transformed the data into a pure CSV file
* inconsistent numbers of columns in the [DATA] section, which must be
CSV-like section
* Extra lines at the end are ignored
* special characters are forbidden except - and _
* checks for Sample_ID column uniqueness
* checks for index uniqueness (if single index)
* checks for combo of dual indices uniqueness
* checks that sample names are unique
and raise a SystemExit error on the first found error.
"""
# aggregates all checks
checks = self.checker()
# Stop after first error
for check in checks:
if check["status"] == "Error":
sys.exit("\u274C " + str(check["msg"]))
def _get_settings(self):
data = {}
for line in self.sections["Settings"]:
key, value = line.split(",")
data[key] = value
return data
settings = property(_get_settings)
def _get_header(self):
data = {}
for line in self.sections["Header"]:
key, value = line.split(",", 1)
data[key] = value
return data
header = property(_get_header)
def _get_instrument(self):
try:
return self.header["Instrument Type"]
except KeyError:
return None
instrument = property(_get_instrument, doc="returns instrument name")
def _get_adapter_kit(self):
try:
return self.header["Index Adapters"]
except KeyError:
return None
index_adapters = property(_get_adapter_kit, doc="returns index adapters")
[docs] def to_fasta(self, adapter_name=""):
"""Extract adapters from [Adapter] section and print them as a fasta file"""
ar1 = self.settings["Adapter"]
try:
ar2 = self.settings["AdapterRead2"]
except KeyError:
ar2 = ""
for name, index in zip(self.df["i7_index_id"], self.df["index"]):
read = f"{ar1}{index}{ar2}"
frmt = {"adapter": adapter_name, "name": name, "index": index}
print(">{adapter}_index_{name}|name:{name}|seq:{index}".format(**frmt))
print(read)
if "index2" in self.df.columns:
for name, index in zip(self.df["i5_index_id"], self.df["index2"]):
read = f"{ar1}{index}{ar2}"
frmt = {"adapter": adapter_name, "name": name, "index": index}
print(">{adapter}_index_{name}|name:{name}|seq:{index}".format(**frmt))
print(read)
[docs] def quick_fix(self, output_filename):
"""Fix sample sheet
Tyical error is when users save the samplesheet as CSV file in excel.
This may add trailing ; characters at the end of section, which raises error
in bcl2fastq.
"""
found_data = False
with open(self.filename) as fin:
with open(output_filename, "w") as fout:
for line in fin.readlines():
if line.startswith("[Data]"):
found_data = True
if found_data:
line = line.replace(";", ",")
else:
line = line.strip().rstrip(";")
line = line.replace(";", ",")
line = line.strip().rstrip(",")
fout.write(line.strip("\n") + "\n")
class BCLConvert:
"""BCLconvert is a replacement for the bcl2fastq software
The format are quite similar but there are differences as explained in the
official Illumina document.
https://support.illumina.com/sequencing/sequencing_software/bcl-convert/compatibility.html
https://knowledge.illumina.com/software/general/software-general-reference_material-list/000003710
Some differences with bcl2fastq
- sample ID compulsary --> error if absent
- sample Name ignored --> warning if present
- fastq header. filter is set to N and control bit to 0. missing instrument is not supported -->error
- V1 and V2 accepted.
- Note: At least one Sample_ID is required in the Data section of the Sample Sheet. --> error
for barcode-mismatch, this is now in the samplesheet with
BarcodeMismatchIndex1, #
BarcodeMismatchIndex2, #
for trmimming, masking:
Adapter --> Changed to AdapterRead1
AdapterRead2 unchanged
Many options that were on command lines are now in the sample sheet
e.g. --minimum-trimmed-read-length is now MinimumTrimmedReadLength
Supports both [Settings] and [settings]. Neither are required.
Supports only [BCLConvert_Settings]. Required.
"""
def __init__(self):
pass
# obsolet names are
# Adapter,TrimAdapter,MaskAdapter,MaskAdapterRead2,TrimAdapter,Read1StartFromCycle,Read1EndWithCycle,
# Read1UMIStartFromCycle,Read1UMILength,Read1StartFromCycle,,Read2UMIStartFromCycle,Read2UMILength,Read2StartFromCycle
[docs]class IEM(SampleSheet):
def __init__(self, filename):
super().__init__(filename)
logger.warning("IEM class is deprecated. Use SampleShee instead.")