Skip to content

Commit

Permalink
Merge pull request #18 from xmidt-org/numBatchers
Browse files Browse the repository at this point in the history
Added a configurable number of batchers
  • Loading branch information
kristinapathak committed Nov 21, 2019
2 parents 7ddd5d8 + 5034e5d commit 27304a0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 23 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.3.2]
- Updated batchInserter to have a configurable amount of batchers [#18](https://github.com/xmidt-org/codex-db/pull/18)

## [v0.3.1]
- Fixed typo in variable name [[#15](https://github.com/xmidt-org/codex-db/pull/15)]
- Fix metric cardinality [[#17](https://github.com/xmidt-org/codex-db/pull/17)]
Expand All @@ -30,7 +33,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0]
- Initial creation, moved from: https://github.com/xmidt-org/codex-deploy

[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.3.1..HEAD
[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.3.2..HEAD
[v0.3.2]: https://github.com/xmidt-org/codex-db/compare/v0.3.1..v0.3.2
[v0.3.1]: https://github.com/xmidt-org/codex-db/compare/v0.3.0..v0.3.1
[v0.3.0]: https://github.com/xmidt-org/codex-db/compare/v0.2.0..v0.3.0
[v0.2.0]: https://github.com/xmidt-org/codex-db/compare/0.1.2...v0.2.0
Expand Down
45 changes: 27 additions & 18 deletions batchInserter/batchInserter.go
Expand Up @@ -28,18 +28,19 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/provider"
"github.com/goph/emperror"
"github.com/xmidt-org/codex-db"
db "github.com/xmidt-org/codex-db"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/semaphore"
)

const (
minMaxWorkers = 1
defaultMaxWorkers = 5
minMaxBatchSize = 0
defaultMaxBatchSize = 1
minMaxBatchWaitTime = time.Duration(1) * time.Millisecond
defaultMinQueueSize = 5
minParseWorkers = 1
minInsertWorkers = 1
defaultInsertWorkers = 5
minMaxBatchSize = 0
defaultMaxBatchSize = 1
minMaxBatchWaitTime = time.Duration(1) * time.Millisecond
defaultMinQueueSize = 5
)

var (
Expand All @@ -59,6 +60,7 @@ func defaultTicker(d time.Duration) (<-chan time.Time, func()) {
// that an event that needs to be inserted isn't waiting for longer than a set
// period of time and that each batch doesn't pass a specified size.
type BatchInserter struct {
numBatchers int
insertQueue chan db.Record
inserter db.Inserter
insertWorkers semaphore.Interface
Expand All @@ -71,7 +73,8 @@ type BatchInserter struct {

// Config holds the configuration values for a batch inserter.
type Config struct {
MaxWorkers int
ParseWorkers int
MaxInsertWorkers int
MaxBatchSize int
MaxBatchWaitTime time.Duration
QueueSize int
Expand All @@ -84,8 +87,11 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider
if inserter == nil {
return nil, errors.New("no inserter")
}
if config.MaxWorkers < minMaxWorkers {
config.MaxWorkers = defaultMaxWorkers
if config.ParseWorkers < minParseWorkers {
config.ParseWorkers = minParseWorkers
}
if config.MaxInsertWorkers < minInsertWorkers {
config.MaxInsertWorkers = defaultInsertWorkers
}
if config.MaxBatchSize < minMaxBatchSize {
config.MaxBatchSize = defaultMaxBatchSize
Expand All @@ -101,12 +107,13 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider
}

measures := NewMeasures(metricsRegistry)
workers := semaphore.New(config.MaxWorkers)
workers := semaphore.New(config.MaxInsertWorkers)
queue := make(chan db.Record, config.QueueSize)
b := BatchInserter{
config: config,
logger: logger,
measures: measures,
numBatchers: config.ParseWorkers,
insertWorkers: workers,
inserter: inserter,
insertQueue: queue,
Expand All @@ -117,8 +124,10 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider

// Start starts the batcher, which pulls from the queue inside of the BatchInserter.
func (b *BatchInserter) Start() {
b.wg.Add(1)
go b.batchRecords()
for i := 0; i < b.numBatchers; i++ {
b.wg.Add(1)
go b.batchRecords()
}
}

// Insert adds the event to the queue inside of BatchInserter, preparing for it
Expand All @@ -138,6 +147,11 @@ func (b *BatchInserter) Insert(record db.Record) {
func (b *BatchInserter) Stop() {
close(b.insertQueue)
b.wg.Wait()

// Grab all the workers to make sure they are done.
for i := 0; i < b.config.MaxInsertWorkers; i++ {
b.insertWorkers.Acquire()
}
}

func (b *BatchInserter) batchRecords() {
Expand Down Expand Up @@ -181,11 +195,6 @@ func (b *BatchInserter) batchRecords() {
}
stop()
}

// Grab all the workers to make sure they are done.
for i := 0; i < b.config.MaxWorkers; i++ {
b.insertWorkers.Acquire()
}
}

func (b *BatchInserter) insertRecords(records []db.Record) {
Expand Down
11 changes: 7 additions & 4 deletions batchInserter/batchInserter_test.go
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/go-kit/kit/metrics/provider"
"github.com/stretchr/testify/assert"

"github.com/xmidt-org/codex-db"
db "github.com/xmidt-org/codex-db"
"github.com/xmidt-org/webpa-common/xmetrics/xmetricstest"
)

Expand All @@ -40,7 +40,8 @@ func TestNewBatchInserter(t *testing.T) {
goodMeasures := NewMeasures(goodRegistry)
goodConfig := Config{
QueueSize: 1000,
MaxWorkers: 5000,
ParseWorkers: 50,
MaxInsertWorkers: 5000,
MaxBatchSize: 100,
MaxBatchWaitTime: 5 * time.Hour,
}
Expand Down Expand Up @@ -81,7 +82,8 @@ func TestNewBatchInserter(t *testing.T) {
MaxBatchSize: defaultMaxBatchSize,
MaxBatchWaitTime: minMaxBatchWaitTime,
QueueSize: defaultMinQueueSize,
MaxWorkers: defaultMaxWorkers,
ParseWorkers: minParseWorkers,
MaxInsertWorkers: defaultInsertWorkers,
},
logger: defaultLogger,
},
Expand Down Expand Up @@ -191,7 +193,8 @@ func TestBatchInserter(t *testing.T) {
config: Config{
MaxBatchWaitTime: 10 * time.Millisecond,
MaxBatchSize: 3,
MaxWorkers: 5,
ParseWorkers: 1,
MaxInsertWorkers: 5,
},
inserter: inserter,
insertQueue: queue,
Expand Down

0 comments on commit 27304a0

Please sign in to comment.