From 74e9abeaf66b46629c9ac9ee450895006b8b92c4 Mon Sep 17 00:00:00 2001 From: Kristina Spring Date: Thu, 26 Mar 2020 14:06:55 -0700 Subject: [PATCH 1/2] added closed channel check and moved nil checks --- batchInserter/batchInserter.go | 29 +++++++++++++++++++++-------- batchInserter/batchInserter_test.go | 26 +++++++++++++++++++++++--- batchInserter/mocks_test.go | 12 +++++++++++- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/batchInserter/batchInserter.go b/batchInserter/batchInserter.go index fc0d886..d5f7638 100644 --- a/batchInserter/batchInserter.go +++ b/batchInserter/batchInserter.go @@ -47,6 +47,11 @@ var ( defaultLogger = log.NewNopLogger() ) +var ( + ErrBadBeginning = errors.New("invalid value for the beginning time of the record") + ErrBadData = errors.New("data nil or empty") +) + // defaultTicker is the production code that produces a ticker. Note that we don't // want to return *time.Ticker, as we want to be able to inject something for testing. // We also need to return a closure to stop the ticker, so that we can call ticker.Stop() without @@ -144,11 +149,18 @@ func (b *BatchInserter) Start() { // Insert adds the event to the queue inside of BatchInserter, preparing for it // to be inserted. This can block, if the queue is full. -func (b *BatchInserter) Insert(record RecordWithTime) { - b.insertQueue <- record +func (b *BatchInserter) Insert(rwt RecordWithTime) error { + if b.timeTracker != nil && rwt.Beginning.IsZero() { + return ErrBadBeginning + } + if rwt.Record.Data == nil || len(rwt.Record.Data) == 0 { + return ErrBadData + } + b.insertQueue <- rwt if b.measures != nil { b.measures.InsertingQueue.Add(1.0) } + return nil } // Stop closes the internal queue and waits for the workers to finish @@ -177,9 +189,6 @@ func (b *BatchInserter) batchRecords() { if b.measures != nil { b.measures.InsertingQueue.Add(-1.0) } - if rwt.Beginning.IsZero() || rwt.Record.Data == nil || len(rwt.Record.Data) == 0 { - continue - } ticker, stop = b.ticker(b.config.MaxBatchWaitTime) records := []db.Record{rwt.Record} beginTimes := []time.Time{rwt.Beginning} @@ -187,9 +196,11 @@ func (b *BatchInserter) batchRecords() { select { case <-ticker: insertRecords = true - case r := <-b.insertQueue: - if r.Beginning.IsZero() || r.Record.Data == nil || len(r.Record.Data) == 0 { - continue + case r, ok := <-b.insertQueue: + // if ok is false, the queue is closed. + if !ok { + insertRecords = true + break } if b.measures != nil { b.measures.InsertingQueue.Add(-1.0) @@ -198,6 +209,7 @@ func (b *BatchInserter) batchRecords() { beginTimes = append(beginTimes, r.Beginning) if b.config.MaxBatchSize != 0 && len(records) >= b.config.MaxBatchSize { insertRecords = true + break } } if insertRecords { @@ -233,5 +245,6 @@ func (b *BatchInserter) sendTimes(beginTimes []time.Time, endTime time.Time) { for _, beginTime := range beginTimes { b.timeTracker.TrackTime(endTime.Sub(beginTime)) } + logging.Debug(b.logger).Log(logging.MessageKey(), "Successfully tracked time taken to insert records") } } diff --git a/batchInserter/batchInserter_test.go b/batchInserter/batchInserter_test.go index 688d3b8..48ae05c 100644 --- a/batchInserter/batchInserter_test.go +++ b/batchInserter/batchInserter_test.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/kit/metrics/provider" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" db "github.com/xmidt-org/codex-db" "github.com/xmidt-org/webpa-common/xmetrics/xmetricstest" @@ -144,10 +145,12 @@ func TestBatchInserter(t *testing.T) { description string insertErr error recordsToInsert []db.Record + badBeginning bool recordsExpected [][]db.Record waitBtwnRecords time.Duration expectedDroppedEvents float64 expectStopCalled bool + expectedErr error }{ { description: "Success", @@ -162,6 +165,13 @@ func TestBatchInserter(t *testing.T) { { description: "Nil Record", recordsToInsert: []db.Record{{}}, + expectedErr: ErrBadData, + }, + { + description: "Missing Beginning for Record", + recordsToInsert: []db.Record{records[0]}, + badBeginning: true, + expectedErr: ErrBadBeginning, }, { description: "Insert Records Error", @@ -179,8 +189,10 @@ func TestBatchInserter(t *testing.T) { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) inserter := new(mockInserter) + tracker := new(mockTracker) for _, r := range tc.recordsExpected { inserter.On("InsertRecords", r).Return(tc.insertErr).Once() + tracker.On("TrackTime", mock.Anything).Times(len(r)) } queue := make(chan RecordWithTime, 5) p := xmetricstest.NewProvider(nil, Metrics) @@ -205,6 +217,7 @@ func TestBatchInserter(t *testing.T) { ticker: func(d time.Duration) (<-chan time.Time, func()) { return tickerChan, stop }, + timeTracker: tracker, } p.Assert(t, DroppedEventsFromDbFailCounter)(xmetricstest.Value(0)) b.wg.Add(1) @@ -214,10 +227,17 @@ func TestBatchInserter(t *testing.T) { time.Sleep(tc.waitBtwnRecords) } rwt := RecordWithTime{ - Record: r, - Beginning: beginTime, + Record: r, + } + if !tc.badBeginning { + rwt.Beginning = beginTime + } + err := b.Insert(rwt) + if tc.expectedErr == nil || err == nil { + assert.Equal(tc.expectedErr, err) + } else { + assert.Contains(err.Error(), tc.expectedErr.Error()) } - b.Insert(rwt) } tickerChan <- time.Now() b.Stop() diff --git a/batchInserter/mocks_test.go b/batchInserter/mocks_test.go index b5ee30d..c2b10e1 100644 --- a/batchInserter/mocks_test.go +++ b/batchInserter/mocks_test.go @@ -18,8 +18,10 @@ package batchInserter import ( + "time" + "github.com/stretchr/testify/mock" - "github.com/xmidt-org/codex-db" + db "github.com/xmidt-org/codex-db" ) type mockInserter struct { @@ -30,3 +32,11 @@ func (c *mockInserter) InsertRecords(records ...db.Record) error { args := c.Called(records) return args.Error(0) } + +type mockTracker struct { + mock.Mock +} + +func (t *mockTracker) TrackTime(d time.Duration) { + t.Called(d) +} From dea4fdf8aef1135f3439ab8c5ee703d8d2aacfcf Mon Sep 17 00:00:00 2001 From: Kristina Spring Date: Mon, 30 Mar 2020 12:14:12 -0700 Subject: [PATCH 2/2] updated changelog and comment --- CHANGELOG.md | 6 +++++- batchInserter/batchInserter.go | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92e9a18..7925e30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.7.0] +- Modified batchInserter to check if certain record values are empty before adding the record to the queue [#30](https://github.com/xmidt-org/codex-db/pull/30) + ## [v0.6.0] - Added TimeTracker to keep track of how long an event is in memory [#29](https://github.com/xmidt-org/codex-db/pull/29) @@ -53,7 +56,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [v0.1.0] - Initial creation, moved from: https://github.com/xmidt-org/codex-deploy -[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.6.0..HEAD +[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.7.0..HEAD +[v0.7.0]: https://github.com/xmidt-org/codex-db/compare/v0.6.0..v0.7.0 [v0.6.0]: https://github.com/xmidt-org/codex-db/compare/v0.5.2..v0.6.0 [v0.5.2]: https://github.com/xmidt-org/codex-db/compare/v0.5.1..v0.5.2 [v0.5.1]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..v0.5.1 diff --git a/batchInserter/batchInserter.go b/batchInserter/batchInserter.go index d5f7638..9f420a1 100644 --- a/batchInserter/batchInserter.go +++ b/batchInserter/batchInserter.go @@ -148,7 +148,8 @@ func (b *BatchInserter) Start() { } // Insert adds the event to the queue inside of BatchInserter, preparing for it -// to be inserted. This can block, if the queue is full. +// to be inserted. This can block, if the queue is full. If the record has +// certain fields empty, an error is returned. func (b *BatchInserter) Insert(rwt RecordWithTime) error { if b.timeTracker != nil && rwt.Beginning.IsZero() { return ErrBadBeginning