diff --git a/CHANGELOG.md b/CHANGELOG.md index 055037d..8bf1ea2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.3.2] +- Updated batchInserter to have a configurable amount of batchers [#18](https://github.com/xmidt-org/codex-db/pull/18) + ## [v0.3.1] - Fixed typo in variable name [[#15](https://github.com/xmidt-org/codex-db/pull/15)] - Fix metric cardinality [[#17](https://github.com/xmidt-org/codex-db/pull/17)] @@ -30,7 +33,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [v0.1.0] - Initial creation, moved from: https://github.com/xmidt-org/codex-deploy -[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.3.1..HEAD +[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.3.2..HEAD +[v0.3.2]: https://github.com/xmidt-org/codex-db/compare/v0.3.1..v0.3.2 [v0.3.1]: https://github.com/xmidt-org/codex-db/compare/v0.3.0..v0.3.1 [v0.3.0]: https://github.com/xmidt-org/codex-db/compare/v0.2.0..v0.3.0 [v0.2.0]: https://github.com/xmidt-org/codex-db/compare/0.1.2...v0.2.0 diff --git a/batchInserter/batchInserter.go b/batchInserter/batchInserter.go index 19dc38b..becc48b 100644 --- a/batchInserter/batchInserter.go +++ b/batchInserter/batchInserter.go @@ -28,18 +28,19 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/provider" "github.com/goph/emperror" - "github.com/xmidt-org/codex-db" + db "github.com/xmidt-org/codex-db" "github.com/xmidt-org/webpa-common/logging" "github.com/xmidt-org/webpa-common/semaphore" ) const ( - minMaxWorkers = 1 - defaultMaxWorkers = 5 - minMaxBatchSize = 0 - defaultMaxBatchSize = 1 - minMaxBatchWaitTime = time.Duration(1) * time.Millisecond - defaultMinQueueSize = 5 + minParseWorkers = 1 + minInsertWorkers = 1 + defaultInsertWorkers = 5 + minMaxBatchSize = 0 + defaultMaxBatchSize = 1 + minMaxBatchWaitTime = time.Duration(1) * time.Millisecond + defaultMinQueueSize = 5 ) var ( @@ -59,6 +60,7 @@ func defaultTicker(d time.Duration) (<-chan time.Time, func()) { // that an event that needs to be inserted isn't waiting for longer than a set // period of time and that each batch doesn't pass a specified size. type BatchInserter struct { + numBatchers int insertQueue chan db.Record inserter db.Inserter insertWorkers semaphore.Interface @@ -71,7 +73,8 @@ type BatchInserter struct { // Config holds the configuration values for a batch inserter. type Config struct { - MaxWorkers int + ParseWorkers int + MaxInsertWorkers int MaxBatchSize int MaxBatchWaitTime time.Duration QueueSize int @@ -84,8 +87,11 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider if inserter == nil { return nil, errors.New("no inserter") } - if config.MaxWorkers < minMaxWorkers { - config.MaxWorkers = defaultMaxWorkers + if config.ParseWorkers < minParseWorkers { + config.ParseWorkers = minParseWorkers + } + if config.MaxInsertWorkers < minInsertWorkers { + config.MaxInsertWorkers = defaultInsertWorkers } if config.MaxBatchSize < minMaxBatchSize { config.MaxBatchSize = defaultMaxBatchSize @@ -101,12 +107,13 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider } measures := NewMeasures(metricsRegistry) - workers := semaphore.New(config.MaxWorkers) + workers := semaphore.New(config.MaxInsertWorkers) queue := make(chan db.Record, config.QueueSize) b := BatchInserter{ config: config, logger: logger, measures: measures, + numBatchers: config.ParseWorkers, insertWorkers: workers, inserter: inserter, insertQueue: queue, @@ -117,8 +124,10 @@ func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider // Start starts the batcher, which pulls from the queue inside of the BatchInserter. func (b *BatchInserter) Start() { - b.wg.Add(1) - go b.batchRecords() + for i := 0; i < b.numBatchers; i++ { + b.wg.Add(1) + go b.batchRecords() + } } // Insert adds the event to the queue inside of BatchInserter, preparing for it @@ -138,6 +147,11 @@ func (b *BatchInserter) Insert(record db.Record) { func (b *BatchInserter) Stop() { close(b.insertQueue) b.wg.Wait() + + // Grab all the workers to make sure they are done. + for i := 0; i < b.config.MaxInsertWorkers; i++ { + b.insertWorkers.Acquire() + } } func (b *BatchInserter) batchRecords() { @@ -181,11 +195,6 @@ func (b *BatchInserter) batchRecords() { } stop() } - - // Grab all the workers to make sure they are done. - for i := 0; i < b.config.MaxWorkers; i++ { - b.insertWorkers.Acquire() - } } func (b *BatchInserter) insertRecords(records []db.Record) { diff --git a/batchInserter/batchInserter_test.go b/batchInserter/batchInserter_test.go index ce99b29..40111b4 100644 --- a/batchInserter/batchInserter_test.go +++ b/batchInserter/batchInserter_test.go @@ -30,7 +30,7 @@ import ( "github.com/go-kit/kit/metrics/provider" "github.com/stretchr/testify/assert" - "github.com/xmidt-org/codex-db" + db "github.com/xmidt-org/codex-db" "github.com/xmidt-org/webpa-common/xmetrics/xmetricstest" ) @@ -40,7 +40,8 @@ func TestNewBatchInserter(t *testing.T) { goodMeasures := NewMeasures(goodRegistry) goodConfig := Config{ QueueSize: 1000, - MaxWorkers: 5000, + ParseWorkers: 50, + MaxInsertWorkers: 5000, MaxBatchSize: 100, MaxBatchWaitTime: 5 * time.Hour, } @@ -81,7 +82,8 @@ func TestNewBatchInserter(t *testing.T) { MaxBatchSize: defaultMaxBatchSize, MaxBatchWaitTime: minMaxBatchWaitTime, QueueSize: defaultMinQueueSize, - MaxWorkers: defaultMaxWorkers, + ParseWorkers: minParseWorkers, + MaxInsertWorkers: defaultInsertWorkers, }, logger: defaultLogger, }, @@ -191,7 +193,8 @@ func TestBatchInserter(t *testing.T) { config: Config{ MaxBatchWaitTime: 10 * time.Millisecond, MaxBatchSize: 3, - MaxWorkers: 5, + ParseWorkers: 1, + MaxInsertWorkers: 5, }, inserter: inserter, insertQueue: queue,