Skip to content

Commit

Permalink
added TimeTracker to keep track of how long an event is in memory (#29)
Browse files Browse the repository at this point in the history
* added TimeTracker to keep track of how long an event is in memory

* attempting to fix tests

* fixed tests

* fixed unit tests hopefully

* updated changelog

* updated wrp-go to v2
  • Loading branch information
kristinapathak committed Feb 19, 2020
1 parent cc14d71 commit b19e32d
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 23 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.6.0]
- Added TimeTracker to keep track of how long an event is in memory [#29](https://github.com/xmidt-org/codex-db/pull/29)

## [v0.5.2]
- Always provide a state hash cassandra [#28](https://github.com/xmidt-org/codex-db/pull/28)

Expand Down Expand Up @@ -50,7 +53,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0]
- Initial creation, moved from: https://github.com/xmidt-org/codex-deploy

[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.5.2..HEAD
[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.6.0..HEAD
[v0.6.0]: https://github.com/xmidt-org/codex-db/compare/v0.5.2..v0.6.0
[v0.5.2]: https://github.com/xmidt-org/codex-db/compare/v0.5.1..v0.5.2
[v0.5.1]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..v0.5.1
[v0.5.0]: https://github.com/xmidt-org/codex-db/compare/v0.4.0..v0.5.0
Expand Down
50 changes: 37 additions & 13 deletions batchInserter/batchInserter.go
Expand Up @@ -56,13 +56,18 @@ func defaultTicker(d time.Duration) (<-chan time.Time, func()) {
return ticker.C, ticker.Stop
}

type TimeTracker interface {
TrackTime(time.Duration)
}

// BatchInserter manages batching events that need to be inserted, ensuring
// that an event that needs to be inserted isn't waiting for longer than a set
// period of time and that each batch doesn't pass a specified size.
type BatchInserter struct {
numBatchers int
insertQueue chan db.Record
insertQueue chan RecordWithTime
inserter db.Inserter
timeTracker TimeTracker
insertWorkers semaphore.Interface
wg sync.WaitGroup
measures *Measures
Expand All @@ -80,10 +85,16 @@ type Config struct {
QueueSize int
}

// RecordWithTime provides the db record and the time this event was received by a service
type RecordWithTime struct {
Record db.Record
Beginning time.Time
}

// NewBatchInserter creates a BatchInserter with the given values, ensuring
// that the configuration and other values given are valid. If configuration
// values aren't valid, a default value is used.
func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider.Provider, inserter db.Inserter) (*BatchInserter, error) {
func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider.Provider, inserter db.Inserter, timeTracker TimeTracker) (*BatchInserter, error) {
if inserter == nil {
return nil, errors.New("no inserter")
}
Expand All @@ -108,7 +119,7 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider

measures := NewMeasures(metricsRegistry)
workers := semaphore.New(config.MaxInsertWorkers)
queue := make(chan db.Record, config.QueueSize)
queue := make(chan RecordWithTime, config.QueueSize)
b := BatchInserter{
config: config,
logger: logger,
Expand All @@ -118,6 +129,7 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider
inserter: inserter,
insertQueue: queue,
ticker: defaultTicker,
timeTracker: timeTracker,
}
return &b, nil
}
Expand All @@ -132,7 +144,7 @@ func (b *BatchInserter) Start() {

// Insert adds the event to the queue inside of BatchInserter, preparing for it
// to be inserted. This can block, if the queue is full.
func (b *BatchInserter) Insert(record db.Record) {
func (b *BatchInserter) Insert(record RecordWithTime) {
b.insertQueue <- record
if b.measures != nil {
b.measures.InsertingQueue.Add(1.0)
Expand Down Expand Up @@ -161,34 +173,36 @@ func (b *BatchInserter) batchRecords() {
stop func()
)
defer b.wg.Done()
for record := range b.insertQueue {
for rwt := range b.insertQueue {
if b.measures != nil {
b.measures.InsertingQueue.Add(-1.0)
}
if record.Data == nil || len(record.Data) == 0 {
if rwt.Beginning.IsZero() || rwt.Record.Data == nil || len(rwt.Record.Data) == 0 {
continue
}
ticker, stop = b.ticker(b.config.MaxBatchWaitTime)
records := []db.Record{record}
records := []db.Record{rwt.Record}
beginTimes := []time.Time{rwt.Beginning}
for {
select {
case <-ticker:
insertRecords = true
case r := <-b.insertQueue:
if r.Beginning.IsZero() || r.Record.Data == nil || len(r.Record.Data) == 0 {
continue
}
if b.measures != nil {
b.measures.InsertingQueue.Add(-1.0)
}
if r.Data == nil || len(r.Data) == 0 {
continue
}
records = append(records, r)
records = append(records, r.Record)
beginTimes = append(beginTimes, r.Beginning)
if b.config.MaxBatchSize != 0 && len(records) >= b.config.MaxBatchSize {
insertRecords = true
}
}
if insertRecords {
b.insertWorkers.Acquire()
go b.insertRecords(records)
go b.insertRecords(records, beginTimes)
insertRecords = false
break
}
Expand All @@ -197,7 +211,7 @@ func (b *BatchInserter) batchRecords() {
}
}

func (b *BatchInserter) insertRecords(records []db.Record) {
func (b *BatchInserter) insertRecords(records []db.Record, beginTimes []time.Time) {
defer b.insertWorkers.Release()
err := b.inserter.InsertRecords(records...)
if err != nil {
Expand All @@ -206,8 +220,18 @@ func (b *BatchInserter) insertRecords(records []db.Record) {
}
logging.Error(b.logger, emperror.Context(err)...).Log(logging.MessageKey(),
"Failed to add records to the database", logging.ErrorKey(), err.Error())
b.sendTimes(beginTimes, time.Now())
return
}
b.sendTimes(beginTimes, time.Now())
logging.Debug(b.logger).Log(logging.MessageKey(), "Successfully upserted device information", "records", records)
logging.Info(b.logger).Log(logging.MessageKey(), "Successfully upserted device information", "records", len(records))
}

func (b *BatchInserter) sendTimes(beginTimes []time.Time, endTime time.Time) {
if b.timeTracker != nil {
for _, beginTime := range beginTimes {
b.timeTracker.TrackTime(endTime.Sub(beginTime))
}
}
}
11 changes: 8 additions & 3 deletions batchInserter/batchInserter_test.go
Expand Up @@ -96,7 +96,7 @@ func TestNewBatchInserter(t *testing.T) {
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
assert := assert.New(t)
bi, err := NewBatchInserter(tc.config, tc.logger, tc.registry, tc.inserter)
bi, err := NewBatchInserter(tc.config, tc.logger, tc.registry, tc.inserter, nil)
if bi != nil {
}
if tc.expectedBatchInserter == nil || bi == nil {
Expand All @@ -117,6 +117,7 @@ func TestNewBatchInserter(t *testing.T) {
}

func TestBatchInserter(t *testing.T) {
beginTime := time.Now()
records := []db.Record{
{
Type: db.State,
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestBatchInserter(t *testing.T) {
for _, r := range tc.recordsExpected {
inserter.On("InsertRecords", r).Return(tc.insertErr).Once()
}
queue := make(chan db.Record, 5)
queue := make(chan RecordWithTime, 5)
p := xmetricstest.NewProvider(nil, Metrics)
m := NewMeasures(p)
stopCalled := false
Expand Down Expand Up @@ -212,7 +213,11 @@ func TestBatchInserter(t *testing.T) {
if i > 0 {
time.Sleep(tc.waitBtwnRecords)
}
b.Insert(r)
rwt := RecordWithTime{
Record: r,
Beginning: beginTime,
}
b.Insert(rwt)
}
tickerChan <- time.Now()
b.Stop()
Expand Down
7 changes: 4 additions & 3 deletions cassandra/db_test.go
Expand Up @@ -21,15 +21,16 @@ import (
"encoding/json"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
db "github.com/xmidt-org/codex-db"
"github.com/xmidt-org/webpa-common/xmetrics/xmetricstest"
"github.com/xmidt-org/wrp-go/wrp"
"github.com/xmidt-org/wrp-go/v2"
"github.com/yugabyte/gocql"
"testing"
"time"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -15,7 +15,8 @@ require (
github.com/stretchr/testify v1.4.0
github.com/xmidt-org/capacityset v0.1.1
github.com/xmidt-org/webpa-common v1.5.0
github.com/xmidt-org/wrp-go v1.3.4
github.com/xmidt-org/wrp-go v1.3.3
github.com/xmidt-org/wrp-go/v2 v2.0.1

github.com/yugabyte/gocql v0.0.0-20190522232832-e049977574e9
)
5 changes: 3 additions & 2 deletions go.sum
Expand Up @@ -305,9 +305,10 @@ github.com/xmidt-org/webpa-common v1.3.1/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4
github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.5.0 h1:HbLwhkSITrwBn2I/FsHRsimXXzUoOOUvoeCvAx0mq4s=
github.com/xmidt-org/webpa-common v1.5.0/go.mod h1:wR27EP2MfUvQNy22rYm9p65VSErlwTi34mDCWhZivgI=
github.com/xmidt-org/wrp-go v1.3.3 h1:WvODdrtxPwHEUqwfwHpu+kNUfBzLBfAIdrKCQjoCblc=
github.com/xmidt-org/wrp-go v1.3.3/go.mod h1:VOKYeeVWc2cyYmGWJksqUCV/lGzReRl0EP74y3mcWp0=
github.com/xmidt-org/wrp-go v1.3.4 h1:7kj+1VXRNNEI7G0Z3z7C58QpIXrWzTw/eI79FdAhyPA=
github.com/xmidt-org/wrp-go v1.3.4/go.mod h1:EWC9BgcYYO1hKgLzz6VFPpg3LU6ZWSDV/uNiWC7zP+o=
github.com/xmidt-org/wrp-go/v2 v2.0.1 h1:JWMpAvNCkD1pLXdZLmAs/4g3twxTM7K4YU57dapJvB0=
github.com/xmidt-org/wrp-go/v2 v2.0.1/go.mod h1:v0HK0go/7OSVDvKbnXsUn6c+M987p0yyxWEs8/Fmf60=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yugabyte/gocql v0.0.0-20190522232832-e049977574e9 h1:BpVFCemJnLkcheWPQGmFnWzS+4CNXtHwKsgZiyvTT/I=
github.com/yugabyte/gocql v0.0.0-20190522232832-e049977574e9/go.mod h1:kXnWCffg+Tcm4uCyjKS4JcAJEsWDrMPR58Yav3pfwBc=
Expand Down

0 comments on commit b19e32d

Please sign in to comment.