Skip to content

Commit

Permalink
bigquery: implement destination with BatchQueue
Browse files Browse the repository at this point in the history
Change-Id: I7b40054d25bead1727e4a02edc113ff6811047ca
  • Loading branch information
elek committed Feb 21, 2024
1 parent 3bffca8 commit cb68960
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions eventkitd-bigquery/bigquery/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bigquery
import (
"context"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
Expand All @@ -20,9 +19,10 @@ type BatchQueue struct {
target eventkit.Destination
mu sync.Mutex
events []*eventkit.Event
droppedEvents atomic.Int64
}

var _ eventkit.Destination = &BatchQueue{}

// NewBatchQueue creates a new batchQueue. It sends out the received events in batch. Either after the flushInterval is
// expired or when there are more than batchSize element in the queue.
func NewBatchQueue(target eventkit.Destination, queueSize int, batchSize int, flushInterval time.Duration) *BatchQueue {
Expand Down Expand Up @@ -60,10 +60,6 @@ func (c *BatchQueue) Run(ctx context.Context) {
}

for {
if drops := c.droppedEvents.Load(); drops > 0 {
mon.Counter("dropped_events").Inc(drops)
c.droppedEvents.Add(-drops)
}

select {
case em := <-c.submitQueue:
Expand Down Expand Up @@ -97,11 +93,13 @@ func (c *BatchQueue) addEvent(ev *eventkit.Event) (full bool) {
}

// Submit implements Destination.
func (c *BatchQueue) Submit(event *eventkit.Event) {
select {
case c.submitQueue <- event:
return
default:
c.droppedEvents.Add(1)
func (c *BatchQueue) Submit(events ...*eventkit.Event) {
defer mon.Task()(nil)(nil)
for _, e := range events {
select {
case c.submitQueue <- e:
default:
mon.Counter("dropped_events").Inc(1)
}
}
}

0 comments on commit cb68960

Please sign in to comment.