Skip to content

Commit

Permalink
Add peek functionality to index reader and use it in the file source …
Browse files Browse the repository at this point in the history
…(2.8.x) (#9961)
  • Loading branch information
brycemcanally committed Apr 24, 2024
1 parent a90d866 commit 5ab5ad0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/internal/storage/fileset/index/option.go
Expand Up @@ -24,6 +24,12 @@ func WithDatum(datum string) Option {
}
}

func WithPeek() Option {
return func(r *Reader) {
r.peek = true
}
}

// WithShardConfig sets the sharding configuration.
func WithShardConfig(config *ShardConfig) Option {
return func(r *Reader) {
Expand Down
11 changes: 8 additions & 3 deletions src/internal/storage/fileset/index/reader.go
Expand Up @@ -32,6 +32,7 @@ type Reader struct {
topIdx *Index
datum string
shardConfig *ShardConfig
peek bool
}

// NewReader creates a new Reader.
Expand All @@ -56,11 +57,15 @@ func (r *Reader) Iterate(ctx context.Context, cb func(*Index) error) error {
if r.topIdx == nil {
return nil
}
peek := r.peek
traverseCb := func(idx *Index) (bool, error) {
if atEnd(idx.Path, r.filter) {
return false, errutil.ErrBreak
}
if idx.File != nil {
if atEnd(idx.Path, r.filter) {
if !peek {
return false, errutil.ErrBreak
}
peek = false
}
if !atStart(idx.Path, r.filter) || !(r.datum == "" || r.datum == idx.File.Datum) {
return false, nil
}
Expand Down
39 changes: 26 additions & 13 deletions src/server/pfs/server/source.go
Expand Up @@ -23,10 +23,10 @@ type Source interface {
}

type source struct {
commitInfo *pfs.CommitInfo
fileSet fileset.FileSet
indexOpts []index.Option
upper string
commitInfo *pfs.CommitInfo
fileSet fileset.FileSet
dirIndexOpts, fileIndexOpts []index.Option
upper string
}

// NewSource creates a Source which emits FileInfos with the information from commit, and the entries return from fileSet.
Expand All @@ -38,21 +38,34 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt
s := &source{
commitInfo: commitInfo,
fileSet: fileset.NewDirInserter(fs, sc.prefix),
indexOpts: []index.Option{
dirIndexOpts: []index.Option{
index.WithPrefix(sc.prefix),
index.WithDatum(sc.datum),
},
fileIndexOpts: []index.Option{
index.WithPrefix(sc.prefix),
index.WithDatum(sc.datum),
},
}
if sc.pathRange != nil {
s.fileSet = fileset.NewDirInserter(fs, sc.pathRange.Lower)
// The upper for the path range is not set to ensure that we
// emit directories at the end of the path range. For example,
// the files /d1/f1 and /d2/f2 with a path range of [/d1/f1,
// /d2/f2) should emit /d1/f1 and /d2/. The upper bound will be
// applied within the callback of the iteration.
s.indexOpts = append(s.indexOpts, index.WithRange(&index.PathRange{
// The directory index options have no upper bound because a
// directory may extend past the upper bound of the path range.
s.dirIndexOpts = append(s.dirIndexOpts,
index.WithRange(&index.PathRange{
Lower: sc.pathRange.Lower,
}))
s.fileIndexOpts = append(s.fileIndexOpts, index.WithRange(&index.PathRange{
Lower: sc.pathRange.Lower,
Upper: sc.pathRange.Upper,
}))
// WithPeek is set to ensure that we iterate one past the upper
// bound of the path range. This is necessary to ensure that
// directories at the end of the path range are emitted. The
// paths are checked again in the callback to ensure we
// terminate if the next directory / file is past the upper
// bound.
s.fileIndexOpts = append(s.fileIndexOpts, index.WithPeek())
s.upper = sc.pathRange.Upper
}
if sc.filter != nil {
Expand All @@ -66,7 +79,7 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt
func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.File) error) error {
ctx, cf := pctx.WithCancel(ctx)
defer cf()
iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.indexOpts...)
iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.dirIndexOpts...)
cache := make(map[string]*pfs.FileInfo)
err := s.fileSet.Iterate(ctx, func(f fileset.File) error {
idx := f.Index()
Expand Down Expand Up @@ -103,7 +116,7 @@ func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.Fil
return errors.EnsureStack(err)
}
return nil
}, s.indexOpts...)
}, s.fileIndexOpts...)
if errors.Is(err, errutil.ErrBreak) {
err = nil
}
Expand Down

0 comments on commit 5ab5ad0

Please sign in to comment.