diff --git a/assets/samplesheet_assembly_coassembly_no_coverage.csv b/assets/samplesheet_assembly_coassembly_no_coverage.csv new file mode 100644 index 0000000..db29939 --- /dev/null +++ b/assets/samplesheet_assembly_coassembly_no_coverage.csv @@ -0,0 +1,2 @@ +sample,fasta,fastq_1,fastq_2,coverage,run_accession,assembler,assembler_version +no_coverage_paired_reads,https://github.com/nf-core/test-datasets/raw/modules/data/genomics/prokaryotes/bacteroides_fragilis/genome/genome.fna.gz,https://github.com/nf-core/test-datasets/raw/modules/data/genomics/prokaryotes/bacteroides_fragilis/illumina/fastq/test1_1.fastq.gz;https://github.com/nf-core/test-datasets/raw/modules/data/genomics/prokaryotes/bacteroides_fragilis/illumina/fastq/test2_1.fastq.gz,https://github.com/nf-core/test-datasets/raw/modules/data/genomics/prokaryotes/bacteroides_fragilis/illumina/fastq/test1_2.fastq.gz;https://github.com/nf-core/test-datasets/raw/modules/data/genomics/prokaryotes/bacteroides_fragilis/illumina/fastq/test2_2.fastq.gz,,SRR3183850;SRR3183951,SPAdes,3.15 diff --git a/assets/samplesheet_assembly_coassembly_with_coverage.csv b/assets/samplesheet_assembly_coassembly_with_coverage.csv new file mode 100644 index 0000000..cf9cab3 --- /dev/null +++ b/assets/samplesheet_assembly_coassembly_with_coverage.csv @@ -0,0 +1,2 @@ +sample,fasta,fastq_1,fastq_2,coverage,run_accession,assembler,assembler_version +coassembly_sample,https://github.com/nf-core/test-datasets/raw/modules/data/genomics/prokaryotes/bacteroides_fragilis/genome/genome.fna.gz,,,45.5,SRR3183850;SRR3183951,SPAdes,3.15 diff --git a/assets/schema_input_assembly.json b/assets/schema_input_assembly.json index 2b5fcea..929589a 100644 --- a/assets/schema_input_assembly.json +++ b/assets/schema_input_assembly.json @@ -25,33 +25,29 @@ "anyOf": [ { "type": "string", - "format": "file-path", - "exists": true, - "pattern": "^\\S+\\.(fq|fastq)(\\.gz)?$" + "pattern": "^\\S+\\.(fq|fastq)(\\.gz)?(;\\S+\\.(fq|fastq)(\\.gz)?)*$" }, { "type": "string", "maxLength": 0 } ], - "errorMessage": "FASTQ file must have extension '.fq' or '.fastq' (optionally gzipped)", - "description": "Forward reads if paired-end or single-end reads FASTQ file" + "errorMessage": "FASTQ file(s) must have extension '.fq' or '.fastq' (optionally gzipped); for co-assemblies use semicolon-separated paths", + "description": "Forward reads if paired-end or single-end reads FASTQ file. For co-assemblies, semicolon-separated paths (e.g. run1_R1.fq.gz;run2_R1.fq.gz)" }, "fastq_2": { "anyOf": [ { "type": "string", - "format": "file-path", - "exists": true, - "pattern": "^\\S+\\.(fq|fastq)(\\.gz)?$" + "pattern": "^\\S+\\.(fq|fastq)(\\.gz)?(;\\S+\\.(fq|fastq)(\\.gz)?)*$" }, { "type": "string", "maxLength": 0 } ], - "errorMessage": "FASTQ file for reverse reads must have extension '.fq' or '.fastq' (optionally gzipped)", - "description": "Reverse reads FASTQ file if paired-end. Leave empty for single-end reads" + "errorMessage": "FASTQ file(s) for reverse reads must have extension '.fq' or '.fastq' (optionally gzipped); for co-assemblies use semicolon-separated paths", + "description": "Reverse reads FASTQ file if paired-end. Leave empty for single-end reads. For co-assemblies, semicolon-separated paths (e.g. run1_R2.fq.gz;run2_R2.fq.gz)" }, "coverage": { "anyOf": [ @@ -71,7 +67,7 @@ "type": "string", "pattern": "^\\S+$", "errorMessage": "Accession must be provided and cannot contain spaces", - "description": "Accession of the run used to generate the assembly" + "description": "Accession(s) of the run(s) used to generate the assembly. For co-assemblies, use semicolon-separated values (e.g. ERR000001;ERR000002)" }, "assembler": { "type": "string", diff --git a/bin/register_coassembly_sample.py b/bin/register_coassembly_sample.py new file mode 100755 index 0000000..f3cf212 --- /dev/null +++ b/bin/register_coassembly_sample.py @@ -0,0 +1,481 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import csv +import hashlib +import logging +import os +import xml.etree.ElementTree as ET +import re + +import requests +from requests.auth import HTTPBasicAuth +from retry import retry + +logger = logging.getLogger("register_coassembly_sample") + + +RUN_XML_API = "https://www.ebi.ac.uk/ena/browser/api/xml/{accession}" +SAMPLE_XML_API = "https://www.ebi.ac.uk/ena/browser/api/xml/{accession}" +SAMPLE_PORTAL_API = "https://www.ebi.ac.uk/ena/portal/api/search" +CHECKLIST_XML_API = "https://www.ebi.ac.uk/ena/browser/api/xml/ERC000011" + +TEST_SUBMIT_URL = "https://wwwdev.ebi.ac.uk/ena/submit/drop-box/submit/" +PROD_SUBMIT_URL = "https://www.ebi.ac.uk/ena/submit/drop-box/submit/" + +INSDC_BIOSAMPLE_ACCESSION_REGEX = re.compile( + r"SAM[AG]?[0-9]+" +) +EXISTING_ACCESSION_IN_ERROR_REGEX = re.compile( + r'accession:\s*"([A-Z]+[0-9]+)"', + re.IGNORECASE, +) + +DEFAULT_NA_VALUE = "not provided" + + +class CoassemblyRegistrationError(RuntimeError): + """Raised for hard validation/submission errors.""" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Register co-assembly samples.") + parser.add_argument("--input", required=True, help="Input assembly metadata CSV") + parser.add_argument("--output", required=True, help="Output assembly metadata CSV") + parser.add_argument( + "--test", + action="store_true", + help="Register sample using the test submission endpoint", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable verbose debug logging", + ) + parser.add_argument("--default-country", type=str, help="Default country to use if values differ or are missing.") + parser.add_argument("--default-date", type=str, help="Default collection date to use if values differ or are missing.") + parser.add_argument("--default-taxid", type=str, help="Default taxon ID to use if values differ or are missing.") + parser.add_argument("--default-tax-name", type=str, help="Default taxon name to use if values differ or are missing.") + return parser.parse_args() + + +def setup_logging(debug: bool) -> None: + """Configure logging level and format based on CLI flags.""" + level = logging.DEBUG if debug else logging.INFO + logging.basicConfig(level=level, format="%(levelname)s: %(message)s", force=True) + if debug: + logging.getLogger("requests").setLevel(logging.INFO) + logging.getLogger("urllib3").setLevel(logging.INFO) + logging.getLogger("requests.packages.urllib3").setLevel(logging.INFO) + logger.debug("Debug logging enabled") + + +def get_credentials() -> tuple[str, str]: + username = os.environ.get("ENA_WEBIN", "").strip() + password = os.environ.get("ENA_WEBIN_PASSWORD", "").strip() + if not username or not password: + raise CoassemblyRegistrationError( + "Missing credentials. Set ENA_WEBIN/ENA_WEBIN_PASSWORD." + ) + return username, password + + +@retry(tries=3, delay=2, backoff=2) +def get_xml(url: str) -> ET.Element: + logger.debug(f"Requesting XML URL: {url}") + response = requests.get(url, timeout=60) + response.raise_for_status() + return ET.fromstring(response.content) + +@retry(tries=3, delay=2, backoff=2) +def get_json(url: str, params: dict[str, str]) -> list[dict[str, str]]: + prepared = requests.Request("GET", url, params=params).prepare() + logger.debug(f"Requesting JSON URL: {prepared.url}") + response = requests.get(url, params=params, timeout=60) + response.raise_for_status() + return response.json() + +def to_pretty_xml(root: ET.Element) -> str: + """Serialize an XML element to a pretty-printed UTF-8 XML string.""" + # Re-parse to avoid side effects from in-place indentation on caller-owned trees. + normalized_root = ET.fromstring(ET.tostring(root, encoding="utf-8")) + ET.indent(normalized_root, space=" ") + return ET.tostring(normalized_root, encoding="utf-8", xml_declaration=True).decode("utf-8") + +@retry(tries=3, delay=2, backoff=2) +def submit_sample_xml(auth: HTTPBasicAuth, submit_url: str, sample_xml: str) -> ET.Element: + submission_xml = """\n\n""" + files = { + "SUBMISSION": ("submission.xml", submission_xml, "application/xml"), + "SAMPLE": ("sample.xml", sample_xml, "application/xml"), + } + response = requests.post(submit_url, files=files, auth=auth, timeout=120) + response.raise_for_status() + return ET.fromstring(response.content) + +def merge_or_not_provided(values: list[str], default_value=None) -> str: + """Merge values or return default_value or DEFAULT_NA_VALUE.""" + unique_values = sorted(set(values)) + logger.debug(f"Merging values: {unique_values}") + if len(unique_values) == 1 and unique_values[0]: + logger.info(f"Merge decision: using unique value '{unique_values[0]}'") + return unique_values[0] + elif default_value: + logger.info(f"Merge decision: using default value '{default_value}' due to multiple/conflicting values or missing data") + return default_value + else: + logger.info(f"Merge decision: '{DEFAULT_NA_VALUE}' (multiple conflicting values or some missing)") + return DEFAULT_NA_VALUE + +def get_run_sample_from_xml(run_accession: str) -> tuple[str, str | None]: + """ + Retrieve the sample accession from the RUN XML for a given run accession. + + Args: + run_accession (str): The run accession to query. + + Returns: + tuple[str, str | None]: The sample accession and an optional secondary sample accession. + + Raises: + CoassemblyRegistrationError: If the RUN XML does not contain a sample accession. + """ + run_root = get_xml(RUN_XML_API.format(accession=run_accession)) + sample_ref = run_root.find(".//RUN_LINKS/RUN_LINK/XREF_LINK[DB='ENA-SAMPLE']/ID") + + if sample_ref is not None and sample_ref.text: + sample_accession = sample_ref.text.strip() + else: + raise CoassemblyRegistrationError( + f"Run {run_accession} has no sample accession in RUN XML (ENA-SAMPLE link)" + ) + + return sample_accession + + +def get_sample_metadata(sample_accession: str) -> tuple[str, str, str, str]: + """ + Retrieve metadata for a given sample accession from the ENA portal. + + Args: + sample_accession (str): The sample accession to query. + + Returns: + tuple[str, str, str, str]: A tuple containing tax_id, scientific_name, country, and collection_date. + """ + if INSDC_BIOSAMPLE_ACCESSION_REGEX.match(sample_accession): + query = f'"sample_accession={sample_accession}"' + else: + query = f'"secondary_sample_accession={sample_accession}"' + + rows = get_json( + SAMPLE_PORTAL_API, + params={ + "result": "sample", + "query": query, + "fields": "country,collection_date,tax_id,scientific_name", + "format": "json", + }, + ) + + if not rows: + logger.warning(f"No portal metadata found for sample {sample_accession}") + return "", "", "", "" + + row = rows[0] + return ( + row.get("tax_id"), + row.get("scientific_name"), + row.get("country"), + row.get("collection_date"), + ) + +def get_allowed_countries() -> set[str]: + """ + Retrieve the set of allowed countries from the ENA checklist XML. + + Returns: + set[str]: A set of allowed country/sea values. + """ + root = get_xml(CHECKLIST_XML_API) + values = set() + field_nodes = root.findall(".//FIELD") + for field in field_nodes: + label = field.findtext("LABEL") + name = field.findtext("NAME") + if label == "geographic location (country and/or sea)" or name == "geographic_location_country_andor_sea": + for entry in field.findall(".//TEXT_VALUE/VALUE"): + if entry.text: + values.add(entry.text.strip()) + break + return values + +def build_sample_xml( + alias: str, + title: str, + description: str, + taxon_id: str, + scientific_name: str, + composed_of: str, + collection_date: str, + geographic_location: str, +) -> str: + """ + Build the XML representation of a sample for ENA submission. + + Args: + alias (str): The sample alias. + title (str): The sample title. + description (str): The sample description. + taxon_id (str): The taxon ID of the sample. + scientific_name (str): The scientific name of the sample. + composed_of (str): The list of source samples. + collection_date (str): The collection date of the sample. + geographic_location (str): The geographic location of the sample. + + Returns: + str: The XML string representation of the sample. + """ + root = ET.Element("SAMPLE_SET") + sample = ET.SubElement(root, "SAMPLE", {"alias": alias}) + + title_node = ET.SubElement(sample, "TITLE") + title_node.text = title + + sample_name = ET.SubElement(sample, "SAMPLE_NAME") + taxon = ET.SubElement(sample_name, "TAXON_ID") + taxon.text = taxon_id + sci = ET.SubElement(sample_name, "SCIENTIFIC_NAME") + sci.text = scientific_name + + description_node = ET.SubElement(sample, "DESCRIPTION") + description_node.text = description + + attrs = ET.SubElement(sample, "SAMPLE_ATTRIBUTES") + + def add_attr(tag: str, value: str) -> None: + item = ET.SubElement(attrs, "SAMPLE_ATTRIBUTE") + tag_node = ET.SubElement(item, "TAG") + tag_node.text = tag + value_node = ET.SubElement(item, "VALUE") + value_node.text = value + + add_attr("ENA-CHECKLIST", "ERC000011") + add_attr("organism", scientific_name) + add_attr("collection date", collection_date) + add_attr("composed of", composed_of) + add_attr("geographic location (country and/or sea)", geographic_location) + add_attr("scientific_name", scientific_name) + + return to_pretty_xml(root) + + +def build_virtual_sample_alias(source_samples: list[str], max_length: int = 50) -> str: + """Build deterministic alias from source samples with an md5 suffix.""" + sorted_samples = sorted(source_samples) + hash8 = hashlib.md5(",".join(sorted_samples).encode("utf-8")).hexdigest()[:8] + + first_two = sorted_samples[:2] + remaining = len(sorted_samples) - len(first_two) + + alias_core = f"coassembly_{'_'.join(first_two)}" + if remaining > 0: + alias_core = f"{alias_core}_{remaining}_others" + + alias = f"{alias_core}_{hash8}" + if len(alias) <= max_length: + return alias + + raise CoassemblyRegistrationError( + f"Hash suffix '{alias}' exceeds maximum alias length of {max_length}" + ) + + +def extract_accession_from_receipt(receipt: ET.Element) -> str: + """ + Extract the sample accession from the ENA submission receipt XML. + + Args: + receipt (ET.Element): The root element of the receipt XML. + + Returns: + str: The sample accession. + + Raises: + CoassemblyRegistrationError: If the receipt does not contain a sample accession. + """ + success = receipt.attrib.get("success", "false").lower() == "true" + if not success: + messages = [m.text for m in receipt.findall(".//MESSAGES/*") if m.text] + message_text = "; ".join(messages) if messages else "no error details in receipt" + + # If this is a duplicate submission, ENA returns the existing accession. + existing_match = EXISTING_ACCESSION_IN_ERROR_REGEX.search(message_text) + if existing_match and "already exists" in message_text.lower(): + existing_accession = existing_match.group(1) + logger.info(f"Virtual sample already exists; reusing accession: {existing_accession}") + return existing_accession + + raise CoassemblyRegistrationError( + f"ENA sample submission failed: {message_text}" + ) + + sample_node = receipt.find(".//SAMPLE") + if sample_node is not None: + ena_accession = sample_node.get("accession", "") + if ena_accession: + return ena_accession + + raise CoassemblyRegistrationError("ENA sample submission succeeded but no SAMPLE accession found in receipt XML") + + +def register_virtual_sample(run_accessions: list[str], test: bool, default_country=None, default_date=None, default_taxid=None, default_tax_name=None) -> str: + """ + Register a virtual sample for co-assemblies based on run accessions. + + Args: + run_accessions (list[str]): A list of run accessions for the co-assembly. + test (bool): Whether to use the test submission endpoint. + + Returns: + str: The sample accession of the registered virtual sample. + + Raises: + CoassemblyRegistrationError: If validation or submission fails. + """ + if test: + submit_url = TEST_SUBMIT_URL + else: + submit_url = PROD_SUBMIT_URL + + sample_accessions = [] + taxon_ids = [] + scientific_names = [] + countries = [] + collection_dates = [] + + for run in run_accessions: + sample_acc = get_run_sample_from_xml(run) + logger.debug(f"Run {run} is linked to sample {sample_acc}") + sample_accessions.append(sample_acc) + + taxon_id, scientific_name, country, collection_date = get_sample_metadata(sample_acc) + logger.debug(f"Metadata for sample {sample_acc}: tax_id={taxon_id}, scientific_name={scientific_name}, country={country}, collection_date={collection_date}") + if not taxon_id: + raise CoassemblyRegistrationError(f"Sample {sample_acc} has no tax_id in ENA portal response") + if not scientific_name: + raise CoassemblyRegistrationError(f"Sample {sample_acc} has no scientific_name in ENA portal response") + taxon_ids.append(taxon_id) + scientific_names.append(scientific_name) + countries.append(country) + collection_dates.append(collection_date) + + unique_samples = sorted(set(sample_accessions)) + if len(unique_samples) == 1: + logger.info(f"All source runs belong to one sample ({unique_samples[0]}); keeping original row unchanged") + return "" + + merged_collection_date = merge_or_not_provided(collection_dates, default_date) + merged_country = merge_or_not_provided(countries, default_country) + merged_taxid = merge_or_not_provided(taxon_ids, default_taxid) + merged_tax_name = merge_or_not_provided(scientific_names, default_tax_name) + + if merged_taxid == DEFAULT_NA_VALUE: + raise CoassemblyRegistrationError( + f"Co-assembly source samples have mixed taxa: {', '.join(taxon_ids)}" + ) + + if merged_tax_name == DEFAULT_NA_VALUE: + raise CoassemblyRegistrationError( + f"Co-assembly source samples have mixed scientific names: {', '.join(scientific_names)}" + ) + + allowed_countries = get_allowed_countries() + if merged_country != DEFAULT_NA_VALUE and merged_country not in allowed_countries: + logger.warning( + f"Country '{merged_country}' is not in ERC000011 allowed country/sea values; setting to '{DEFAULT_NA_VALUE}'" + ) + merged_country = DEFAULT_NA_VALUE + + sample_list = ",".join(unique_samples) + title = f"Combined sample from {sample_list}" + description = ( + "This sample is a virtual sample of co-assembled raw reads from multiple samples " + f"of {merged_tax_name}. Co-assembly was performed from runs of the samples {sample_list}" + ) + alias = build_virtual_sample_alias(unique_samples) + logger.info(f"Registering virtual sample with alias '{alias}' for co-assembly of samples: {sample_list}") + + sample_xml = build_sample_xml( + alias=alias, + title=title, + description=description, + taxon_id=merged_taxid, + scientific_name=merged_tax_name, + composed_of=sample_list, + collection_date=merged_collection_date, + geographic_location=merged_country, + ) + + with open(f"{alias}.xml", "w", encoding="utf-8") as sample_file: + sample_file.write(sample_xml) + + username, password = get_credentials() + receipt = submit_sample_xml(HTTPBasicAuth(username, password), submit_url, sample_xml) + logger.debug(f"ENA receipt XML:\n{ET.tostring(receipt, encoding='unicode')}") + virtual_sample = extract_accession_from_receipt(receipt) + logger.info(f"Registered virtual co-assembly sample: {virtual_sample}") + return virtual_sample + + +def main() -> int: + args = parse_args() + setup_logging(args.debug) + + logger.debug(f"Starting coassembly registration script: input={args.input}, output={args.output}, test={args.test}") + + with open(args.input, newline="", encoding="utf-8") as handle: + reader = csv.DictReader(handle) + fieldnames = reader.fieldnames or [] + if "Runs" not in fieldnames: + raise CoassemblyRegistrationError("Input CSV must have a 'Runs' column with ENA run accessions") + rows = list(reader) + + # The script may add/update the Sample value; ensure the output schema includes it + if "Sample" not in fieldnames: + fieldnames.append("Sample") + + updated_rows: list[dict[str, str]] = [] + for row in rows: + run_accessions = row["Runs"].split(",") + + logger.info(f"Processing assembly with runs: {run_accessions}") + + # Skip non-coassembly rows by design + if len(run_accessions) <= 1: + logger.info("Single run found; skipping coassembly sample registration for this row") + updated_rows.append(row) + continue + + virtual_sample = register_virtual_sample( + run_accessions=run_accessions, + test=args.test, + default_country=args.default_country, + default_date=args.default_date, + default_taxid=args.default_taxid, + default_tax_name=args.default_tax_name + ) + if virtual_sample or virtual_sample == "": + row["Sample"] = virtual_sample + updated_rows.append(row) + + logger.info(f"Writing updated assembly metadata to: {args.output}") + with open(args.output, "w", newline="", encoding="utf-8") as out_handle: + writer = csv.DictWriter(out_handle, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(updated_rows) + + +if __name__ == "__main__": + main() diff --git a/conf/modules.config b/conf/modules.config index 931f57e..3a16be0 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -176,7 +176,7 @@ process { ] } - withName: 'REGISTERSTUDY|GENERATE_ASSEMBLY_MANIFEST' { + withName: 'REGISTERSTUDY|GENERATE_ASSEMBLY_MANIFEST|REGISTER_COASSEMBLY_SAMPLE' { publishDir = [ enabled: false ] diff --git a/conf/test_assembly_coassembly_no_coverage.config b/conf/test_assembly_coassembly_no_coverage.config new file mode 100644 index 0000000..eda3ca5 --- /dev/null +++ b/conf/test_assembly_coassembly_no_coverage.config @@ -0,0 +1,34 @@ +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + Nextflow config file for running minimal tests +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + Defines input files and everything required to run a fast and simple pipeline test. + + Use as follows: + nextflow run nf-core/seqsubmit -profile test_assembly_coassembly_no_coverage, --outdir + +---------------------------------------------------------------------------------------- +*/ + +process { + resourceLimits = [ + cpus: 2, + memory: '8.GB', + time: '1.h' + ] +} + +params { + config_profile_name = 'Test --mode metagenomic_assemblies co-assembly without coverage profile' + config_profile_description = 'Co-assembly test with missing coverage and paired-end reads from two runs' + + // Input data + input = "${projectDir}/assets/samplesheet_assembly_coassembly_no_coverage.csv" + outdir = 'test_output' + + mode = "metagenomic_assemblies" + submission_study = "PRJEB98843" + centre_name = "TEST_CENTER" + + test_upload = true +} diff --git a/conf/test_assembly_coassembly_with_coverage.config b/conf/test_assembly_coassembly_with_coverage.config new file mode 100644 index 0000000..ba3d683 --- /dev/null +++ b/conf/test_assembly_coassembly_with_coverage.config @@ -0,0 +1,34 @@ +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + Nextflow config file for running minimal tests +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + Defines input files and everything required to run a fast and simple pipeline test. + + Use as follows: + nextflow run nf-core/seqsubmit -profile test_assembly_coassembly_with_coverage, --outdir + +---------------------------------------------------------------------------------------- +*/ + +process { + resourceLimits = [ + cpus: 2, + memory: '8.GB', + time: '1.h' + ] +} + +params { + config_profile_name = 'Test --mode metagenomic_assemblies co-assembly with coverage profile' + config_profile_description = 'Co-assembly test with pre-provided coverage and multiple run accessions' + + // Input data + input = "${projectDir}/assets/samplesheet_assembly_coassembly_with_coverage.csv" + outdir = 'test_output' + + mode = "metagenomic_assemblies" + submission_study = "PRJEB98843" + centre_name = "TEST_CENTER" + + test_upload = true +} diff --git a/conf/test_assembly_complete_metadata.config b/conf/test_assembly_complete_metadata.config index 16e4ccf..ce366be 100644 --- a/conf/test_assembly_complete_metadata.config +++ b/conf/test_assembly_complete_metadata.config @@ -30,4 +30,5 @@ params { submission_study = "PRJEB98843" centre_name = "TEST_CENTER" + test_upload = true } diff --git a/conf/test_assembly_no_coverage_paired_reads.config b/conf/test_assembly_no_coverage_paired_reads.config index 65c73e4..4571a1c 100644 --- a/conf/test_assembly_no_coverage_paired_reads.config +++ b/conf/test_assembly_no_coverage_paired_reads.config @@ -30,4 +30,5 @@ params { submission_study = "PRJEB98843" centre_name = "TEST_CENTER" + test_upload = true } diff --git a/conf/test_assembly_no_coverage_single_reads.config b/conf/test_assembly_no_coverage_single_reads.config index 814de31..ee269e9 100644 --- a/conf/test_assembly_no_coverage_single_reads.config +++ b/conf/test_assembly_no_coverage_single_reads.config @@ -30,4 +30,5 @@ params { submission_study = "PRJEB98843" centre_name = "TEST_CENTER" + test_upload = true } diff --git a/conf/test_assembly_no_study_complete_metadata.config b/conf/test_assembly_no_study_complete_metadata.config index b1c96d7..6831007 100644 --- a/conf/test_assembly_no_study_complete_metadata.config +++ b/conf/test_assembly_no_study_complete_metadata.config @@ -31,5 +31,4 @@ params { centre_name = "TEST_CENTER" test_upload = true - } diff --git a/conf/test_assembly_one_contig.config b/conf/test_assembly_one_contig.config index b27784e..ce2e86c 100644 --- a/conf/test_assembly_one_contig.config +++ b/conf/test_assembly_one_contig.config @@ -30,4 +30,5 @@ params { submission_study = "PRJEB98843" centre_name = "TEST_CENTER" + test_upload = true } diff --git a/modules/local/create_assembly_metadata_csv/main.nf b/modules/local/create_assembly_metadata_csv/main.nf index b7d7151..576f781 100644 --- a/modules/local/create_assembly_metadata_csv/main.nf +++ b/modules/local/create_assembly_metadata_csv/main.nf @@ -19,13 +19,16 @@ process CREATE_ASSEMBLY_METADATA_CSV { script: def header = 'Runs,Coverage,Assembler,Version,Filepath,Sample' + // Format run accessions: wrap in quotes if it's a list (co-assembly) or single value + def runs_joined = meta.run_accession instanceof List ? meta.run_accession.join(',') : meta.run_accession + def runs = "\"${runs_joined}\"" def row = [ - meta.run_accession, + runs, meta.coverage, meta.assembler, meta.assembler_version, fasta.name, - '' // Sample column left empty because co-assemblies are not supported + '' // Sample column is filled later for co-assemblies that require virtual sample registration ].join(',') """ cat <<-END_CSV > ${meta.id}_assembly_metadata.csv diff --git a/modules/local/register_coassembly_sample/environment.yml b/modules/local/register_coassembly_sample/environment.yml new file mode 100644 index 0000000..66d3d46 --- /dev/null +++ b/modules/local/register_coassembly_sample/environment.yml @@ -0,0 +1,8 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/nf-core/modules/master/modules/environment-schema.json +channels: + - conda-forge + - bioconda +dependencies: + - conda-forge::python=3.12.11 + - conda-forge::requests=2.32.5 + - conda-forge::retry=0.9.2 diff --git a/modules/local/register_coassembly_sample/main.nf b/modules/local/register_coassembly_sample/main.nf new file mode 100644 index 0000000..e0d3d8e --- /dev/null +++ b/modules/local/register_coassembly_sample/main.nf @@ -0,0 +1,52 @@ +process REGISTER_COASSEMBLY_SAMPLE { + tag "${meta.id}" + label 'process_single' + + conda "${moduleDir}/environment.yml" + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'oras://community.wave.seqera.io/library/pip_requests_retry:d07e257656d938ab' : + 'community.wave.seqera.io/library/pip_requests_retry:eb16563fc2cb641f' }" + + input: + tuple val(meta), path(csv) + val(test) + + output: + tuple val(meta), path("*_updated.csv"), emit: csv + path "versions.yml", emit: versions + + when: + task.ext.when == null || task.ext.when + + script: + def args = task.ext.args ?: '' + def prefix = task.ext.prefix ?: "${meta.id}" + def test_arg = test ? "--test" : "" + """ + register_coassembly_sample.py \\ + --input ${csv} \\ + --output ${prefix}_updated.csv \\ + ${test_arg} \\ + ${args} + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + python: \$(python --version | sed 's/Python //') + requests: \$(python -c "import requests; print(requests.__version__)") + retry: \$(python -c "import importlib.metadata as m; print(m.version('retry'))") + END_VERSIONS + """ + + stub: + def prefix = task.ext.prefix ?: "${meta.id}" + """ + cp ${csv} ${prefix}_updated.csv + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + python: \$(python --version | sed 's/Python //') + requests: \$(python -c "import requests; print(requests.__version__)") + retry: \$(python -c "import importlib.metadata as m; print(m.version('retry'))") + END_VERSIONS + """ +} diff --git a/modules/local/register_coassembly_sample/meta.yml b/modules/local/register_coassembly_sample/meta.yml new file mode 100644 index 0000000..977b239 --- /dev/null +++ b/modules/local/register_coassembly_sample/meta.yml @@ -0,0 +1,44 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/nf-core/modules/master/modules/meta-schema.json +name: "register_coassembly_sample" +description: Register a virtual ENA sample for co-assemblies and write sample accession into assembly metadata CSV +keywords: + - ena + - sample + - metadata + - co-assembly +tools: + - python: + description: Python + homepage: https://www.python.org/ + - retry: + description: Retry decorator for resilient network calls + homepage: https://pypi.org/project/retry/ +input: + - - meta: + type: map + description: Groovy map containing sample metadata + - csv: + type: file + description: Assembly metadata CSV input for assembly_uploader tool + pattern: "*.csv" + - test: + type: string + description: Whether to submit to the ENA test server (`true`) instead of production. +output: + - csv: + - meta: + type: map + description: Same metadata map as input + - "*.csv": + type: file + description: Updated assembly metadata CSV with "Sample" column filled for co-assemblies + pattern: "*.csv" + - versions: + - versions.yml: + type: file + description: File containing software versions + pattern: "versions.yml" +authors: + - "@ochkalova" +maintainers: + - "@ochkalova" diff --git a/modules/local/register_coassembly_sample/tests/coassembly_metadata.csv b/modules/local/register_coassembly_sample/tests/coassembly_metadata.csv new file mode 100644 index 0000000..689d97b --- /dev/null +++ b/modules/local/register_coassembly_sample/tests/coassembly_metadata.csv @@ -0,0 +1,11 @@ +Runs,Coverage,Assembler,Version,Filepath +"SRR3183853,SRR3183854,SRR3183855,SRR3183856,SRR3183857,SRR3183858,SRR3183859,SRR3183860,SRR3183861,SRR3183952,SRR3183953,SRR3183954,SRR3183955,SRR3183956,SRR3183957,SRR3183958,SRR3183959,SRR3183960",10.0,MEGAHIT,1.2.9,contigs1.fasta.gz +"SRR3183850,SRR3183851,SRR3183852,SRR3183949,SRR3183950,SRR3183951",12.0,MEGAHIT,1.2.9,contigs2.fasta.gz +"SRR1631070,SRR1631071,SRR1631072,SRR1631073,SRR1631074,SRR1631075,SRR1631076,SRR1631077,SRR1631078,SRR1631079,SRR1631080,SRR1631081,SRR1631082,SRR1631083,SRR3187740,SRR3187741,SRR3187742,SRR3187743,SRR3187744,SRR3187745,SRR3187746,SRR3187747,SRR3187748,SRR3188145,SRR3188146,SRR3188147,SRR3188148,SRR3188149,SRR3188150,SRR3188151,SRR3188152,SRR3188153",15.0,MEGAHIT,1.2.9,contigs3.fasta.gz +"SRR1622826,SRR3187094,SRR3187149",8.0,MEGAHIT,1.2.9,contigs4.fasta.gz +"SRR1622910,SRR1622911",7.0,MEGAHIT,1.2.9,contigs5.fasta.gz +"SRR1624846,SRR1624847,SRR1624848,SRR1624849,SRR1624850,SRR1624851,SRR1624852,SRR1624853,SRR1624854",9.0,MEGAHIT,1.2.9,contigs6.fasta.gz +"SRR1631360,SRR1631361,SRR1631362,SRR1631363,SRR1631364,SRR1631365",6.0,MEGAHIT,1.2.9,contigs9.fasta.gz +"SRR3183597,SRR3183598,SRR3183599,SRR3183600",3.0,MEGAHIT,1.2.9,contigs12.fasta.gz +"SRR3183638,SRR3183639,SRR3183640,SRR3183641,SRR3183642,SRR3183643,SRR3183644,SRR3183645,SRR3183646,SRR3183647",2.0,MEGAHIT,1.2.9,contigs13.fasta.gz +"SRR3183844,SRR3183845,SRR3183846",1.0,MEGAHIT,1.2.9,contigs14.fasta.gz diff --git a/modules/local/register_coassembly_sample/tests/main.nf.test b/modules/local/register_coassembly_sample/tests/main.nf.test new file mode 100644 index 0000000..1a93313 --- /dev/null +++ b/modules/local/register_coassembly_sample/tests/main.nf.test @@ -0,0 +1,36 @@ +nextflow_process { + name "Test Process REGISTER_COASSEMBLY_SAMPLE" + script "../main.nf" + config "./nextflow.config" + process "REGISTER_COASSEMBLY_SAMPLE" + + tag "modules" + tag "register_coassembly_sample" + + test("REGISTER_COASSEMBLY_SAMPLE completes with expected outputs") { + when { + process { + """ + input[0] = [ + [ id:'test' ], + file("${moduleDir}/tests/coassembly_metadata.csv", checkIfExists: true) + ] + input[1] = true + """ + } + } + + then { + assert process.success + assertAll( + { assert process.out.csv.size() == 1 }, + { assert process.out.csv[0][1].toString().endsWith("_updated.csv") }, + { + def csvContent = path(process.out.csv[0][1]).text + assert csvContent.contains("Sample") + assert csvContent.contains("SRR3183853") + } + ) + } + } +} diff --git a/modules/local/register_coassembly_sample/tests/nextflow.config b/modules/local/register_coassembly_sample/tests/nextflow.config new file mode 100644 index 0000000..ea49989 --- /dev/null +++ b/modules/local/register_coassembly_sample/tests/nextflow.config @@ -0,0 +1,5 @@ +process { + withName: 'REGISTER_COASSEMBLY_SAMPLE' { + ext.args = '--debug' + } +} diff --git a/nextflow.config b/nextflow.config index 4f9e21a..5482eed 100644 --- a/nextflow.config +++ b/nextflow.config @@ -199,6 +199,8 @@ profiles { test_assembly_no_coverage_single_reads { includeConfig 'conf/test_assembly_no_coverage_single_reads.config' } test_assembly_no_coverage_paired_reads { includeConfig 'conf/test_assembly_no_coverage_paired_reads.config' } test_assembly_one_contig { includeConfig 'conf/test_assembly_one_contig.config' } + test_assembly_coassembly_with_coverage { includeConfig 'conf/test_assembly_coassembly_with_coverage.config' } + test_assembly_coassembly_no_coverage { includeConfig 'conf/test_assembly_coassembly_no_coverage.config' } } // Load nf-core custom profiles from different institutions diff --git a/tests/assembly_coassembly_no_coverage.nf.test b/tests/assembly_coassembly_no_coverage.nf.test new file mode 100644 index 0000000..a4eefd8 --- /dev/null +++ b/tests/assembly_coassembly_no_coverage.nf.test @@ -0,0 +1,39 @@ +nextflow_pipeline { + + name "Test assembly submission workflow - coassembly_no_coverage" + script "../main.nf" + tag "pipeline" + tag "mode_assembly" + tag "test_assembly_coassembly_no_coverage" + profile "test_assembly_coassembly_no_coverage" + + test("-profile test_assembly_coassembly_no_coverage") { + + when { + params { + outdir = "$outputDir" + } + } + + then { + // stable_name: All files + folders in ${params.outdir}/ with a stable name + def stable_name = getAllFilesFromDir(params.outdir, relative: true, includeDir: true, ignore: ['pipeline_info/*.{html,json,txt}']) + // stable_path: All files in ${params.outdir}/ with stable content + def stable_path = getAllFilesFromDir(params.outdir, ignoreFile: 'tests/.nftignore') + // Early failure no need to test the rest of snapshots + assert workflow.success + assertAll( + { assert snapshot( + // Number of successful tasks + workflow.trace.succeeded().size(), + // pipeline versions.yml file for multiqc from which Nextflow version is removed because we test pipelines on multiple Nextflow versions + removeNextflowVersion("$outputDir/pipeline_info/nf_core_seqsubmit_software_mqc_versions.yml"), + // All stable path name, with a relative path + stable_name, + // All files with stable contents + stable_path + ).match() } + ) + } + } +} diff --git a/tests/assembly_coassembly_with_coverage.nf.test b/tests/assembly_coassembly_with_coverage.nf.test new file mode 100644 index 0000000..60f15e9 --- /dev/null +++ b/tests/assembly_coassembly_with_coverage.nf.test @@ -0,0 +1,39 @@ +nextflow_pipeline { + + name "Test assembly submission workflow - coassembly_with_coverage" + script "../main.nf" + tag "pipeline" + tag "mode_assembly" + tag "test_assembly_coassembly_with_coverage" + profile "test_assembly_coassembly_with_coverage" + + test("-profile test_assembly_coassembly_with_coverage") { + + when { + params { + outdir = "$outputDir" + } + } + + then { + // stable_name: All files + folders in ${params.outdir}/ with a stable name + def stable_name = getAllFilesFromDir(params.outdir, relative: true, includeDir: true, ignore: ['pipeline_info/*.{html,json,txt}']) + // stable_path: All files in ${params.outdir}/ with stable content + def stable_path = getAllFilesFromDir(params.outdir, ignoreFile: 'tests/.nftignore') + // Early failure no need to test the rest of snapshots + assert workflow.success + assertAll( + { assert snapshot( + // Number of successful tasks + workflow.trace.succeeded().size(), + // pipeline versions.yml file for multiqc from which Nextflow version is removed because we test pipelines on multiple Nextflow versions + removeNextflowVersion("$outputDir/pipeline_info/nf_core_seqsubmit_software_mqc_versions.yml"), + // All stable path name, with a relative path + stable_name, + // All files with stable contents + stable_path + ).match() } + ) + } + } +} diff --git a/workflows/assemblysubmit.nf b/workflows/assemblysubmit.nf index 1f4dc7b..97d19d8 100644 --- a/workflows/assemblysubmit.nf +++ b/workflows/assemblysubmit.nf @@ -7,6 +7,7 @@ include { COVERM_CONTIG } from '../modules/nf-core/coverm/contig/main' include { FASTAVALIDATOR } from '../modules/nf-core/fastavalidator/main' include { CREATE_ASSEMBLY_METADATA_CSV } from '../modules/local/create_assembly_metadata_csv/main' +include { REGISTER_COASSEMBLY_SAMPLE } from '../modules/local/register_coassembly_sample/main' include { GENERATE_ASSEMBLY_MANIFEST } from '../modules/local/generate_assembly_manifest/main' include { REGISTERSTUDY } from '../modules/local/registerstudy/main' include { ENA_WEBIN_CLI_WRAPPER as SUBMIT } from '../modules/local/ena_webin_cli_wrapper' @@ -49,11 +50,13 @@ workflow ASSEMBLYSUBMIT { // Create assembly channel with proper metadata structure assembly_fasta = ch_samplesheet .map { row -> + // support semicolon-separated values for co-assemblies (e.g. ERR000001;ERR000002) + def run_accessions = row[5].split(';').toList() def meta = [ id: row[0].id, single_end: row[3] ? false : true, coverage: row[4] ?: null, - run_accession: row[5], + run_accession: run_accessions.size() == 1 ? run_accessions[0] : run_accessions, assembler: row[6], assembler_version: row[7] ] @@ -63,21 +66,36 @@ workflow ASSEMBLYSUBMIT { reads_fastq = ch_samplesheet .filter { row -> row[2] && row[2] != "" } // Check if fastq_1 exists and is not empty .map { row -> + def run_accessions = row[5].split(';').toList() def meta = [ id: row[0].id, single_end: row[3] ? false : true, coverage: row[4] ?: null, - run_accession: row[5], + run_accession: run_accessions.size() == 1 ? run_accessions[0] : run_accessions, assembler: row[6], assembler_version: row[7] ] - if (row[3] && row[3] != "") { + def fastq1_list = row[2].split(';').toList() + def fastq2_list = (row[3] && row[3] != "") ? row[3].split(';').toList() : [] + // Validation: Check that number of read files matches number of accessions + if (fastq1_list.size() != run_accessions.size()) { + error "Sample ${meta.id}: Number of forward read files (${fastq1_list.size()}) does not match number of run accessions (${run_accessions.size()})" + } + + if (fastq2_list && fastq2_list.size() != run_accessions.size()) { + error "Sample ${meta.id}: Number of reverse read files (${fastq2_list.size()}) does not match number of run accessions (${run_accessions.size()})" + } + + // Convert paths to file objects + def fastq1_paths = fastq1_list.collect { path -> file(path) } + def fastq2_paths = fastq2_list ? fastq2_list.collect { path -> file(path) } : [] + if (fastq2_paths) { // If paired end reads - [meta, [file(row[2]), file(row[3])]] + [meta, fastq1_paths, fastq2_paths] } else { // If single end - [meta, file(row[2])] + [meta, fastq1_paths] } } @@ -94,11 +112,36 @@ workflow ASSEMBLYSUBMIT { } // For assemblies without coverage, calculate coverage with CoverM + // Transform reads into the format CoverM expects: list of all read files validated_fastas.filter { meta, _fasta -> meta.coverage == null } .join(reads_fastq) - .multiMap { meta, fasta, fastq -> + .map { tuple -> + def meta = tuple[0] + def fasta = tuple[1] + def reads_data = tuple[2..-1] + + // Transform reads into flat list for CoverM + def all_reads = [] + if (meta.single_end) { + // Single-end: just flatten the R1 list + def fastq1_list = reads_data[0] + all_reads = fastq1_list + } else { + // Paired-end: interleave R1 and R2 files + // Input format: [meta, [R1_1, R1_2, ...], [R2_1, R2_2, ...]] + def fastq1_list = reads_data[0] + def fastq2_list = reads_data[1] + + // Create list as: R1_1, R2_1, R1_2, R2_2, ... + // Use collectMany to flatten the pairs + all_reads = [fastq1_list, fastq2_list].transpose().collectMany { pair -> pair } + } + + [meta, fasta, all_reads] + } + .multiMap { meta, fasta, reads -> assembly: [ meta, fasta ] - reads: [ meta, fastq ] + reads: [ meta, reads ] } .set { coverm_input } @@ -145,9 +188,26 @@ workflow ASSEMBLYSUBMIT { ) ch_versions = ch_versions.mix(CREATE_ASSEMBLY_METADATA_CSV.out.versions) + // For co-assemblies (multiple run accessions), register a virtual ENA sample and fill Sample column. + // Rows with a single run accession bypass this step unchanged. + assembly_metadata_by_type = CREATE_ASSEMBLY_METADATA_CSV.out.csv + .branch { meta, _csv -> + coassembly: meta.run_accession instanceof List && meta.run_accession.size() > 1 + standard: true + } + + REGISTER_COASSEMBLY_SAMPLE( + assembly_metadata_by_type.coassembly, + test_upload + ) + ch_versions = ch_versions.mix(REGISTER_COASSEMBLY_SAMPLE.out.versions) + + assembly_metadata_with_sample = assembly_metadata_by_type.standard + .mix(REGISTER_COASSEMBLY_SAMPLE.out.csv) + // Concatenate assembly metadata CSVs into single file to publish CONCAT_METADATA ( - CREATE_ASSEMBLY_METADATA_CSV.out.csv.map { _meta, file -> file }.collect().map { files -> [ [id: "assemblies_metadata"], files ] }, + assembly_metadata_with_sample.map { _meta, file -> file }.collect().map { files -> [ [id: "assemblies_metadata"], files ] }, 'true' // skip_header - we want to keep the header from the first file and skip it for the rest ) @@ -171,7 +231,7 @@ workflow ASSEMBLYSUBMIT { // Generate assembly manifest files and submit them to ENA GENERATE_ASSEMBLY_MANIFEST( - assemblies_with_coverage.join(CREATE_ASSEMBLY_METADATA_CSV.out.csv), + assemblies_with_coverage.join(assembly_metadata_with_sample), study_accession_ch.first(), upload_tpa, test_upload @@ -232,8 +292,8 @@ workflow ASSEMBLYSUBMIT { : file("${projectDir}/assets/methods_description_template.yml", checkIfExists: true) def ch_methods_description = channel.value(methodsDescriptionText(ch_multiqc_custom_methods_description)) ch_multiqc_files = ch_multiqc_files.mix(ch_methods_description.collectFile(name: 'methods_description_mqc.yaml', sort: true)) - ch_multiqc_files = ch_multiqc_files.mix(CONCAT_ACCESSIONS.out.file_out.map{meta, file -> file}) - ch_multiqc_files = ch_multiqc_files.mix(CONCAT_METADATA.out.file_out.map{meta, file -> file}) + ch_multiqc_files = ch_multiqc_files.mix(CONCAT_ACCESSIONS.out.file_out.map{_meta, file -> file}) + ch_multiqc_files = ch_multiqc_files.mix(CONCAT_METADATA.out.file_out.map{_meta, file -> file}) MULTIQC( ch_multiqc_files.flatten().collect().map { files -> [