Skip to content

Commit

Permalink
Merge branch 'update-template' into 'dev'
Browse files Browse the repository at this point in the history
Template updates [CW-1564][CW-2303][CW-2855]

See merge request epi2melabs/workflows/wf-bacterial-genomes!95
  • Loading branch information
SamStudio8 committed Oct 18, 2023
2 parents 70714be + a942271 commit 4d147b9
Show file tree
Hide file tree
Showing 9 changed files with 977 additions and 523 deletions.
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ repos:
pass_filenames: false
additional_dependencies:
- epi2melabs
- id: build_models
name: build_models
entry: datamodel-codegen --strict-nullable --base-class workflow_glue.results_schema_helpers.BaseModel --use-schema-description --disable-timestamp --input results_schema.yml --input-file-type openapi --output bin/workflow_glue/results_schema.py
language: python
files: 'results_schema.yml'
pass_filenames: false
additional_dependencies:
- datamodel-code-generator
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
Expand All @@ -37,4 +45,5 @@ repos:
"--import-order-style=google",
"--statistics",
"--max-line-length=88",
"--extend-exclude=bin/workflow_glue/results_schema.py",
]
58 changes: 58 additions & 0 deletions bin/workflow_glue/check_bam_headers_in_dir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Check (u)BAM files for `@SQ` lines whether they are the same in all headers."""

from pathlib import Path
import sys

import pysam

from .util import get_named_logger, wf_parser # noqa: ABS101


def get_sq_lines(xam_file):
"""Extract the `@SQ` lines from the header of a XAM file."""
return pysam.AlignmentFile(xam_file, check_sq=False).header["SQ"]


def main(args):
"""Run the entry point."""
logger = get_named_logger("checkBamHdr")

if not args.input_path.is_dir():
raise ValueError(f"Input path '{args.input_path}' must be a directory.")

target_files = list(args.input_path.glob("*"))
if not target_files:
raise ValueError(f"No files found in input directory '{args.input_path}'.")
# Loop over target files and check if there are `@SQ` lines in all headers or not.
# Set `is_unaligned` accordingly. If there are mixed headers (either with some files
# containing `@SQ` lines and some not or with different files containing different
# `@SQ` lines), set `mixed_headers` to `True`.
first_sq_lines = None
mixed_headers = False
for xam_file in target_files:
sq_lines = get_sq_lines(xam_file)
if first_sq_lines is None:
# this is the first file
first_sq_lines = sq_lines
else:
# this is a subsequent file; check with the first `@SQ` lines
if sq_lines != first_sq_lines:
mixed_headers = True
break

# we set `is_unaligned` to `True` if there were no mixed headers and the last file
# didn't have `@SQ` lines (as we can then be sure that none of the files did)
is_unaligned = not mixed_headers and not sq_lines
# write `is_unaligned` and `mixed_headers` out so that they can be set as env.
# variables
sys.stdout.write(
f"IS_UNALIGNED={int(is_unaligned)};MIXED_HEADERS={int(mixed_headers)}"
)
logger.info(f"Checked (u)BAM headers in '{args.input_path}'.")


def argparser():
"""Argument parser for entrypoint."""
parser = wf_parser("check_bam_headers")
parser.add_argument("input_path", type=Path, help="Path to target directory")
return parser
2 changes: 1 addition & 1 deletion bin/workflow_glue/per_sample_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def main(args):
params_data = json.load(fh)

run_summary_dict = get_run_summary(
"report_files/per-read-stats.tsv",
"report_files/per-read-stats.tsv.gz",
f"report_files/{args.sample_alias}.mlst.json",
params_data["reference"]
)
Expand Down
172 changes: 139 additions & 33 deletions lib/Pinguscript.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,154 @@ import groovy.json.JsonSlurper


class Pinguscript {
public static String ping_post(workflow, message, error_message, out_dir, params) {
def msgId = UUID.randomUUID().toString()
def hosthash = null

// Send a ping for the start of a workflow
public static void ping_start(nextflow, workflow, params) {
wf_ping(nextflow, workflow, "start", null, params)
}
// Send a ping for a completed workflow (successful or otherwise)
public static void ping_complete(nextflow, workflow, params) {
wf_ping(nextflow, workflow, "end", null, params)
}
// Send a ping for a workflow error
public static void ping_error(nextflow, workflow, params) {
def error_message = workflow.errorMessage
wf_ping(nextflow, workflow, "error", error_message, params)
}
// Shared handler to construct a ping JSON and send it
private static String wf_ping(nextflow, workflow, event, error_message, params) {
if (params.disable_ping) {
return "{}"
}
def body_json = make_wf_ping(nextflow, workflow, event, error_message, params)
send_ping_post("epilaby", body_json)
}

// Helper to removing keys from a map
private static clean_meta(meta, keys_to_remove) {
for (key in keys_to_remove) {
if (meta.containsKey(key)) {
meta.remove(key)
}
}
}

// Helper for fetching a key from the params map
// seems pointless but you just know someone is going to end up writing meta.this ? meta.that
private static get_meta(meta, key) {
(meta.containsKey(key) && meta[key]) ? meta[key].toString() : null
}

// Construct workflow ping JSON
private static String make_wf_ping(nextflow, workflow, event, error_message, params) {
// cheeky deepcopy using json
String paramsJSON = new JsonBuilder(params).toPrettyString()
def params_data = new JsonSlurper().parseText(paramsJSON)

// hostname
def host = null
try {
hosthash = InetAddress.getLocalHost().getHostName()
} catch(Exception e) {
hosthash = "Unavailable"
host = InetAddress.getLocalHost().getHostName()
}
catch(Exception e) {}

// OS
// TODO check version on WSL
def opsys = System.properties['os.name'].toLowerCase()
if (System.properties['os.version'].toLowerCase().contains("wsl")){
opsys = "WSL"
def opver = System.properties['os.version']
if (opver.toLowerCase().contains("wsl")){
opsys = "wsl"
}

// placeholder for any future okta business
// for now we'll use the guest_<ulid> sent to wf.epi2me_user
def user = get_meta(params.wf, "epi2me_user")

// drop cruft to save some precious bytes
// affects the deep copy rather than original params
clean_meta(params_data, [
"schema_ignore_params",
])
def ingress_ids = []
if (params_data.containsKey("wf")) {
ingress_ids = params_data.wf["ingress.run_ids"] ?: []
clean_meta(params_data.wf, [
"agent", // we send this later
"epi2me_instance", // we send this later
"epi2me_user", // we send this later
"example_cmd",
"ingress.run_ids", // we will send this elsewhere
])
}

// try and get runtime information
def cpus = null
try {
cpus = Runtime.getRuntime().availableProcessors()
}
def workflow_name = "$workflow.manifest.name"
def session = "$workflow.sessionId"
def errorMessage = "$error_message"
def profile = "$workflow.profile"
def filename = "$out_dir/params.json"
File fileb = new File(filename)
def any_other_data = [:]
if (fileb.exists() && "$message" != "start") {
def jsonSlurper = new JsonSlurper()
any_other_data = jsonSlurper.parse(fileb)
catch(Exception e) {}

def workflow_success = null
def workflow_exitcode = null
if (event != "start") {
workflow_success = workflow.success
workflow_exitcode = workflow.exitStatus
}
def meta_json = new JsonBuilder()
def agent = "$params.wf.agent"
def meta = meta_json "error": errorMessage.toString(), "profile": profile.toString(),
"agent": agent.toString()
meta+=any_other_data
def ping_version = '2.0.2'
def tracking_json = new JsonBuilder()
def tracking_id = tracking_json "msg_id": msgId, "version": ping_version
def data_json = new JsonBuilder()
def data = data_json "workflow": workflow_name.toString(),
"message": message, "meta": meta

/// build message
def body_json = new JsonBuilder()
def root = body_json "tracking_id": tracking_id, "hostname": hosthash.toString(), "os": opsys.toString(),
"session": session.toString(), "data": data, "source": "workflow"
body_json \
"tracking_id": [
"msg_id": UUID.randomUUID().toString(),
"version": "3.0.0"
],
"source": "workflow",
"event": event,
"params": params_data,
// data will be null on start events, as ingress has not run
"data": event != "start" ? [run_ids: ingress_ids] : null,
"workflow": [
"name": workflow.manifest.name,
"version": workflow.manifest.version, // could use NfcoreTemplate.version(workflow)
"run_name": workflow.runName, // required to disambiguate sessions
"session": workflow.sessionId,
"profile": workflow.profile,
"resume": workflow.resume,
"error": error_message, // null if no error
"success": workflow_success,
"exitcode": workflow_exitcode,
],
"env": [
"user": user, // placeholder for any future okta
"hostname": host,
"os": [
"name": opsys,
"version": opver
],
"resource": [
"cpus": cpus,
"memory": null, // placeholder, no point asking via Runtime as it will just give us the Xmx size
],
"agent": get_meta(params.wf, "agent"), // access via original params
"epi2me": [
"instance": get_meta(params.wf, "epi2me_instance"),
"user": user,
],
"nextflow": [
"version": nextflow.version.toString(),
"version_compat": nextflow.version.matches(workflow.manifest.nextflowVersion)
]
]
return body_json
}

// Send a JSON payload to a given endpoint
private static String send_ping_post(endpoint, body_json) {
// Attempt to send payload and absorb any possible Exception gracefully
String postResult
boolean raise_exception = false
try {
((HttpURLConnection)new URL('https://ping.oxfordnanoportal.com/epilaby').openConnection()).with({
((HttpURLConnection)new URL("https://ping.oxfordnanoportal.com/${endpoint}").openConnection()).with({
requestMethod = 'POST'
doOutput = true
setConnectTimeout(5000)
Expand All @@ -64,7 +169,8 @@ class Pinguscript {
// Accessing inputStream.text will raise an Exception for failed requests
postResult = inputStream.text
})
} catch(Exception e) {
}
catch(Exception e) {
if(raise_exception) { throw e }
}
return (postResult)
Expand Down

0 comments on commit 4d147b9

Please sign in to comment.