Skip to content

Commit

Permalink
support sending eventkit events in batches...
Browse files Browse the repository at this point in the history
We have a specific Destination implementation which directly saves the Events to BQ (instead of sending via UDP to a daemon, which saves them to BQ).

But this endpoint doesn't have any queue.

In this patch, I copy pasted the queue logic from the UDP client and made a generic BatchQueue destination.

Change-Id: I25ad4c3c2c9a2a261985b9efab1cd12f9acec9cc
  • Loading branch information
elek authored and Storj Robot committed Feb 15, 2024
1 parent 273b7bd commit 3bffca8
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 30 deletions.
16 changes: 10 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type UDPClient struct {
droppedEvents atomic.Int64
}

var _ Destination = &UDPClient{}

func NewUDPClient(application, version, instance, addr string) *UDPClient {
c := &UDPClient{
Application: application,
Expand Down Expand Up @@ -248,13 +250,15 @@ func (c *UDPClient) send(packet *outgoingPacket, addr string) (err error) {
return err
}

func (c *UDPClient) Submit(event *Event) {
func (c *UDPClient) Submit(events ...*Event) {
c.init()

select {
case c.submitQueue <- event:
return
default:
c.droppedEvents.Add(1)
for _, event := range events {
select {
case c.submitQueue <- event:
return
default:
c.droppedEvents.Add(1)
}
}
}
107 changes: 107 additions & 0 deletions eventkitd-bigquery/bigquery/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package bigquery

import (
"context"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"storj.io/eventkit"
"storj.io/eventkit/utils"
)

// BatchQueue collects events and send them in batches.
type BatchQueue struct {
batchThreshold int
flushInterval time.Duration
submitQueue chan *eventkit.Event
target eventkit.Destination
mu sync.Mutex
events []*eventkit.Event
droppedEvents atomic.Int64
}

// NewBatchQueue creates a new batchQueue. It sends out the received events in batch. Either after the flushInterval is
// expired or when there are more than batchSize element in the queue.
func NewBatchQueue(target eventkit.Destination, queueSize int, batchSize int, flushInterval time.Duration) *BatchQueue {
c := &BatchQueue{
submitQueue: make(chan *eventkit.Event, queueSize),
batchThreshold: batchSize,
events: make([]*eventkit.Event, 0),
flushInterval: flushInterval,
target: target,
}
return c
}

// Run implements Destination.
func (c *BatchQueue) Run(ctx context.Context) {
ticker := utils.NewJitteredTicker(c.flushInterval)
var background errgroup.Group
defer func() { _ = background.Wait() }()
background.Go(func() error {
c.target.Run(ctx)
return nil
})
background.Go(func() error {
ticker.Run(ctx)
return nil
})

sendAndReset := func() {
c.mu.Lock()
eventsToSend := c.events
c.events = make([]*eventkit.Event, 0)
c.mu.Unlock()

c.target.Submit(eventsToSend...)
}

for {
if drops := c.droppedEvents.Load(); drops > 0 {
mon.Counter("dropped_events").Inc(drops)
c.droppedEvents.Add(-drops)
}

select {
case em := <-c.submitQueue:
if c.addEvent(em) {
sendAndReset()
}
case <-ticker.C:
if len(c.events) > 0 {
sendAndReset()
}
case <-ctx.Done():
left := len(c.submitQueue)
for i := 0; i < left; i++ {
if c.addEvent(<-c.submitQueue) {
sendAndReset()
}
}
if len(c.events) > 0 {
c.target.Submit(c.events...)
}
return
}
}
}

func (c *BatchQueue) addEvent(ev *eventkit.Event) (full bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.events = append(c.events, ev)
return len(c.events) >= c.batchThreshold
}

// Submit implements Destination.
func (c *BatchQueue) Submit(event *eventkit.Event) {
select {
case c.submitQueue <- event:
return
default:
c.droppedEvents.Add(1)
}
}
44 changes: 44 additions & 0 deletions eventkitd-bigquery/bigquery/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package bigquery

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/eventkit"
)

func TestBatchQueue(t *testing.T) {
m := &mockDestination{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewBatchQueue(m, 1000, 10, 1*time.Hour)
go func() {
queue.Run(ctx)
}()
for i := 0; i < 25; i++ {
queue.Submit(&eventkit.Event{
Name: "foobar",
})
}
require.Eventually(t, func() bool {
return len(m.events) == 2
}, 5*time.Second, 10*time.Millisecond)
require.Len(t, m.events[0], 10)
require.Len(t, m.events[1], 10)
}

type mockDestination struct {
events [][]*eventkit.Event
}

func (m *mockDestination) Submit(event ...*eventkit.Event) {
m.events = append(m.events, event)
}

func (m *mockDestination) Run(ctx context.Context) {
}

var _ eventkit.Destination = &mockDestination{}
50 changes: 27 additions & 23 deletions eventkitd-bigquery/bigquery/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,35 @@ func NewBigQueryDestination(ctx context.Context, appName string, project string,
return res, nil
}

func (b *BigQueryDestination) Submit(event *eventkit.Event) {
var tags []*pb.Tag
for _, t := range event.Tags {
tags = append(tags, &pb.Tag{
Key: t.Key,
Value: t.Value,
})
}
records := map[string][]*Record{
event.Name: {
{
Application: Application{
Name: b.appName,
Version: "0.0.1",
},
Source: Source{
Instance: b.SourceInstance,
Address: "0.0.0.0",
},
ReceivedAt: time.Now(),
Timestamp: event.Timestamp,
Tags: tags,
// Submit implements Destination.
func (b *BigQueryDestination) Submit(events ...*eventkit.Event) {
records := map[string][]*Record{}
for _, event := range events {
var tags []*pb.Tag
for _, t := range event.Tags {
tags = append(tags, &pb.Tag{
Key: t.Key,
Value: t.Value,
})
}
if _, found := records[event.Name]; !found {
records[event.Name] = make([]*Record, 0)
}
records[event.Name] = append(records[event.Name], &Record{
Application: Application{
Name: b.appName,
Version: "0.0.1",
},
Source: Source{
Instance: b.SourceInstance,
Address: "0.0.0.0",
},
},
ReceivedAt: time.Now(),
Timestamp: event.Timestamp,
Tags: tags,
})
}

err := b.client.SaveRecord(context.Background(), records)
if err != nil {
fmt.Println("WARN: Couldn't save eventkit record to BQ: ", err)
Expand Down
6 changes: 5 additions & 1 deletion registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ type Event struct {
}

type Destination interface {
Submit(*Event)
Submit(...*Event)
Run(ctx context.Context)
}

type BatchDestination interface {
SubmitBatch(*[]Event)
}

type Registry struct {
dests []Destination
}
Expand Down

0 comments on commit 3bffca8

Please sign in to comment.