From b19e32d4e0606232982432f7646e008c69fab866 Mon Sep 17 00:00:00 2001 From: kristinaspring Date: Wed, 19 Feb 2020 09:35:51 -0800 Subject: [PATCH] added TimeTracker to keep track of how long an event is in memory (#29) * added TimeTracker to keep track of how long an event is in memory * attempting to fix tests * fixed tests * fixed unit tests hopefully * updated changelog * updated wrp-go to v2 --- CHANGELOG.md | 6 +++- batchInserter/batchInserter.go | 50 +++++++++++++++++++++-------- batchInserter/batchInserter_test.go | 11 +++++-- cassandra/db_test.go | 7 ++-- go.mod | 3 +- go.sum | 5 +-- 6 files changed, 59 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e496518..92e9a18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.6.0] +- Added TimeTracker to keep track of how long an event is in memory [#29](https://github.com/xmidt-org/codex-db/pull/29) + ## [v0.5.2] - Always provide a state hash cassandra [#28](https://github.com/xmidt-org/codex-db/pull/28) @@ -50,7 +53,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [v0.1.0] - Initial creation, moved from: https://github.com/xmidt-org/codex-deploy -[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.5.2..HEAD +[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.6.0..HEAD +[v0.6.0]: https://github.com/xmidt-org/codex-db/compare/v0.5.2..v0.6.0 [v0.5.2]: https://github.com/xmidt-org/codex-db/compare/v0.5.1..v0.5.2 [v0.5.1]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..v0.5.1 [v0.5.0]: https://github.com/xmidt-org/codex-db/compare/v0.4.0..v0.5.0 diff --git a/batchInserter/batchInserter.go b/batchInserter/batchInserter.go index becc48b..fc0d886 100644 --- a/batchInserter/batchInserter.go +++ b/batchInserter/batchInserter.go @@ -56,13 +56,18 @@ func defaultTicker(d time.Duration) (<-chan time.Time, func()) { return ticker.C, ticker.Stop } +type TimeTracker interface { + TrackTime(time.Duration) +} + // BatchInserter manages batching events that need to be inserted, ensuring // that an event that needs to be inserted isn't waiting for longer than a set // period of time and that each batch doesn't pass a specified size. type BatchInserter struct { numBatchers int - insertQueue chan db.Record + insertQueue chan RecordWithTime inserter db.Inserter + timeTracker TimeTracker insertWorkers semaphore.Interface wg sync.WaitGroup measures *Measures @@ -80,10 +85,16 @@ type Config struct { QueueSize int } +// RecordWithTime provides the db record and the time this event was received by a service +type RecordWithTime struct { + Record db.Record + Beginning time.Time +} + // NewBatchInserter creates a BatchInserter with the given values, ensuring // that the configuration and other values given are valid. If configuration // values aren't valid, a default value is used. -func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider.Provider, inserter db.Inserter) (*BatchInserter, error) { +func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider.Provider, inserter db.Inserter, timeTracker TimeTracker) (*BatchInserter, error) { if inserter == nil { return nil, errors.New("no inserter") } @@ -108,7 +119,7 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider measures := NewMeasures(metricsRegistry) workers := semaphore.New(config.MaxInsertWorkers) - queue := make(chan db.Record, config.QueueSize) + queue := make(chan RecordWithTime, config.QueueSize) b := BatchInserter{ config: config, logger: logger, @@ -118,6 +129,7 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider inserter: inserter, insertQueue: queue, ticker: defaultTicker, + timeTracker: timeTracker, } return &b, nil } @@ -132,7 +144,7 @@ func (b *BatchInserter) Start() { // Insert adds the event to the queue inside of BatchInserter, preparing for it // to be inserted. This can block, if the queue is full. -func (b *BatchInserter) Insert(record db.Record) { +func (b *BatchInserter) Insert(record RecordWithTime) { b.insertQueue <- record if b.measures != nil { b.measures.InsertingQueue.Add(1.0) @@ -161,34 +173,36 @@ func (b *BatchInserter) batchRecords() { stop func() ) defer b.wg.Done() - for record := range b.insertQueue { + for rwt := range b.insertQueue { if b.measures != nil { b.measures.InsertingQueue.Add(-1.0) } - if record.Data == nil || len(record.Data) == 0 { + if rwt.Beginning.IsZero() || rwt.Record.Data == nil || len(rwt.Record.Data) == 0 { continue } ticker, stop = b.ticker(b.config.MaxBatchWaitTime) - records := []db.Record{record} + records := []db.Record{rwt.Record} + beginTimes := []time.Time{rwt.Beginning} for { select { case <-ticker: insertRecords = true case r := <-b.insertQueue: + if r.Beginning.IsZero() || r.Record.Data == nil || len(r.Record.Data) == 0 { + continue + } if b.measures != nil { b.measures.InsertingQueue.Add(-1.0) } - if r.Data == nil || len(r.Data) == 0 { - continue - } - records = append(records, r) + records = append(records, r.Record) + beginTimes = append(beginTimes, r.Beginning) if b.config.MaxBatchSize != 0 && len(records) >= b.config.MaxBatchSize { insertRecords = true } } if insertRecords { b.insertWorkers.Acquire() - go b.insertRecords(records) + go b.insertRecords(records, beginTimes) insertRecords = false break } @@ -197,7 +211,7 @@ func (b *BatchInserter) batchRecords() { } } -func (b *BatchInserter) insertRecords(records []db.Record) { +func (b *BatchInserter) insertRecords(records []db.Record, beginTimes []time.Time) { defer b.insertWorkers.Release() err := b.inserter.InsertRecords(records...) if err != nil { @@ -206,8 +220,18 @@ func (b *BatchInserter) insertRecords(records []db.Record) { } logging.Error(b.logger, emperror.Context(err)...).Log(logging.MessageKey(), "Failed to add records to the database", logging.ErrorKey(), err.Error()) + b.sendTimes(beginTimes, time.Now()) return } + b.sendTimes(beginTimes, time.Now()) logging.Debug(b.logger).Log(logging.MessageKey(), "Successfully upserted device information", "records", records) logging.Info(b.logger).Log(logging.MessageKey(), "Successfully upserted device information", "records", len(records)) } + +func (b *BatchInserter) sendTimes(beginTimes []time.Time, endTime time.Time) { + if b.timeTracker != nil { + for _, beginTime := range beginTimes { + b.timeTracker.TrackTime(endTime.Sub(beginTime)) + } + } +} diff --git a/batchInserter/batchInserter_test.go b/batchInserter/batchInserter_test.go index 40111b4..688d3b8 100644 --- a/batchInserter/batchInserter_test.go +++ b/batchInserter/batchInserter_test.go @@ -96,7 +96,7 @@ func TestNewBatchInserter(t *testing.T) { for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) - bi, err := NewBatchInserter(tc.config, tc.logger, tc.registry, tc.inserter) + bi, err := NewBatchInserter(tc.config, tc.logger, tc.registry, tc.inserter, nil) if bi != nil { } if tc.expectedBatchInserter == nil || bi == nil { @@ -117,6 +117,7 @@ func TestNewBatchInserter(t *testing.T) { } func TestBatchInserter(t *testing.T) { + beginTime := time.Now() records := []db.Record{ { Type: db.State, @@ -181,7 +182,7 @@ func TestBatchInserter(t *testing.T) { for _, r := range tc.recordsExpected { inserter.On("InsertRecords", r).Return(tc.insertErr).Once() } - queue := make(chan db.Record, 5) + queue := make(chan RecordWithTime, 5) p := xmetricstest.NewProvider(nil, Metrics) m := NewMeasures(p) stopCalled := false @@ -212,7 +213,11 @@ func TestBatchInserter(t *testing.T) { if i > 0 { time.Sleep(tc.waitBtwnRecords) } - b.Insert(r) + rwt := RecordWithTime{ + Record: r, + Beginning: beginTime, + } + b.Insert(rwt) } tickerChan <- time.Now() b.Stop() diff --git a/cassandra/db_test.go b/cassandra/db_test.go index 3ca4c5e..183fad7 100644 --- a/cassandra/db_test.go +++ b/cassandra/db_test.go @@ -21,15 +21,16 @@ import ( "encoding/json" "errors" "fmt" + "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" db "github.com/xmidt-org/codex-db" "github.com/xmidt-org/webpa-common/xmetrics/xmetricstest" - "github.com/xmidt-org/wrp-go/wrp" + "github.com/xmidt-org/wrp-go/v2" "github.com/yugabyte/gocql" - "testing" - "time" ) var ( diff --git a/go.mod b/go.mod index fc1cd58..9010c21 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,8 @@ require ( github.com/stretchr/testify v1.4.0 github.com/xmidt-org/capacityset v0.1.1 github.com/xmidt-org/webpa-common v1.5.0 - github.com/xmidt-org/wrp-go v1.3.4 + github.com/xmidt-org/wrp-go v1.3.3 + github.com/xmidt-org/wrp-go/v2 v2.0.1 github.com/yugabyte/gocql v0.0.0-20190522232832-e049977574e9 ) diff --git a/go.sum b/go.sum index 3dbebf0..f1aa121 100644 --- a/go.sum +++ b/go.sum @@ -305,9 +305,10 @@ github.com/xmidt-org/webpa-common v1.3.1/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4 github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.5.0 h1:HbLwhkSITrwBn2I/FsHRsimXXzUoOOUvoeCvAx0mq4s= github.com/xmidt-org/webpa-common v1.5.0/go.mod h1:wR27EP2MfUvQNy22rYm9p65VSErlwTi34mDCWhZivgI= +github.com/xmidt-org/wrp-go v1.3.3 h1:WvODdrtxPwHEUqwfwHpu+kNUfBzLBfAIdrKCQjoCblc= github.com/xmidt-org/wrp-go v1.3.3/go.mod h1:VOKYeeVWc2cyYmGWJksqUCV/lGzReRl0EP74y3mcWp0= -github.com/xmidt-org/wrp-go v1.3.4 h1:7kj+1VXRNNEI7G0Z3z7C58QpIXrWzTw/eI79FdAhyPA= -github.com/xmidt-org/wrp-go v1.3.4/go.mod h1:EWC9BgcYYO1hKgLzz6VFPpg3LU6ZWSDV/uNiWC7zP+o= +github.com/xmidt-org/wrp-go/v2 v2.0.1 h1:JWMpAvNCkD1pLXdZLmAs/4g3twxTM7K4YU57dapJvB0= +github.com/xmidt-org/wrp-go/v2 v2.0.1/go.mod h1:v0HK0go/7OSVDvKbnXsUn6c+M987p0yyxWEs8/Fmf60= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yugabyte/gocql v0.0.0-20190522232832-e049977574e9 h1:BpVFCemJnLkcheWPQGmFnWzS+4CNXtHwKsgZiyvTT/I= github.com/yugabyte/gocql v0.0.0-20190522232832-e049977574e9/go.mod h1:kXnWCffg+Tcm4uCyjKS4JcAJEsWDrMPR58Yav3pfwBc=