Skip to content

Commit

Permalink
feat(persist): clean old entries upon loading store file (#295)
Browse files Browse the repository at this point in the history
Adds a garbage collect mechanism to remove entries from the storer file saved on disk which has been expired.
Expiration is calculated using the same `TTL`  that is being used to remove storer files and the mechanism remove the entires that has not been updated in a period greater than the `TTL`
  • Loading branch information
gsanchezgavier committed May 11, 2022
1 parent bb90381 commit 5085008
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 41 deletions.
44 changes: 40 additions & 4 deletions persist/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ const (
integrationsDir = "nr-integrations"
)

var (
// ErrNotFound defines an error that will be returned when trying to access a storage entry that can't be found
ErrNotFound = errors.New("key not found")
)
// ErrNotFound defines an error that will be returned when trying to access a storage entry that can't be found
var ErrNotFound = errors.New("key not found")

var now = time.Now

Expand Down Expand Up @@ -67,6 +65,7 @@ type fileStore struct {
inMemoryStore
path string
ilog log.Logger
ttl time.Duration
}

// SetNow forces a different "current time" for the Storer.
Expand Down Expand Up @@ -110,6 +109,7 @@ func NewFileStore(storagePath string, ilog log.Logger, ttl time.Duration) (Store
path: storagePath,
ilog: ilog,
inMemoryStore: *ms,
ttl: ttl,
}

if stat, err := os.Stat(storagePath); err == nil {
Expand All @@ -122,6 +122,7 @@ func NewFileStore(storagePath string, ilog log.Logger, ttl time.Duration) (Store
if err != nil {
ilog.Debugf(err.Error())
}

} else if os.IsNotExist(err) {
folder := path.Dir(storagePath)
err := os.MkdirAll(folder, dirFilePerm)
Expand All @@ -142,13 +143,20 @@ func (j *fileStore) Save() error {
return err
}

// Removes any entry that has not been updated duiring the integration cycle and with
// an expired timestamp.
if err := j.deleteOldEntries(); err != nil {
j.ilog.Debugf("Error deleting old entries in store file %q: %w. ", j.path, err)
}

j.locker.Lock()
defer j.locker.Unlock()

bytes, err := json.Marshal(j)
if err != nil {
return err
}

return ioutil.WriteFile(j.path, bytes, filePerm)
}

Expand Down Expand Up @@ -238,6 +246,34 @@ func (j *fileStore) loadFromDisk() error {
return nil
}

// deleteOldEntries traverse the stored data removing entires with timestamp greater than ttl.
// There is an implicit filter by integration for the removed entires since each storer file
// contains only the metrics for a specifc integration instance.
func (j *fileStore) deleteOldEntries() error {
j.locker.Lock()
defer j.locker.Unlock()

// Just decode Timestamp for performance reasons.
var entry struct {
Timestamp int64
}

nowTime := now()

for key, val := range j.Data {
if err := json.Unmarshal(val, &entry); err != nil {
return fmt.Errorf("decoding storer entry: %w", err)
}

if nowTime.Sub(time.Unix(entry.Timestamp, 0)) > j.ttl {
delete(j.Data, key)
}

}

return nil
}

// Delete removes the cached data for the given key. If the data does not exist, the system does not return
// any error.
func (j inMemoryStore) Delete(key string) error {
Expand Down
119 changes: 82 additions & 37 deletions persist/storer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,26 @@ func (m *memoryStorerProvider) prepareToRead(s Storer) (Storer, error) {
}

type diskStorerProvider struct {
t *testing.T
filePath string
}

func (j *diskStorerProvider) new() (Storer, error) {
if j.filePath == "" {
j.filePath = filePath()
j.filePath = path.Join(j.t.TempDir(), "storage.json")
}

return NewFileStore(j.filePath, log.NewStdErr(true), DefaultTTL)
}

func filePath() string {
rootDir, err := ioutil.TempDir("", "disk_storage")
if err != nil {
panic(err)
}

return path.Join(rootDir, "storage.json")
}

func (j *diskStorerProvider) prepareToRead(s Storer) (Storer, error) {
s.Save()
assert.NoError(j.t, s.Save())

return j.new()
}

func getStorerProviders() []storerProvider {
return []storerProvider{&memoryStorerProvider{}, &diskStorerProvider{}}
func getStorerProviders(t *testing.T) []storerProvider {
return []storerProvider{&memoryStorerProvider{}, &diskStorerProvider{t: t}}
}

func TestStorer_Struct(t *testing.T) {
Expand All @@ -70,7 +62,7 @@ func TestStorer_Struct(t *testing.T) {
return nowTime
})

for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -121,7 +113,7 @@ func TestStorer_Map(t *testing.T) {
return nowTime
})

for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -161,7 +153,7 @@ func TestStorer_Array(t *testing.T) {
return nowTime
})

for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -192,13 +184,12 @@ func TestStorer_Array(t *testing.T) {
}

func TestStorer_String(t *testing.T) {

nowTime := time.Now()
setNow(func() time.Time {
return nowTime
})

for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -229,13 +220,12 @@ func TestStorer_String(t *testing.T) {
}

func TestStorer_Number(t *testing.T) {

nowTime := time.Now()
setNow(func() time.Time {
return nowTime
})

for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -265,25 +255,18 @@ func TestStorer_Number(t *testing.T) {
}

func TestStorer_Overwrite(t *testing.T) {
for _, provider := range getStorerProviders() {
setNow(time.Now)

for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

t.Run(reflect.TypeOf(storer).Name(), func(t *testing.T) {
// Given a Storer implementation
storer.Set("my-storage-test", "initial Value")

// And a stored record
nowTime := time.Unix(1234, 5678)
setNow(func() time.Time {
return nowTime
})
ts := storer.Set("my-storage-test", "initial Value")
assert.Equal(t, nowTime.Unix(), ts)

nowTime = time.Unix(78910, 111213)
// When this record is overwritten
ts = storer.Set("my-storage-test", "overwritten value")
assert.Equal(t, nowTime.Unix(), ts)
storer.Set("my-storage-test", "overwritten value")

storer, err = provider.prepareToRead(storer)
assert.NoError(t, err)
Expand All @@ -297,7 +280,7 @@ func TestStorer_Overwrite(t *testing.T) {
}

func TestStorer_NotFound(t *testing.T) {
for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand All @@ -318,7 +301,7 @@ func TestStorer_NotFound(t *testing.T) {
}

func TestStorer_Delete(t *testing.T) {
for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand All @@ -344,7 +327,7 @@ func TestStorer_Delete(t *testing.T) {
}

func TestStorer_DeleteUnexistent(t *testing.T) {
for _, provider := range getStorerProviders() {
for _, provider := range getStorerProviders(t) {
storer, err := provider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -419,7 +402,6 @@ func TestFileStorer_Save(t *testing.T) {
_, err = storer.Get("structValue", &structValue)
assert.NoError(t, err)
assert.Equal(t, testStruct{555, 444}, structValue)

}

func TestInMemoryStore_flushCache(t *testing.T) {
Expand All @@ -446,7 +428,7 @@ func TestFileStore_Save(t *testing.T) {

expectedTS := nowTime.Unix()

storeProvider := diskStorerProvider{}
storeProvider := diskStorerProvider{t: t}
s, err := storeProvider.new()
assert.NoError(t, err)

Expand Down Expand Up @@ -492,5 +474,68 @@ func TestFileStore_Save(t *testing.T) {

assert.Equal(t, "v", entry.Value)
assert.Equal(t, expectedTS, entry.Timestamp)
}

func TestFileStore_DeleteOldEntriesUponSaving(t *testing.T) {
// Reset global variable affected by other tests to the original
// value used by the library.
SetNow(time.Now)

// Given a file storer
filePath := path.Join(t.TempDir(), "test.json")
ttl := 1 * time.Second

storer, err := NewFileStore(filePath, log.NewStdErr(true), ttl)
assert.NoError(t, err)

// When a valid storer contains keys with timestamp greater than TTL
storer.Set("expiredKey", "val")
time.Sleep(ttl + time.Second)

storer.Set("recentKey", "v")

assert.NoError(t, storer.Save())

var val interface{}

_, err = storer.Get("recentKey", &val)
assert.NoError(t, err)

// Expired keys are removed from the storer on saving.
_, err = storer.Get("expiredKey", &val)
assert.EqualError(t, err, ErrNotFound.Error())

storer, err = NewFileStore(filePath, log.NewStdErr(true), ttl)
assert.NoError(t, err)

_, err = storer.Get("recentKey", &val)
assert.NoError(t, err)

// Expired keys have been removed from the file.
_, err = storer.Get("expiredKey", &val)
assert.EqualError(t, err, ErrNotFound.Error())
}

var data = []byte(`{"Timestamp":1650971736,"Value":["1","2","3","4"]}`)

func Benchmark_UnmashalEntireStruct(b *testing.B) {
entry := jsonEntry{}
for i := 0; i < b.N; i++ {
if err := json.Unmarshal(data, &entry); err != nil {
b.Fatal(err)
}
assert.Equal(b, int64(1650971736), entry.Timestamp)
}
}

func Benchmark_UnmashalPartialStruct(b *testing.B) {
var timestamp struct {
Timestamp int64
}
for i := 0; i < b.N; i++ {
if err := json.Unmarshal(data, &timestamp); err != nil {
b.Fatal(err)
}
assert.Equal(b, int64(1650971736), timestamp.Timestamp)
}
}

0 comments on commit 5085008

Please sign in to comment.