Skip to content

Running workflows in parallel with Slurm

Pietro Marchesi edited this page Feb 6, 2018 · 1 revision

In this section, we extend the previous example on how to execute SciLuigi workflows with Slurm by showing how we can run multiple instances of the same pipeline in parallel.

Setting up a 'meta' workflow

To send out multiple instances of our pipeline, we need to set up a 'meta' workflow which builds all the individual workflows. This meta workflow has a parameter n_tasks which allows to specify from the command line how many workflows to generate. Each workflow will be constructed with a unique integer parameter task_n obtained by iterating over range(n_tasks).

import luigi
import sciluigi
import os

class MyMetaWorkflow(sciluigi.WorkflowTask):
    
    runmode = luigi.Parameter()
    n_tasks     = luigi.IntParameter()

    def workflow(self):
        if self.runmode == 'local':
            runmode = sciluigi.RUNMODE_LOCAL
        elif self.runmode == 'hpc':
            runmode = sciluigi.RUNMODE_HPC
        elif self.runmode == 'mpi':
            runmode = sciluigi.RUNMODE_MPI
        else:
            raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')

		# here we creat each workflow with a parameter task_n 
		# and append all workflows to a list
        tasks = []
        for t in range(self.n_tasks):
            wf = self.new_task('wf', MyWorkflow, task_n=t,
                               runmode=runmode)
            tasks.append(wf)
        return tasks

Building the main pipeline

Next, we can define the 'actual' pipeline similarly to the previous example. The main difference is that our tasks will take an extra task_n parameters, passed by the workflow class. The pipeline will execute the following steps (as an example, suppose task_n=3):

  • The task MyFooWriter will write a file called foo3.txt which contains the string foo.
  • The task MyFooReplacer will replace the string with the host name of the cluster node where the job is running, and save it to a file called host3.txt.

We will send out several instances of this pipeline to run (potentially) on different nodes, and then we will be able to read in the host*.txt files where they have been executed.

class MyWorkflow(sciluigi.WorkflowTask):

    runmode = luigi.Parameter()
    task_n = luigi.IntParameter() # added the task_n parameter

    def workflow(self):
        foowriter = self.new_task('foowriter', MyFooWriter,
                                  task_n=self.task_n,
                                  slurminfo=sciluigi.SlurmInfo(
                                  runmode=self.runmode,
                                  project='myname',
                                  partition='mypartition',
                                  cores='1',
                                  time='1:00:00',
                                  jobname='foowriter',
                                  threads='1'))

        fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
                                    task_n=self.task_n,
                                    slurminfo=sciluigi.SlurmInfo(
                                    runmode=self.runmode,
                                    project='myname',
                                    partition='mypartition',
                                    cores='1',
                                    time='1:00:00',
                                    jobname='fooreplacer',
                                    threads='1'))

        fooreplacer.in_foo = foowriter.out_foo
        return fooreplacer


class MyFooWriter(sciluigi.SlurmTask):

    task_n = luigi.IntParameter()
    def out_foo(self):
        return sciluigi.TargetInfo(self, 'foo{}.txt'.format(str(self.task_n)))

    def run(self):
        self.ex('touch {out_file}; echo foo > {out_file}; sleep 30'.format(
                out_file = self.out_foo().path))


class MyFooReplacer(sciluigi.SlurmTask):

    task_n = luigi.IntParameter()
    in_foo = None

    def out_replaced(self):
        out_file = os.path.join(os.path.dirname(self.in_foo().path),
                                'host{}.txt'.format(str(self.task_n)))
        return sciluigi.TargetInfo(self, out_file)

    def run(self):
        self.ex('./replace_with_hostname.sh {in_file} {out_file}; sleep 30'.format(
                in_file = self.in_foo().path,
                out_file = self.out_replaced().path))

Here, the run method calls a script replace_with_hostname.sh introduced in the previous tutorial.

Note that we also changed the way we write the foo* files. Instead of using Python code, we make a call to ex(), which ensures that the task is sent to Slurm and executed as a batch job. The ex() method is in fact what is actually sensitive to the runmode, and if called with rumode set to hpc, it will call salloc to allocate resources and srun to run the command. Thus, only calls to ex() will be sent out as batch jobs to the Slurm queue. In contrast, any Python code present in the run method of the task will be executed locally.

Lastly, we added sleep 30 to the commands in both tasks, which is an instruction to wait for 30 seconds. This is just to slow down the execution a bit so that we can see jobs appearing in the queue and in what order they were executed.

Running the workflow

At the end of the script, we have to add

if __name__ == '__main__':
    sciluigi.run_local(main_task_cls=MyMetaWorkflow)

Finally, we can run our pipeline. Importantly, we have to set the number of workers to a number larger than one (Luigi's default is one) to allow our tasks to run concurrently. Supposing the code is in a script called sciluigi_slurm_example_parallel.py, we can type

python sciluigi_slurm_example_parallel.py --runmode hpc --n-tasks 10 --workers 30

This command will execute 10 pipelines which can run in parallel (depending on the availability of resources) on different nodes.

You can check that the host name written in the host*.txt agrees with Slurm by calling sacct and passing the NodeList field to the --format option:

sacct --user <username> --format=JobID,JobName,Partition,Account,AllocCPUS,State,NodeList

The last column of the output of sacct will then show the nodes where the tasks were executed.

Of course, you may want to limit the number of batch jobs which can be executed in parallel at any given time. If we were to set --workers 10, then SciLuigi would send out all the 10 foowriter tasks as batch jobs, but then, since it only has 10 workers available, it will wait until workers are free before sending the fooreplacer tasks.