diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst index 8c936a745..248faa683 100644 --- a/docs/snakefiles/rules.rst +++ b/docs/snakefiles/rules.rst @@ -1119,8 +1119,8 @@ The following shows an example job submission wrapper: .. _snakefiles-input_functions: -Functions as Input Files ------------------------- +Input functions +--------------- Instead of specifying strings or lists of strings as input files, snakemake can also make use of functions that return single **or** lists of input files: @@ -1130,15 +1130,18 @@ Instead of specifying strings or lists of strings as input files, snakemake can return [... a list of input files depending on given wildcards ...] rule: - input: myfunc - output: "someoutput.{somewildcard}.txt" - shell: "..." + input: + myfunc + output: + "someoutput.{somewildcard}.txt" + shell: + "..." The function has to accept a single argument that will be the wildcards object generated from the application of the rule to create some requested output files. Note that you can also use `lambda expressions `_ instead of full function definitions. By this, rules can have entirely different input files (both in form and number) depending on the inferred wildcards. E.g. you can assign input files that appear in entirely different parts of your filesystem based on some wildcard value and a dictionary that maps the wildcard value to file paths. -Note that the function will be executed when the rule is evaluated and before the workflow actually starts to execute. Further note that using a function as input overrides the default mechanism of replacing wildcards with their values inferred from the output files. You have to take care of that yourself with the given wildcards object. +In additon to a single wildcards argument, input functions can optionally take a ``groupid`` (with exactly that name) as second argument, see :ref:`snakefiles_group-local` for details. Finally, when implementing the input function, it is best practice to make sure that it can properly handle all possible wildcard values your rule can have. In particular, input files should not be combined with very general rules that can be applied to create almost any file: Snakemake will try to apply the rule, and will report the exceptions of your input function as errors. @@ -1159,9 +1162,12 @@ This can be done by having them return ``dict()`` objects with the names as the return {'foo': '{wildcards.token}.txt'.format(wildcards=wildcards)} rule: - input: unpack(myfunc) - output: "someoutput.{token}.txt" - shell: "..." + input: + unpack(myfunc) + output: + "someoutput.{token}.txt" + shell: + "..." Note that ``unpack()`` is only necessary for input functions returning ``dict``. While it also works for ``list``, remember that lists (and nested lists) of strings are automatically flattened. @@ -1184,54 +1190,10 @@ These restrictions do not apply when using ``unpack()``. input: *myfunc1(), **myfunc2(), - output: "..." - shell: "..." - -.. _snakefiles-version_tracking: - -Version Tracking ----------------- - -Rules can specify a version that is tracked by Snakemake together with the output files. When the version changes snakemake informs you when using the flag ``--summary`` or ``--list-version-changes``. -The version can be specified by the version directive, which takes a string: - -.. code-block:: python - - rule: - input: ... - output: ... - version: "1.0" - shell: ... - -The version can of course also be filled with the output of a shell command, e.g.: - -.. code-block:: python - - SOMECOMMAND_VERSION = subprocess.check_output("somecommand --version", shell=True) - - rule: - version: SOMECOMMAND_VERSION - -Alternatively, you might want to use file modification times in case of local scripts: - -.. code-block:: python - - SOMECOMMAND_VERSION = str(os.path.getmtime("path/to/somescript")) - - rule: - version: SOMECOMMAND_VERSION - -A re-run can be automated by invoking Snakemake as follows: - -.. code-block:: console - - $ snakemake -R `snakemake --list-version-changes` - -With the availability of the ``conda`` directive (see :ref:`integrated_package_management`) -the ``version`` directive has become **obsolete** in favor of defining isolated -software environments that can be automatically deployed via the conda package -manager. - + output: + "..." + shell: + "..." .. _snakefiles-code_tracking: @@ -1521,6 +1483,51 @@ This enables to almost arbitrarily partition the DAG, e.g. in order to safe netw For execution on the cloud using Google Life Science API and preemptible instances, we expect all rules in the group to be homogenously set as preemptible instances (e.g., with command-line option ``--preemptible-rules``), such that a preemptible VM is requested for the execution of the group job. +.. _snakefiles_group-local: + +Group-local jobs +~~~~~~~~~~~~~~~~ + +From Snakemake 7.0 on, it is further possible to ensure that jobs from a certain rule are executed separately within each :ref:`job group `. +For this purpose we use :ref:`input functions `, which, in addition to the ``wildcards`` argument can expect a ``groupid`` argument. +In such a case, Snakemake passes the ID of the corresponding group job to the input function. +Consider the following example + +.. code-block:: python + + rule all: + input: + expand("bar{i}.txt", i=range(3)) + + + rule grouplocal: + output: + "foo.{groupid}.txt" + group: + "foo" + shell: + "echo test > {output}" + + + def get_input(wildcards, groupid): + return f"foo.{groupid}.txt" + + + rule consumer: + input: + get_input + output: + "bar{i}.txt" + group: + "foo" + shell: + "cp {input} {output}" + +Here, the value of ``groupid`` that is passed by Snakemake to the input function is a `UUID `_ that uniquely identifies the group job in which each instance of the rule ``consumer`` is contained. +In the input function ``get_input`` we use this ID to request the desired input file from the rule ``grouplocal``. +Since the value of the corresponding wildcard ``groupid`` is now always a group specific unique ID, it is ensured that the rule ``grouplocal`` will run for every group job spawned from the group ``foo`` (remember that group jobs by default only span one connected component, and that this can be configured via the command line, see :ref:`job_grouping`). +Of course, above example would also work if the groups are not specified via the rule definition but entirely via the :ref:`command line `. + .. _snakefiles-piped-output: Piped output @@ -1601,6 +1608,49 @@ Consider the following example: Snakemake will schedule the service with all consumers to the same physical node (in the future we might provide further controls and other modes of operation). Once all consumer jobs are finished, the service job will be terminated automatically by Snakemake, and the service output will be removed. +Group-local service jobs +~~~~~~~~~~~~~~~~~~~~~~~~ + +Since Snakemake supports arbitrary partitioning of the DAG into so-called :ref:`job groups `, one should consider what this implies for service jobs when running a workflow in a cluster of cloud context: +since each group job spans at least one connected component (see :ref:`job groups ` and `the Snakemake paper `), this means that the service job will automatically connect all consumers into one big group. +This can be undesired, because depending on the number of consumers that group job can become too big for efficient execution on the underlying architecture. +In case of local execution, this is not a problem because here DAG partitioning has no effect. + +However, to make a workflow portable across different backends, this behavior should always be considered. +In order to circumvent it, it is possible to model service jobs as group-local, i.e. ensuring that each group job gets its own instance of the service rule. +This works by combining the service job pattern from above with the :ref:`group-local pattern ` as follows: + +.. code-block:: python + + rule the_service: + output: + service("foo.{groupid}.socket") + shell: + # here we simulate some kind of server process that provides data via a socket + "ln -s /dev/random {output}; sleep 10000" + + + def get_socket(wildcards, groupid): + return f"foo.{groupid}.socket" + + + rule consumer1: + input: + get_socket + output: + "test.txt" + shell: + "head -n1 {input} > {output}" + + + rule consumer2: + input: + get_socket + output: + "test2.txt" + shell: + "head -n1 {input} > {output}" + .. _snakefiles-paramspace: Parameter space exploration diff --git a/snakemake/__init__.py b/snakemake/__init__.py index c2667f0a8..876d48192 100644 --- a/snakemake/__init__.py +++ b/snakemake/__init__.py @@ -183,6 +183,7 @@ def snakemake( conda_not_block_search_path_envvars=False, scheduler_solver_path=None, conda_base_path=None, + local_groupid="local", ): """Run snakemake on a given snakefile. @@ -313,6 +314,7 @@ def snakemake( conda_not_block_search_path_envvars (bool): Do not block search path envvars (R_LIBS, PYTHONPATH, ...) when using conda environments. scheduler_solver_path (str): Path to Snakemake environment (this can be used to e.g. overwrite the search path for the ILP solver used during scheduling). conda_base_path (str): Path to conda base environment (this can be used to overwrite the search path for conda, mamba, and activate). + local_groupid (str): Local groupid to use as a placeholder for groupid-referrring input functions of local jobs (internal use only, default: local). log_handler (list): redirect snakemake output to this list of custom log handlers, each a function that takes a log message dictionary (see below) as its only argument (default []). The log message dictionary for the log handler has to following entries: :level: @@ -593,6 +595,7 @@ def snakemake( conda_base_path=conda_base_path, check_envvars=not lint, # for linting, we do not need to check whether requested envvars exist all_temp=all_temp, + local_groupid=local_groupid, ) success = True @@ -706,6 +709,7 @@ def snakemake( group_components=group_components, max_inventory_wait_time=max_inventory_wait_time, conda_not_block_search_path_envvars=conda_not_block_search_path_envvars, + local_groupid=local_groupid, ) success = workflow.execute( targets=targets, @@ -1906,6 +1910,11 @@ def get_argument_parser(profile=None): "used. Note that this is intended primarily for internal use and may " "lead to unexpected results otherwise.", ) + group_behavior.add_argument( + "--local-groupid", + default="local", + help="Name for local groupid, meant for internal use only.", + ) group_behavior.add_argument( "--max-jobs-per-second", default=10, @@ -2923,6 +2932,7 @@ def open_browser(): conda_not_block_search_path_envvars=args.conda_not_block_search_path_envvars, scheduler_solver_path=args.scheduler_solver_path, conda_base_path=args.conda_base_path, + local_groupid=args.local_groupid, ) if args.runtime_profile: diff --git a/snakemake/dag.py b/snakemake/dag.py index 48b1d6437..b841d10ec 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -367,6 +367,9 @@ def check_dynamic(self): def is_edit_notebook_job(self, job): return self.workflow.edit_notebook and job.targetfile in self.targetfiles + def get_job_group(self, job): + return self._group.get(job) + @property def dynamic_output_jobs(self): """Iterate over all jobs with dynamic output files.""" @@ -1157,6 +1160,22 @@ def _update_group_components(self): for j in primary: self._group[j] = primary + for group in self._group.values(): + group.finalize() + + def update_incomplete_input_expand_jobs(self): + """Update (re-evaluate) all jobs which have incomplete input file expansions. + + only filled in the second pass of postprocessing. + """ + updated = False + for job in list(self.jobs): + if job.incomplete_input_expand: + newjob = job.updated() + self.replace_job(job, newjob, recursive=False) + updated = True + return updated + def update_ready(self, jobs=None): """Update information whether a job is ready to execute. @@ -1178,7 +1197,6 @@ def update_ready(self, jobs=None): self._ready_jobs.add(job) else: group = self._group[job] - group.finalize() if group not in self._running: candidate_groups.add(group) @@ -1207,7 +1225,9 @@ def close_remote_objects(self): if not self.needrun(job): job.close_remote() - def postprocess(self, update_needrun=True): + def postprocess( + self, update_needrun=True, update_incomplete_input_expand_jobs=True + ): """Postprocess the DAG. This has to be invoked after any change to the DAG topology.""" self.cleanup() @@ -1217,6 +1237,21 @@ def postprocess(self, update_needrun=True): self.update_priority() self.handle_pipes_and_services() self.update_groups() + + if update_incomplete_input_expand_jobs: + updated = self.update_incomplete_input_expand_jobs() + if updated: + + # run a second pass, some jobs have been updated + # with potentially new input files that have depended + # on group ids. + self.postprocess( + update_needrun=True, + update_incomplete_input_expand_jobs=False, + ) + + return + self.update_ready() self.close_remote_objects() self.update_checkpoint_outputs() diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 97067cc72..e09dc8f90 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -134,6 +134,9 @@ def fmt(res): return args return "" + def get_local_groupid_arg(self): + return f" --local-groupid {self.workflow.local_groupid} " + def get_behavior_args(self): if self.workflow.conda_not_block_search_path_envvars: return " --conda-not-block-search-path-envvars " @@ -448,6 +451,7 @@ def __init__( "--latency-wait {latency_wait} ", self.get_default_remote_provider_args(), self.get_default_resources_args(), + self.get_local_groupid_arg(), "{overwrite_workdir} {overwrite_config} {printshellcmds} {rules} ", "--notemp --quiet --no-hooks --nolock --mode {} ".format( Mode.subprocess @@ -721,6 +725,7 @@ def __init__( self.exec_job = exec_job self.exec_job += self.get_additional_args() + self.exec_job += " {job_specific_args:u} " if not disable_default_remote_provider_args: self.exec_job += self.get_default_remote_provider_args() if not disable_get_default_resources_args: @@ -819,6 +824,9 @@ def format_job(self, pattern, job, **kwargs): "--wait-for-files {wait_for_files}", wait_for_files=[repr(f) for f in wait_for_files], ) + job_specific_args = "" + if job.is_group: + job_specific_args = f"--local-groupid {job.jobid}" format_p = partial( self.format_job_pattern, @@ -827,6 +835,7 @@ def format_job(self, pattern, job, **kwargs): latency_wait=self.latency_wait, waitfiles_parameter=waitfiles_parameter, scheduler_solver_path=scheduler_solver_path, + job_specific_args=job_specific_args, **kwargs, ) try: diff --git a/snakemake/jobs.py b/snakemake/jobs.py index 7093c43fc..ac10a5b86 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -88,6 +88,7 @@ def new( format_wildcards=None, targetfile=None, update=False, + groupid=None, ): if rule.is_branched: # for distinguishing branched rules, we need input and output in addition @@ -101,7 +102,7 @@ def new( key = (rule.name, *sorted(wildcards_dict.items())) if update: # cache entry has to be replaced because job shall be constructed from scratch - obj = Job(rule, dag, wildcards_dict, format_wildcards, targetfile) + obj = Job(rule, dag, wildcards_dict, format_wildcards, targetfile, groupid) self.cache[key] = obj else: try: @@ -145,10 +146,17 @@ class Job(AbstractJob): "_attempt", "_group", "targetfile", + "incomplete_input_expand", ] def __init__( - self, rule, dag, wildcards_dict=None, format_wildcards=None, targetfile=None + self, + rule, + dag, + wildcards_dict=None, + format_wildcards=None, + targetfile=None, + groupid=None, ): self.rule = rule self.dag = dag @@ -167,8 +175,14 @@ def __init__( else Wildcards(fromdict=format_wildcards) ) - self.input, input_mapping, self.dependencies = self.rule.expand_input( - self.wildcards_dict + ( + self.input, + input_mapping, + self.dependencies, + self.incomplete_input_expand, + ) = self.rule.expand_input( + self.wildcards_dict, + groupid=groupid, ) self.output, output_mapping = self.rule.expand_output(self.wildcards_dict) @@ -222,12 +236,21 @@ def __init__( self.subworkflow_input[f] = sub def updated(self): + group = self.dag.get_job_group(self) + groupid = None + if group is None: + if self.dag.workflow.run_local or self.is_local: + groupid = self.dag.workflow.local_groupid + else: + groupid = group.jobid + job = self.dag.job_factory.new( self.rule, self.dag, wildcards_dict=self.wildcards_dict, targetfile=self.targetfile, update=True, + groupid=groupid, ) job.is_updated = True return job @@ -1001,6 +1024,13 @@ def get_wait_for_files(self): def jobid(self): return self.dag.jobid(self) + def uuid(self): + return str( + get_uuid( + f"{self.rule.name}:{','.join(sorted(f'{w}:{v}' for w, v in self.wildcards_dict.items()))}" + ) + ) + def postprocess( self, upload_remote=True, @@ -1122,6 +1152,7 @@ class GroupJob(AbstractJob): "_all_products", "_attempt", "toposorted", + "_jobid", ] def __init__(self, id, jobs): @@ -1135,6 +1166,7 @@ def __init__(self, id, jobs): self._inputsize = None self._all_products = None self._attempt = self.dag.workflow.attempt + self._jobid = None @property def dag(self): @@ -1366,7 +1398,15 @@ def properties(self, omit_resources=["_cores", "_nodes"], **aux_properties): @property def jobid(self): - return str(get_uuid(",".join(str(job.jobid) for job in self.jobs))) + if not self._jobid: + # The uuid of the last job is sufficient to uniquely identify the group job. + # This is true because each job can only occur in one group job. + # Additionally, this is the most stable id we can get, even if the group + # changes by adding more upstream jobs, e.g. due to groupid usage in input + # functions (see Dag.update_incomplete_input_expand_jobs()) + last_job = sorted(self.toposorted[-1])[-1] + self._jobid = last_job.uuid() + return self._jobid def cleanup(self): for job in self.jobs: diff --git a/snakemake/rules.py b/snakemake/rules.py index d976ba8d6..85f2a3734 100644 --- a/snakemake/rules.py +++ b/snakemake/rules.py @@ -232,9 +232,14 @@ def partially_expand(f, wildcards): if len(set(values)) == 1 ) # TODO have a look into how to concretize dependencies here - branch._input, _, branch.dependencies = branch.expand_input( + branch._input, _, branch.dependencies, incomplete = branch.expand_input( non_dynamic_wildcards ) + assert not incomplete, ( + "bug: dynamic branching resulted in incomplete input files, " + "please file an issue on https://github.com/snakemake/snakemake" + ) + branch._output, _ = branch.expand_output(non_dynamic_wildcards) resources = branch.expand_resources(non_dynamic_wildcards, branch._input, 1) @@ -706,6 +711,7 @@ def apply_input_function( wildcards, incomplete_checkpoint_func=lambda e: None, raw_exceptions=False, + groupid=None, **aux_params ): incomplete = False @@ -714,7 +720,17 @@ def apply_input_function( elif isinstance(func, AnnotatedString): func = func.callable sig = inspect.signature(func) + _aux_params = {k: v for k, v in aux_params.items() if k in sig.parameters} + + if "groupid" in sig.parameters: + if groupid is not None: + _aux_params["groupid"] = groupid + else: + # Return empty list of files and incomplete marker + # the job will be reevaluated once groupids have been determined + return [], True + try: value = func(Wildcards(fromdict=wildcards), **_aux_params) except IncompleteCheckpointException as e: @@ -750,7 +766,9 @@ def _apply_wildcards( property=None, incomplete_checkpoint_func=lambda e: None, allow_unpack=True, + groupid=None, ): + incomplete = False if aux_params is None: aux_params = dict() for name, item in olditems._allitems(): @@ -766,6 +784,7 @@ def _apply_wildcards( wildcards, incomplete_checkpoint_func=incomplete_checkpoint_func, is_unpack=is_unpack, + groupid=groupid, **aux_params ) if apply_path_modifier and not incomplete: @@ -820,8 +839,9 @@ def _apply_wildcards( name, start, end=len(newitems) if is_iterable else None ) start = len(newitems) + return incomplete - def expand_input(self, wildcards): + def expand_input(self, wildcards, groupid=None): def concretize_iofile(f, wildcards, is_from_callable): if is_from_callable: if isinstance(f, Path): @@ -846,7 +866,7 @@ def handle_incomplete_checkpoint(exception): input = InputFiles() mapping = dict() try: - self._apply_wildcards( + incomplete = self._apply_wildcards( input, self.input, wildcards, @@ -854,8 +874,12 @@ def handle_incomplete_checkpoint(exception): mapping=mapping, incomplete_checkpoint_func=handle_incomplete_checkpoint, property="input", + groupid=groupid, ) except WildcardError as e: + import pdb + + pdb.set_trace() raise WildcardError( "Wildcards in input files cannot be " "determined from output files:", str(e), @@ -876,7 +900,7 @@ def handle_incomplete_checkpoint(exception): for f in input: f.check() - return input, mapping, dependencies + return input, mapping, dependencies, incomplete def expand_params(self, wildcards, input, output, resources, omit_callable=False): def concretize_param(p, wildcards, is_from_callable): diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 431c600e6..3b705e356 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -147,6 +147,7 @@ def __init__( check_envvars=True, max_threads=None, all_temp=False, + local_groupid="local", ): """ Create the controller. @@ -230,6 +231,7 @@ def __init__( self.max_threads = max_threads self.all_temp = all_temp self.scheduler = None + self.local_groupid = local_groupid _globals = globals() _globals["workflow"] = self diff --git a/tests/test_groupid_expand/Snakefile b/tests/test_groupid_expand/Snakefile new file mode 100644 index 000000000..0d30974de --- /dev/null +++ b/tests/test_groupid_expand/Snakefile @@ -0,0 +1,30 @@ +shell.executable("bash") + + +rule all: + input: + expand("bar{i}.txt", i=range(3)), + + +rule grouplocal: + output: + "foo.{groupid}.txt", + group: + "foo" + shell: + "echo {wildcards.groupid} > {output}" + + +def get_input(wildcards, groupid): + return f"foo.{groupid}.txt" + + +rule consumer: + input: + get_input, + output: + "bar{i}.txt", + group: + "foo" + shell: + "cp {input} {output}" diff --git a/tests/test_groupid_expand/expected-results/bar0.txt b/tests/test_groupid_expand/expected-results/bar0.txt new file mode 100644 index 000000000..408303742 --- /dev/null +++ b/tests/test_groupid_expand/expected-results/bar0.txt @@ -0,0 +1 @@ +local diff --git a/tests/test_groupid_expand/expected-results/bar1.txt b/tests/test_groupid_expand/expected-results/bar1.txt new file mode 100644 index 000000000..408303742 --- /dev/null +++ b/tests/test_groupid_expand/expected-results/bar1.txt @@ -0,0 +1 @@ +local diff --git a/tests/test_groupid_expand/expected-results/bar2.txt b/tests/test_groupid_expand/expected-results/bar2.txt new file mode 100644 index 000000000..408303742 --- /dev/null +++ b/tests/test_groupid_expand/expected-results/bar2.txt @@ -0,0 +1 @@ +local diff --git a/tests/test_groupid_expand/expected-results/foo.local.txt b/tests/test_groupid_expand/expected-results/foo.local.txt new file mode 100644 index 000000000..408303742 --- /dev/null +++ b/tests/test_groupid_expand/expected-results/foo.local.txt @@ -0,0 +1 @@ +local diff --git a/tests/test_groupid_expand_cluster/Snakefile b/tests/test_groupid_expand_cluster/Snakefile new file mode 100644 index 000000000..e75756237 --- /dev/null +++ b/tests/test_groupid_expand_cluster/Snakefile @@ -0,0 +1,27 @@ +rule all: + input: + expand("bar{i}.txt", i=range(3)) + + +rule grouplocal: + output: + "foo.{groupid}.txt" + group: + "foo" + shell: + "echo test > {output}" + + +def get_input(wildcards, groupid): + return f"foo.{groupid}.txt" + + +rule consumer: + input: + get_input + output: + "bar{i}.txt" + group: + "foo" + shell: + "cp {input} {output}" diff --git a/tests/test_groupid_expand_cluster/expected-results/bar0.txt b/tests/test_groupid_expand_cluster/expected-results/bar0.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_groupid_expand_cluster/expected-results/bar0.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_groupid_expand_cluster/expected-results/bar1.txt b/tests/test_groupid_expand_cluster/expected-results/bar1.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_groupid_expand_cluster/expected-results/bar1.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_groupid_expand_cluster/expected-results/bar2.txt b/tests/test_groupid_expand_cluster/expected-results/bar2.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_groupid_expand_cluster/expected-results/bar2.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_groupid_expand_cluster/expected-results/foo.412c59b9-e2fc-51ea-9a84-d055fb244f80.txt b/tests/test_groupid_expand_cluster/expected-results/foo.412c59b9-e2fc-51ea-9a84-d055fb244f80.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_groupid_expand_cluster/expected-results/foo.412c59b9-e2fc-51ea-9a84-d055fb244f80.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_groupid_expand_cluster/expected-results/foo.5792af91-9d58-5430-a941-2d29860112e7.txt b/tests/test_groupid_expand_cluster/expected-results/foo.5792af91-9d58-5430-a941-2d29860112e7.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_groupid_expand_cluster/expected-results/foo.5792af91-9d58-5430-a941-2d29860112e7.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_groupid_expand_cluster/expected-results/foo.e1a9b7a0-f48e-568a-bc42-d6c2078055be.txt b/tests/test_groupid_expand_cluster/expected-results/foo.e1a9b7a0-f48e-568a-bc42-d6c2078055be.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_groupid_expand_cluster/expected-results/foo.e1a9b7a0-f48e-568a-bc42-d6c2078055be.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_groupid_expand_cluster/qsub b/tests/test_groupid_expand_cluster/qsub new file mode 100755 index 000000000..0bc8aabba --- /dev/null +++ b/tests/test_groupid_expand_cluster/qsub @@ -0,0 +1,7 @@ +#!/bin/bash +echo `date` >> qsub.log +tail -n1 $1 >> qsub.log +# simulate printing of job id by a random number +echo $RANDOM +cat $1 >> qsub.log +sh $1 diff --git a/tests/tests.py b/tests/tests.py index 8bf56b87a..75e9470f5 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1505,6 +1505,15 @@ def test_template_engine(): run(dpath("test_template_engine")) +def test_groupid_expand_local(): + run(dpath("test_groupid_expand")) + + +@skip_on_windows +def test_groupid_expand_cluster(): + run(dpath("test_groupid_expand_cluster"), cluster="./qsub", nodes=3) + + @skip_on_windows def test_service_jobs(): run(dpath("test_service_jobs"), check_md5=False)