Skip to content

Commit

Permalink
Revert "Merge: Download each file once (grailbio/grail!15922)" (grail…
Browse files Browse the repository at this point in the history
…bio/grail!16436)

Approved-by: Pedro Fialho <pfialho@grailbio.com>

GitLab URL: https://gitlab.com/grailbio/grail/-/merge_requests/16436

fbshipit-source-id: 76a8d8c
  • Loading branch information
Dave Shepard authored and SDkkkkkkkk committed Jun 20, 2023
1 parent fe6c751 commit 12b3c2d
Showing 1 changed file with 25 additions and 59 deletions.
84 changes: 25 additions & 59 deletions local/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,6 @@ type Executor struct {
refCountsCond *sync.Cond
deadObjects map[digest.Digest]bool
gcing chan struct{}

downloadTasks sync.Map
downloadTaskKeys map[digest.Digest][]string
}

type downloadHandle struct {
DoDownload *sync.Once
result *reflow.File
err error
}

type refCount struct {
Expand Down Expand Up @@ -180,7 +171,6 @@ func (e *Executor) Start() error {
if e.FileRepository == nil {
e.FileRepository = &filerepo.Repository{Root: filepath.Join(e.Prefix, e.Dir, objectsDir)}
}
e.downloadTaskKeys = make(map[digest.Digest][]string)
os.MkdirAll(e.FileRepository.Root, 0777)
tempdir := filepath.Join(e.Prefix, e.Dir, "download")
if err := os.MkdirAll(tempdir, 0777); err != nil {
Expand Down Expand Up @@ -438,10 +428,6 @@ func (e *Executor) unload(ctx context.Context, fs reflow.Fileset) (done <-chan s
e.Log.Errorf("gc: unload dead collect: %v", err)
}
e.refCountsMu.Lock()
for _, url := range e.downloadTaskKeys[id] {
e.downloadTasks.Delete(url)
}
delete(e.downloadTaskKeys, id)
delete(e.deadObjects, id)
if e.refCounts[id].count > 0 {
panic(fmt.Sprintf("gc: refcount %v not 0: %v", id.Short(), e.refCounts[id].count))
Expand Down Expand Up @@ -521,8 +507,14 @@ func (e *Executor) Load(ctx context.Context, repo *url.URL, fs reflow.Fileset) (
mu sync.Mutex
resolved = make(map[digest.Digest]reflow.File)
files = fs.Files()
tempRepo filerepo.Repository
err error
)
tempRepo.Root, err = ioutil.TempDir(e.FileRepository.Root, "temp-load")
defer os.RemoveAll(tempRepo.Root)
if err != nil {
return reflow.Fileset{}, err
}
err = traverse.Each(len(files), func(i int) error {
file := files[i]
if !file.IsRef() {
Expand All @@ -546,61 +538,32 @@ func (e *Executor) Load(ctx context.Context, repo *url.URL, fs reflow.Fileset) (
var (
incr bool
res reflow.File
err error
)
if !file.ContentHash.IsZero() {
incr = true
e.incr(file.ContentHash)
res, err = fileFromRepo(ctx, e.FileRepository, file)
}
if file.ContentHash.IsZero() || err != nil {
taskUncast, _ := e.downloadTasks.LoadOrStore(file.Source, &downloadHandle{DoDownload: &sync.Once{}})
task := taskUncast.(*downloadHandle)
var tempRepo filerepo.Repository
tempRepo.Root, err = ioutil.TempDir(e.FileRepository.Root, "temp-load")
bucket, key, err := e.Blob.Bucket(ctx, file.Source)
if err != nil {
return err
}
defer os.RemoveAll(tempRepo.Root)
task.DoDownload.Do(func() {
bucket, key, _ := e.Blob.Bucket(ctx, file.Source)
dl := download{
Bucket: bucket,
Key: key,
File: file,
Log: e.Log,
}
if file, err := dl.Do(ctx, &tempRepo); err != nil {
e.Log.Errorf("error downloading file: %v", err)
task.err = err
} else {
task.result = &file
}
})
if task.err != nil {
return task.err
} else {
res = *task.result
resDigest := res.Digest()
if !incr {
e.incr(resDigest)
}
if err := e.FileRepository.Vacuum(ctx, &tempRepo); err != nil {
return err
}
e.refCountsMu.Lock()
// Keys in downloadTaskKeys must be equivalent to the actual file hash, which repo uses to name the file
// on the disk
if _, ok := e.downloadTaskKeys[resDigest]; !ok {
e.downloadTaskKeys[resDigest] = []string{file.Source}
} else {
e.downloadTaskKeys[resDigest] = append(e.downloadTaskKeys[resDigest], file.Source)
}
e.refCountsMu.Unlock()
dl := download{
Bucket: bucket,
Key: key,
File: file,
Log: e.Log,
}
res, err = dl.Do(ctx, &tempRepo)
if err != nil {
return err
}
if !incr {
e.incr(res.Digest())
}
}
mu.Lock()
// Keys in resolved are what the caller uses to link the argument to the filename
resolved[file.Digest()] = res
mu.Unlock()
return nil
Expand All @@ -612,11 +575,14 @@ func (e *Executor) Load(ctx context.Context, repo *url.URL, fs reflow.Fileset) (
}
return reflow.Fileset{}, err
}
if subs, ok := fs.Subst(resolved); !ok {
if err := e.FileRepository.Vacuum(ctx, &tempRepo); err != nil {
return reflow.Fileset{}, err
}
x, ok := fs.Subst(resolved)
if !ok {
return reflow.Fileset{}, errors.E(errors.Invalid, "load", fmt.Sprint(fs), errors.New("fileset not resolved"))
} else {
return subs, nil
}
return x, nil
}

// Repository returns the repository attached to this executor.
Expand Down

0 comments on commit 12b3c2d

Please sign in to comment.