Skip to content

Commit

Permalink
bigquery: parallel destination for faster event processing
Browse files Browse the repository at this point in the history
Change-Id: Ifb24235444affcc93e13e8d1c282c48172a63ee2
  • Loading branch information
elek committed Feb 21, 2024
1 parent cb68960 commit 3c9a399
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 1 deletion.
16 changes: 15 additions & 1 deletion eventkitd-bigquery/bigquery/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package bigquery

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -24,21 +28,31 @@ func TestBatchQueue(t *testing.T) {
})
}
require.Eventually(t, func() bool {
return len(m.events) == 2
return m.Len() == 2
}, 5*time.Second, 10*time.Millisecond)
require.Len(t, m.events[0], 10)
require.Len(t, m.events[1], 10)
}

type mockDestination struct {
mu sync.Mutex
events [][]*eventkit.Event
}

func (m *mockDestination) Submit(event ...*eventkit.Event) {
m.mu.Lock()
defer m.mu.Unlock()
m.events = append(m.events, event)
}

func (m *mockDestination) Len() int {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.events)
}

func (m *mockDestination) Run(ctx context.Context) {

}

var _ eventkit.Destination = &mockDestination{}
70 changes: 70 additions & 0 deletions eventkitd-bigquery/bigquery/parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package bigquery

import (
"context"
"fmt"
"os"

"golang.org/x/sync/errgroup"

"storj.io/eventkit"
)

// Parallel sends messages parallel from multiple goroutines.
type Parallel struct {
queue chan []*eventkit.Event
target func() (eventkit.Destination, error)
workers int
teardown chan struct{}
}

// NewParallel creates a destination. It requires a way to create the worker destinations and the number of goroutines.
func NewParallel(target func() (eventkit.Destination, error), workers int) *Parallel {
return &Parallel{
queue: make(chan []*eventkit.Event, workers),
teardown: make(chan struct{}),
target: target,
workers: workers,
}
}

// Submit implements eventkit.Destination.
func (p *Parallel) Submit(events ...*eventkit.Event) {
select {
case p.queue <- events:
case <-p.teardown:
}

}

// Run implements eventkit.Destination.
func (p *Parallel) Run(ctx context.Context) {
w := errgroup.Group{}
for i := 0; i < p.workers; i++ {
dest, err := p.target()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "WARNING: eventkit destination couldn't be created: %v", err)
continue
}
w.Go(func() error {
dest.Run(ctx)
return nil
})
w.Go(func() error {
for {
select {
case events := <-p.queue:
dest.Submit(events...)
case <-ctx.Done():
return nil
}
}
})
}
_ = w.Wait()
close(p.teardown)
close(p.queue)

}

var _ eventkit.Destination = &Parallel{}
34 changes: 34 additions & 0 deletions eventkitd-bigquery/bigquery/parallel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package bigquery

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/eventkit"
)

func TestParallel(t *testing.T) {
m := &mockDestination{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewParallel(func() (eventkit.Destination, error) {
return m, nil
}, 10)
go func() {
queue.Run(ctx)
}()
for i := 0; i < 10000; i++ {
queue.Submit(&eventkit.Event{
Name: "foobar",
})
}
require.Eventually(t, func() bool {
return m.Len() == 10000
}, 5*time.Second, 10*time.Millisecond)
require.Len(t, m.events[0], 1)
require.Len(t, m.events[1], 1)

}

0 comments on commit 3c9a399

Please sign in to comment.