Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3g sidecar passthru #8287

Closed
wants to merge 51 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
ed20c5c
Switch to local s2 from external s2 module
msteffen Aug 12, 2022
49a8aef
WIP
lukemarsden Oct 7, 2022
ee9cd86
WIP on proxying to real S3/minio backend in certain cases for s3_out
lukemarsden Oct 11, 2022
9614e36
WIP - getting a 404 for some reason. Need to understand the mux routi…
lukemarsden Oct 11, 2022
5623711
More WIP
lukemarsden Oct 12, 2022
a906fe5
example
lukemarsden Oct 12, 2022
5f80fd0
bump, debugging
lukemarsden Oct 12, 2022
1c75c42
debug logging from spark was making it unbearably slow
lukemarsden Oct 12, 2022
c2854f4
mega find and replace
lukemarsden Oct 14, 2022
c4447da
Debugging rewriting...
lukemarsden Oct 14, 2022
272da07
try replacing bucket name and path separately
lukemarsden Oct 17, 2022
ff66799
REVERSE the transformation in the response
lukemarsden Oct 17, 2022
d21527b
Handle XML style bucket rewriting too
lukemarsden Oct 17, 2022
88ae87a
Thank you linter (for once)
lukemarsden Oct 17, 2022
a863d24
why are you getting stuck
lukemarsden Oct 17, 2022
20892b9
Try NOT transforming the path within the bucket, see if that works
lukemarsden Oct 17, 2022
de3068c
Try to get back to sanity...
lukemarsden Oct 17, 2022
09f725d
When it works, debug logging in spark breaks loki
lukemarsden Oct 18, 2022
1391f21
Switch back to trying to get bucket renaming working
lukemarsden Oct 18, 2022
ed532f7
turn back on signing and start mutating bucket name again
lukemarsden Oct 18, 2022
0158e0d
The last commit worked! Now try manipulating the path as well
lukemarsden Oct 18, 2022
0d2acb7
add stack to log message. This is now working
lukemarsden Oct 18, 2022
a6f9463
performance - don't rewrite octet streams
lukemarsden Oct 18, 2022
00ffc99
cleanup
lukemarsden Oct 18, 2022
075727d
more cleanup
lukemarsden Oct 18, 2022
94b6b57
cleanup
lukemarsden Oct 18, 2022
0e228bf
revert vendoring s2, not needed for the direction we took
lukemarsden Oct 18, 2022
e33bcfa
Merge branch '2.3.x' into s3g-sidecar-passthru
lukemarsden Oct 18, 2022
e80fcc0
Cleanup
lukemarsden Oct 18, 2022
a2b4105
Some WIP towards supporting real S3 as well. Needs testing in a reali…
lukemarsden Oct 18, 2022
e6c05b1
First pass on copying data from backend job-scoped bucket location to…
lukemarsden Oct 18, 2022
fb8d917
Try creating a new commit
lukemarsden Oct 18, 2022
60ebb8b
Comment
lukemarsden Oct 18, 2022
846176d
Revert docker changes (#8293)
chainlink Oct 18, 2022
0220cc8
Put S3 copying code in the right place - it doesn't actually work yet…
lukemarsden Oct 18, 2022
13e5c88
Merge branch 's3g-sidecar-passthru' of github.com:pachyderm/pachyderm…
lukemarsden Oct 18, 2022
d53ea8b
The copy has to happen from the storage container because that has th…
lukemarsden Oct 18, 2022
2296eb5
Revert "Revert docker changes (#8293)"
lukemarsden Oct 18, 2022
0f25c77
Improve logging
lukemarsden Oct 18, 2022
ceb09bb
Revert "Revert "Revert docker changes (#8293)""
lukemarsden Oct 18, 2022
368a25c
cleanup - this code wasn't meant to be here as well
lukemarsden Oct 18, 2022
d0392fc
Attempt cleanup after copying job-scoped prefix into pfs
lukemarsden Oct 19, 2022
28b673a
Revert "Revert "Revert "Revert docker changes (#8293)"""
lukemarsden Oct 19, 2022
68b6cd2
logging
lukemarsden Oct 19, 2022
31aa3d2
Don't do the cleanup from inside the mfc
lukemarsden Oct 19, 2022
42f5351
Revert "Revert "Revert "Revert "Revert docker changes (#8293)""""
lukemarsden Oct 19, 2022
2420d4b
Cleanup stale comments
lukemarsden Oct 19, 2022
368583e
More comment cleanup
lukemarsden Oct 19, 2022
56c094c
Fix up bail out logic
chainlink Oct 20, 2022
ead11f9
Fix up logic
chainlink Oct 20, 2022
af9704b
Update Sign4 call to use the Security Token
bbonenfant Oct 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/spark/s3-out/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM jupyter/pyspark-notebook:spark-3.3.0
WORKDIR /home/jovyan
ADD spark.py /home/jovyan/
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.3/hadoop-aws-3.3.3.jar
RUN wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.264/aws-java-sdk-bundle-1.12.264.jar
ENTRYPOINT ["spark-submit", "--executor-memory=15g", "--driver-memory=15g", "--jars='hadoop-aws-3.3.3.jar,aws-java-sdk-bundle-1.12.264.jar'", "/home/jovyan/spark.py"]
48 changes: 48 additions & 0 deletions examples/spark/s3-out/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Spark writing to `s3_out` in `raw_s3_out` mode

Spark likes to write to `s3_out` in ways that the normal Pachyderm S3 gateway doesn't like.

We have a special alpha `s3_out` feature mode called `raw_s3_out`.
You can trigger it by adding an annotation to your pipeline json:

```
"metadata": {
"annotations": {
"raw_s3_out": "true"
}
},
```

Writes to `s3_out` will then work with Spark, especially when Spark is writing a large amount of data. (With the normal S3 gateway, you see slow-downs and errors relating to "copyFile" failing.)

This directory contains a worked example. We've built and pushed the Docker image for you already, so all you need to do is run:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link to the bits?


```
pachctl create repo poke_s3
pachctl create branch poke_s3@master
```
This gives us a pipeline to poke to make spark start.

```
pachctl create pipeline -f s3-spark.json
```

And observe that the result (about 190MB of the phrase "INFINITEIMPROBABILITY" repeated over and over again, a homage to [The Guide](https://sites.google.com/site/h2g2theguide/Index/i/149246)) is written to the output repo:

```
pachctl put file -f /etc/passwd poke_s3@master:/test01
```
Pokes the pipeline to start it.
```
pachctl list file spark_s3_demo@master
```
And observe the final files are written therein!


## How it works (advanced details 🤓)

You don't need to know how this mode works to use it, but in case you're interested.

We implement this by passing those S3 requests directly through to the backing S3 store (with some light protocol hacking to rewrite bucket names, paths and authentication credentials).

This means that real S3 (or minio) is processing the complex things that Spark does with the S3 protocol, and when it's finished, we just copy the result back into the output repo in PFS.
5 changes: 5 additions & 0 deletions examples/spark/s3-out/build-and-push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -xeuo pipefail
export IMAGE=quay.io/lukemarsden/spark_s3_demo:v0.0.13
docker buildx build -t $IMAGE .
docker push $IMAGE
29 changes: 29 additions & 0 deletions examples/spark/s3-out/s3-spark.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"pipeline": {
"name": "spark_s3_demo"
},
"metadata": {
"annotations": {
"raw_s3_out": "true"
}
},
"input": {
"pfs": {
"glob": "/",
"repo": "poke_s3",
"name": "poke_s3"
}
},
"transform": {
"cmd": [
"spark-submit",
"--executor-memory=15g",
"--driver-memory=15g",
"--jars", "hadoop-aws-3.3.3.jar,aws-java-sdk-bundle-1.12.264.jar",
"spark.py"
],
"image": "quay.io/lukemarsden/spark_s3_demo:v0.0.13",
"working_dir": "/home/jovyan"
},
"s3_out": true
}
75 changes: 75 additions & 0 deletions examples/spark/s3-out/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.context import SparkContext
from pyspark import SparkConf
import time
import os

conf = SparkConf()
minio = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could strip this out

if minio:
conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:9000")
else:
# conf.set('spark.hadoop.fs.s3a.endpoint', "http://192.168.49.2:30600")
endpoint = os.getenv('S3_ENDPOINT')
conf.set('spark.hadoop.fs.s3a.endpoint', endpoint)
print(f"endpoint is {endpoint}")
# conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:30600")

conf.set('spark.hadoop.fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")

# XXX I don't think the following line actually turns on the magic committer. What else needs to happen?
# conf.set('spark.hadoop.fs.s3a.committer.name', 'magic')

if minio:
conf.set('spark.hadoop.fs.s3a.access.key', 'admin')
conf.set('spark.hadoop.fs.s3a.secret.key', 'password')
else:
conf.set('spark.hadoop.fs.s3a.access.key', "anything_will_do")
conf.set('spark.hadoop.fs.s3a.secret.key', "anything_will_do")

conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
conf.set("spark.hadoop.fs.s3a.change.detection.mode", 'none')
conf.set("spark.hadoop.fs.s3a.change.detection.version.required", 'false')

sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# sc.setLogLevel("DEBUG")
sc.setSystemProperty("com.amazonaws.services.s3.disablePutObjectMD5Validation", "true")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably don't need this any more


# confirm config is applied to this session
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
sc = spark.sparkContext
conf = sc.getConf()
print(sc.getConf().getAll())

# create some example data
# big = "INFINITEIMPROBABILITY"*1024*100
big = "INFINITEIMPROBABILITY"*1024*100
zs = [ Row(a=big, b=big,) for _ in range(1000) ]
df = spark.createDataFrame(zs)
df.explain()
# df.show()
df.repartition(200)
df.explain()

repo = "spark-s3g-demo2"
branch = "master"

path = "example-data-24"
if minio:
url = f"s3a://foo/{path}"
else:
url = f"s3a://out/{path}"

print("Starting write...")
(df.coalesce(1)
.write
# .option("fs.s3a.committer.name", "magic")
.format("parquet")
.mode("overwrite")
.save(url))
print("Finished write!")

df.explain()
74 changes: 74 additions & 0 deletions examples/spark/s3gateway/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.context import SparkContext
from pyspark import SparkConf
import time
import os
import python_pachyderm

conf = SparkConf()
minio = False
if minio:
conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:9000")
else:
conf.set('spark.hadoop.fs.s3a.endpoint', "http://192.168.49.2:30600")
# conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:30600")

conf.set('spark.hadoop.fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")

# XXX I don't think the following line actually turns on the magic committer. What else needs to happen?
# conf.set('spark.hadoop.fs.s3a.committer.name', 'magic')

if minio:
conf.set('spark.hadoop.fs.s3a.access.key', 'admin')
conf.set('spark.hadoop.fs.s3a.secret.key', 'password')
else:
conf.set('spark.hadoop.fs.s3a.access.key', 'anything_matching')
conf.set('spark.hadoop.fs.s3a.secret.key', 'anything_matching')

conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
conf.set("spark.hadoop.fs.s3a.change.detection.mode", 'none')
conf.set("spark.hadoop.fs.s3a.change.detection.version.required", 'false')

sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# sc.setLogLevel("DEBUG")
sc.setSystemProperty("com.amazonaws.services.s3.disablePutObjectMD5Validation", "true")

# confirm config is applied to this session
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
sc = spark.sparkContext
conf = sc.getConf()
print(sc.getConf().getAll())

# create some example data
# big = "INFINITEIMPROBABILITY"*1024*100
big = "INFINITEIMPROBABILITY"*1024*100
zs = [ Row(a=big, b=big,) for _ in range(1000) ]
df = spark.createDataFrame(zs)
df.explain()
# df.show()
df.repartition(200)
df.explain()

repo = "spark-s3g-demo2"
branch = "master"

client = python_pachyderm.Client()

with client.commit(repo, branch) as commit:
print(f"Opening commit {commit} for spark job")
path = "example-data-24"
if minio:
url = f"s3a://foo/{path}"
else:
url = f"s3a://{branch}.{repo}/{path}"
(df.coalesce(1)
.write
# .option("fs.s3a.committer.name", "magic")
.format("parquet")
.mode("overwrite")
.save(url))
df.explain()
print(f"Closing {commit}")
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ require (
github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9
github.com/snowflakedb/gosnowflake v1.6.11
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/cast v1.3.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,8 @@ github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.1 h1:voD4ITNjPL5jjBfgR/r8fPIIBrliWrWHeiJApdr3r4w=
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9 h1:hp2CYQUINdZMHdvTdXtPOY2ainKl4IoMcpAXEf2xj3Q=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
Expand Down