Skip to content

Commit

Permalink
Add 'UpdateManyWithRetention' whisper function and migrate fill to us…
Browse files Browse the repository at this point in the history
…e it

Fill must read and write data to exactly same archive.

For this purpose this commit intoroduce new function 'UpdateManyWithRetention'
that also gets 'desired' archive's retention.

Also migrate 'fillArchive' in fill package to use that function, instead of
plain 'UpdateMany'

Fixes jjneely#19
  • Loading branch information
Vladimir Smirnov committed Aug 15, 2017
1 parent 7cf79e9 commit 0e073c8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
2 changes: 1 addition & 1 deletion fill/fill.go
Expand Up @@ -54,7 +54,7 @@ func fillArchive(srcWsp, dstWsp *whisper.Whisper, start, stop int) error {
}
tsStart += ts.Step()
}
dstWsp.UpdateMany(points)
dstWsp.UpdateManyWithRetention(points, v.MaxRetention())

stop = fromTime
if start >= stop {
Expand Down
11 changes: 10 additions & 1 deletion whisper/whisper.go
Expand Up @@ -369,14 +369,19 @@ func (whisper *Whisper) Update(value float64, timestamp int) (err error) {
return nil
}

func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) {
// UpdateManyWithRetention updates only archive with specified retention.
// retention = -1 means "update all possible archives"
func (whisper *Whisper) UpdateManyWithRetention(points []*TimeSeriesPoint, retention int) {
// sort the points, newest first
sort.Sort(timeSeriesPointsNewestFirst{points})

now := int(time.Now().Unix()) // TODO: danger of 2030 something overflow

var currentPoints []*TimeSeriesPoint
for _, archive := range whisper.archives {
if retention != -1 && retention != archive.MaxRetention() {
continue
}
currentPoints, points = extractPoints(points, now, archive.MaxRetention())
if len(currentPoints) == 0 {
continue
Expand All @@ -393,6 +398,10 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) {
}
}

func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) {
whisper.UpdateManyWithRetention(points, -1)
}

func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSeriesPoint) {
alignedPoints := alignPoints(archive, points)
intervals, packedBlocks := packSequences(archive, alignedPoints)
Expand Down

0 comments on commit 0e073c8

Please sign in to comment.