Skip to content

Commit

Permalink
Merge branch 'CW-2465-update_template' into 'dev'
Browse files Browse the repository at this point in the history
Update config/template

Closes CW-2465

See merge request epi2melabs/workflows/wf-bacterial-genomes!80
  • Loading branch information
Christopher Alder committed Jul 25, 2023
2 parents 90cd21d + 3236263 commit e223e1b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 34 deletions.
12 changes: 11 additions & 1 deletion bin/workflow_glue/__init__.py
Expand Up @@ -13,13 +13,23 @@

def get_components():
"""Find a list of workflow command scripts."""
logger = get_main_logger(_package_name)
path = os.path.dirname(os.path.abspath(__file__))
components = list()
for fname in glob.glob(os.path.join(path, "*.py")):
name = os.path.splitext(os.path.basename(fname))[0]
if name in ("__init__", "util"):
continue
mod = importlib.import_module(f"{_package_name}.{name}")

# leniently attempt to import module
try:
mod = importlib.import_module(f"{_package_name}.{name}")
except ModuleNotFoundError as e:
# if imports cannot be satisifed, refuse to add the component
# rather than exploding
logger.warn(f"Could not load {name} due to missing module {e.name}")
continue

# if theres a main() and and argparser() thats good enough for us.
try:
req = "main", "argparser"
Expand Down
34 changes: 32 additions & 2 deletions bin/workflow_glue/check_sample_sheet.py
@@ -1,10 +1,35 @@
"""Check if a sample sheet is valid."""
import codecs
import csv
import os
import sys

from .util import get_named_logger, wf_parser # noqa: ABS101


# Some Excel users save their CSV as UTF-8 (and occasionally for a reason beyond my
# comprehension, UTF-16); Excel then adds a byte order mark (unnecessarily for UTF-8
# I should add). If we do not handle this with the correct encoding, the mark will
# appear in the parsed data, causing the header to be malformed.
# See CW-2310
def determine_codec(f):
"""Peek at a file and return an appropriate reading codec."""
with open(f, 'rb') as f_bytes:
# Could use chardet here if we need to expand codec support
initial_bytes = f_bytes.read(8)

for codec, encoding_name in [
[codecs.BOM_UTF8, "utf-8-sig"], # use the -sig codec to drop the mark
[codecs.BOM_UTF16_BE, "utf-16"], # don't specify LE or BE to drop mark
[codecs.BOM_UTF16_LE, "utf-16"],
[codecs.BOM_UTF32_BE, "utf-32"], # handle 32 for completeness
[codecs.BOM_UTF32_LE, "utf-32"], # again skip LE or BE to drop mark
]:
if initial_bytes.startswith(codec):
return encoding_name
return None # will cause file to be opened with default encoding


def main(args):
"""Run the entry point."""
logger = get_named_logger("checkSheet")
Expand All @@ -14,10 +39,15 @@ def main(args):
sample_types = []
allowed_sample_types = [
"test_sample", "positive_control", "negative_control", "no_template_control"
]
]

if not os.path.exists(args.sample_sheet) or not os.path.isfile(args.sample_sheet):
sys.stdout.write(f"Could not open sample sheet '{args.sample_sheet}'.")
sys.exit()

try:
with open(args.sample_sheet, "r") as f:
encoding = determine_codec(args.sample_sheet)
with open(args.sample_sheet, "r", encoding=encoding) as f:
csv_reader = csv.DictReader(f)
n_row = 0
for row in csv_reader:
Expand Down
42 changes: 30 additions & 12 deletions lib/fastqingress.nf
Expand Up @@ -50,7 +50,17 @@ def fastq_ingress(Map arguments)
def ch_result
if (margs.fastcat_stats) {
// run fastcat regardless of input type
ch_result = fastcat(ch_input.reads_found, margs["fastcat_extra_args"])
ch_result = fastcat(ch_input.reads_found, margs["fastcat_extra_args"]).map {
meta, reads, stats ->
// extract run_ids parsed by fastcat into metadata
ArrayList run_ids = stats.resolve("run_ids").splitText().collect {
it.strip()
}
// `meta + [...]` returns a new map which is handy to avoid any
// modifying-maps-in-closures weirdness
// See https://github.com/nextflow-io/nextflow/issues/2660
[meta + [run_ids: run_ids], reads, stats]
}
} else {
// the fastcat stats were not requested --> run fastcat only on directories with
// more than one FASTQ file (and not on single files or directories with a
Expand Down Expand Up @@ -187,9 +197,11 @@ def watch_path(Map margs) {


process move_or_compress {
label params.process_label
label "fastq_ingress"
label "wf_common"
cpus 1
input:
// don't stage `input` with a literal because we check the file extension
tuple val(meta), path(input)
output:
tuple val(meta), path("seqs.fastq.gz")
Expand All @@ -199,21 +211,22 @@ process move_or_compress {
// we need to take into account that the file could already be named
// "seqs.fastq.gz" in which case `mv` would fail
"""
[ "$input" == "$out" ] || mv $input $out
[ "$input" == "$out" ] || mv "$input" $out
"""
} else {
"""
cat $input | bgzip -@ $task.cpus > $out
cat "$input" | bgzip -@ $task.cpus > $out
"""
}
}


process fastcat {
label params.process_label
label "fastq_ingress"
label "wf_common"
cpus 3
input:
tuple val(meta), path(input)
tuple val(meta), path("input")
val extra_args
output:
tuple val(meta), path("seqs.fastq.gz"), path("fastcat_stats")
Expand All @@ -227,8 +240,9 @@ process fastcat {
-r $fastcat_stats_outdir/per-read-stats.tsv \
-f $fastcat_stats_outdir/per-file-stats.tsv \
$extra_args \
$input \
input \
| bgzip -@ $task.cpus > $out
csvtk cut -tf runid $fastcat_stats_outdir/per-read-stats.tsv | csvtk del-header | sort | uniq > $fastcat_stats_outdir/run_ids
"""
}

Expand Down Expand Up @@ -373,10 +387,13 @@ Map create_metamap(Map arguments) {
kwargs: [
"barcode": null,
"type": "test_sample",
"run_ids": [],
],
name: "create_metamap",
)
return parser.parse_known_args(arguments)
def metamap = parser.parse_known_args(arguments)
metamap['alias'] = metamap['alias'].replaceAll(" ","_")
return metamap
}


Expand Down Expand Up @@ -430,15 +447,16 @@ def get_sample_sheet(Path sample_sheet, ArrayList required_sample_types) {
* @return: string (optional)
*/
process validate_sample_sheet {
label params.process_label
input:
path csv
label "fastq_ingress"
label "wf_common"
input:
path "sample_sheet.csv"
val required_sample_types
output: stdout
script:
String req_types_arg = required_sample_types ? "--required_sample_types "+required_sample_types.join(" ") : ""
"""
workflow-glue check_sample_sheet $csv $req_types_arg
workflow-glue check_sample_sheet sample_sheet.csv $req_types_arg
"""
}

24 changes: 12 additions & 12 deletions main.nf
Expand Up @@ -10,7 +10,7 @@ OPTIONAL_FILE = file("$projectDir/data/OPTIONAL_FILE")
FLYE_MIN_COVERAGE_THRESHOLD = 5

process readStats {
label params.process_label
label "wfbacterialgenomes"
cpus 1
input:
tuple val(meta), path("align.bam"), path("align.bam.bai")
Expand All @@ -27,7 +27,7 @@ process readStats {


process coverStats {
label params.process_label
label "wfbacterialgenomes"
cpus 2
input:
tuple val(meta), path("align.bam"), path("align.bam.bai")
Expand All @@ -45,7 +45,7 @@ process coverStats {


process deNovo {
label params.process_label
label "wfbacterialgenomes"
cpus params.threads
input:
tuple val(meta), path("reads.fastq.gz")
Expand Down Expand Up @@ -89,7 +89,7 @@ process deNovo {


process alignReads {
label params.process_label
label "wfbacterialgenomes"
cpus params.threads
input:
tuple val(meta), path("reads.fastq.gz"), path("ref.fasta.gz")
Expand Down Expand Up @@ -264,7 +264,7 @@ process mlstVersion {


process getVersions {
label params.process_label
label "wfbacterialgenomes"
cpus 1
input:
path "input_versions.txt"
Expand All @@ -283,7 +283,7 @@ process getVersions {


process getParams {
label params.process_label
label "wfbacterialgenomes"
cpus 1
output:
path "params.json"
Expand All @@ -297,7 +297,7 @@ process getParams {


process makeReport {
label params.process_label
label "wfbacterialgenomes"
cpus 1
input:
path "versions/*"
Expand Down Expand Up @@ -338,7 +338,7 @@ process makeReport {


process makePerSampleReports {
label params.process_label
label "wfbacterialgenomes"
cpus 1
input:
path "versions.txt"
Expand Down Expand Up @@ -374,7 +374,7 @@ process makePerSampleReports {
// decoupling the publish from the process steps.
process output {
// publish inputs to output directory
label params.process_label
label "wfbacterialgenomes"
publishDir "${params.out_dir}", mode: 'copy', pattern: "*"
input:
path fname
Expand All @@ -387,7 +387,7 @@ process output {


process lookup_medaka_consensus_model {
label params.process_label
label "wfbacterialgenomes"
input:
path("lookup_table")
val basecall_model
Expand All @@ -402,7 +402,7 @@ process lookup_medaka_consensus_model {


process lookup_medaka_variant_model {
label params.process_label
label "wfbacterialgenomes"
input:
path("lookup_table")
val basecall_model
Expand All @@ -419,7 +419,7 @@ process lookup_medaka_variant_model {
// Creates a new directory named after the sample alias and moves the fastcat results
// into it.
process collectFastqIngressResultsInDir {
label params.process_label
label "wfbacterialgenomes"
input:
// both the fastcat seqs as well as stats might be `OPTIONAL_FILE` --> stage in
// different sub-directories to avoid name collisions
Expand Down
8 changes: 7 additions & 1 deletion nextflow.config
Expand Up @@ -30,7 +30,6 @@ params {
validate_params = true
show_hidden_params = false
analyse_unclassified = false
process_label = "wfbacterialgenomes"
schema_ignore_params = 'show_hidden_params,validate_params,monochrome_logs,aws_queue,aws_image_prefix,wf,process_label'

wf {
Expand All @@ -42,6 +41,7 @@ params {
"--reference 'wf-bacterial-genomes-demo/ref/ref.fasta.gz'",
"--sample_sheet 'wf-bacterial-genomes-demo/isolates_sample_sheet.csv'"
]
common_sha = "sha0fa3896acb70eecc0d432c91a1516d596a87741c"
container_sha = "sha31b8f1a3cf898574ae2b1bb617892126103253a3"
container_sha_prokka = "sha08669655982fbef7c750c7895e97e100196c4967"
container_sha_medaka = "sha5603de35d5a38721b78af89200ace153ce821ef4"
Expand Down Expand Up @@ -74,6 +74,9 @@ executor {
// used by default for "standard" (docker) and singularity profiles,
// other profiles may override.
process {
withLabel:wf_common {
container = "ontresearch/wf-common:${params.wf.common_sha}"
}
withLabel:wfbacterialgenomes {
container = "ontresearch/wf-bacterial-genomes:${params.wf.container_sha}"
}
Expand Down Expand Up @@ -122,6 +125,9 @@ profiles {
executor = 'awsbatch'
queue = "${params.aws_queue}"
memory = '8G'
withLabel:wf_common {
container = "${params.aws_image_prefix}-wf-common:${params.wf.common_sha}-root"
}
withLabel:wfbacterialgenomes {
container = "${params.aws_image_prefix}-wf-bacterial-genomes:${params.wf.container_sha}-root"
}
Expand Down
6 changes: 0 additions & 6 deletions nextflow_schema.json
Expand Up @@ -224,12 +224,6 @@
"type": "string",
"hidden": true
},
"process_label": {
"type": "string",
"description": "The main process label for template processes to use by default",
"hidden": true,
"default": "wf-template"
},
"aws_queue": {
"type": "string",
"hidden": true
Expand Down

0 comments on commit e223e1b

Please sign in to comment.