Skip to content

Commit

Permalink
[chore][fileconsumer] Extract a readerMetadata struct (#25828)
Browse files Browse the repository at this point in the history
This PR continues the incremental refactoring of the fileconsumer
package. I'd like to eventually move the reader struct into an internal
package. Prior to doing so, I believe it will be helpful to simplify the
struct's contents and usage.

- Extract a readerMetadata struct from the reader struct. This contains
exactly the fields which can be saved and reloaded. Consequently, saving
and reloading will operator directly on the metadata, rather than the
entire reader.
- Simplify the readerFactory and readerBuilder by pulling some nuanced
responsibilities out of the builder and into the factory's specialized
functions.
- Remove several builder options, which are now handled by factory.
- Remove special case where fingerprint was initialized by builder. This
was only used in tests.
- Remove useless test which only validates that json encoder
functionality.
  • Loading branch information
djaglowski committed Aug 16, 2023
1 parent df26b3e commit 919e970
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 158 deletions.
27 changes: 13 additions & 14 deletions pkg/stanza/fileconsumer/file.go
Expand Up @@ -323,7 +323,7 @@ func (m *Manager) syncLastPollFiles(ctx context.Context) {

// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader); err != nil {
if err := enc.Encode(fileReader.readerMetadata); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}
Expand All @@ -349,7 +349,7 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {

// Decode the number of entries
var knownFileCount int
if err := dec.Decode(&knownFileCount); err != nil {
if err = dec.Decode(&knownFileCount); err != nil {
return fmt.Errorf("decoding file count: %w", err)
}

Expand All @@ -361,31 +361,30 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
// Decode each of the known files
m.knownFiles = make([]*reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
// Only the offset, fingerprint, and splitter
// will be used before this reader is discarded
unsafeReader, err := m.readerFactory.unsafeReader()
if err != nil {
return err
}
if err = dec.Decode(unsafeReader); err != nil {
rmd := &readerMetadata{}
if err = dec.Decode(rmd); err != nil {
return err
}

// Migrate readers that used FileAttributes.HeaderAttributes
// This block can be removed in a future release, tentatively v0.90.0
if ha, ok := unsafeReader.FileAttributes["HeaderAttributes"]; ok {
if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok {
switch hat := ha.(type) {
case map[string]any:
for k, v := range hat {
unsafeReader.FileAttributes[k] = v
rmd.FileAttributes[k] = v
}
delete(unsafeReader.FileAttributes, "HeaderAttributes")
delete(rmd.FileAttributes, "HeaderAttributes")
default:
m.Errorw("migrate header attributes: unexpected format")
}
}

m.knownFiles = append(m.knownFiles, unsafeReader)
r, err := m.readerFactory.newFromMetadata(rmd)
if err != nil {
m.Errorw("load reader", err)
} else {
m.knownFiles = append(m.knownFiles, r)
}
}

return nil
Expand Down
24 changes: 13 additions & 11 deletions pkg/stanza/fileconsumer/reader.go
Expand Up @@ -29,24 +29,26 @@ type readerConfig struct {
includeFilePathResolved bool
}

type readerMetadata struct {
Fingerprint *fingerprint.Fingerprint
Offset int64
FileAttributes map[string]any
HeaderFinalized bool
}

// reader manages a single file
type reader struct {
*zap.SugaredLogger `json:"-"` // json tag excludes embedded fields from storage
*zap.SugaredLogger
*readerConfig
*readerMetadata
file *os.File
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
decoder *helper.Decoder
headerReader *header.Reader
processFunc emit.Callback

Fingerprint *fingerprint.Fingerprint
Offset int64
generation int
file *os.File
FileAttributes map[string]any
eof bool

HeaderFinalized bool
headerReader *header.Reader
generation int
eof bool
}

// offsetToEnd sets the starting offset
Expand Down
108 changes: 33 additions & 75 deletions pkg/stanza/fileconsumer/reader_factory.go
Expand Up @@ -28,26 +28,36 @@ type readerFactory struct {
}

func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) {
return f.newReaderBuilder().
withFile(file).
withFingerprint(fp).
build()
return readerBuilder{
readerFactory: f,
file: file,
readerMetadata: &readerMetadata{
Fingerprint: fp,
FileAttributes: map[string]any{},
},
}.build()
}

// copy creates a deep copy of a reader
func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) {
return f.newReaderBuilder().
withFile(newFile).
withFingerprint(old.Fingerprint.Copy()).
withOffset(old.Offset).
withSplitterFunc(old.lineSplitFunc).
withFileAttributes(util.MapCopy(old.FileAttributes)).
withHeaderFinalized(old.HeaderFinalized).
build()
func (f *readerFactory) newFromMetadata(m *readerMetadata) (*reader, error) {
return readerBuilder{
readerFactory: f,
readerMetadata: m,
}.build()
}

func (f *readerFactory) unsafeReader() (*reader, error) {
return f.newReaderBuilder().build()
// copy creates a deep copy of a reader
func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) {
return readerBuilder{
readerFactory: f,
file: newFile,
splitFunc: old.lineSplitFunc,
readerMetadata: &readerMetadata{
Fingerprint: old.Fingerprint.Copy(),
Offset: old.Offset,
FileAttributes: util.MapCopy(old.FileAttributes),
HeaderFinalized: old.HeaderFinalized,
},
}.build()
}

func (f *readerFactory) newFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
Expand All @@ -56,54 +66,15 @@ func (f *readerFactory) newFingerprint(file *os.File) (*fingerprint.Fingerprint,

type readerBuilder struct {
*readerFactory
file *os.File
fp *fingerprint.Fingerprint
offset int64
splitFunc bufio.SplitFunc
headerFinalized bool
fileAttributes map[string]any
file *os.File
readerMetadata *readerMetadata
splitFunc bufio.SplitFunc
}

func (f *readerFactory) newReaderBuilder() *readerBuilder {
return &readerBuilder{readerFactory: f, fileAttributes: map[string]any{}}
}

func (b *readerBuilder) withSplitterFunc(s bufio.SplitFunc) *readerBuilder {
b.splitFunc = s
return b
}

func (b *readerBuilder) withFile(f *os.File) *readerBuilder {
b.file = f
return b
}

func (b *readerBuilder) withFingerprint(fp *fingerprint.Fingerprint) *readerBuilder {
b.fp = fp
return b
}

func (b *readerBuilder) withOffset(offset int64) *readerBuilder {
b.offset = offset
return b
}

func (b *readerBuilder) withHeaderFinalized(finalized bool) *readerBuilder {
b.headerFinalized = finalized
return b
}

func (b *readerBuilder) withFileAttributes(attrs map[string]any) *readerBuilder {
b.fileAttributes = attrs
return b
}

func (b *readerBuilder) build() (r *reader, err error) {
func (b readerBuilder) build() (r *reader, err error) {
r = &reader{
readerConfig: b.readerConfig,
Offset: b.offset,
HeaderFinalized: b.headerFinalized,
FileAttributes: b.fileAttributes,
readerConfig: b.readerConfig,
readerMetadata: b.readerMetadata,
}

if b.splitFunc != nil {
Expand All @@ -121,7 +92,7 @@ func (b *readerBuilder) build() (r *reader, err error) {
}
r.decoder = helper.NewDecoder(encoding)

if b.headerConfig == nil || b.headerFinalized {
if b.headerConfig == nil || b.readerMetadata.HeaderFinalized {
r.splitFunc = r.lineSplitFunc
r.processFunc = b.readerConfig.emit
} else {
Expand All @@ -134,13 +105,11 @@ func (b *readerBuilder) build() (r *reader, err error) {
}

if b.file == nil {
r.SugaredLogger = b.SugaredLogger.With("path", "uninitialized")
return r, nil
}

r.file = b.file
r.SugaredLogger = b.SugaredLogger.With("path", b.file.Name())
r.FileAttributes = b.fileAttributes

// Resolve file name and path attributes
resolved := b.file.Name()
Expand Down Expand Up @@ -185,16 +154,5 @@ func (b *readerBuilder) build() (r *reader, err error) {
}
}

if b.fp != nil {
r.Fingerprint = b.fp
return r, nil
}

fp, err := b.readerFactory.newFingerprint(r.file)
if err != nil {
return nil, err
}
r.Fingerprint = fp

return r, nil
}
74 changes: 16 additions & 58 deletions pkg/stanza/fileconsumer/reader_test.go
Expand Up @@ -4,13 +4,10 @@
package fileconsumer

import (
"bytes"
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -83,7 +80,10 @@ func TestTokenization(t *testing.T) {
_, err := temp.Write(tc.fileContent)
require.NoError(t, err)

r, err := f.newReaderBuilder().withFile(temp).build()
fp, err := f.newFingerprint(temp)
require.NoError(t, err)

r, err := f.newReader(temp, fp)
require.NoError(t, err)

r.ReadToEnd(context.Background())
Expand Down Expand Up @@ -111,7 +111,10 @@ func TestTokenizationTooLong(t *testing.T) {
_, err := temp.Write(fileContent)
require.NoError(t, err)

r, err := f.newReaderBuilder().withFile(temp).build()
fp, err := f.newFingerprint(temp)
require.NoError(t, err)

r, err := f.newReader(temp, fp)
require.NoError(t, err)

r.ReadToEnd(context.Background())
Expand Down Expand Up @@ -147,7 +150,10 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
_, err := temp.Write(fileContent)
require.NoError(t, err)

r, err := f.newReaderBuilder().withFile(temp).build()
fp, err := f.newFingerprint(temp)
require.NoError(t, err)

r, err := f.newReader(temp, fp)
require.NoError(t, err)

r.ReadToEnd(context.Background())
Expand Down Expand Up @@ -179,7 +185,10 @@ func TestHeaderFingerprintIncluded(t *testing.T) {

temp := openTemp(t, t.TempDir())

r, err := f.newReaderBuilder().withFile(temp).build()
fp, err := f.newFingerprint(temp)
require.NoError(t, err)

r, err := f.newReader(temp, fp)
require.NoError(t, err)

_, err = temp.Write(fileContent)
Expand Down Expand Up @@ -215,54 +224,3 @@ func readToken(t *testing.T, c chan *emitParams) []byte {
}
return nil
}

func TestEncodingDecode(t *testing.T) {
testFile := openTemp(t, t.TempDir())
testToken := tokenWithLength(2 * fingerprint.DefaultSize)
_, err := testFile.Write(testToken)
require.NoError(t, err)
fp, err := fingerprint.New(testFile, fingerprint.DefaultSize)
require.NoError(t, err)

f := readerFactory{
SugaredLogger: testutil.Logger(t),
readerConfig: &readerConfig{
fingerprintSize: fingerprint.DefaultSize,
maxLogSize: defaultMaxLogSize,
},
splitterFactory: splitter.NewMultilineFactory(helper.NewSplitterConfig()),
fromBeginning: false,
}
r, err := f.newReader(testFile, fp)
require.NoError(t, err)

// Just faking out these properties
r.HeaderFinalized = true
r.FileAttributes = map[string]any{"foo": "bar"}

assert.Equal(t, testToken[:fingerprint.DefaultSize], r.Fingerprint.FirstBytes)
assert.Equal(t, int64(2*fingerprint.DefaultSize), r.Offset)

// Encode
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
require.NoError(t, enc.Encode(r))

// Decode
dec := json.NewDecoder(bytes.NewReader(buf.Bytes()))
decodedReader, err := f.unsafeReader()
require.NoError(t, err)
require.NoError(t, dec.Decode(decodedReader))

// Assert decoded reader has values persisted
assert.Equal(t, testToken[:fingerprint.DefaultSize], decodedReader.Fingerprint.FirstBytes)
assert.Equal(t, int64(2*fingerprint.DefaultSize), decodedReader.Offset)
assert.True(t, decodedReader.HeaderFinalized)
assert.Equal(t, map[string]any{"foo": "bar"}, decodedReader.FileAttributes)

// These fields are intentionally excluded, as they may have changed
assert.Empty(t, decodedReader.FileAttributes[logFileName])
assert.Empty(t, decodedReader.FileAttributes[logFilePath])
assert.Empty(t, decodedReader.FileAttributes[logFileNameResolved])
assert.Empty(t, decodedReader.FileAttributes[logFilePathResolved])
}

0 comments on commit 919e970

Please sign in to comment.