Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle pipeline takes extremely long time to finish #7247

Open
RaananHadar opened this issue Jan 17, 2022 · 8 comments
Open

Shuffle pipeline takes extremely long time to finish #7247

RaananHadar opened this issue Jan 17, 2022 · 8 comments
Labels

Comments

@RaananHadar
Copy link
Contributor

What happened?:
I've created a shuffle pipeline that takes a "/" glob with files:

a.json
b.json
c.json

and creates an output with combinations:

/pfs/out/a_b/a.json
/pfs/out/a_b/b.json
...

For each combination...

The user code finishes in less than a second. The storage container takes extremely long to finish. For only 20 files, the job takes ~40 seconds. For 50 it may take minutes and for the real number of files the job is required to handle (10s of thousands), its impractical to run in Pachyderm at the moment.

What you expected to happen?:
Its a shuffle pipeline with empty files, it should complete extremely quickly.

How to reproduce it (as minimally and precisely as possible)?:
I've created a basic mock to reproduce this pipeline:

  1. Here is a script to reproduce a dataset:
for i in range(1,20):
    with open('./data/' + str(i) + '.json','w') as fp:
        fp.write(str(i))
  1. Here is the user code:
if __name__ == "__main__":
    from itertools import combinations
    import sys, os

    images_path = sys.argv[1]
    k = int(sys.argv[2])
    output_dir = sys.argv[3]

    images = os.listdir(images_path)
    images_combinations = combinations(images, k)

    for i, combination in enumerate(images_combinations):
        #create new folder
        image_suffix_len = len(combination[0].split(".")[-1]) + 1 # +1 to include the . of the suffix
        folder = os.path.join(output_dir, '_'.join(combination))
        folder = folder.replace(combination[0][-image_suffix_len:], "")

        os.mkdir(folder)

        #save k images in the combination to the new folder
        for j in range(k):
            current_file = combination[j]
            current_file = os.path.join(images_path, current_file)
            os.symlink(current_file, os.path.join(folder, os.path.basename(current_file)))
  1. Here is a pipeline spec:
{
    "pipeline": {
      "name": "mock"
    },
    "description": "A shuffle pipeline that reproduces the bug",
    "transform": {
      "cmd": [ "/bin/bash" ],
      "stdin": [
        "python3 /code/run_test_combinations.py $test 2 /pfs/out"
      ],
      "image": "mock:0.0.1"
    },
    "input": {
        "pfs": {
            "repo": "input",
            "glob": "/",
            "empty_files": true,
            "name": "test"
        }
    }
}

Anything else we need to know?:

Environment?:

  • Kubernetes version (use kubectl version): AKS 1.20.9
  • Pachyderm CLI and pachd server version (use pachctl version): This was actually tested and reproduced with the latest versions of both Pachyderm 1.13.4 and Pachyderm 2.0.5
  • Cloud provider (e.g. aws, azure, gke) or local deployment (e.g. minikube vs dockerized k8s): Azure
  • Others:
    Looking at the storage container logs, the user code finishes in less than a second. The storage container takes a while to run, looks like its generating a lot of calls to pachd. As I said, this finishes successfully for a small number of files, but it looks like the code does not scale to large numbers.
@acohen4
Copy link
Contributor

acohen4 commented Jan 20, 2022

Hi @RaananHadar !

As a side-effect of some the performance trade-offs made in 2.0, the shuffle pipeline implemented with Empty Files has become comparatively less performant than copying files from input to /pfs/out when the file sizes are very small. Our rule of thumb estimate at the moment is that for files less than 10 MB, shuffling will likely be more performant by copying full files instead of symlinking empty files.

Could you try shuffling with empty_files set to false, and see whether the performance becomes acceptable?

@mindthevirt
Copy link
Contributor

Agent Dale Georg linked Freshdesk ticket 401 for this issue.

@RaananHadar
Copy link
Contributor Author

I've changed empty_files to false and attempted to copy the files. This does improve performance for a small amount of files.

However for my practical pipeline, I'm talking gigabytes per datum at ~10k files, meaning the user code now becomes the bottleneck. So now I have to optimize my user code to parallel copy... which is not terrible, but I honestly think that the empty_files pattern has many advantages had it been optimized...

@RaananHadar
Copy link
Contributor Author

The peculiar thing is that this also happens for 1.13.4. It takes me about 8 minutes to run the above symlink pipeline on 1.13.4 with only 50 generated files.

I think there's something else at work here.

@BOsterbuhr
Copy link
Contributor

Sorry for the delay, @RaananHadar. There have been improvements made to compaction and symlinking in 2.1.6 and 2.1.7 that provide significant performance improvements.

Can you upgrade to 2.1.7 and let us know how performance looks for your specific use case?

@RaananHadar
Copy link
Contributor Author

Thank you @BOsterbuhr, will give it a retest and report back.

@RaananHadar
Copy link
Contributor Author

RaananHadar commented May 10, 2022

I did test this with 2.1.7 on azure.

And I even simplified this somewhat to test more general shuffle pipelines. In my simplified pipeline I take x inputs and map them to x outputs (I can share the code if needed).

Here are my findings:
a mock pipeline with empty_files=true still takes about 30 minutes with ~50,000 output datums. I expect my pipeline to reach 1e6 outputs... so this is a no go yet. Without empty files, a mock that copies the files takes ~20 seconds. At first glance, this is great, however, this is a mock pipeline, and my original pipeline is a reduce that would require copying couple of tens of TBs... so it won't scale well 😟 and probably introduce needless cloud costs.

Bottom line, I think solving this via the empty_files=true is required.. I am willing to spend some time on helping you reproduce this, because AKS maybe at fault here.

@RaananHadar
Copy link
Contributor Author

Here is the updated mock to reproduce this:

data generation code:

for i in range(1,50000):
    with open('./data/' + str(i) + '.json','w') as fp:
        fp.write(str(i))

pipeline code's main.py:

import sys, os

if __name__ == "__main__":
    
    images_path = sys.argv[1]
    output_dir = sys.argv[2]

    images = os.listdir(images_path)

    i = 0
    for image in images:
        os.symlink(images_path + '/' + image, output_dir + '/' + str(i) + '.json')
        i = i + 1

    print('all done!')

pipeline spec:

{
    "pipeline": {
      "name": "mock"
    },
    "description": "A shuffle pipeline that reproduces the bug",
    "transform": {
      "cmd": [ "/bin/bash" ],
      "stdin": [
        "python3 /code/main.py $test /pfs/out"
      ],
      "image": "foo/mock_shuffle:0.01"
    },
    "input": {
        "pfs": {
            "repo": "input",
            "glob": "/",
            "empty_files": true,
            "name": "test"
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants