From 5ab5ad0da4a3bb0d4a38e369ce8b095de36924de Mon Sep 17 00:00:00 2001 From: Bryce McAnally Date: Wed, 24 Apr 2024 09:51:45 -0500 Subject: [PATCH] Add peek functionality to index reader and use it in the file source (2.8.x) (#9961) --- src/internal/storage/fileset/index/option.go | 6 +++ src/internal/storage/fileset/index/reader.go | 11 ++++-- src/server/pfs/server/source.go | 39 +++++++++++++------- 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/src/internal/storage/fileset/index/option.go b/src/internal/storage/fileset/index/option.go index 5c1744038b8..dc1cc2d2d7e 100644 --- a/src/internal/storage/fileset/index/option.go +++ b/src/internal/storage/fileset/index/option.go @@ -24,6 +24,12 @@ func WithDatum(datum string) Option { } } +func WithPeek() Option { + return func(r *Reader) { + r.peek = true + } +} + // WithShardConfig sets the sharding configuration. func WithShardConfig(config *ShardConfig) Option { return func(r *Reader) { diff --git a/src/internal/storage/fileset/index/reader.go b/src/internal/storage/fileset/index/reader.go index 14599b5fd2b..a8d03cc178e 100644 --- a/src/internal/storage/fileset/index/reader.go +++ b/src/internal/storage/fileset/index/reader.go @@ -32,6 +32,7 @@ type Reader struct { topIdx *Index datum string shardConfig *ShardConfig + peek bool } // NewReader creates a new Reader. @@ -56,11 +57,15 @@ func (r *Reader) Iterate(ctx context.Context, cb func(*Index) error) error { if r.topIdx == nil { return nil } + peek := r.peek traverseCb := func(idx *Index) (bool, error) { - if atEnd(idx.Path, r.filter) { - return false, errutil.ErrBreak - } if idx.File != nil { + if atEnd(idx.Path, r.filter) { + if !peek { + return false, errutil.ErrBreak + } + peek = false + } if !atStart(idx.Path, r.filter) || !(r.datum == "" || r.datum == idx.File.Datum) { return false, nil } diff --git a/src/server/pfs/server/source.go b/src/server/pfs/server/source.go index cacd6e479d6..1260139652e 100644 --- a/src/server/pfs/server/source.go +++ b/src/server/pfs/server/source.go @@ -23,10 +23,10 @@ type Source interface { } type source struct { - commitInfo *pfs.CommitInfo - fileSet fileset.FileSet - indexOpts []index.Option - upper string + commitInfo *pfs.CommitInfo + fileSet fileset.FileSet + dirIndexOpts, fileIndexOpts []index.Option + upper string } // NewSource creates a Source which emits FileInfos with the information from commit, and the entries return from fileSet. @@ -38,21 +38,34 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt s := &source{ commitInfo: commitInfo, fileSet: fileset.NewDirInserter(fs, sc.prefix), - indexOpts: []index.Option{ + dirIndexOpts: []index.Option{ + index.WithPrefix(sc.prefix), + index.WithDatum(sc.datum), + }, + fileIndexOpts: []index.Option{ index.WithPrefix(sc.prefix), index.WithDatum(sc.datum), }, } if sc.pathRange != nil { s.fileSet = fileset.NewDirInserter(fs, sc.pathRange.Lower) - // The upper for the path range is not set to ensure that we - // emit directories at the end of the path range. For example, - // the files /d1/f1 and /d2/f2 with a path range of [/d1/f1, - // /d2/f2) should emit /d1/f1 and /d2/. The upper bound will be - // applied within the callback of the iteration. - s.indexOpts = append(s.indexOpts, index.WithRange(&index.PathRange{ + // The directory index options have no upper bound because a + // directory may extend past the upper bound of the path range. + s.dirIndexOpts = append(s.dirIndexOpts, + index.WithRange(&index.PathRange{ + Lower: sc.pathRange.Lower, + })) + s.fileIndexOpts = append(s.fileIndexOpts, index.WithRange(&index.PathRange{ Lower: sc.pathRange.Lower, + Upper: sc.pathRange.Upper, })) + // WithPeek is set to ensure that we iterate one past the upper + // bound of the path range. This is necessary to ensure that + // directories at the end of the path range are emitted. The + // paths are checked again in the callback to ensure we + // terminate if the next directory / file is past the upper + // bound. + s.fileIndexOpts = append(s.fileIndexOpts, index.WithPeek()) s.upper = sc.pathRange.Upper } if sc.filter != nil { @@ -66,7 +79,7 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.File) error) error { ctx, cf := pctx.WithCancel(ctx) defer cf() - iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.indexOpts...) + iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.dirIndexOpts...) cache := make(map[string]*pfs.FileInfo) err := s.fileSet.Iterate(ctx, func(f fileset.File) error { idx := f.Index() @@ -103,7 +116,7 @@ func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.Fil return errors.EnsureStack(err) } return nil - }, s.indexOpts...) + }, s.fileIndexOpts...) if errors.Is(err, errutil.ErrBreak) { err = nil }