Skip to content

Commit

Permalink
added closed channel check and moved nil checks (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
kristinapathak committed Mar 30, 2020
2 parents b19e32d + dea4fdf commit 2ca3658
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 14 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.7.0]
- Modified batchInserter to check if certain record values are empty before adding the record to the queue [#30](https://github.com/xmidt-org/codex-db/pull/30)

## [v0.6.0]
- Added TimeTracker to keep track of how long an event is in memory [#29](https://github.com/xmidt-org/codex-db/pull/29)

Expand Down Expand Up @@ -53,7 +56,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0]
- Initial creation, moved from: https://github.com/xmidt-org/codex-deploy

[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.6.0..HEAD
[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.7.0..HEAD
[v0.7.0]: https://github.com/xmidt-org/codex-db/compare/v0.6.0..v0.7.0
[v0.6.0]: https://github.com/xmidt-org/codex-db/compare/v0.5.2..v0.6.0
[v0.5.2]: https://github.com/xmidt-org/codex-db/compare/v0.5.1..v0.5.2
[v0.5.1]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..v0.5.1
Expand Down
32 changes: 23 additions & 9 deletions batchInserter/batchInserter.go
Expand Up @@ -47,6 +47,11 @@ var (
defaultLogger = log.NewNopLogger()
)

var (
ErrBadBeginning = errors.New("invalid value for the beginning time of the record")
ErrBadData = errors.New("data nil or empty")
)

// defaultTicker is the production code that produces a ticker. Note that we don't
// want to return *time.Ticker, as we want to be able to inject something for testing.
// We also need to return a closure to stop the ticker, so that we can call ticker.Stop() without
Expand Down Expand Up @@ -143,12 +148,20 @@ func (b *BatchInserter) Start() {
}

// Insert adds the event to the queue inside of BatchInserter, preparing for it
// to be inserted. This can block, if the queue is full.
func (b *BatchInserter) Insert(record RecordWithTime) {
b.insertQueue <- record
// to be inserted. This can block, if the queue is full. If the record has
// certain fields empty, an error is returned.
func (b *BatchInserter) Insert(rwt RecordWithTime) error {
if b.timeTracker != nil && rwt.Beginning.IsZero() {
return ErrBadBeginning
}
if rwt.Record.Data == nil || len(rwt.Record.Data) == 0 {
return ErrBadData
}
b.insertQueue <- rwt
if b.measures != nil {
b.measures.InsertingQueue.Add(1.0)
}
return nil
}

// Stop closes the internal queue and waits for the workers to finish
Expand Down Expand Up @@ -177,19 +190,18 @@ func (b *BatchInserter) batchRecords() {
if b.measures != nil {
b.measures.InsertingQueue.Add(-1.0)
}
if rwt.Beginning.IsZero() || rwt.Record.Data == nil || len(rwt.Record.Data) == 0 {
continue
}
ticker, stop = b.ticker(b.config.MaxBatchWaitTime)
records := []db.Record{rwt.Record}
beginTimes := []time.Time{rwt.Beginning}
for {
select {
case <-ticker:
insertRecords = true
case r := <-b.insertQueue:
if r.Beginning.IsZero() || r.Record.Data == nil || len(r.Record.Data) == 0 {
continue
case r, ok := <-b.insertQueue:
// if ok is false, the queue is closed.
if !ok {
insertRecords = true
break
}
if b.measures != nil {
b.measures.InsertingQueue.Add(-1.0)
Expand All @@ -198,6 +210,7 @@ func (b *BatchInserter) batchRecords() {
beginTimes = append(beginTimes, r.Beginning)
if b.config.MaxBatchSize != 0 && len(records) >= b.config.MaxBatchSize {
insertRecords = true
break
}
}
if insertRecords {
Expand Down Expand Up @@ -233,5 +246,6 @@ func (b *BatchInserter) sendTimes(beginTimes []time.Time, endTime time.Time) {
for _, beginTime := range beginTimes {
b.timeTracker.TrackTime(endTime.Sub(beginTime))
}
logging.Debug(b.logger).Log(logging.MessageKey(), "Successfully tracked time taken to insert records")
}
}
26 changes: 23 additions & 3 deletions batchInserter/batchInserter_test.go
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/go-kit/kit/metrics/provider"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

db "github.com/xmidt-org/codex-db"
"github.com/xmidt-org/webpa-common/xmetrics/xmetricstest"
Expand Down Expand Up @@ -144,10 +145,12 @@ func TestBatchInserter(t *testing.T) {
description string
insertErr error
recordsToInsert []db.Record
badBeginning bool
recordsExpected [][]db.Record
waitBtwnRecords time.Duration
expectedDroppedEvents float64
expectStopCalled bool
expectedErr error
}{
{
description: "Success",
Expand All @@ -162,6 +165,13 @@ func TestBatchInserter(t *testing.T) {
{
description: "Nil Record",
recordsToInsert: []db.Record{{}},
expectedErr: ErrBadData,
},
{
description: "Missing Beginning for Record",
recordsToInsert: []db.Record{records[0]},
badBeginning: true,
expectedErr: ErrBadBeginning,
},
{
description: "Insert Records Error",
Expand All @@ -179,8 +189,10 @@ func TestBatchInserter(t *testing.T) {
t.Run(tc.description, func(t *testing.T) {
assert := assert.New(t)
inserter := new(mockInserter)
tracker := new(mockTracker)
for _, r := range tc.recordsExpected {
inserter.On("InsertRecords", r).Return(tc.insertErr).Once()
tracker.On("TrackTime", mock.Anything).Times(len(r))
}
queue := make(chan RecordWithTime, 5)
p := xmetricstest.NewProvider(nil, Metrics)
Expand All @@ -205,6 +217,7 @@ func TestBatchInserter(t *testing.T) {
ticker: func(d time.Duration) (<-chan time.Time, func()) {
return tickerChan, stop
},
timeTracker: tracker,
}
p.Assert(t, DroppedEventsFromDbFailCounter)(xmetricstest.Value(0))
b.wg.Add(1)
Expand All @@ -214,10 +227,17 @@ func TestBatchInserter(t *testing.T) {
time.Sleep(tc.waitBtwnRecords)
}
rwt := RecordWithTime{
Record: r,
Beginning: beginTime,
Record: r,
}
if !tc.badBeginning {
rwt.Beginning = beginTime
}
err := b.Insert(rwt)
if tc.expectedErr == nil || err == nil {
assert.Equal(tc.expectedErr, err)
} else {
assert.Contains(err.Error(), tc.expectedErr.Error())
}
b.Insert(rwt)
}
tickerChan <- time.Now()
b.Stop()
Expand Down
12 changes: 11 additions & 1 deletion batchInserter/mocks_test.go
Expand Up @@ -18,8 +18,10 @@
package batchInserter

import (
"time"

"github.com/stretchr/testify/mock"
"github.com/xmidt-org/codex-db"
db "github.com/xmidt-org/codex-db"
)

type mockInserter struct {
Expand All @@ -30,3 +32,11 @@ func (c *mockInserter) InsertRecords(records ...db.Record) error {
args := c.Called(records)
return args.Error(0)
}

type mockTracker struct {
mock.Mock
}

func (t *mockTracker) TrackTime(d time.Duration) {
t.Called(d)
}

0 comments on commit 2ca3658

Please sign in to comment.