-
Notifications
You must be signed in to change notification settings - Fork 567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3g sidecar passthru #8287
Closed
Closed
S3g sidecar passthru #8287
Changes from all commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
ed20c5c
Switch to local s2 from external s2 module
msteffen 49a8aef
WIP
lukemarsden ee9cd86
WIP on proxying to real S3/minio backend in certain cases for s3_out
lukemarsden 9614e36
WIP - getting a 404 for some reason. Need to understand the mux routi…
lukemarsden 5623711
More WIP
lukemarsden a906fe5
example
lukemarsden 5f80fd0
bump, debugging
lukemarsden 1c75c42
debug logging from spark was making it unbearably slow
lukemarsden c2854f4
mega find and replace
lukemarsden c4447da
Debugging rewriting...
lukemarsden 272da07
try replacing bucket name and path separately
lukemarsden ff66799
REVERSE the transformation in the response
lukemarsden d21527b
Handle XML style bucket rewriting too
lukemarsden 88ae87a
Thank you linter (for once)
lukemarsden a863d24
why are you getting stuck
lukemarsden 20892b9
Try NOT transforming the path within the bucket, see if that works
lukemarsden de3068c
Try to get back to sanity...
lukemarsden 09f725d
When it works, debug logging in spark breaks loki
lukemarsden 1391f21
Switch back to trying to get bucket renaming working
lukemarsden ed532f7
turn back on signing and start mutating bucket name again
lukemarsden 0158e0d
The last commit worked! Now try manipulating the path as well
lukemarsden 0d2acb7
add stack to log message. This is now working
lukemarsden a6f9463
performance - don't rewrite octet streams
lukemarsden 00ffc99
cleanup
lukemarsden 075727d
more cleanup
lukemarsden 94b6b57
cleanup
lukemarsden 0e228bf
revert vendoring s2, not needed for the direction we took
lukemarsden e33bcfa
Merge branch '2.3.x' into s3g-sidecar-passthru
lukemarsden e80fcc0
Cleanup
lukemarsden a2b4105
Some WIP towards supporting real S3 as well. Needs testing in a reali…
lukemarsden e6c05b1
First pass on copying data from backend job-scoped bucket location to…
lukemarsden fb8d917
Try creating a new commit
lukemarsden 60ebb8b
Comment
lukemarsden 846176d
Revert docker changes (#8293)
chainlink 0220cc8
Put S3 copying code in the right place - it doesn't actually work yet…
lukemarsden 13e5c88
Merge branch 's3g-sidecar-passthru' of github.com:pachyderm/pachyderm…
lukemarsden d53ea8b
The copy has to happen from the storage container because that has th…
lukemarsden 2296eb5
Revert "Revert docker changes (#8293)"
lukemarsden 0f25c77
Improve logging
lukemarsden ceb09bb
Revert "Revert "Revert docker changes (#8293)""
lukemarsden 368a25c
cleanup - this code wasn't meant to be here as well
lukemarsden d0392fc
Attempt cleanup after copying job-scoped prefix into pfs
lukemarsden 28b673a
Revert "Revert "Revert "Revert docker changes (#8293)"""
lukemarsden 68b6cd2
logging
lukemarsden 31aa3d2
Don't do the cleanup from inside the mfc
lukemarsden 42f5351
Revert "Revert "Revert "Revert "Revert docker changes (#8293)""""
lukemarsden 2420d4b
Cleanup stale comments
lukemarsden 368583e
More comment cleanup
lukemarsden 56c094c
Fix up bail out logic
chainlink ead11f9
Fix up logic
chainlink af9704b
Update Sign4 call to use the Security Token
bbonenfant File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
FROM jupyter/pyspark-notebook:spark-3.3.0 | ||
WORKDIR /home/jovyan | ||
ADD spark.py /home/jovyan/ | ||
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.3/hadoop-aws-3.3.3.jar | ||
RUN wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.264/aws-java-sdk-bundle-1.12.264.jar | ||
ENTRYPOINT ["spark-submit", "--executor-memory=15g", "--driver-memory=15g", "--jars='hadoop-aws-3.3.3.jar,aws-java-sdk-bundle-1.12.264.jar'", "/home/jovyan/spark.py"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# Spark writing to `s3_out` in `raw_s3_out` mode | ||
|
||
Spark likes to write to `s3_out` in ways that the normal Pachyderm S3 gateway doesn't like. | ||
|
||
We have a special alpha `s3_out` feature mode called `raw_s3_out`. | ||
You can trigger it by adding an annotation to your pipeline json: | ||
|
||
``` | ||
"metadata": { | ||
"annotations": { | ||
"raw_s3_out": "true" | ||
} | ||
}, | ||
``` | ||
|
||
Writes to `s3_out` will then work with Spark, especially when Spark is writing a large amount of data. (With the normal S3 gateway, you see slow-downs and errors relating to "copyFile" failing.) | ||
|
||
This directory contains a worked example. We've built and pushed the Docker image for you already, so all you need to do is run: | ||
|
||
``` | ||
pachctl create repo poke_s3 | ||
pachctl create branch poke_s3@master | ||
``` | ||
This gives us a pipeline to poke to make spark start. | ||
|
||
``` | ||
pachctl create pipeline -f s3-spark.json | ||
``` | ||
|
||
And observe that the result (about 190MB of the phrase "INFINITEIMPROBABILITY" repeated over and over again, a homage to [The Guide](https://sites.google.com/site/h2g2theguide/Index/i/149246)) is written to the output repo: | ||
|
||
``` | ||
pachctl put file -f /etc/passwd poke_s3@master:/test01 | ||
``` | ||
Pokes the pipeline to start it. | ||
``` | ||
pachctl list file spark_s3_demo@master | ||
``` | ||
And observe the final files are written therein! | ||
|
||
|
||
## How it works (advanced details 🤓) | ||
|
||
You don't need to know how this mode works to use it, but in case you're interested. | ||
|
||
We implement this by passing those S3 requests directly through to the backing S3 store (with some light protocol hacking to rewrite bucket names, paths and authentication credentials). | ||
|
||
This means that real S3 (or minio) is processing the complex things that Spark does with the S3 protocol, and when it's finished, we just copy the result back into the output repo in PFS. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/bash | ||
set -xeuo pipefail | ||
export IMAGE=quay.io/lukemarsden/spark_s3_demo:v0.0.13 | ||
docker buildx build -t $IMAGE . | ||
docker push $IMAGE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
{ | ||
"pipeline": { | ||
"name": "spark_s3_demo" | ||
}, | ||
"metadata": { | ||
"annotations": { | ||
"raw_s3_out": "true" | ||
} | ||
}, | ||
"input": { | ||
"pfs": { | ||
"glob": "/", | ||
"repo": "poke_s3", | ||
"name": "poke_s3" | ||
} | ||
}, | ||
"transform": { | ||
"cmd": [ | ||
"spark-submit", | ||
"--executor-memory=15g", | ||
"--driver-memory=15g", | ||
"--jars", "hadoop-aws-3.3.3.jar,aws-java-sdk-bundle-1.12.264.jar", | ||
"spark.py" | ||
], | ||
"image": "quay.io/lukemarsden/spark_s3_demo:v0.0.13", | ||
"working_dir": "/home/jovyan" | ||
}, | ||
"s3_out": true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from pyspark.sql import SparkSession, Row, DataFrame | ||
from pyspark.context import SparkContext | ||
from pyspark import SparkConf | ||
import time | ||
import os | ||
|
||
conf = SparkConf() | ||
minio = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could strip this out |
||
if minio: | ||
conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:9000") | ||
else: | ||
# conf.set('spark.hadoop.fs.s3a.endpoint', "http://192.168.49.2:30600") | ||
endpoint = os.getenv('S3_ENDPOINT') | ||
conf.set('spark.hadoop.fs.s3a.endpoint', endpoint) | ||
print(f"endpoint is {endpoint}") | ||
# conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:30600") | ||
|
||
conf.set('spark.hadoop.fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem") | ||
|
||
# XXX I don't think the following line actually turns on the magic committer. What else needs to happen? | ||
# conf.set('spark.hadoop.fs.s3a.committer.name', 'magic') | ||
|
||
if minio: | ||
conf.set('spark.hadoop.fs.s3a.access.key', 'admin') | ||
conf.set('spark.hadoop.fs.s3a.secret.key', 'password') | ||
else: | ||
conf.set('spark.hadoop.fs.s3a.access.key', "anything_will_do") | ||
conf.set('spark.hadoop.fs.s3a.secret.key', "anything_will_do") | ||
|
||
conf.set('spark.hadoop.fs.s3a.path.style.access', 'true') | ||
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false') | ||
conf.set("spark.hadoop.fs.s3a.change.detection.mode", 'none') | ||
conf.set("spark.hadoop.fs.s3a.change.detection.version.required", 'false') | ||
|
||
sc = SparkContext(conf=conf) | ||
sc.setLogLevel("ERROR") | ||
# sc.setLogLevel("DEBUG") | ||
sc.setSystemProperty("com.amazonaws.services.s3.disablePutObjectMD5Validation", "true") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably don't need this any more |
||
|
||
# confirm config is applied to this session | ||
spark = SparkSession.builder.getOrCreate() | ||
conf = spark.sparkContext.getConf() | ||
sc = spark.sparkContext | ||
conf = sc.getConf() | ||
print(sc.getConf().getAll()) | ||
|
||
# create some example data | ||
# big = "INFINITEIMPROBABILITY"*1024*100 | ||
big = "INFINITEIMPROBABILITY"*1024*100 | ||
zs = [ Row(a=big, b=big,) for _ in range(1000) ] | ||
df = spark.createDataFrame(zs) | ||
df.explain() | ||
# df.show() | ||
df.repartition(200) | ||
df.explain() | ||
|
||
repo = "spark-s3g-demo2" | ||
branch = "master" | ||
|
||
path = "example-data-24" | ||
if minio: | ||
url = f"s3a://foo/{path}" | ||
else: | ||
url = f"s3a://out/{path}" | ||
|
||
print("Starting write...") | ||
(df.coalesce(1) | ||
.write | ||
# .option("fs.s3a.committer.name", "magic") | ||
.format("parquet") | ||
.mode("overwrite") | ||
.save(url)) | ||
print("Finished write!") | ||
|
||
df.explain() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
from pyspark.sql import SparkSession, Row, DataFrame | ||
from pyspark.context import SparkContext | ||
from pyspark import SparkConf | ||
import time | ||
import os | ||
import python_pachyderm | ||
|
||
conf = SparkConf() | ||
minio = False | ||
if minio: | ||
conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:9000") | ||
else: | ||
conf.set('spark.hadoop.fs.s3a.endpoint', "http://192.168.49.2:30600") | ||
# conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:30600") | ||
|
||
conf.set('spark.hadoop.fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem") | ||
|
||
# XXX I don't think the following line actually turns on the magic committer. What else needs to happen? | ||
# conf.set('spark.hadoop.fs.s3a.committer.name', 'magic') | ||
|
||
if minio: | ||
conf.set('spark.hadoop.fs.s3a.access.key', 'admin') | ||
conf.set('spark.hadoop.fs.s3a.secret.key', 'password') | ||
else: | ||
conf.set('spark.hadoop.fs.s3a.access.key', 'anything_matching') | ||
conf.set('spark.hadoop.fs.s3a.secret.key', 'anything_matching') | ||
|
||
conf.set('spark.hadoop.fs.s3a.path.style.access', 'true') | ||
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false') | ||
conf.set("spark.hadoop.fs.s3a.change.detection.mode", 'none') | ||
conf.set("spark.hadoop.fs.s3a.change.detection.version.required", 'false') | ||
|
||
sc = SparkContext(conf=conf) | ||
sc.setLogLevel("ERROR") | ||
# sc.setLogLevel("DEBUG") | ||
sc.setSystemProperty("com.amazonaws.services.s3.disablePutObjectMD5Validation", "true") | ||
|
||
# confirm config is applied to this session | ||
spark = SparkSession.builder.getOrCreate() | ||
conf = spark.sparkContext.getConf() | ||
sc = spark.sparkContext | ||
conf = sc.getConf() | ||
print(sc.getConf().getAll()) | ||
|
||
# create some example data | ||
# big = "INFINITEIMPROBABILITY"*1024*100 | ||
big = "INFINITEIMPROBABILITY"*1024*100 | ||
zs = [ Row(a=big, b=big,) for _ in range(1000) ] | ||
df = spark.createDataFrame(zs) | ||
df.explain() | ||
# df.show() | ||
df.repartition(200) | ||
df.explain() | ||
|
||
repo = "spark-s3g-demo2" | ||
branch = "master" | ||
|
||
client = python_pachyderm.Client() | ||
|
||
with client.commit(repo, branch) as commit: | ||
print(f"Opening commit {commit} for spark job") | ||
path = "example-data-24" | ||
if minio: | ||
url = f"s3a://foo/{path}" | ||
else: | ||
url = f"s3a://{branch}.{repo}/{path}" | ||
(df.coalesce(1) | ||
.write | ||
# .option("fs.s3a.committer.name", "magic") | ||
.format("parquet") | ||
.mode("overwrite") | ||
.save(url)) | ||
df.explain() | ||
print(f"Closing {commit}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
link to the bits?