Skip to content

Commit

Permalink
Merge pull request #45 from pachyderm/next
Browse files Browse the repository at this point in the history
Merge v.05 code from next in to master
  • Loading branch information
jdoliner committed Mar 14, 2015
2 parents ff5d8a7 + 3934265 commit 1a37aec
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ ADD . /go/src/$PFS
RUN go install -race $PFS/services/master && go install $PFS/services/router && go install $PFS/services/webhook && go install $PFS/deploy
RUN ln $GOPATH/src/$PFS/scripts/pfs-test /usr/local/bin/pfs-test
RUN ln $GOPATH/src/$PFS/scripts/pfs-bench /usr/local/bin/pfs-bench
RUN ln $GOPATH/src/$PFS/scripts/btrfs-wrapper /usr/sbin/btrfs
RUN ln $GOPATH/src/$PFS/scripts/btrfs-wrapper /bin/btrfs

EXPOSE 80
2 changes: 1 addition & 1 deletion deploy/static/dev/announce-master-0-3.service
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PartOf = master-0-3.service
BindsTo = master-0-3.service

[Service]
ExecStart = /bin/sh -c "while true; do etcdctl set /pfs/master/0-3 '%H:62871' --ttl 60;sleep 45;done"
ExecStart = /bin/sh -c "while true; do etcdctl set /pfs/master/0-3 '%H:53285' --ttl 60;sleep 45;done"
ExecStop = /usr/bin/etcdctl rm /pfs/master/0-3

[X-Fleet]
Expand Down
2 changes: 1 addition & 1 deletion deploy/static/dev/announce-master-1-3.service
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PartOf = master-1-3.service
BindsTo = master-1-3.service

[Service]
ExecStart = /bin/sh -c "while true; do etcdctl set /pfs/master/1-3 '%H:60711' --ttl 60;sleep 45;done"
ExecStart = /bin/sh -c "while true; do etcdctl set /pfs/master/1-3 '%H:59553' --ttl 60;sleep 45;done"
ExecStop = /usr/bin/etcdctl rm /pfs/master/1-3

[X-Fleet]
Expand Down
2 changes: 1 addition & 1 deletion deploy/static/dev/announce-master-2-3.service
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PartOf = master-2-3.service
BindsTo = master-2-3.service

[Service]
ExecStart = /bin/sh -c "while true; do etcdctl set /pfs/master/2-3 '%H:58281' --ttl 60;sleep 45;done"
ExecStart = /bin/sh -c "while true; do etcdctl set /pfs/master/2-3 '%H:53432' --ttl 60;sleep 45;done"
ExecStop = /usr/bin/etcdctl rm /pfs/master/2-3

[X-Fleet]
Expand Down
10 changes: 5 additions & 5 deletions deploy/static/dev/master-0-3.service
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ ExecStartPre = -/bin/sh -c "echo $(docker rm master-0-3)"
ExecStartPre = /bin/sh -c "echo $(docker pull `etcdctl get /pfs/registry`/pfs)"
ExecStartPre = -/bin/sh -c "echo $(mkdir /var/lib/pfs)"
ExecStartPre = -/bin/sh -c "echo $(truncate /var/lib/pfs/data.img -s 10G)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs /var/lib/pfs/data.img)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs `etcdctl get /pfs/config/DISK`)"
ExecStartPre = -/bin/sh -c "echo $(mkdir -p /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount /var/lib/pfs/data.img /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount `etcdctl get /pfs/config/DISK` /var/lib/pfs/vol)"
ExecStart = /bin/sh -c "echo $(docker run \
--privileged=true \
--name master-0-3 \
-v /:/host:ro \
-v /var/lib/pfs/vol:/host/var/lib/pfs/vol \
-v /var/lib/pfs:/var/lib/pfs \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /usr/sbin/btrfs:/usr/sbin/btrfs.host \
-v /lib64:/lib64.host \
-e AWS_ACCESS_KEY_ID=`etcdctl get /pfs/creds/AWS_ACCESS_KEY_ID` \
-e AWS_SECRET_ACCESS_KEY=`etcdctl get /pfs/creds/AWS_SECRET_ACCESS_KEY` \
-p 62871:80 \
-p 53285:80 \
-i `etcdctl get /pfs/registry`/pfs \
/go/bin/master 0-3)"
ExecStop = /bin/sh -c "echo $(docker rm -f master-0-3)"
10 changes: 5 additions & 5 deletions deploy/static/dev/master-1-3.service
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ ExecStartPre = -/bin/sh -c "echo $(docker rm master-1-3)"
ExecStartPre = /bin/sh -c "echo $(docker pull `etcdctl get /pfs/registry`/pfs)"
ExecStartPre = -/bin/sh -c "echo $(mkdir /var/lib/pfs)"
ExecStartPre = -/bin/sh -c "echo $(truncate /var/lib/pfs/data.img -s 10G)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs /var/lib/pfs/data.img)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs `etcdctl get /pfs/config/DISK`)"
ExecStartPre = -/bin/sh -c "echo $(mkdir -p /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount /var/lib/pfs/data.img /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount `etcdctl get /pfs/config/DISK` /var/lib/pfs/vol)"
ExecStart = /bin/sh -c "echo $(docker run \
--privileged=true \
--name master-1-3 \
-v /:/host:ro \
-v /var/lib/pfs/vol:/host/var/lib/pfs/vol \
-v /var/lib/pfs:/var/lib/pfs \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /usr/sbin/btrfs:/usr/sbin/btrfs.host \
-v /lib64:/lib64.host \
-e AWS_ACCESS_KEY_ID=`etcdctl get /pfs/creds/AWS_ACCESS_KEY_ID` \
-e AWS_SECRET_ACCESS_KEY=`etcdctl get /pfs/creds/AWS_SECRET_ACCESS_KEY` \
-p 60711:80 \
-p 59553:80 \
-i `etcdctl get /pfs/registry`/pfs \
/go/bin/master 1-3)"
ExecStop = /bin/sh -c "echo $(docker rm -f master-1-3)"
10 changes: 5 additions & 5 deletions deploy/static/dev/master-2-3.service
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ ExecStartPre = -/bin/sh -c "echo $(docker rm master-2-3)"
ExecStartPre = /bin/sh -c "echo $(docker pull `etcdctl get /pfs/registry`/pfs)"
ExecStartPre = -/bin/sh -c "echo $(mkdir /var/lib/pfs)"
ExecStartPre = -/bin/sh -c "echo $(truncate /var/lib/pfs/data.img -s 10G)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs /var/lib/pfs/data.img)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs `etcdctl get /pfs/config/DISK`)"
ExecStartPre = -/bin/sh -c "echo $(mkdir -p /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount /var/lib/pfs/data.img /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount `etcdctl get /pfs/config/DISK` /var/lib/pfs/vol)"
ExecStart = /bin/sh -c "echo $(docker run \
--privileged=true \
--name master-2-3 \
-v /:/host:ro \
-v /var/lib/pfs/vol:/host/var/lib/pfs/vol \
-v /var/lib/pfs:/var/lib/pfs \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /usr/sbin/btrfs:/usr/sbin/btrfs.host \
-v /lib64:/lib64.host \
-e AWS_ACCESS_KEY_ID=`etcdctl get /pfs/creds/AWS_ACCESS_KEY_ID` \
-e AWS_SECRET_ACCESS_KEY=`etcdctl get /pfs/creds/AWS_SECRET_ACCESS_KEY` \
-p 58281:80 \
-p 53432:80 \
-i `etcdctl get /pfs/registry`/pfs \
/go/bin/master 2-3)"
ExecStop = /bin/sh -c "echo $(docker rm -f master-2-3)"
8 changes: 4 additions & 4 deletions deploy/templates/sharded
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ ExecStartPre = -/bin/sh -c "echo $(docker rm {{.Name}}-{{.Shard}}-{{.Nshards}})"
ExecStartPre = /bin/sh -c "echo $(docker pull {{.Container}})"
ExecStartPre = -/bin/sh -c "echo $(mkdir /var/lib/pfs)"
ExecStartPre = -/bin/sh -c "echo $(truncate /var/lib/pfs/data.img -s 10G)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs /var/lib/pfs/data.img)"
ExecStartPre = -/bin/sh -c "echo $(mkfs.btrfs `etcdctl get /pfs/config/DISK`)"
ExecStartPre = -/bin/sh -c "echo $(mkdir -p /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount /var/lib/pfs/data.img /var/lib/pfs/vol)"
ExecStartPre = -/bin/sh -c "echo $(mount `etcdctl get /pfs/config/DISK` /var/lib/pfs/vol)"
ExecStart = /bin/sh -c "echo $(docker run \
--privileged=true \
--name {{.Name}}-{{.Shard}}-{{.Nshards}} \
-v /:/host:ro \
-v /var/lib/pfs/vol:/host/var/lib/pfs/vol \
-v /var/lib/pfs:/var/lib/pfs \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /usr/sbin/btrfs:/usr/sbin/btrfs.host \
-v /lib64:/lib64.host \
-e AWS_ACCESS_KEY_ID=`etcdctl get /pfs/creds/AWS_ACCESS_KEY_ID` \
-e AWS_SECRET_ACCESS_KEY=`etcdctl get /pfs/creds/AWS_SECRET_ACCESS_KEY` \
-p {{.Port}}:80 \
Expand Down
113 changes: 72 additions & 41 deletions lib/mapreduce/mapreduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ func spinupContainer(image string, command []string) (string, error) {
return "", err
}
if err := docker.PullImage(image, nil); err != nil {
//return "", err this is erroring due to failing to parse response json
log.Print(err)
return "", err //this is erroring due to failing to parse response json
}

return startContainer(image, command)
}

// This is where you'd find a stopContainer method. However we don't have one
// because the docker package provides docker.StopContainer which does exactly
// what we want.

func ipAddr(containerId string) (string, error) {
docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
if err != nil {
Expand All @@ -76,6 +81,7 @@ func retry(f func() error, retries int, pause time.Duration) error {
if err == nil {
break
} else {
log.Print("Retrying due to error: ", err)
time.Sleep(pause)
}
}
Expand All @@ -96,6 +102,7 @@ type Job struct {
Command []string `json:"command"`
Limit int `json:"limit"`
Parallel int `json:"parallel"`
TimeOut int `json:"timeout"`
}

type materializeInfo struct {
Expand Down Expand Up @@ -159,6 +166,7 @@ func Map(job Job, jobPath string, m materializeInfo, host string, shard, modulos
var bucket *s3.Bucket
if getProtocol(job.Input) == ProtoS3 {
auth, err := aws.EnvAuth()
log.Print("auth: %#v", auth)
if err != nil {
log.Print(err)
return
Expand All @@ -170,6 +178,7 @@ func Map(job Job, jobPath string, m materializeInfo, host string, shard, modulos
log.Print(err)
return
}
log.Print("bucketName: ", bucketName)
bucket = client.Bucket(bucketName)
}

Expand All @@ -188,45 +197,66 @@ func Map(job Job, jobPath string, m materializeInfo, host string, shard, modulos
go func(wg *sync.WaitGroup) {
defer wg.Done()
for name := range files {
var inFile io.ReadCloser
var err error
switch {
case getProtocol(job.Input) == ProtoPfs:
inFile, err = btrfs.Open(path.Join(m.In, m.Commit, job.Input, name))
case getProtocol(job.Input) == ProtoS3:
inFile, err = bucket.GetReader(name)
default:
log.Print("It shouldn't be possible to get here.")
continue
}
if err != nil {
log.Print(err)
return
}
defer inFile.Close()

var resp *http.Response
err = retry(func() error {
log.Print("Posting: ", "http://"+path.Join(host, name))
resp, err = client.Post("http://"+path.Join(host, name), "application/text", inFile)
return err
}, retries, 200*time.Millisecond)
if err != nil {
log.Print(err)
return
}
defer resp.Body.Close()
func() { // function scope just so that defer works
var err error
var resp *http.Response

err = retry(func() error {
var inFile io.ReadCloser
switch {
case getProtocol(job.Input) == ProtoPfs:
inFile, err = btrfs.Open(path.Join(m.In, m.Commit, job.Input, name))
case getProtocol(job.Input) == ProtoS3:
log.Print("File name: ", name)
inFile, err = bucket.GetReader(name)
default:
return fmt.Errorf("Invalid protocol.")
}
if err != nil {
return err
}
defer inFile.Close()

log.Print(name, ": ", "Posting: ", "http://"+path.Join(host, name))
resp, err = client.Post("http://"+path.Join(host, name), "application/text", inFile)
log.Print(name, ": ", "Post done.")
return err
}, retries, 200*time.Millisecond)
if err != nil {
log.Print(err)
return
}
defer resp.Body.Close()

outFile, err := btrfs.CreateAll(path.Join(m.Out, m.Branch, jobPath, name))
if err != nil {
log.Print(err)
return
}
defer outFile.Close()
if _, err := io.Copy(outFile, resp.Body); err != nil {
log.Print(err)
return
}
log.Print(name, ": ", "Creating file ", path.Join(m.Out, m.Branch, jobPath, name))
outFile, err := btrfs.CreateAll(path.Join(m.Out, m.Branch, jobPath, name))
if err != nil {
log.Print(err)
return
}
defer outFile.Close()
log.Print(name, ": ", "Opened outfile.")

wait := time.Minute * 10
if job.TimeOut != 0 {
wait = time.Duration(job.TimeOut) * time.Second
}
timer := time.AfterFunc(wait,
func() {
log.Print(name, ": ", "Timeout. Killing mapper.")
err := resp.Body.Close()
if err != nil {
log.Print(err)
}
})
defer log.Print("Result of timer.Stop(): ", timer.Stop())
log.Print(name, ": ", "Copying output...")
if _, err := io.Copy(outFile, resp.Body); err != nil {
log.Print(err)
return
}
log.Print(name, ": ", "Done copying.")
}()
}
}(&wg)
}
Expand Down Expand Up @@ -254,12 +284,12 @@ func Map(job Job, jobPath string, m materializeInfo, host string, shard, modulos
}
nextMarker := ""
for {
log.Print("s3: before List nextMarker = ", nextMarker)
lr, err := bucket.List(inPath, "", nextMarker, 0)
if err != nil {
log.Print(err)
return
}
nextMarker = lr.NextMarker
for _, key := range lr.Contents {
if route.HashResource(key.Key)%modulos == shard {
// This file belongs on this shard
Expand All @@ -269,6 +299,7 @@ func Map(job Job, jobPath string, m materializeInfo, host string, shard, modulos
break
}
}
nextMarker = key.Key
}
if !lr.IsTruncated {
// We've exhausted the output
Expand Down Expand Up @@ -415,7 +446,7 @@ func Materialize(in_repo, branch, commit, out_repo, jobDir string, shard, modulo
if !exists {
// Perfectly valid to have no jobs dir, it just means we have no work
// to do.
log.Printf("Jobs dir doesn't exists:\n", path.Join(in_repo, commit, jobDir))
log.Print("Jobs dir doesn't exists:\n", path.Join(in_repo, commit, jobDir))
return nil
}

Expand Down
39 changes: 39 additions & 0 deletions lib/mapreduce/mapreduce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package mapreduce

import (
"fmt"
"io"
"log"
"os"
"sync"
"testing"

"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/s3"
)

func TestS3(t *testing.T) {
auth, err := aws.EnvAuth()
log.Printf("auth: %#v", auth)
if err != nil {
log.Print(err)
return
}
client := s3.New(auth, aws.USWest)
bucket := client.Bucket("pachyderm-data")
var wg sync.WaitGroup
defer wg.Wait()
for i := 0; i < 5000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
inFile, err := bucket.GetReader(fmt.Sprintf("chess/file%09d", i))
if err != nil {
log.Print(err)
return
}
io.Copy(os.Stdout, inFile)
inFile.Close()
}(i)
}
}
4 changes: 2 additions & 2 deletions scripts/btrfs-wrapper
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/sh

# This script gets injected in to the docker container as /usr/sbin/btrfs
# This script gets injected in to the docker container as /bin/btrfs

LD_LIBRARY_PATH=/lib64.host /usr/sbin/btrfs.host $@
chroot /host btrfs $@
10 changes: 10 additions & 0 deletions scripts/clean
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh
# Cleanup the state created by scripts/launch
###############################################################################
set -E # pass trap handlers down to subshells
#set -x # execution tracing debug messages

PFS_DIR=~/.pfs

sudo umount "$PFS_DIR"/vol || true
rm "$PFS_DIR"/data.img || true
3 changes: 2 additions & 1 deletion scripts/dev-install
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Install a git repo on a pfs cluster.
###############################################################################
set -E # pass trap handlers down to subshells
set -x # execution tracing debug messages
#set -x # execution tracing debug messages

USAGE="Usage: $0 host"

Expand Down Expand Up @@ -33,6 +33,7 @@ BUCKET=${IMAGE_BUCKET:-"pachyderm-images"}
ssh core@"$HOST" etcdctl set /pfs/creds/AWS_ACCESS_KEY_ID ${AWS_ACCESS_KEY_ID:?"Env: AWS_ACCESS_KEY_ID must be set"}
ssh core@"$HOST" etcdctl set /pfs/creds/AWS_SECRET_ACCESS_KEY ${AWS_SECRET_ACCESS_KEY:?"Env: AWS_SECRET_ACCESS_KEY must be set"}
ssh core@"$HOST" etcdctl set /pfs/creds/IMAGE_BUCKET "$BUCKET"
ssh core@"$HOST" etcdctl set /pfs/config/DISK "/var/lib/pfs/data.img"
ssh core@"$HOST" 'cd pfs.git && hooks/post-receive'

echo "Done."

0 comments on commit 1a37aec

Please sign in to comment.