Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strange behavior when using checkpoints in cloud execution #2021

Closed
cademirch opened this issue Dec 17, 2022 · 8 comments
Closed

Strange behavior when using checkpoints in cloud execution #2021

cademirch opened this issue Dec 17, 2022 · 8 comments
Labels
bug Something isn't working

Comments

@cademirch
Copy link
Contributor

cademirch commented Dec 17, 2022

Snakemake version

7.18.2
Describe the bug

I am trying to run a workflow with a checkpoint in the cloud. The workflow runs as expected when executed locally. However, in the cloud (using --google-lifesciences) it seems that on the checkpoint aggregating step (see below), the whole DAG is rebuilt within that cloud job.

Minimal example

import glob

localrules: first_step, second_step
samples = ["sample_a"]
rule all:
    input: expand("results/{sample}/done.txt", sample=samples)

def get_files(wc):
    """
    Globs files from second_step's output to create input for third_step
    """
    checkpoint_output = checkpoints.second_step.get(**wc).output[0]
    query = os.path.join(checkpoint_output, "*.txt") # only care about txt files
    files = [os.path.basename(f) for f in glob.glob(query)]
    numbers = [f.replace("_interval.txt", "") for f in files]
    
    out = expand("results/{{sample}}/third_step/intermediate_{i}.txt", i=numbers)
    
    return out
    
rule first_step:
    output: "results/{sample}/hi.txt"
    shell: "echo 'hi' > {output}"

checkpoint second_step:
    """
    Generates random number of files
    """
    input: "results/{sample}/hi.txt"
    output: directory("results/{sample}/second_step")
    run:
        import random
        import os
        num = random.randint(1,10)
        if not os.path.exists(output[0]):
            os.mkdir(output[0])
        for i in range(num):
            with open(os.path.join(output[0], f"{i}_interval.txt"), "w") as f:
                print(f"this is a message from file interval_{i}.txt", file=f)

rule third_step:
    input: "results/{sample}/second_step/{i}_interval.txt"
    output: "results/{sample}/third_step/intermediate_{i}.txt"
    shell: "cat {input} > {output}"


rule aggregate_step:
    """
    Aggregates files from third_step
    """
    input: get_files
    output: "results/{sample}/done.txt"
    shell: "cat {input} > {output}"

run using:
snakemake --google-lifesciences --default-remote-prefix cade-test -j 10

This is the stdout/stderr from the aggregate_step job, notice how it rebuilds the DAG

--2022-12-17 05:53:54--  https://raw.githubusercontent.com/snakemake/snakemake/main/snakemake/executors/google_lifesciences_helper.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4045 (4.0K) [text/plain]
Saving to: ‘/download.py’

     0K ...                                                   100% 4.19M=0.001s

2022-12-17 05:53:54 (4.19 MB/s) - ‘/download.py’ saved [4045/4045]

Blob source/cache/workdir-b03670b5fb7cb25f93b093b879059c6fcfe6a93aed77fa6984cb8c5645a4d322.tar.gz downloaded to /tmp/workdir.tar.gz.
Snakefile
Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1 (use --cores to define parallelism)
Rules claiming more threads will be scaled down.
Provided resources: mem_mb=1000, disk_mb=1000
Select jobs to execute...

[Sat Dec 17 05:53:59 2022]
localcheckpoint second_step:
    input: cade-test/results/sample_a/hi.txt
    output: cade-test/results/sample_a/second_step
    jobid: 1
    reason: Missing output files: cade-test/results/sample_a/second_step
    wildcards: sample=sample_a
    resources: mem_mb=1000, disk_mb=1000, tmpdir=/tmp
Downstream jobs will be updated after completion.

Warning: the following output files of rule second_step were not present when the DAG was created:
{'cade-test/results/sample_a/second_step'}
Downloading from remote: cade-test/results/sample_a/hi.txt
Finished download.
Uploading to remote: cade-test/results/sample_a/second_step
Finished upload.
[Sat Dec 17 05:54:00 2022]
Finished job 1.
1 of 2 steps (50%) done
Select jobs to execute...

[Sat Dec 17 05:54:01 2022]
rule third_step:
    input: cade-test/results/sample_a/second_step/0_interval.txt
    output: cade-test/results/sample_a/third_step/intermediate_0.txt
    jobid: 5
    reason: Updated input files: cade-test/results/sample_a/second_step/0_interval.txt
    wildcards: sample=sample_a, i=0
    resources: mem_mb=1000, disk_mb=1000, tmpdir=/tmp

Downloading from remote: cade-test/results/sample_a/second_step/0_interval.txt
Finished download.
Uploading to remote: cade-test/results/sample_a/third_step/intermediate_0.txt
Finished upload.
[Sat Dec 17 05:54:02 2022]
Finished job 5.
2 of 4 steps (50%) done
Select jobs to execute...

[Sat Dec 17 05:54:02 2022]
rule third_step:
    input: cade-test/results/sample_a/second_step/1_interval.txt
    output: cade-test/results/sample_a/third_step/intermediate_1.txt
    jobid: 6
    reason: Updated input files: cade-test/results/sample_a/second_step/1_interval.txt
    wildcards: sample=sample_a, i=1
    resources: mem_mb=1000, disk_mb=1000, tmpdir=/tmp

Downloading from remote: cade-test/results/sample_a/second_step/1_interval.txt
Finished download.
Uploading to remote: cade-test/results/sample_a/third_step/intermediate_1.txt
Finished upload.
[Sat Dec 17 05:54:03 2022]
Finished job 6.
3 of 4 steps (75%) done
Select jobs to execute...

[Sat Dec 17 05:54:03 2022]
rule aggregate_step:
    input: cade-test/results/sample_a/third_step/intermediate_0.txt, cade-test/results/sample_a/third_step/intermediate_1.txt
    output: cade-test/results/sample_a/done.txt
    jobid: 0
    reason: Input files updated by another job: cade-test/results/sample_a/third_step/intermediate_1.txt, cade-test/results/sample_a/third_step/intermediate_0.txt
    wildcards: sample=sample_a
    resources: mem_mb=1000, disk_mb=1000, tmpdir=/tmp

Uploading to remote: cade-test/results/sample_a/done.txt
Finished upload.
[Sat Dec 17 05:54:04 2022]
Finished job 0.
4 of 4 steps (100%) done
['results/{sample}/third_step/intermediate_0.txt', 'results/{sample}/third_step/intermediate_1.txt']
--2022-12-17 05:54:06--  https://raw.githubusercontent.com/snakemake/snakemake/main/snakemake/executors/google_lifesciences_helper.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4045 (4.0K) [text/plain]
Saving to: ‘/gls.py’

     0K ...                                                   100% 4.37M=0.001s

2022-12-17 05:54:06 (4.37 MB/s) - ‘/gls.py’ saved [4045/4045]

Additional context
@johanneskoester Would appreciate your insight on this. Am I doing something really wrong or not using checkpoints as intended?

@cademirch cademirch added the bug Something isn't working label Dec 17, 2022
@aryarm
Copy link
Member

aryarm commented Dec 23, 2022

Hi @cademirch ,

I'm trying to adapt a test from our test suite so that it contains some of the functionality of your example. How's this? Unlike the original test, I've added a directory output and aggregated over files in the directory. If this doesn't reproduce the issues you've been seeing, then we can always go back and try something else - but at least we'll have ruled this out.

import os

# This test file is adapted from this one:
# https://github.com/snakemake/snakemake/blob/e03a3b42eea89d512290bf98ee7d77ce2e17447c/tests/test_cloud_checkpoints_issue574/Snakefile
from snakemake.remote.GS import RemoteProvider as GSRemoteProvider
GS = GSRemoteProvider()

rule all:
    input:
        "landsat-data.txt.bz2"

checkpoint copy:
    input:
        GS.remote("gcp-public-data-landsat/LC08/01/001/003/LC08_L1GT_001003_20170430_20170501_01_RT/LC08_L1GT_001003_20170430_20170501_01_RT_MTL.txt")
    output:
        directory("landsat-dir")
    resources:
        mem_mb=100
    run:
        shell('mkdir {output}')
        for i in range(3):
            shell('cp {input} {output}/landsat-data-$(date "+%s%N").txt')

def get_files(wc):
    checkpoint_output = checkpoints.copy.get().output[0]
    query = os.path.join(checkpoint_output, "landsat-data-{s}.txt")
    return expand(query, s=glob_wildcards(query).s)

rule pack:
    input: get_files
    output:
        "landsat-data.txt.bz2"
    conda:
        "env.yml"
    log:
        "logs/pack.log"
    shell:
        "cat {input} | bzip2 -c > {output}; echo successful > {log}"

@cademirch
Copy link
Contributor Author

cademirch commented Jan 14, 2023

Thanks for putting this test file together @aryarm. Sorry for the delay, but I've run it and this also almost replicates the behavior above.

Here is the output from the GLS log of the execution of the pack rule:

Building DAG of jobs...
Creating conda environment env.yml...
Downloading and installing remote packages.
Environment for /workdir/env.yml created (location: .snakemake/conda/314bbf2bbc17d83008407e0090947efe_)
Using shell: /bin/bash
Provided cores: 1 (use --cores to define parallelism)
Rules claiming more threads will be scaled down.
Provided resources: mem_mb=1000, disk_mb=1000
Select jobs to execute...

[Sat Jan 14 17:19:52 2023]
checkpoint copy:
    input: gcp-public-data-landsat/LC08/01/001/003/LC08_L1GT_001003_20170430_20170501_01_RT/LC08_L1GT_001003_20170430_20170501_01_RT_MTL.txt
    output: cade-test/aryarm-test/landsat-dir
    jobid: 1
    reason: Missing output files: cade-test/aryarm-test/landsat-dir
    resources: mem_mb=100, disk_mb=1000, tmpdir=/tmp
Downstream jobs will be updated after completion.

Warning: the following output files of rule copy were not present when the DAG was created:
{'cade-test/aryarm-test/landsat-dir'}
Downloading from remote: gcp-public-data-landsat/LC08/01/001/003/LC08_L1GT_001003_20170430_20170501_01_RT/LC08_L1GT_001003_20170430_20170501_01_RT_MTL.txt
Finished download.
Uploading to remote: cade-test/aryarm-test/landsat-dir
Finished upload.
[Sat Jan 14 17:19:55 2023]
Finished job 1.
1 of 2 steps (50%) done
MissingInputException in rule pack  in line 29 of /workdir/Snakefile:
Missing input files for rule pack:
    output: cade-test/aryarm-test/landsat-data.txt.bz2
    affected files:
        cade-test/aryarm-test/cade-test/aryarm-test/landsat-dir/landsat-data-1673716794473183560.txt
        cade-test/aryarm-test/cade-test/aryarm-test/landsat-dir/landsat-data-1673716794404379200.txt
        cade-test/aryarm-test/cade-test/aryarm-test/landsat-dir/landsat-data-1673716794467798605.txt

So, despite this being the execution of pack, the DAG is built and it is determined that copy needs to run, which makes sense because on the instance, the output of copy does not exist.

So I think this boils down to checkpoint rules not being compatible when assumed_shared_fs is False. Which overall makes sense, but I'm not sure is documented.

That being said, I'm not sure what the solution is here. Perhaps somehow Snakemake needs to know to bring the checkpoint_output file to the instance when executing the checkpoint dependent rule? @johanneskoester Would appreciate your feedback and input here.

Sidenote: There is a MissingInputException for pack because the remote prefix is doubled up. I believe this is happens because the input function to pack returns a list of files with the remote prefix already prepended, and then Snakemake prepends it again because of the default-remote-prefix CLI argument. In my example in the OP I avoid this by returning a list of paths relative to the Snakefile, not including the remote prefix. This probably warrants its own issue, or some documentation. Modifying the get_files input function in @aryarm's Snakefile like so:

def get_files(wc):
    import glob
    checkpoint_output = checkpoints.copy.get().output[0]
    query = os.path.join(checkpoint_output, "landsat-data-{s}.txt")
    files = [os.path.basename(f) for f in glob.glob(query)]
    wildcards = [f.split("-")[2].strip(".txt") for f in files]
    return expand("landsat-dir/landsat-data-{s}", s=wildcards)

Solves this error.

@aryarm
Copy link
Member

aryarm commented Feb 11, 2023

this also almost replicates the behavior above

So are there any other errors that you're getting that this minimal example doesn't replicate?

So I think this boils down to checkpoint rules not being compatible when assumed_shared_fs is False. Which overall makes sense, but I'm not sure is documented.

I'm not sure what you mean here. Is assumed_shared_fs a variable that's defined somewhere in Snakemake's source code?

Sidenote: There is a MissingInputException for pack

Maybe we should start by tackling the duplicated prefix issue. In theory, we shouldn't have to change our get_files() method to make that work because Snakefiles are supposed to work on the cloud without any modifications.
It's strange because it looks a lot like #574, which we already fixed

I've started on a branch that we can draft a PR for. I'll create the PR once we're confident that it covers all of the weird behavior you've been seeing.

@cademirch
Copy link
Contributor Author

cademirch commented Feb 11, 2023

this also almost replicates the behavior above

So are there any other errors that you're getting that this minimal example doesn't replicate?

Other than the MissingInputException, no.

So I think this boils down to checkpoint rules not being compatible when assumed_shared_fs is False. Which overall makes sense, but I'm not sure is documented.

I'm not sure what you mean here. Is assumed_shared_fs a variable that's defined somewhere in Snakemake's source code?

Sorry, this is poor explanation on my part. IIRC assumed_shared_fs is an attribute of workflow. It is false when executing on the cloud since there is no shared filesystem between the cloud nodes. Basically, it seems checkpoints wont work when there is no shared filesystem because the checkpoint dependent rule needs to have access to the output files of the checkpoint to determine its own input. When there is no shared filesystem (i.e. cloud) the checkpoint dependent rule is executed on a cloud node, where it then builds its own DAG to create the checkpoint output and any other required files, regardless if the checkpoint output exists in the bucket.

Sidenote: There is a MissingInputException for pack

Maybe we should start by tackling the duplicated prefix issue. In theory, we shouldn't have to change our get_files() method to make that work because Snakefiles are supposed to work on the cloud without any modifications. It's strange because it looks a lot like #574, which we already fixed

I'm not super sure about this actually. I believe that the duplicated prefix here is because in your get_files input function, the absolute paths of the the input files is returned, which includes the remote prefix. Then, when the input function is evaluated, Snakemake prepends the default remote prefix, thus the double prepend. I think this just may need to be documented for when working with input functions with checkpoints and remote files.

Thanks for following up on this!

@aryarm
Copy link
Member

aryarm commented Feb 12, 2023

Other than the MissingInputException, no.

Ok. Then I'm going to move forward with that example as a test for the pull request.

I believe that the duplicated prefix here is because in your get_files input function, the absolute paths of the the input files is returned, which includes the remote prefix. Then, when the input function is evaluated, Snakemake prepends the default remote prefix, thus the double prepend.

Yes, I agree - that's definitely what is happening here. But I'm still not sure that it should.
Like I said earlier, I think Snakefiles are supposed to work on the cloud without any modifications. So I don't think this is actually desired behavior? Take a look at the test for issue #574 (in PR #1294)), for example.
https://github.com/snakemake/snakemake/pull/1294/files#diff-ca7f44ee7cdf3c3ac5abb7d257641266748e23c0e376d5e61abd0908a31027b9
In that situation, we had the same issue: the prefix was getting duplicated for the output of a checkpoint. As far as I can tell, the only real difference between the two tests is that we had a lambda function instead of a get_files() method. I'm not sure why that would matter, but it'll be easier to debug once we have a PR.

Thanks for further explaining the point about assumed_shared_fs. I didn't know that was an attribute of workflow.
That seems like a good hypothesis. Let's investigate in the PR!

@aryarm
Copy link
Member

aryarm commented Feb 12, 2023

ok, we can continue working on this in PR #2108

@cademirch , let me know if you have any suggestions on changes to my description of the issue or the code I committed!

@cademirch
Copy link
Contributor Author

@aryarm Thank you for putting together the PR, this is very helpful! Hopefully we can get to the bottom of this.

@cademirch
Copy link
Contributor Author

Closed because this is covered in the docs example 🤦‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants