From 1263e44fd13b15d2afc95116821c0b6d4b763fc8 Mon Sep 17 00:00:00 2001 From: Matt Parker Date: Tue, 16 May 2023 18:39:00 +0000 Subject: [PATCH] aws demo --- .gitignore | 2 + .pre-commit-config.yaml | 10 +- CHANGELOG.md | 8 +- LICENSE | 200 +++---- README.md | 12 + bin/check_sample_sheet.py | 40 -- bin/ping.py | 65 --- bin/workflow-glue | 7 + bin/workflow_glue/__init__.py | 62 ++ bin/workflow_glue/check_sample_sheet.py | 93 +++ bin/{ => workflow_glue}/report.py | 18 +- bin/workflow_glue/tests/__init__.py | 1 + bin/workflow_glue/tests/test_test.py | 10 + bin/workflow_glue/util.py | 52 ++ data/.DS_Store | Bin 6148 -> 0 bytes lib/Pinguscript.groovy | 4 +- lib/fastqingress.nf | 725 +++++++++++++----------- lib/ping.nf | 36 -- main.nf | 28 +- nextflow.config | 8 +- nextflow_schema.json | 6 +- 21 files changed, 728 insertions(+), 659 deletions(-) delete mode 100755 bin/check_sample_sheet.py delete mode 100755 bin/ping.py create mode 100755 bin/workflow-glue create mode 100755 bin/workflow_glue/__init__.py create mode 100755 bin/workflow_glue/check_sample_sheet.py rename bin/{ => workflow_glue}/report.py (98%) create mode 100755 bin/workflow_glue/tests/__init__.py create mode 100755 bin/workflow_glue/tests/test_test.py create mode 100755 bin/workflow_glue/util.py delete mode 100644 data/.DS_Store delete mode 100644 lib/ping.nf diff --git a/.gitignore b/.gitignore index e5d8ce0..be27172 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ nextflow template-workflow .*.swp .*.swo +*.pyc +*.pyo diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1a330bb..1d1d82d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,9 +18,10 @@ repos: additional_dependencies: - epi2melabs - repo: https://github.com/pycqa/flake8 - rev: 3.7.9 + rev: 5.0.4 hooks: - id: flake8 + pass_filenames: false additional_dependencies: - flake8-rst-docstrings - flake8-docstrings @@ -31,4 +32,9 @@ repos: - flake8-builtins - flake8-absolute-import - flake8-print - entry: flake8 bin --import-order-style google --statistics + args: [ + "bin", + "--import-order-style=google", + "--statistics", + "--max-line-length=88", + ] diff --git a/CHANGELOG.md b/CHANGELOG.md index e0361ad..3f23bb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,15 @@ # Changelog All notable changes to this project will be documented in this file. -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v0.0.7] +### Changed +- Updated whole workflow to bring up-to-date with recent template changes +### Added +- Configuration for running demo data in AWS + ## [v0.0.6] ### Changed - Updated description in manifest diff --git a/LICENSE b/LICENSE index 3ee2c24..bd5e2de 100644 --- a/LICENSE +++ b/LICENSE @@ -1,12 +1,5 @@ -This Source Code Form is subject to the terms of the Mozilla Public -License, v. 2.0. If a copy of the MPL was not distributed with this -file, You can obtain one at http://mozilla.org/MPL/2.0/. - -(c) 2021- Oxford Nanopore Technologies Ltd. - - -Mozilla Public License Version 2.0 -================================== +Oxford Nanopore Technologies PLC. Public License Version 1.0 +============================================================= 1. Definitions -------------- @@ -17,7 +10,7 @@ Mozilla Public License Version 2.0 1.2. "Contributor Version" means the combination of the Contributions of others (if any) used - by a Contributor and that particular Contributor's Contribution. + by a Contributor and that particular Contributor’s Contribution. 1.3. "Contribution" means Covered Software of a particular Contributor. @@ -28,59 +21,46 @@ Mozilla Public License Version 2.0 Form, and Modifications of such Source Code Form, in each case including portions thereof. -1.5. "Incompatible With Secondary Licenses" - means - - (a) that the initial Contributor has attached the notice described - in Exhibit B to the Covered Software; or - - (b) that the Covered Software was made available under the terms of - version 1.1 or earlier of the License, but not also under the - terms of a Secondary License. - -1.6. "Executable Form" +1.5. "Executable Form" means any form of the work other than Source Code Form. -1.7. "Larger Work" +1.6. "Larger Work" means a work that combines Covered Software with other material, in a separate file or files, that is not Covered Software. -1.8. "License" +1.7. "License" means this document. -1.9. "Licensable" +1.8. "Licensable" means having the right to grant, to the maximum extent possible, whether at the time of the initial grant or subsequently, any and all of the rights conveyed by this License. -1.10. "Modifications" +1.9. "Modifications" means any of the following: - (a) any file in Source Code Form that results from an addition to, - deletion from, or modification of the contents of Covered - Software; or - - (b) any new file in Source Code Form that contains any Covered - Software. + (a) any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered + Software; or + (b) any new file in Source Code Form that contains any Covered + Software. -1.11. "Patent Claims" of a Contributor - means any patent claim(s), including without limitation, method, - process, and apparatus claims, in any patent Licensable by such - Contributor that would be infringed, but for the grant of the - License, by the making, using, selling, offering for sale, having - made, import, or transfer of either its Contributions or its - Contributor Version. +1.10. "Research Purposes" + means use for internal research and not intended for or directed + towards commercial advantages or monetary compensation; provided, + however, that monetary compensation does not include sponsored + research of research funded by grants. -1.12. "Secondary License" +1.11 "Secondary License" means either the GNU General Public License, Version 2.0, the GNU Lesser General Public License, Version 2.1, the GNU Affero General Public License, Version 3.0, or any later versions of those licenses. -1.13. "Source Code Form" +1.12. "Source Code Form" means the form of the work preferred for making modifications. -1.14. "You" (or "Your") +1.13. "You" (or "Your") means an individual or a legal entity exercising rights under this License. For legal entities, "You" includes any entity that controls, is controlled by, or is under common control with You. For @@ -96,53 +76,47 @@ Mozilla Public License Version 2.0 2.1. Grants Each Contributor hereby grants You a world-wide, royalty-free, -non-exclusive license: - -(a) under intellectual property rights (other than patent or trademark) - Licensable by such Contributor to use, reproduce, make available, - modify, display, perform, distribute, and otherwise exploit its - Contributions, either on an unmodified basis, with Modifications, or - as part of a Larger Work; and - -(b) under Patent Claims of such Contributor to make, use, sell, offer - for sale, have made, import, and otherwise transfer either its - Contributions or its Contributor Version. +non-exclusive license under Contributor copyrights Licensable by such +Contributor to use, reproduce, make available, modify, display, +perform, distribute, and otherwise exploit solely for Research Purposes +its Contributions, either on an unmodified basis, with Modifications, +or as part of a Larger Work. 2.2. Effective Date The licenses granted in Section 2.1 with respect to any Contribution -become effective for each Contribution on the date the Contributor first -distributes such Contribution. +become effective for each Contribution on the date the Contributor +first distributes such Contribution. 2.3. Limitations on Grant Scope The licenses granted in this Section 2 are the only rights granted under this License. No additional rights or licenses will be implied from the -distribution or licensing of Covered Software under this License. -Notwithstanding Section 2.1(b) above, no patent license is granted by a -Contributor: +distribution or licensing of Covered Software under this License. The +License is incompatible with Secondary Licenses. Notwithstanding +Section 2.1 above, no copyright license is granted: (a) for any code that a Contributor has removed from Covered Software; or -(b) for infringements caused by: (i) Your and any other third party's - modifications of Covered Software, or (ii) the combination of its - Contributions with other software (except as part of its Contributor - Version); or +(b) use of the Contributions or its Contributor Version other than for +Research Purposes only; or -(c) under Patent Claims infringed by Covered Software in the absence of - its Contributions. +(c) for infringements caused by: (i) Your and any other third party’s +modifications of Covered Software, or (ii) the combination of its +Contributions with other software (except as part of its Contributor +Version). -This License does not grant any rights in the trademarks, service marks, -or logos of any Contributor (except as may be necessary to comply with -the notice requirements in Section 3.4). +This License does not grant any rights in the patents, trademarks, +service marks, or logos of any Contributor (except as may be necessary +to comply with the notice requirements in Section 3.4). 2.4. Subsequent Licenses No Contributor makes additional grants as a result of Your choice to distribute the Covered Software under a subsequent version of this -License (see Section 10.2) or under the terms of a Secondary License (if -permitted under the terms of Section 3.3). +License (see Section 10.2) or under the terms of a Secondary License +(if permitted under the terms of Section 3.3). 2.5. Representation @@ -171,8 +145,7 @@ Modifications that You create or to which You contribute, must be under the terms of this License. You must inform recipients that the Source Code Form of the Covered Software is governed by the terms of this License, and how they can obtain a copy of this License. You may not -attempt to alter or restrict the recipients' rights in the Source Code -Form. +attempt to alter or restrict the recipients’ rights in the Source Code Form. 3.2. Distribution of Executable Form @@ -185,22 +158,14 @@ If You distribute Covered Software in Executable Form then: than the cost of distribution to the recipient; and (b) You may distribute such Executable Form under the terms of this - License, or sublicense it under different terms, provided that the - license for the Executable Form does not attempt to limit or alter - the recipients' rights in the Source Code Form under this License. + License. 3.3. Distribution of a Larger Work You may create and distribute a Larger Work under terms of Your choice, provided that You also comply with the requirements of this License for -the Covered Software. If the Larger Work is a combination of Covered -Software with a work governed by one or more Secondary Licenses, and the -Covered Software is not Incompatible With Secondary Licenses, this -License permits You to additionally distribute such Covered Software -under the terms of such Secondary License(s), so that the recipient of -the Larger Work may, at their option, further distribute the Covered -Software under the terms of either this License or such Secondary -License(s). +the Covered Software. The Larger Work may not be a combination of Covered +Software with a work governed by one or more Secondary Licenses. 3.4. Notices @@ -212,16 +177,15 @@ the extent required to remedy known factual inaccuracies. 3.5. Application of Additional Terms -You may choose to offer, and to charge a fee for, warranty, support, -indemnity or liability obligations to one or more recipients of Covered -Software. However, You may do so only on Your own behalf, and not on -behalf of any Contributor. You must make it absolutely clear that any -such warranty, support, indemnity, or liability obligation is offered by -You alone, and You hereby agree to indemnify every Contributor for any -liability incurred by such Contributor as a result of warranty, support, -indemnity or liability terms You offer. You may include additional -disclaimers of warranty and limitations of liability specific to any -jurisdiction. +You may not choose to offer, or charge a fee for use of the Covered +Software or a fee for, warranty, support, indemnity or liability +obligations to one or more recipients of Covered Software. You must +make it absolutely clear that any such warranty, support, indemnity, or +liability obligation is offered by You alone, and You hereby agree to +indemnify every Contributor for any liability incurred by such +Contributor as a result of warranty, support, indemnity or liability +terms You offer. You may include additional disclaimers of warranty and +limitations of liability specific to any jurisdiction. 4. Inability to Comply Due to Statute or Regulation --------------------------------------------------- @@ -240,23 +204,12 @@ recipient of ordinary skill to be able to understand it. -------------- 5.1. The rights granted under this License will terminate automatically -if You fail to comply with any of its terms. However, if You become -compliant, then the rights granted under this License from a particular -Contributor are reinstated (a) provisionally, unless and until such -Contributor explicitly and finally terminates Your grants, and (b) on an -ongoing basis, if such Contributor fails to notify You of the -non-compliance by some reasonable means prior to 60 days after You have -come back into compliance. Moreover, Your grants from a particular -Contributor are reinstated on an ongoing basis if such Contributor -notifies You of the non-compliance by some reasonable means, this is the -first time You have received notice of non-compliance with this License -from such Contributor, and You become compliant prior to 30 days after -Your receipt of the notice. - -5.2. If You initiate litigation against any entity by asserting a patent +if You fail to comply with any of its terms. + +5.2. If You initiate litigation against any entity by asserting an infringement claim (excluding declaratory judgment actions, counter-claims, and cross-claims) alleging that a Contributor Version -directly or indirectly infringes any patent, then the rights granted to +directly or indirectly infringes, then the rights granted to You by any and all Contributors for the Covered Software under Section 2.1 of this License shall terminate. @@ -299,8 +252,10 @@ prior to termination shall survive termination. * and all other commercial damages or losses, even if such party * * shall have been informed of the possibility of such damages. This * * limitation of liability shall not apply to liability for death or * -* personal injury resulting from such party's negligence to the * -* extent applicable law prohibits such limitation. Some * +* personal injury resulting from such party’s negligence to the * +* extent applicable law prohibits such limitation, but in such event, * +* and to the greatest extent permissible, damages will be limited to * +* direct damages not to exceed one hundred dollars. Some * * jurisdictions do not allow the exclusion or limitation of * * incidental or consequential damages, so this exclusion and * * limitation may not apply to You. * @@ -314,7 +269,7 @@ Any litigation relating to this License may be brought only in the courts of a jurisdiction where the defendant maintains its principal place of business and such litigation shall be governed by laws of that jurisdiction, without reference to its conflict-of-law provisions. -Nothing in this Section shall prevent a party's ability to bring +Nothing in this Section shall prevent a party’s ability to bring cross-claims or counter-claims. 9. Miscellaneous @@ -332,10 +287,10 @@ shall not be used to construe this License against a Contributor. 10.1. New Versions -Mozilla Foundation is the license steward. Except as provided in Section -10.3, no one other than the license steward has the right to modify or -publish new versions of this License. Each version will be given a -distinguishing version number. +Oxford Nanopore Technologies PLC. is the license steward. Except as +provided in Section 10.3, no one other than the license steward has the +right to modify or publish new versions of this License. Each version +will be given a distinguishing version number. 10.2. Effect of New Versions @@ -352,19 +307,12 @@ modified version of this License if you rename the license and remove any references to the name of the license steward (except to note that such modified license differs from this License). -10.4. Distributing Source Code Form that is Incompatible With Secondary -Licenses - -If You choose to distribute Source Code Form that is Incompatible With -Secondary Licenses under the terms of this version of the License, the -notice described in Exhibit B of this License must be attached. - Exhibit A - Source Code Form License Notice ------------------------------------------- - This Source Code Form is subject to the terms of the Mozilla Public - License, v. 2.0. If a copy of the MPL was not distributed with this - file, You can obtain one at http://mozilla.org/MPL/2.0/. + This Source Code Form is subject to the terms of the Oxford Nanopore + Technologies PLC. Public License, v. 1.0. Full licence can be found + obtained from support@nanoporetech.com If it is not possible or desirable to put the notice in a particular file, then You may include the notice in a location (such as a LICENSE @@ -372,9 +320,3 @@ file in a relevant directory) where a recipient would be likely to look for such a notice. You may add additional accurate notices of copyright ownership. - -Exhibit B - "Incompatible With Secondary Licenses" Notice ---------------------------------------------------------- - - This Source Code Form is "Incompatible With Secondary Licenses", as - defined by the Mozilla Public License, v. 2.0. diff --git a/README.md b/README.md index b18ac9d..f589d8c 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,10 @@ This repository contains a [nextflow](https://www.nextflow.io/) workflow for the assembly of reads originating from the mpox virus obtained through Oxford Nanopore metagenomic sequencing. + + + + ## Introduction This workflow provides a simple way to analyse mpox sequencing data; taking @@ -22,6 +26,10 @@ Using community-develped tools, this workflow: * Creats a de-novo assembly (`flye` & `medaka`) More information can be found in this [blog post](https://labs.epi2me.io/basic-mpox-workflow). + + + + ## Quickstart The workflow uses [nextflow](https://www.nextflow.io/) to manage compute and @@ -64,6 +72,10 @@ The primary outputs of the workflow include: * an HTML report document detailing the primary findings of the workflow. * a draft consensus sequence obtained * a medaka polished assembly + + + + ## Useful links * [nextflow](https://www.nextflow.io/) diff --git a/bin/check_sample_sheet.py b/bin/check_sample_sheet.py deleted file mode 100755 index 9c6a4ff..0000000 --- a/bin/check_sample_sheet.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -"""Script to check that sample sheet is well-formatted.""" -import argparse -import sys - -import pandas as pd - - -def main(): - """Run entry point.""" - parser = argparse.ArgumentParser() - parser.add_argument('sample_sheet') - parser.add_argument('output') - args = parser.parse_args() - - try: - samples = pd.read_csv(args.sample_sheet, sep=None) - if 'alias' in samples.columns: - if 'sample_id' in samples.columns: - sys.stderr.write( - "Warning: sample sheet contains both 'alias' and " - 'sample_id, using the former.') - samples['sample_id'] = samples['alias'] - if not set(['sample_id', 'barcode']).intersection(samples.columns): - raise IOError() - except Exception: - raise IOError( - "Could not parse sample sheet, it must contain two columns " - "named 'barcode' and 'sample_id' or 'alias'.") - # check duplicates - dup_bc = samples['barcode'].duplicated() - dup_sample = samples['sample_id'].duplicated() - if any(dup_bc) or any(dup_sample): - raise IOError( - "Sample sheet contains duplicate values.") - samples.to_csv(args.output, sep=",", index=False) - - -if __name__ == '__main__': - main() diff --git a/bin/ping.py b/bin/ping.py deleted file mode 100755 index 8c3f060..0000000 --- a/bin/ping.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -"""Send workflow ping.""" - -import argparse -import json -import uuid - -from epi2melabs import ping - - -def get_uuid(val): - """Construct UUID from string.""" - return uuid.UUID(str(val)) - - -def main(): - """Run the entry point.""" - parser = argparse.ArgumentParser() - parser.add_argument( - "--hostname", required=True, default=None, - help="ping some meta") - parser.add_argument( - "--opsys", required=True, default=None, - help="ping some meta") - parser.add_argument( - "--session", default=None, - help="ping some meta") - parser.add_argument( - "--message", required=True, default=None, - help="message to include in the ping") - parser.add_argument( - "--meta", default=None, - help="JSON file of metadata to be included in the ping") - parser.add_argument( - "--revision", default='unknown', - help="git branch/tag of the executed workflow") - parser.add_argument( - "--commit", default='unknown', - help="git commit of the executed workflow") - parser.add_argument( - "--disable", action='store_true', - help="Run the script but don't send the ping") - args = parser.parse_args() - - meta = None - if args.meta: - with open(args.meta, "r") as json_file: - meta = json.load(json_file) - - if not args.disable: - ping.Pingu( - get_uuid(args.session), - hostname=args.hostname, - opsys=args.opsys - ).send_workflow_ping( - workflow='wf-template', - message=args.message, - revision=args.revision, - commit=args.commit, - meta=meta - ) - - -if __name__ == "__main__": - main() diff --git a/bin/workflow-glue b/bin/workflow-glue new file mode 100755 index 0000000..00185be --- /dev/null +++ b/bin/workflow-glue @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Entrypoint of pseudo-package for all the code used in the workflow.""" + +from workflow_glue import cli + +if __name__ == "__main__": + cli() diff --git a/bin/workflow_glue/__init__.py b/bin/workflow_glue/__init__.py new file mode 100755 index 0000000..aa1ae1f --- /dev/null +++ b/bin/workflow_glue/__init__.py @@ -0,0 +1,62 @@ +"""Workflow Python code.""" +import argparse +import glob +import importlib +import os + +from .util import _log_level, get_main_logger # noqa: ABS101 + + +__version__ = "0.0.1" +_package_name = "workflow_glue" + + +def get_components(): + """Find a list of workflow command scripts.""" + path = os.path.dirname(os.path.abspath(__file__)) + components = list() + for fname in glob.glob(os.path.join(path, "*.py")): + name = os.path.splitext(os.path.basename(fname))[0] + if name in ("__init__", "util"): + continue + mod = importlib.import_module(f"{_package_name}.{name}") + # if theres a main() and and argparser() thats good enough for us. + try: + req = "main", "argparser" + if all(callable(getattr(mod, x)) for x in req): + components.append(name) + except Exception: + pass + return components + + +def cli(): + """Run workflow entry points.""" + parser = argparse.ArgumentParser( + 'wf-glue', + parents=[_log_level()], + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument( + '-v', '--version', action='version', + version='%(prog)s {}'.format(__version__)) + + subparsers = parser.add_subparsers( + title='subcommands', description='valid commands', + help='additional help', dest='command') + subparsers.required = True + + # all component demos, plus some others + components = [ + f'{_package_name}.{comp}' for comp in get_components()] + for module in components: + mod = importlib.import_module(module) + p = subparsers.add_parser( + module.split(".")[-1], parents=[mod.argparser()]) + p.set_defaults(func=mod.main) + + logger = get_main_logger(_package_name) + args = parser.parse_args() + + logger.info("Starting entrypoint.") + args.func(args) diff --git a/bin/workflow_glue/check_sample_sheet.py b/bin/workflow_glue/check_sample_sheet.py new file mode 100755 index 0000000..2bd1779 --- /dev/null +++ b/bin/workflow_glue/check_sample_sheet.py @@ -0,0 +1,93 @@ +"""Check if a sample sheet is valid.""" +import csv +import sys + +from .util import get_named_logger, wf_parser # noqa: ABS101 + + +def main(args): + """Run the entry point.""" + logger = get_named_logger("checkSheet") + + barcodes = [] + aliases = [] + sample_types = [] + allowed_sample_types = [ + "test_sample", "positive_control", "negative_control", "no_template_control" + ] + + try: + with open(args.sample_sheet, "r") as f: + csv_reader = csv.DictReader(f) + n_row = 0 + for row in csv_reader: + n_row += 1 + if n_row == 1: + n_cols = len(row) + else: + # check we got the same number of fields + if len(row) != n_cols: + raise ValueError( + f"Unexpected number of cells in row number {n_row}." + ) + try: + barcodes.append(row["barcode"]) + except KeyError: + sys.stdout.write("'barcode' column missing") + sys.exit() + try: + aliases.append(row["alias"]) + except KeyError: + sys.stdout.write("'alias' column missing") + sys.exit() + try: + sample_types.append(row["type"]) + except KeyError: + pass + except Exception as e: + sys.stdout.write(f"Parsing error: {e}") + sys.exit() + + # check barcode and alias values are unique + if len(barcodes) > len(set(barcodes)): + sys.stdout.write("values in 'barcode' column not unique") + sys.exit() + if len(aliases) > len(set(aliases)): + sys.stdout.write("values in 'alias' column not unique") + sys.exit() + + if sample_types: + # check if "type" column has unexpected values + unexp_type_vals = set(sample_types) - set(allowed_sample_types) + + if unexp_type_vals: + sys.stdout.write( + f"found unexpected values in 'type' column: {unexp_type_vals}. " + f"Allowed values are: {allowed_sample_types}" + ) + sys.exit() + + if args.required_sample_types: + for required_type in args.required_sample_types: + if required_type not in allowed_sample_types: + sys.stdout.write(f"Not an allowed sample type: {required_type}") + sys.exit() + if sample_types.count(required_type) < 1: + sys.stdout.write( + f"Sample sheet requires at least 1 of {required_type}") + sys.exit() + + logger.info(f"Checked sample sheet {args.sample_sheet}.") + + +def argparser(): + """Argument parser for entrypoint.""" + parser = wf_parser("check_sample_sheet") + parser.add_argument("sample_sheet", help="Sample sheet to check") + parser.add_argument( + "--required_sample_types", + help="List of required sample types. Each sample type provided must " + "appear at least once in the sample sheet", + nargs="*" + ) + return parser diff --git a/bin/report.py b/bin/workflow_glue/report.py similarity index 98% rename from bin/report.py rename to bin/workflow_glue/report.py index c429a8e..0f728c9 100755 --- a/bin/report.py +++ b/bin/workflow_glue/report.py @@ -1,7 +1,6 @@ #!/usr/bin/env python """Create workflow report.""" -import argparse import math from aplanat import bars @@ -15,6 +14,8 @@ import pysam import vcf +from .util import wf_parser # noqa: ABS101 + def load_fasta(reference): """Load reference data.""" @@ -217,9 +218,9 @@ def make_assembly_summary(bed, report): section.plot(p) -def main(): - """Run the entry point.""" - parser = argparse.ArgumentParser() +def argparser(): + """Argument parser for entrypoint.""" + parser = wf_parser("report") parser.add_argument("report", help="Report output file") parser.add_argument("summaries", nargs='+', help="Read summary file.") parser.add_argument( @@ -246,8 +247,11 @@ def main(): parser.add_argument( "--commit", default='unknown', help="git commit of the executed workflow") - args = parser.parse_args() + return parser + +def main(args): + """Run the entry point.""" report = WFReport( "wf-mpx Sequencing Report", "wf-mpx", revision=args.revision, commit=args.commit) @@ -291,7 +295,3 @@ def main(): # write report report.write(args.report) - - -if __name__ == "__main__": - main() diff --git a/bin/workflow_glue/tests/__init__.py b/bin/workflow_glue/tests/__init__.py new file mode 100755 index 0000000..71bd80f --- /dev/null +++ b/bin/workflow_glue/tests/__init__.py @@ -0,0 +1 @@ +"""__init__.py for the tests.""" diff --git a/bin/workflow_glue/tests/test_test.py b/bin/workflow_glue/tests/test_test.py new file mode 100755 index 0000000..496bc2a --- /dev/null +++ b/bin/workflow_glue/tests/test_test.py @@ -0,0 +1,10 @@ +"""A dummy test.""" + +import argparse + +from workflow_glue import report + + +def test(): + """Just showing that we can import using the workflow-glue.""" + assert isinstance(report.argparser(), argparse.ArgumentParser) diff --git a/bin/workflow_glue/util.py b/bin/workflow_glue/util.py new file mode 100755 index 0000000..9359ac7 --- /dev/null +++ b/bin/workflow_glue/util.py @@ -0,0 +1,52 @@ +"""The odd helper function.""" + +import argparse +import logging + +_log_name = None + + +def get_main_logger(name): + """Create the top-level logger.""" + global _log_name + _log_name = name + logging.basicConfig( + format='[%(asctime)s - %(name)s] %(message)s', + datefmt='%H:%M:%S', level=logging.INFO) + return logging.getLogger(name) + + +def get_named_logger(name): + """Create a logger with a name. + + :param name: name of logger. + """ + name = name.ljust(10)[:10] # so logging is aligned + logger = logging.getLogger('{}.{}'.format(_log_name, name)) + return logger + + +def wf_parser(name): + """Make an argument parser for a workflow command.""" + return argparse.ArgumentParser( + name, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + add_help=False) + + +def _log_level(): + """Parser to set logging level and acquire software version/commit.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) + + modify_log_level = parser.add_mutually_exclusive_group() + modify_log_level.add_argument( + '--debug', action='store_const', + dest='log_level', const=logging.DEBUG, default=logging.INFO, + help='Verbose logging of debug information.') + modify_log_level.add_argument( + '--quiet', action='store_const', + dest='log_level', const=logging.WARNING, default=logging.INFO, + help='Minimal logging; warnings only.') + + return parser diff --git a/data/.DS_Store b/data/.DS_Store deleted file mode 100644 index aa9e2fd64188b69ce3affde22954984c7067092e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK%}N6?5Kh{v>5ABcpx$!v)`iRfJNG+Dv#@a#2C?f0oxUpe+Z{-`y6}T`nAF2T(2mqN zjt&Ts7x`vkdpNA@Rcb|9t(8VaIXtM8igLHQKN{u5=1%$Wq;>!FJbW3wzG;d%e2q%h z4UXX*jJd0QboxOr4DKL4t{vA95(C5lF|c?Hn3GPdF5c2;MZ^Fx@Jj~pe2}1sw!u`R zIy#`i-$#t+5K+L!w*;azXd6s5LIi~CQb1kG%@c#`a_~!&XB$j4>T<@_%rK6bnVT01 zSF?j(s&vL}jnonY#K1HIS=}w*{eS%P`+vHKdc*)Ruuu%}T*GP9VM*q0omm{-wGy-i qih_Bm#z_en>MDj@yo$F#m4IKO0caacHG&6(egqT^)DQ!I%D@+tO;k1j diff --git a/lib/Pinguscript.groovy b/lib/Pinguscript.groovy index e9d0751..a893263 100644 --- a/lib/Pinguscript.groovy +++ b/lib/Pinguscript.groovy @@ -8,7 +8,7 @@ class Pinguscript { def msgId = UUID.randomUUID().toString() def hosthash = null try { - hosthash = InetAddress.getLocalHost().getHostName().md5() + hosthash = InetAddress.getLocalHost().getHostName() } catch(Exception e) { hosthash = "Unavailable" } @@ -32,7 +32,7 @@ class Pinguscript { def meta = meta_json "error": errorMessage.toString(), "profile": profile.toString(), "agent": agent.toString() meta+=any_other_data - def ping_version = '2.0.1' + def ping_version = '2.0.2' def tracking_json = new JsonBuilder() def tracking_id = tracking_json "msg_id": msgId, "version": ping_version def data_json = new JsonBuilder() diff --git a/lib/fastqingress.nf b/lib/fastqingress.nf index d059ca1..6fefbce 100644 --- a/lib/fastqingress.nf +++ b/lib/fastqingress.nf @@ -1,411 +1,444 @@ -import ArgumentParser - -// Downstream tooling assumes FASTQ files are nicely organised into directories. -// In the case where a single FASTQ file has been input and the parent directory -// contains other valid FASTQ, we will create a directory in the work area to -// hold it instead. We stage the file in with `copy` (rather than `link`) -// to ensure that when the new dir is mounted to containers downstream it does -// not contain a symlink that cannot be read. -// See CW-1154 -process isolateSingleFile { - label params.process_label - stageInMode 'copy' - cpus 1 - input: - file reads - output: - path "$reads.simpleName" - script: - def name = reads.simpleName - """ - mkdir $name - mv $reads $name - """ -} +import java.nio.file.NoSuchFileException +import ArgumentParser -process checkSampleSheet { - label params.process_label - cpus 1 - input: - file "sample_sheet.txt" - output: - file "samples.txt" - """ - check_sample_sheet.py sample_sheet.txt samples.txt - """ -} +EXTENSIONS = ["fastq", "fastq.gz", "fq", "fq.gz"] /** - * Compare number of samples in samplesheet - * with the number of barcoded dirs found and - * print warnings - * + * Take a map of input arguments, find valid inputs, and return a channel + * with elements of `[metamap, seqs.fastq.gz, path-to-fastcat-stats]`. + * The last item is `null` if `fastcat` was not run. It is only run on directories + * containing more than one FASTQ file or when `fastcat_stats: true`. * - * @param number of samples in sample sheet - * @param number of barcoded directories - * @return null + * @param arguments: map with arguments containing + * - "input": path to either: (i) input FASTQ file, (ii) top-level directory containing + * FASTQ files, (iii) directory containing sub-directories which contain FASTQ + * files + * - "sample": string to name single sample + * - "sample_sheet": path to CSV sample sheet + * - "analyse_unclassified": boolean whether to keep unclassified reads + * - "fastcat_stats": boolean whether to write the `fastcat` stats + * @return Channel of `[Map(alias, barcode, type, ...), Path, Path|null]`. + * The first element is a map with metadata, the second is the path to the + * `.fastq.gz` file with the (potentially concatenated) sequences and the third is + * the path to the directory with the fastcat statistics (or `null` if `fastcat` + * wasn't run). */ - -def compareSampleSheetFastq(int sample_sheet_count, int valid_dir_count) +def fastq_ingress(Map arguments) { - - if (sample_sheet_count != valid_dir_count) { - log.warn "The number of samplesheet entries ({}) does not match the number of barcoded directories ({})", sample_sheet_count, valid_dir_count + // check arguments + Map margs = parse_arguments(arguments) + // define the channel for holding the inputs [metamap, input_path]. It will be + // either filled by `watchPath` (only emitting files) or by the data of the three + // input types (single file or dir with fastq or subdirs with fastq). + def ch_input + // handle `watchPath` case + if (margs["watch_path"]) { + ch_input = watch_path(margs) + } else { + // create a channel with the inputs (single file / dir with fastq / subdirs + // with fastq) + ch_input = get_valid_inputs(margs) } - -} - -/** - * Take an input file and sample name to return a channel with - * a single named sample. If the input file is in a directory with other valid - * input files (or other directories containing valid files), a copy of it will - * be made to the working directory using the isolateSingleFile process. - * - * - * @param input_file Single fastq file - * @param sample_name Name to give the sample - * @return Channel of tuples (path, map(sample_id, type, barcode)) - */ -def handle_single_file(input_file, sample_name) -{ - singleFile = Channel.fromPath(input_file) - ArrayList valid_files_in_dir = find_fastq(input_file.parent, true) - if (valid_files_in_dir.size() == 1) { - // Avoid a stageInMode copy if the parent directory contains only one valid FASTQ anyway - return singleFile.map { it -> tuple(it.parent, create_metamap([sample_id:sample_name ?: it.simpleName])) } + // `ch_input` might contain elements of `[metamap, null]` if there were entries in + // the sample sheet for which no FASTQ files were found. We put these into an extra + // channel and combine with the result channel before returning. + ch_input = ch_input.branch { meta, path -> + reads_found: path as boolean + no_reads_found: true } - else { - // Isolate the file via copy with isolateSingleFile - sample = isolateSingleFile(singleFile) - return sample.map { it -> tuple(it, create_metamap([sample_id:sample_name ?: it.simpleName])) } + def ch_result + if (margs.fastcat_stats) { + // run fastcat regardless of input type + ch_result = fastcat(ch_input.reads_found, margs["fastcat_extra_args"]) + } else { + // the fastcat stats were not requested --> run fastcat only on directories with + // more than one FASTQ file (and not on single files or directories with a + // single file) + def ch_branched = ch_input.reads_found.map {meta, path -> + // find directories with only a single FASTQ file and "unwrap" the file + if (path.isDirectory()) { + List fq_files = get_fq_files_in_dir(path) + if (fq_files.size() == 1) { + path = fq_files[0] + } + } + [meta, path] + } .branch { meta, path -> + // now there can only be two cases: + // (i) single FASTQ file (pass to `move_or_compress` later) + // (ii) dir with multiple fastq files (pass to `fastcat` later) + single_file: path.isFile() + dir_with_fastq_files: true + } + // call the respective processes on both branches and return + ch_result = fastcat( + ch_branched.dir_with_fastq_files, margs["fastcat_extra_args"] + ).concat( + ch_branched.single_file | move_or_compress | map { + meta, path -> [meta, path, null] + } + ) } + return ch_result.concat(ch_input.no_reads_found.map { [*it, null] }) } /** - * Find fastq data using various globs. Wrapper around Nextflow `file` - * method. + * Run `watchPath` on the input directory and return a channel [metamap, path-to-fastq]. + * The meta data is taken from the sample sheet in case one was provided. Otherwise it + * only contains the `alias` (either `margs["sample"]` or the name of the parent + * directory of the file). * - * @param pattern file object corresponding to top level input folder. - * @param search_subdirs boolean flag to search subdirectories of pattern - * @return list of files. + * @param margs: map with parsed input arguments + * @return: Channel of [metamap, path-to-fastq] */ - -def find_fastq(pattern, search_subdirs) -{ - ArrayList files = [] - ArrayList extensions = ["fastq", "fastq.gz", "fq", "fq.gz"] - for (ext in extensions) { - if (search_subdirs) { - files += file(pattern.resolve("**.${ext}"), type: 'file') +def watch_path(Map margs) { + // we have two cases to consider: (i) files being generated in the top-level + // directory and (ii) files being generated in sub-directories. If we find files of + // both kinds, throw an error. + Path input + try { + input = file(margs.input, checkIfExists: true) + } catch (NoSuchFileException e) { + error "Input path $margs.input does not exist." + } + if (input.isFile()) { + error "Input ($input) must be a directory when using `watch_path`." + } + // get existing FASTQ files first (look for relevant files in the top-level dir and + // all sub-dirs) + def ch_existing_input = Channel.fromPath(input) + | concat(Channel.fromPath("$input/*", type: 'dir')) + | map { get_fq_files_in_dir(it) } + | flatten + // now get channel with files found by `watchPath` + def ch_watched = Channel.watchPath("$input/**").until { it.name.startsWith('STOP') } + // only keep FASTQ files + | filter { + for (ext in EXTENSIONS) { + if (it.name.endsWith(ext)) return true } - else { - files += file(pattern.resolve("*.${ext}"), type: 'file') + return false + } + // merge the channels + ch_watched = ch_existing_input | concat(ch_watched) + // check if input is as expected; start by throwing an error when finding files in + // top-level dir and sub-directories + String prev_input_type + ch_watched + | map { + String input_type = (it.parent == input) ? "top-level" : "sub-dir" + if (prev_input_type && (input_type != prev_input_type)) { + error "`watchPath` found FASTQ files in the top-level directory " + + "as well as in sub-directories." + } + // if file is in a sub-dir, make sure it's not a sub-sub-dir + if ((input_type == "sub-dir") && (it.parent.parent != input)) { + error "`watchPath` found a FASTQ file more than one level of " + + "sub-directories deep ('$it')." + } + // we also don't want files in the top-level dir when we got a sample sheet + if ((input_type == "top-level") && margs["sample_sheet"]) { + error "`watchPath` found files in top-level directory even though a " + + "sample sheet was provided ('${margs["sample_sheet"]}')." + } + prev_input_type = input_type + } + if (margs.sample_sheet) { + // add metadata from sample sheet (we can't use join here since it does not work + // with repeated keys; we therefore need to transform the sample sheet data into + // a map with the barcodes as keys) + def ch_sample_sheet = get_sample_sheet(file(margs.sample_sheet), margs.required_sample_types) + | collect + | map { it.collectEntries { [(it["barcode"]): it] } } + // now we can use this channel to annotate all files with the corresponding info + // from the sample sheet + ch_watched = ch_watched + | combine(ch_sample_sheet) + | map { file_path, sample_sheet_map -> + String barcode = file_path.parent.name + Map meta = sample_sheet_map[barcode] + // throw error if the barcode was not in the sample sheet + if (!meta) { + error "Sub-directory $barcode was not found in the sample sheet." + } + [meta, file_path] + } + } else { + ch_watched = ch_watched + | map { + // This file could be in the top-level dir or a sub-dir. In the first case + // check if a sample name was provided. In the second case, the alias is + // always the name of the sub-dir. + String alias + if (it.parent == input) { + // top-level dir + alias = margs["sample"] ?: it.parent.name + } else { + // sub-dir + alias = it.parent.name + } + [create_metamap([alias: alias]), it] } } - return files + return ch_watched } -/** - * Take an input directory return the barcode and non barcode - * sub directories contained within. - * - * - * @param input_directory Top level input folder to locate sub directories - * @param unclassified Keep unclassified directory - * - * @return A list containing sublists of barcode and non_barcode sub directories - */ -def get_subdirectories(input_directory, unclassified) -{ - barcode_dirs = file(input_directory.resolve("barcode*"), type: 'dir', maxdepth: 1) - all_dirs = file(input_directory.resolve("*"), type: 'dir', maxdepth: 1) - if (!unclassified) { - all_dirs.removeIf(it -> it.SimpleName.toLowerCase() == "unclassified") - } - non_barcoded = (all_dirs + barcode_dirs) - all_dirs.intersect(barcode_dirs) - - return [barcode_dirs, non_barcoded] +process move_or_compress { + label params.process_label + cpus params.threads + input: + tuple val(meta), path(input) + output: + tuple val(meta), path("seqs.fastq.gz") + script: + String out = "seqs.fastq.gz" + if (input.name.endsWith('.gz')) { + // we need to take into account that the file could already be named + // "seqs.fastq.gz" in which case `mv` would fail + """ + [ "$input" == "$out" ] || mv $input $out + """ + } else { + """ + cat $input | bgzip -@ $task.cpus > $out + """ + } +} + + +process fastcat { + label params.process_label + cpus params.threads + input: + tuple val(meta), path(input) + val extra_args + output: + tuple val(meta), path("seqs.fastq.gz"), path("fastcat_stats") + script: + String out = "seqs.fastq.gz" + String fastcat_stats_outdir = "fastcat_stats" + """ + mkdir $fastcat_stats_outdir + fastcat \ + -s ${meta["alias"]} \ + -r $fastcat_stats_outdir/per-read-stats.tsv \ + -f $fastcat_stats_outdir/per-file-stats.tsv \ + $extra_args \ + $input \ + | bgzip -@ $task.cpus > $out + """ } /** - * Load a sample sheet into a Nextflow channel to map barcodes - * to sample names. + * Parse input arguments for `fastq_ingress`. * - * @param samples CSV file according to MinKNOW sample sheet specification - * @return A Nextflow Channel of tuples (barcode, sample name, sample type) + * @param arguments: map with input arguments (see `fastq_ingress` for details) + * @return: map of parsed arguments */ -def get_sample_sheet(sample_sheet) -{ - log.info "Checking sample sheet." - sample_sheet = file(sample_sheet); - is_file = sample_sheet.isFile() - - if (!is_file) { - log.error "`--samples` is not a file." - exit 1 - } - - return checkSampleSheet(sample_sheet) - .splitCsv(header: true) - .map { row -> tuple( - row.barcode, - row.sample_id, - row.type ? row.type : 'test_sample') - } +Map parse_arguments(Map arguments) { + ArgumentParser parser = new ArgumentParser( + args:["input"], + kwargs:["sample": null, + "sample_sheet": null, + "analyse_unclassified": false, + "fastcat_stats": false, + "fastcat_extra_args": "", + "required_sample_types": [], + "watch_path": false], + name: "fastq_ingress") + return parser.parse_args(arguments) } /** - * Take a list of input directories and return directories which are - * valid, i.e. contains only .fastq(.gz) files. + * Find valid inputs based on the input type. * - * - * @param input_dirs List of barcoded directories (barcodeXX,mydir...) - * @return List of valid directories + * @param margs: parsed arguments (see `fastq_ingress` for details) + * @return: channel of `[metamap, input-path]`; `input-path` can be the path to + * a single FASTQ file or to a directory containing FASTQ files */ -def get_valid_directories(input_dirs) -{ - valid_dirs = [] - no_fastq_dirs = [] - invalid_files_dirs = [] - for (d in input_dirs) { - valid = true - fastq = find_fastq(d, false) - all_files = file(d.resolve("*"), type: 'file', maxdepth: 1) - non_fastq = ( all_files + fastq ) - all_files.intersect(fastq) - - if (non_fastq) { - valid = false - invalid_files_dirs << d - } - if (!fastq) { - valid = false - no_fastq_dirs << d - } - if (valid) { - valid_dirs << d - } - } - if (valid_dirs.size() == 0) { - log.error "None of the directories given contain .fastq(.gz) files." - exit 1 +def get_valid_inputs(Map margs){ + log.info "Checking fastq input." + Path input + try { + input = file(margs.input, checkIfExists: true) + } catch (NoSuchFileException e) { + error "Input path $margs.input does not exist." } - if (no_fastq_dirs.size() > 0) { - log.warn "Excluding directories not containing .fastq(.gz) files:" - for (d in no_fastq_dirs) { - log.warn " - ${d}" + // declare resulting input channel and other variables needed in the outer scope + def ch_input + ArrayList sub_dirs_with_fastq_files + // handle case of `input` being a single file + if (input.isFile()) { + // the `fastcat` process can deal with directories or single file inputs + ch_input = Channel.of( + [create_metamap([alias: margs["sample"] ?: input.simpleName]), input]) + } else if (input.isDirectory()) { + // input is a directory --> we accept two cases: (i) a top-level directory with + // fastq files and no sub-directories or (ii) a directory with one layer of + // sub-directories containing fastq files + boolean dir_has_fastq_files = get_fq_files_in_dir(input) + // find potential sub-directories (and sub-dirs with FASTQ files; note that + // these lists can be empty) + ArrayList sub_dirs = file(input.resolve('*'), type: "dir") + sub_dirs_with_fastq_files = sub_dirs.findAll { get_fq_files_in_dir(it) } + // deal with first case (top-lvl dir with FASTQ files and no sub-directories + // containing FASTQ files) + if (dir_has_fastq_files) { + if (sub_dirs_with_fastq_files) { + error "Input directory '$input' cannot contain FASTQ " + + "files and sub-directories with FASTQ files." + } + ch_input = Channel.of( + [create_metamap([alias: margs["sample"] ?: input.baseName]), input]) + } else { + // deal with the second case (sub-directories with fastq data) --> first + // check whether we actually found sub-directories + if (!sub_dirs_with_fastq_files) { + error "Input directory '$input' must contain either FASTQ files " + + "or sub-directories containing FASTQ files." + } + // make sure that there are no sub-sub-directories with FASTQ files and that + // the sub-directories actually contain fastq files) + if (sub_dirs.any { + ArrayList subsubdirs = file(it.resolve('*'), type: "dir") + subsubdirs.any { get_fq_files_in_dir(it) } + }) { + error "Input directory '$input' cannot contain more " + + "than one level of sub-directories with FASTQ files." + } + // remove directories called 'unclassified' unless otherwise specified + if (!margs.analyse_unclassified) { + sub_dirs_with_fastq_files = sub_dirs_with_fastq_files.findAll { + it.baseName != "unclassified" + } + } + // filter based on sample sheet in case one was provided + if (margs.sample_sheet) { + // get channel of entries in the sample sheet + def ch_sample_sheet = get_sample_sheet(file(margs.sample_sheet), margs.required_sample_types) + // get the union of both channels (missing values will be replaced with + // `null`) + def ch_union = Channel.fromPath(sub_dirs_with_fastq_files).map { + [it.baseName, it] + }.join(ch_sample_sheet.map{[it.barcode, it]}, remainder: true) + // after joining the channels, there are three possible cases: + // (i) valid input path and sample sheet entry are both present + // (ii) there is a sample sheet entry but no corresponding input dir + // --> we'll emit `[metamap-from-sample-sheet-entry, null]` + // (iii) there is a valid path, but the sample sheet entry is missing + // --> drop this entry and print a warning to the log + ch_input = ch_union.map {barcode, path, sample_sheet_entry -> + if (sample_sheet_entry) { + [create_metamap(sample_sheet_entry), path] + } else { + log.warn "Input directory '$barcode' was found, but sample " + + "sheet '$margs.sample_sheet' has no such entry." + } + } + } else { + ch_input = Channel.fromPath(sub_dirs_with_fastq_files).map { + [create_metamap([alias: it.baseName, barcode: it.baseName]), it] + } + } } + } else { + error "Input $input appears to be neither a file nor a directory." } - if (invalid_files_dirs.size() > 0) { - log.warn "Excluding directories containing non .fastq(.gz) files:" - for (d in invalid_files_dirs) { - log.warn " - ${d}" - } + // a sample sheet only makes sense in the case of a directory with + // sub-directories + if (margs.sample_sheet && !sub_dirs_with_fastq_files) { + error "Sample sheet was provided, but input does not contain " + + "sub-directories with FASTQ files." } - return valid_dirs + return ch_input } /** - * Take an input directory and sample name to return a channel - * with a single named sample. + * Create a map that contains at least these keys: `[alias, barcode, type]`. + * `alias` is required, `barcode` and `type` are filled with default values if + * missing. Additional entries are allowed. * - * - * @param input_directory Directory of fastq files - * @param sample_name Name to give the sample - * @return Channel of tuples (path, map(sample_id, type, barcode)) + * @param kwargs: map with input parameters; must contain `alias` + * @return: map(alias, barcode, type, ...) */ -def handle_flat_dir(input_directory, sample_name) -{ - valid_dirs= get_valid_directories([ file(input_directory) ]) - return Channel.fromPath(valid_dirs) - .map { it -> tuple(it, create_metamap([sample_id:sample_name ?: it.baseName])) } - +Map create_metamap(Map arguments) { + ArgumentParser parser = new ArgumentParser( + args: ["alias"], + kwargs: [ + "barcode": null, + "type": "test_sample", + ], + name: "create_metamap", + ) + return parser.parse_known_args(arguments) } /** - * Take a list of barcode directories and a sample sheet to return - * a channel of named samples. + * Get the fastq files in the directory (non-recursive). * - * - * @param barcoded_dirs List of barcoded directories (barcodeXX,...) - * @param sample_sheet List of tuples mapping barcode to sample name - * or a simple string for non-multiplexed data. - * @param min_barcode Minimum barcode to accept. - * @param max_barcode Maximum (inclusive) barcode to accept. - * @return Channel of tuples (path, map(sample_id, type, barcode)) + * @param dir: path to the target directory + * @return: list of found fastq files */ -def handle_barcoded_dirs(barcoded_dirs, sample_sheet, min_barcode, max_barcode) -{ - valid_dirs = get_valid_directories(barcoded_dirs) - // link sample names to barcode through sample sheet - if (!sample_sheet) { - sample_sheet = Channel - .fromPath(valid_dirs) - .filter(~/.*barcode[0-9]{1,3}$/) // up to 192 - .filter { barcode_in_range(it, min_barcode, max_barcode) } - .map { path -> tuple(path.baseName, path.baseName, 'test_sample')} - } else { - - // return warning if there is a discrepancy between the samplesheet and barcode dirs - - // unclassfied will never be in the sample_sheet so remove - non_unclassified = valid_dirs - non_unclassified -= 'unclassified' - - barcode_dirs_found = non_unclassified.size() - - int count = 0 - - // We do this instead of .count() because valid_dirs is a list and - // sample_sheet is a channel - the channel is only populated after - // checkSampleSheet is complete and so if you compare without - // waiting for that then the comparisson fails - sample_sheet_entries = sample_sheet.subscribe onNext: { count++ }, onComplete: { compareSampleSheetFastq(count,barcode_dirs_found) } - - } - - return Channel - .fromPath(valid_dirs) - .filter(~/.*barcode[0-9]{1,3}$/) // up to 192 - .filter { barcode_in_range(it, min_barcode, max_barcode) } - .map { path -> tuple(path.baseName, path) } - .join(sample_sheet) - .map { barcode, path, sample, type -> tuple(path, create_metamap([sample_id:sample, type:type, barcode:barcode])) } +ArrayList get_fq_files_in_dir(Path dir) { + return EXTENSIONS.collect { file(dir.resolve("*.$it"), type: "file") } .flatten() } /** - * Determine if a barcode path is within a required numeric range + * Check the sample sheet and return a channel with its rows if it is valid. * - * @param path barcoded directory (barcodeXX). - * @param min_barcode Minimum barcode to accept. - * @param max_barcode Maximum (inclusive) barcode to accept. + * @param sample_sheet: path to the sample sheet CSV + * @return: channel of maps (with values in sample sheet header as keys) */ -def barcode_in_range(path, min_barcode, max_barcode) -{ - pattern = ~/barcode(\d+)/ - matcher = "${path}" =~ pattern - def value = null - try{ - value = matcher[0][1].toInteger() - }catch(ArrayIndexOutOfBoundsException ex){ - print("${path} is not a barcoded directory") +def get_sample_sheet(Path sample_sheet, ArrayList required_sample_types) { + // If `validate_sample_sheet` does not return an error message, we can assume that + // the sample sheet is valid and parse it. However, because of Nextflow's + // asynchronous magic, we might emit values from `.splitCSV()` before the + // error-checking closure finishes. This is no big deal, but undesired nonetheless + // as the error message might be overwritten by the traces of new nextflow processes + // in STDOUT. Thus, we use the somewhat clunky construct with `concat` and `last` + // below. This lets the CSV channel only start to emit once the error checking is + // done. + ch_err = validate_sample_sheet(sample_sheet, required_sample_types).map { + // check if there was an error message + if (it) error "Invalid sample sheet: ${it}." + it } - valid = ((value >= min_barcode) && (value <= max_barcode)) - return valid + // concat the channel holding the path to the sample sheet to `ch_err` and call + // `.last()` to make sure that the error-checking closure above executes before + // emitting values from the CSV + return ch_err.concat(Channel.fromPath(sample_sheet)).last().splitCsv( + header: true, quote: '"' + ) } /** - * Take a list of non-barcode directories to return a channel - * of named samples. Samples are named by directory baseName. + * Python script for validating a sample sheet. The script will write messages + * to STDOUT if the sample sheet is invalid. In case there are no issues, no + * message is emitted. * - * - * @param non_barcoded_dirs List of directories (mydir,...) - * @return Channel of tuples (path, map(sample_id, type, barcode)) + * @param: path to sample sheet CSV + * @param: list of required sample types (optional) + * @return: string (optional) */ -def handle_non_barcoded_dirs(non_barcoded_dirs) -{ - valid_dirs = get_valid_directories(non_barcoded_dirs) - return Channel.fromPath(valid_dirs) - .map { path -> tuple(path, create_metamap([sample_id:path.baseName])) } -} - -def create_metamap(Map arguments) { - def parser = new ArgumentParser( - args:["sample_id"], - kwargs:[ - "type": "test_sample", - "barcode": null, - ], - name:"create_metamap", - ) - return parser.parse_args(arguments) +process validate_sample_sheet { + label params.process_label + input: + path csv + val required_sample_types + output: stdout + script: + String req_types_arg = required_sample_types ? "--required_sample_types "+required_sample_types.join(" ") : "" + """ + workflow-glue check_sample_sheet $csv $req_types_arg + """ } -/** - * Take an input (file or directory) and return a channel of - * named samples. - * - * @param input Top level input file or folder to locate fastq data. - * @param sample string to name single sample data. - * @param sample_sheet Path to sample sheet CSV file. - * @param min_barcode Minimum barcode to accept. - * @param max_barcode Maximum (inclusive) barcode to accept. - * @param unclassified Keep unclassified reads. - * - * @return Channel of tuples (path, map(sample_id, type, barcode)) - */ -def fastq_ingress(Map arguments) -{ - def parser = new ArgumentParser( - args:["input"], - kwargs:[ - "sample":null, "sample_sheet":null, - "min_barcode":0, "max_barcode":Integer.MAX_VALUE, - "unclassified":false], - name:"fastq_ingress") - Map margs = parser.parse_args(arguments) - - - log.info "Checking fastq input." - input = file(margs.input) - - // Handle file input - if (input.isFile()) { - // Assume sample is a string at this point - log.info "Single file input detected." - if (margs.sample_sheet) { - log.warn "Warning: `--sample_sheet` given but single file input found. Ignoring." - } - return handle_single_file(input, margs.sample) - } - - // Handle directory input - if (input.isDirectory()) { - // Get barcoded and non barcoded subdirectories - (barcoded, non_barcoded) = get_subdirectories(input, margs.unclassified) - - // Case 03: If no subdirectories, handle the single dir - if (!barcoded && !non_barcoded) { - log.info "Single directory input detected." - if (margs.sample_sheet) { - log.warn "`--sample_sheet` given but single non-barcode directory found. Ignoring." - } - return handle_flat_dir(input, margs.sample) - } - - if (margs.sample) { - log.warn "`--sample` given but multiple directories found, ignoring." - } - - // Case 01, 02, 04: Handle barcoded and non_barcoded dirs - // Handle barcoded folders - barcoded_samples = Channel.empty() - if (barcoded) { - log.info "Barcoded directories detected." - sample_sheet = null - if (margs.sample_sheet) { - sample_sheet = get_sample_sheet(margs.sample_sheet) - } - barcoded_samples = handle_barcoded_dirs(barcoded, sample_sheet, margs.min_barcode, margs.max_barcode) - } - - non_barcoded_samples = Channel.empty() - if (non_barcoded) { - log.info "Non barcoded directories detected." - if (!barcoded && margs.sample_sheet) { - log.warn "Warning: `--sample_sheet` given but no barcode directories found." - } - non_barcoded_samples = handle_non_barcoded_dirs(non_barcoded) - } - - return barcoded_samples.mix(non_barcoded_samples) - } -} diff --git a/lib/ping.nf b/lib/ping.nf deleted file mode 100644 index abbfb75..0000000 --- a/lib/ping.nf +++ /dev/null @@ -1,36 +0,0 @@ - -process pingMessage { - label params.process_label - cpus 1 - input: - val message - path json - script: - hostname = InetAddress.getLocalHost().getHostName() - opsys = System.properties['os.name'].toLowerCase() - disable = params.disable_ping ? '--disable' : '' - meta = json.name != 'OPTIONAL_FILE' ? "--meta $json": '' - """ - ping.py \ - --hostname $hostname \ - --opsys "$opsys" \ - --session $workflow.sessionId \ - --message $message \ - $meta $disable - """ -} - - -// send a start message -workflow start_ping { - main: - pingMessage("Started", Channel.fromPath("$projectDir/data/OPTIONAL_FILE")) -} - -// send an end message -workflow end_ping { - take: - json - main: - pingMessage("Finished", json) -} diff --git a/main.nf b/main.nf index c32d779..5ce627f 100644 --- a/main.nf +++ b/main.nf @@ -15,21 +15,6 @@ nextflow.enable.dsl = 2 include { fastq_ingress } from './lib/fastqingress' - -process summariseReads { - label "wfmpx" - cpus 1 - input: - tuple path(directory), val(meta) - output: - tuple val(meta.sample_id), val(meta.type), path("${meta.sample_id}.fastq.gz"), path("${meta.sample_id}.stats"), emit: sample - shell: - """ - fastcat -s ${meta.sample_id} -r ${meta.sample_id}.stats -x ${directory} > ${meta.sample_id}.fastq - gzip ${meta.sample_id}.fastq - """ -} - process alignReads { label "wfmpx" cpus params.align_threads @@ -174,7 +159,7 @@ process makeReport { report_name = "wf-mpx-report.html" """ - report.py $report_name \ + workflow-glue report $report_name \ --versions versions \ seqs.txt \ --params params.json \ @@ -210,8 +195,8 @@ workflow pipeline { reference genbank main: - summary = summariseReads(reads) - alignment = alignReads(summary.sample, reference) + samples_for_processing = reads.map {it -> [it[0].alias, it[0].type, it[1], it[2]]} + alignment = alignReads(samples_for_processing, reference) coverage = coverageCalc(alignment.alignment, reference) variants = medakaVariants(alignment.alignment, reference, genbank) draft = makeConsensus(variants, reference, coverage) @@ -224,7 +209,7 @@ workflow pipeline { report = makeReport( - summary.map{it[3]}.collect(), + reads | map { it[2].resolve("per-read-stats.tsv") } | collectFile(keepHeader: true), software_versions.collect(), workflow_params, variants.map{it[2]}.collect(), @@ -234,7 +219,7 @@ workflow pipeline { ) emit: results = report.concat( - summary.map{it[3]}.collect(), + reads | map { it[2].resolve("per-read-stats.tsv") } | collectFile(keepHeader: true), alignment.alignment.map{it[2]}.collect(), alignment.alignment.map{it[3]}.collect(), variants.map{it[2]}.collect(), @@ -271,7 +256,8 @@ workflow { samples = fastq_ingress([ "input":params.fastq, "sample":params.sample, - "sample_sheet":null]) + "sample_sheet":null, + "fastcat_stats":true ]) pipeline(samples, params._reference, params._genbank) output(pipeline.out.results) diff --git a/nextflow.config b/nextflow.config index 3fe175d..8e72b42 100644 --- a/nextflow.config +++ b/nextflow.config @@ -16,7 +16,6 @@ params { fastq = null out_dir = "output" sample = null - wfversion = "v0.0.6" align_threads = 4 assembly_threads = 4 assembly = true @@ -33,7 +32,7 @@ params { monochrome_logs = false validate_params = true show_hidden_params = false - schema_ignore_params = 'show_hidden_params,validate_params,monochrome_logs,aws_queue,aws_image_prefix,wfversion,wf,process_label' + schema_ignore_params = 'show_hidden_params,validate_params,monochrome_logs,aws_queue,aws_image_prefix,wf,process_label' wf { example_cmd = [ @@ -51,7 +50,7 @@ manifest { description = 'Mpox metagenomics assembly.' mainScript = 'main.nf' nextflowVersion = '>=20.10.0' - version = '0.0.6' + version = 'v0.0.7' } epi2melabs { @@ -129,13 +128,16 @@ profiles { timeline { enabled = true + overwrite = true file = "${params.out_dir}/execution/timeline.html" } report { enabled = true + overwrite = true file = "${params.out_dir}/execution/report.html" } trace { enabled = true + overwrite = true file = "${params.out_dir}/execution/trace.txt" } diff --git a/nextflow_schema.json b/nextflow_schema.json index db48f14..fa5200f 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -4,6 +4,7 @@ "title": "epi2me-labs/wf-mpx", "description": "Mpox metagenomics assembly.", "demo_url": "https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-mpx/wf-mpx-demo.tar.gz", + "aws_demo_url": "https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-mpx/wf-mpx-demo/aws.nextflow.config", "url": "https://github.com/epi2me-labs/wf-mpx", "type": "object", "definitions": { @@ -167,11 +168,6 @@ "type": "string", "hidden": true }, - "wfversion": { - "type": "string", - "default": "v0.0.6", - "hidden": true - }, "_reference": { "type": "string", "hidden": true