Skip to content

Commit

Permalink
Modify all DAG computations to be at most linear in the number of jobs (
Browse files Browse the repository at this point in the history
#710)

* ensure that DAG.update_ready becomes linear.

* clarify installation

* get rid of repeated bfs calls and directly use dicts and sets for DAG.needrun_jobs and DAG.jobs

* simplification

* revert test change: ancient should not apply for intermediate results

* fix issue with dynamic handling

* skip finished jobs when iterating over needrun jobs

* remove issue 661 test as this is not the desired behavior of ancient.

* remove test case (see previous commit)

* add restart jobs to the DAG._ready_jobs set again.

* fix ready computation for group jobs

* fix __eq__ implementation of group jobs

* fix ready determination for groups

* restore ancient behavior as desired in issue #661
  • Loading branch information
johanneskoester committed Oct 30, 2020
1 parent cfb8348 commit 2e2c63a
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 65 deletions.
19 changes: 17 additions & 2 deletions docs/getting_started/installation.rst
Expand Up @@ -29,7 +29,10 @@ The default conda solver is a bit slow and sometimes has issues with `selecting
$ conda install -c conda-forge mamba
Then, you can install Snakemake with
Full installation
-----------------

Snakemake can be installed with all goodies needed to run in any environment and for creating interactive reports via

.. code-block:: console
Expand All @@ -44,13 +47,25 @@ This will install snakemake into an isolated software environment, that has to b
$ snakemake --help
Installing into isolated environments is best practice in order to avoid side effects with other packages.

Note that full installation is not possible from **Windows**, because some of the dependencies are Unix (Linux/MacOS) only.
For Windows, please use the minimal installation below.

Minimal installation
--------------------

A minimal version of Snakemake which only depends on the bare necessities can be installed with

.. code-block:: console
$ mamba create -c bioconda -c conda-forge -n snakemake snakemake-minimal
Note that Snakemake is available via Bioconda for historical, reproducibility, and continuity reasons.
In contrast to the full installation, which depends on some Unix (Linux/MacOS) only packages, this also works on Windows.

Notes on Bioconda as a package source
-------------------------------------

Note that Snakemake is available via Bioconda for historical, reproducibility, and continuity reasons (although it is not limited to biology applications at all).
However, it is easy to combine Snakemake installation with other channels, e.g., by prefixing the package name with ``::bioconda``, i.e.,

.. code-block:: console
Expand Down
11 changes: 10 additions & 1 deletion snakemake/__init__.py
Expand Up @@ -2626,7 +2626,16 @@ def open_browser():
with open(args.runtime_profile, "w") as out:
profile = yappi.get_func_stats()
profile.sort("totaltime")
profile.print_all(out=out)
profile.print_all(
out=out,
columns={
0: ("name", 120),
1: ("ncall", 10),
2: ("tsub", 8),
3: ("ttot", 8),
4: ("tavg", 8),
},
)

sys.exit(0 if success else 1)

Expand Down
78 changes: 47 additions & 31 deletions snakemake/dag.py
Expand Up @@ -123,6 +123,7 @@ def __init__(
self.container_imgs = dict()
self._progress = 0
self._group = dict()
self._n_until_ready = defaultdict(int)

self.job_factory = JobFactory()
self.group_job_factory = GroupJobFactory()
Expand Down Expand Up @@ -243,9 +244,13 @@ def cleanup_workdir(self):

def cleanup(self):
self.job_cache.clear()
final_jobs = set(self.jobs)
final_jobs = set(self.bfs(self.dependencies, *self.targetjobs))
todelete = [job for job in self.dependencies if job not in final_jobs]
for job in todelete:
try:
self._needrun.remove(job)
except KeyError:
pass
del self.dependencies[job]
try:
del self.depending[job]
Expand Down Expand Up @@ -335,7 +340,8 @@ def check_dynamic(self):
"""Check dynamic output and update downstream rules if necessary."""
if self.has_dynamic_rules:
for job in filter(
lambda job: (job.dynamic_output and not self.needrun(job)), self.jobs
lambda job: (job.dynamic_output and not self.needrun(job)),
list(self.jobs),
):
self.update_dynamic(job)
self.postprocess()
Expand All @@ -351,17 +357,12 @@ def dynamic_output_jobs(self):
@property
def jobs(self):
""" All jobs in the DAG. """
for job in self.bfs(self.dependencies, *self.targetjobs):
yield job
return self.dependencies.keys()

@property
def needrun_jobs(self):
""" Jobs that need to be executed. """
for job in filter(
self.needrun,
self.bfs(self.dependencies, *self.targetjobs, stop=self.noneedrun_finished),
):
yield job
return filterfalse(self.finished, self._needrun)

@property
def local_needrun_jobs(self):
Expand All @@ -371,8 +372,7 @@ def local_needrun_jobs(self):
@property
def finished_jobs(self):
""" Iterate over all jobs that have been finished."""
for job in filter(self.finished, self.bfs(self.dependencies, *self.targetjobs)):
yield job
return filter(self.finished, self.jobs)

@property
def ready_jobs(self):
Expand Down Expand Up @@ -630,9 +630,9 @@ def handle_remote(self, job, upload=True):
""" Remove local files if they are no longer needed and upload. """
if upload:
# handle output files
files = list(job.expanded_output)
files = job.expanded_output
if job.benchmark:
files.append(job.benchmark)
files = chain(job.expanded_output, (job.benchmark,))
for f in files:
if f.is_remote and not f.should_stay_on_remote:
f.upload_to_remote()
Expand All @@ -650,6 +650,9 @@ def handle_remote(self, job, upload=True):
)

if not self.keep_remote_local:
if not any(f.is_remote for f in job.input):
return

# handle input files
needed = lambda job_, f: any(
f in files
Expand All @@ -663,6 +666,7 @@ def unneeded_files():
and not f.protected
and not f.should_keep_local
)

generated_input = set()
for job_, files in self.dependencies[job].items():
generated_input |= files
Expand Down Expand Up @@ -944,8 +948,10 @@ def update_needrun(job):
_needrun = self._needrun
dependencies = self.dependencies
depending = self.depending
_n_until_ready = self._n_until_ready

_needrun.clear()
_n_until_ready.clear()
candidates = list(self.jobs)

# Update the output mintime of all jobs.
Expand All @@ -970,19 +976,23 @@ def update_needrun(job):
for job_, files in dependencies[job].items():
missing_output = job_.missing_output(requested=files)
reason(job_).missing_output.update(missing_output)
if missing_output and not job_ in visited:
if missing_output and job_ not in visited:
visited.add(job_)
queue.append(job_)

for job_, files in depending[job].items():
if job_ in candidates_set and not all(f.is_ancient for f in files):
reason(job_).updated_input_run.update(
f for f in files if not f.is_ancient
)
if not job_ in visited:
if job_ in candidates_set:
if job_ not in visited:
if all(f.is_ancient for f in files):
# No other reason to run job_.
# Since all files are ancient, we do not trigger it.
continue
visited.add(job_)
queue.append(job_)

_n_until_ready[job_] += 1
reason(job_).updated_input_run.update(files)

# update len including finished jobs (because they have already increased the job counter)
self._len = len(self._finished | self._needrun)

Expand Down Expand Up @@ -1101,6 +1111,9 @@ def update_ready(self, jobs=None):
potential_new_ready_jobs = False
candidate_groups = set()
for job in jobs:
if job in self._ready_jobs:
# job has been seen before, no need to process again
continue
if not self.finished(job) and self._ready(job):
potential_new_ready_jobs = True
if job.group is None:
Expand Down Expand Up @@ -1138,6 +1151,7 @@ def close_remote_objects(self):
def postprocess(self):
"""Postprocess the DAG. This has to be invoked after any change to the
DAG topology."""
self.cleanup()
self.update_jobids()
self.update_needrun()
self.update_priority()
Expand Down Expand Up @@ -1230,16 +1244,14 @@ def _ready(self, job):
"""Return whether the given job is ready to execute."""
group = self._group.get(job, None)
if group is None:
is_external_needrun_dep = self.needrun
return self._n_until_ready[job] == 0
else:

def is_external_needrun_dep(j):
g = self._group.get(j, None)
return self.needrun(j) and (g is None or g != group)

return self._finished.issuperset(
filter(is_external_needrun_dep, self.dependencies[job])
)
n_internal_deps = lambda job: sum(
self._group.get(dep) == group for dep in self.dependencies[job]
)
return all(
(self._n_until_ready[job] - n_internal_deps(job)) == 0 for job in group
)

def update_checkpoint_dependencies(self, jobs=None):
"""Update dependencies of checkpoints."""
Expand Down Expand Up @@ -1283,12 +1295,16 @@ def finish(self, job, update_dynamic=True):

# mark depending jobs as ready
# skip jobs that are marked as until jobs
potential_new_ready_jobs = self.update_ready(
depending = [
j
for job in jobs
for j in self.depending[job]
if not self.in_until(job) and self.needrun(j)
)
]
for job in depending:
self._n_until_ready[job] -= 1

potential_new_ready_jobs = self.update_ready(depending)

for job in jobs:
if update_dynamic and job.dynamic_output:
Expand Down Expand Up @@ -1501,7 +1517,7 @@ def bfs(self, direction, *jobs, stop=lambda job: False):
# stop criterion reached for this node
continue
yield job
for job_, _ in direction[job].items():
for job_ in direction[job].keys():
if not job_ in visited:
queue.append(job_)
visited.add(job_)
Expand Down
2 changes: 2 additions & 0 deletions snakemake/jobs.py
Expand Up @@ -1416,6 +1416,8 @@ def __hash__(self):
return hash(self.jobs)

def __eq__(self, other):
if not isinstance(other, AbstractJob):
return False
if other.is_group():
return self.jobs == other.jobs
else:
Expand Down
26 changes: 17 additions & 9 deletions snakemake/scheduler.py
Expand Up @@ -368,9 +368,7 @@ def stats(self):
@property
def open_jobs(self):
""" Return open jobs. """
jobs = set(self.dag.ready_jobs)
jobs -= self.running
jobs -= self.failed
jobs = self.dag.ready_jobs

if not self.dryrun:
jobs = [
Expand Down Expand Up @@ -400,7 +398,7 @@ def schedule(self):

# obtain needrun and running jobs in a thread-safe way
with self._lock:
needrun = list(self.open_jobs)
needrun = set(self.open_jobs)
running = list(self.running)
errors = self._errors
user_kill = self._user_kill
Expand Down Expand Up @@ -455,6 +453,8 @@ def schedule(self):
# update running jobs
with self._lock:
self.running.update(run)
# remove from read_jobs
self.dag._ready_jobs -= run

# actually run jobs
local_runjobs = [job for job in run if job.is_local]
Expand Down Expand Up @@ -538,9 +538,15 @@ def _proceed(
logger.job_finished(jobid=job.jobid)
self.progress()

if (
if self.dryrun:
if not self.running:
# During dryrun, only release when all running jobs are done.
# This saves a lot of time, as self.open_jobs has to be
# evaluated less frequently.
self._open_jobs.release()
elif (
not self.running
or (potential_new_ready_jobs and self.open_jobs)
or potential_new_ready_jobs
or self.workflow.immediate_submit
):
# go on scheduling if open jobs are ready or no job is running
Expand All @@ -565,6 +571,8 @@ def _handle_error(self, job):
if job.restart_times > job.attempt - 1:
logger.info("Trying to restart job {}.".format(self.dag.jobid(job)))
job.attempt += 1
# add job to those being ready again
self.dag._ready_jobs.add(job)
else:
self._errors = True
self.failed.add(job)
Expand Down Expand Up @@ -692,9 +700,9 @@ def job_selector_ilp(self, jobs):
)
)

selected_jobs = [
selected_jobs = set(
job for job, variable in scheduled_jobs.items() if variable.value() == 1.0
]
)
for name in self.workflow.global_resources:
self.resources[name] -= sum(
[job.resources.get(name, 0) for job in selected_jobs]
Expand Down Expand Up @@ -765,7 +773,7 @@ def calc_reward():
if not E:
break

solution = [job for job, sel in zip(jobs, x) if sel]
solution = set(job for job, sel in zip(jobs, x) if sel)
# update resources
for name, b_i in zip(self.global_resources, b):
self.resources[name] = b_i
Expand Down
1 change: 0 additions & 1 deletion tests/test_ancient/Snakefile
Expand Up @@ -25,7 +25,6 @@ rule b:
shell:
"echo \"C recreated\" > {output}"

#Will not be executed because C is marked ancient and D exists
rule c:
input:
ancient("C")
Expand Down
15 changes: 0 additions & 15 deletions tests/test_issue661/Snakefile

This file was deleted.

1 change: 0 additions & 1 deletion tests/test_issue661/ancient_derivative

This file was deleted.

1 change: 0 additions & 1 deletion tests/test_issue661/expected-results/ancient_derivative

This file was deleted.

Empty file.
Empty file.
4 changes: 0 additions & 4 deletions tests/tests.py
Expand Up @@ -1065,10 +1065,6 @@ def test_github_issue640():
)


def test_issue661():
run(dpath("test_issue661"), check_md5=True)


def test_generate_unit_tests():
tmpdir = run(
dpath("test_generate_unit_tests"),
Expand Down

0 comments on commit 2e2c63a

Please sign in to comment.