Commit d69651af authored by Chris Cheshire's avatar Chris Cheshire
Browse files

Restored check samplesheet

parent 4fe2ab19
Loading
Loading
Loading
Loading
+27 −110
Original line number Diff line number Diff line
@@ -5,11 +5,13 @@

import os
import sys
from collections import Counter
from pathlib import Path
import errno
import argparse


logger = logging.getLogger()
def parse_args(args=None):
    Description = "Reformat nf-core/cutandrun samplesheet file and check its contents."
    Epilog = "Example usage: python check_samplesheet.py <FILE_IN> <FILE_OUT>"

    parser = argparse.ArgumentParser(description=Description, epilog=Epilog)
    parser.add_argument("FILE_IN", help="Input samplesheet file.")
@@ -17,108 +19,29 @@ logger = logging.getLogger()
    parser.add_argument("IGG", help="Boolean for whether or not igg is given")
    return parser.parse_args(args)

class RowChecker:
    """
    Define a service that can validate and transform each given row.

    Attributes:
        modified (list): A list of dicts, where each dict corresponds to a previously
            validated and transformed row. The order of rows is maintained.

    """

    VALID_FORMATS = (
        ".fq.gz",
        ".fastq.gz",
    )

    def __init__(
        self,
        sample_col="sample",
        first_col="fastq_1",
        second_col="fastq_2",
        single_col="single_end",
        **kwargs,
    ):
        """
        Initialize the row checker with the expected column names.

        Args:
            sample_col (str): The name of the column that contains the sample name
                (default "sample").
            first_col (str): The name of the column that contains the first (or only)
                FASTQ file path (default "fastq_1").
            second_col (str): The name of the column that contains the second (if any)
                FASTQ file path (default "fastq_2").
            single_col (str): The name of the new column that will be inserted and
                records whether the sample contains single- or paired-end sequencing
                reads (default "single_end").

        """
        super().__init__(**kwargs)
        self._sample_col = sample_col
        self._first_col = first_col
        self._second_col = second_col
        self._single_col = single_col
        self._seen = set()
        self.modified = []

    def validate_and_transform(self, row):
        """
        Perform all validations on the given row and insert the read pairing status.

        Args:
            row (dict): A mapping from column headers (keys) to elements of that row
                (values).
def make_dir(path):
    if len(path) > 0:
        try:
            os.makedirs(path)
        except OSError as exception:
            if exception.errno != errno.EEXIST:
                raise exception

        """
        self._validate_sample(row)
        self._validate_first(row)
        self._validate_second(row)
        self._validate_pair(row)
        self._seen.add((row[self._sample_col], row[self._first_col]))
        self.modified.append(row)

    def _validate_sample(self, row):
        """Assert that the sample name exists and convert spaces to underscores."""
        assert len(row[self._sample_col]) > 0, "Sample input is required."
        # Sanitize samples slightly.
        row[self._sample_col] = row[self._sample_col].replace(" ", "_")

    def _validate_first(self, row):
        """Assert that the first FASTQ entry is non-empty and has the right format."""
        assert len(row[self._first_col]) > 0, "At least the first FASTQ file is required."
        self._validate_fastq_format(row[self._first_col])

    def _validate_second(self, row):
        """Assert that the second FASTQ entry has the right format if it exists."""
        if len(row[self._second_col]) > 0:
            self._validate_fastq_format(row[self._second_col])

    def _validate_pair(self, row):
        """Assert that read pairs have the same file extension. Report pair status."""
        if row[self._first_col] and row[self._second_col]:
            row[self._single_col] = False
            assert (
                Path(row[self._first_col]).suffixes == Path(row[self._second_col]).suffixes
            ), "FASTQ pairs must have the same file extensions."
        else:
            row[self._single_col] = True

    def _validate_fastq_format(self, filename):
        """Assert that a given filename has one of the expected FASTQ extensions."""
        assert any(filename.endswith(extension) for extension in self.VALID_FORMATS), (
            f"The FASTQ file has an unrecognized extension: {filename}\n"
            f"It should be one of: {', '.join(self.VALID_FORMATS)}"
def print_error(error, context="Line", context_str=""):
    error_str = "ERROR: Please check samplesheet -> {}".format(error)
    if context != "" and context_str != "":
        error_str = "ERROR: Please check samplesheet -> {}\n{}: '{}'".format(
            error, context.strip(), context_str.strip()
        )
    print(error_str)
    sys.exit(1)

    def validate_unique_samples(self):
        """
        Assert that the combination of sample name and FASTQ filename is unique.

def check_samplesheet(file_in, file_out, igg_control):
    """
    Detect the tabular format.
    This function checks that the samplesheet follows the following structure:

    group,replicate,control_group,fastq_1,fastq_2
    WT,1,1,WT_LIB1_REP1_1.fastq.gz,WT_LIB1_REP1_2.fastq.gz
@@ -128,14 +51,6 @@ def check_samplesheet(file_in, file_out, igg_control):
    IGG,1,1,KO_LIB1_REP1_1.fastq.gz,IGG_LIB1_REP1_2.fastq.gz
    IGG,2,2,KO_LIB1_REP1_1.fastq.gz,IGG_LIB1_REP1_2.fastq.gz
    """
    peek = handle.read(2048)
    sniffer = csv.Sniffer()
    if not sniffer.has_header(peek):
        logger.critical(f"The given sample sheet does not appear to contain a header.")
        sys.exit(1)
    dialect = sniffer.sniff(peek)
    handle.seek(0)
    return dialect

    igg_present = False

@@ -150,8 +65,9 @@ def check_samplesheet(file_in, file_out, igg_control):
            print("ERROR: Please check samplesheet header -> {} != {}".format(",".join(header), ",".join(HEADER)))
            sys.exit(1)

    Validate the general shape of the table, expected columns, and each row. Also add
    an additional column which records whether one or two FASTQ reads were found.
        ## Check sample entries
        for line in fin:
            lspl = [x.strip().strip('"') for x in line.strip().split(",")]

            if not igg_present:
                if 'igg' in lspl:
@@ -216,7 +132,7 @@ def check_samplesheet(file_in, file_out, igg_control):
                else:
                    sample_run_dict[sample][replicate].append(sample_info)

    ## Check igg_control parameter is consistent with input groups
    ## Check igg_control parameter is consistent with input groups
    if (igg_control == 'true' and not igg_present):
        print("ERROR: No 'igg' group was found in " + str(file_in) + " If you are not supplying an IgG control, please specify --igg_control 'false' on command line.")
        sys.exit(1)
@@ -225,7 +141,7 @@ def check_samplesheet(file_in, file_out, igg_control):
        print("ERROR: Parameter --igg_control was set to false, but an 'igg' group was found in " + str(file_in) + ".")
        sys.exit(1)

    ## Check control groups have unique ids that are the same as their replicate ids
    ## Check control groups have unique ids that are the same as their replicate ids
    control_group_ids = []
    if igg_present:
        for key, data in sample_run_dict["igg"].items():
@@ -293,5 +209,6 @@ def main(args=None):
    args = parse_args(args)
    check_samplesheet(args.FILE_IN, args.FILE_OUT, args.IGG)


if __name__ == "__main__":
    sys.exit(main())
 No newline at end of file