Skip to content

Commit

Permalink
Merge pull request #83 from pachyderm/next
Browse files Browse the repository at this point in the history
Next
  • Loading branch information
jdoliner committed Jun 17, 2015
2 parents d810a70 + ae478c2 commit 1335e4a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 25 deletions.
31 changes: 26 additions & 5 deletions lib/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

var (
ErrCancelled = errors.New("pfs: cancelled")
ErrArgCount = errors.New("pfs: illegal argument count")
ErrCancelled = errors.New("pfs: cancelled")
ErrArgCount = errors.New("pfs: illegal argument count")
ErrUnkownKeyword = errors.New("pfs: unknown keyword")
)

type Pipeline struct {
Expand Down Expand Up @@ -64,7 +65,12 @@ func (p *Pipeline) Input(name string) error {
// Image sets the image that is being used for computations.
func (p *Pipeline) Image(image string) error {
p.config.Config.Image = image
return container.PullImage(image)
err := container.PullImage(image)
if err != nil {
log.Print(err)
log.Print("assuming image is local and continuing")
}
return nil
}

// Start gets an outRepo ready to be used. This is where clean up of dirty
Expand Down Expand Up @@ -92,6 +98,7 @@ func (p *Pipeline) runCommit() string {
// best the process crashing at the wrong time could still leave it in an
// inconsistent state.
func (p *Pipeline) Run(cmd []string) error {
log.Print("Running: ", strings.Join(cmd, " "))
// this function always increments counter
defer func() { p.counter++ }()
// Check if the commit already exists
Expand Down Expand Up @@ -258,12 +265,24 @@ func (p *Pipeline) RunPachFile(r io.Reader) error {
if err := p.Start(); err != nil {
return err
}
var tokens []string
for lines.Scan() {
if p.cancelled {
return ErrCancelled
}
tokens := strings.Fields(lines.Text())
if len(tokens) == 0 || tokens[0][0] == '#' {
if len(tokens) > 0 && tokens[len(tokens)-1] == "\\" {
// We have tokens from last loop, remove the \ token which designates the line wrap
tokens = tokens[:len(tokens)-1]
} else {
// No line wrap, clear the tokens they were already execuated
tokens = []string{}
}
tokens = append(tokens, strings.Fields(lines.Text())...)
// These conditions are, empty line, comment line and wrapped line.
// All 3 cause us to continue, the first 2 because we're skipping them.
// The last because we need more input.
if len(tokens) == 0 || tokens[0][0] == '#' || tokens[len(tokens)-1] == "\\" {
// Comment or empty line, skip
continue
}

Expand All @@ -289,6 +308,8 @@ func (p *Pipeline) RunPachFile(r io.Reader) error {
return ErrArgCount
}
err = p.Shuffle(tokens[1])
default:
return ErrUnkownKeyword
}
if err != nil {
log.Print(err)
Expand Down
28 changes: 28 additions & 0 deletions lib/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,31 @@ run sleep 100
time.Sleep(time.Second * 2)
check(r.Cancel(), t)
}

// TestWrap tests a simple job that uses line wrapping in it's Pachfile
func TestWrap(t *testing.T) {
outRepo := "TestWrap"
check(btrfs.Init(outRepo), t)
pipeline := NewPipeline("output", "", outRepo, "commit", "master", "0-1")
pachfile := `
image ubuntu
# touch foo
run touch /out/foo \
/out/bar
`
err := pipeline.RunPachFile(strings.NewReader(pachfile))
check(err, t)

exists, err := btrfs.FileExists(path.Join(outRepo, "commit", "foo"))
check(err, t)
if exists != true {
t.Fatal("File `foo` doesn't exist when it should.")
}

exists, err = btrfs.FileExists(path.Join(outRepo, "commit", "bar"))
check(err, t)
if exists != true {
t.Fatal("File `bar` doesn't exist when it should.")
}
}
21 changes: 1 addition & 20 deletions lib/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,6 @@ func indexOf(haystack []string, needle string) int {
return -1
}

func cat(w http.ResponseWriter, name string) {
exists, err := btrfs.FileExists(name)
if err != nil {
http.Error(w, err.Error(), 500)
log.Print(err)
}
if !exists {
http.Error(w, "404 page not found", 404)
return
}

if err := rawCat(w, name); err != nil {
http.Error(w, err.Error(), 500)
log.Print(err)
return
}
}

func rawCat(w io.Writer, name string) error {
f, err := btrfs.Open(name)
if err != nil {
Expand Down Expand Up @@ -161,7 +143,7 @@ func genericFileHandler(fs string, w http.ResponseWriter, r *http.Request) {
http.Error(w, "404 page not found", 404)
return
case 1:
cat(w, files[0])
http.ServeFile(w, r, btrfs.FilePath(files[0]))
default:
msg := multipart.NewWriter(w)
defer msg.Close()
Expand Down Expand Up @@ -290,7 +272,6 @@ func (s *Shard) CommitHandler(w http.ResponseWriter, r *http.Request) {
err := oldRunner.Cancel()
if err != nil {
log.Print(err)
return
}
}
err := newRunner.Run()
Expand Down

0 comments on commit 1335e4a

Please sign in to comment.