Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ga4gh tes bugfixes #1127

Merged
merged 11 commits into from Aug 12, 2021
9 changes: 8 additions & 1 deletion docs/executing/cloud.rst
Expand Up @@ -350,7 +350,7 @@ To install and configure Funnel follow its official `documentation <https://ohsu
Configuration
~~~~~~~~~~~~~

Two steps are required to make a Snakemake workflow TES ready:
Three steps are required to make a Snakemake workflow TES ready:

**Attach conda to rules:**
Execution of Snakemake tasks via TES means, Snakemake is running in a container in the cloud and it executes a specific rule (or a group of rules) with defined input/output data.
Expand All @@ -365,6 +365,13 @@ The TES module requires using a remote file storage system for input/output file
There are several options available in Snakemake to use remote files.
This guide recommends to use S3 (or SWIFT) object storage.

**Install py-tes module:**
TES backend requires py-tes to be installed. Please install py-tes, e.g. via Conda or Pip.

.. code-block:: console

$ pip install py-tes

Execution
~~~~~~~~~

Expand Down
39 changes: 17 additions & 22 deletions snakemake/executors/ga4gh_tes.py
Expand Up @@ -37,7 +37,12 @@ def __init__(
tes_url=None,
container_image=None,
):
import tes
try:
import tes
except ImportError:
raise WorkflowError(
"Unable to import Python package tes. TES backend requires py-tes to be installed. Please install py-tes, e.g. via Conda or Pip."
)

self.container_image = container_image or get_container_image()
self.container_workdir = "/tmp"
Expand Down Expand Up @@ -310,39 +315,29 @@ def _get_task_inputs(self, job, jobscript, checkdir):

return inputs

def _get_task_outputs(self, job, checkdir):
outputs = []
# add output files to outputs
for o in job.output:
def _append_task_outputs(self, outputs, files):
for file in files:
obj = self._prepare_file(
filename=o,
filename=file,
checkdir=checkdir,
type="Output",
)
if obj:
outputs.append(obj)
return outputs

def _get_task_outputs(self, job, checkdir):
outputs = []
# add output files to outputs
outputs = self._append_task_outputs(outputs, job.output)

# add log files to outputs
if job.log:
for log in job.log:
outputs.append(
self._prepare_file(
filename=log,
checkdir=checkdir,
type="Output",
)
)
outputs = self._append_task_outputs(outputs, job.log)

# add benchmark files to outputs
if hasattr(job, "benchmark") and job.benchmark:
for benchmark in job.benchmark:
outputs.append(
self._prepare_file(
filename=benchmark,
checkdir=checkdir,
type="Output",
)
)
outputs = self._append_task_outputs(outputs, job.benchmark)

return outputs

Expand Down