Skip to content

Commit

Permalink
bigquery: option to configure complex bq destinations
Browse files Browse the repository at this point in the history
Change-Id: I43c71b6b3d9ff0e9b44fc7d2e389cea59a78e330
  • Loading branch information
elek committed Feb 22, 2024
1 parent 6551794 commit 204fc04
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions eventkitd-bigquery/bigquery/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package bigquery

import (
"context"
"strconv"
"strings"
"time"

"github.com/zeebo/errs/v2"

"storj.io/eventkit"
)

// CreateDestination creates eventkit destination based on complex configuration.
// Example configurations:
//
// 127.0.0.1:1234
// bigquery:app=...,project=...,dataset=...
// bigquery:app=...,project=...,dataset=...|batch:queueSize=111,flashSize=111,flushInterval=111
// bigquery:app=...,project=...,dataset=...|parallel:runners=10|batch:queueSize=111,flashSize=111,flushInterval=111
func CreateDestination(ctx context.Context, config string) (eventkit.Destination, error) {
layers := strings.Split(config, "|")
var lastLayer func() (eventkit.Destination, error)
for _, layer := range layers {
layer = strings.TrimSpace(layer)
if layer == "" {
continue
}
typeName, params, found := strings.Cut(layer, ":")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in the form type:param=value,...")
}
switch typeName {
case "bigquery", "bq":
var appName, project, dataset string
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "appName":
appName = value
case "project":
project = value
case "dataset":
dataset = value
default:
return nil, errs.Errorf("Unknown parameter for bigquery destination %s. Please use appName/project/dataset", key)
}

}
lastLayer = func() (eventkit.Destination, error) {
return NewBigQueryDestination(ctx, appName, project, dataset)
}
case "parallel":
var workers int
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "workers":
var err error
workers, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("workers parameter of parallel destination should be a number and not %s", value)
}
default:
return nil, errs.Errorf("Unknown parameter for parallel destination %s. Please use appName/project/dataset", value)
}
}

ll := lastLayer
lastLayer = func() (eventkit.Destination, error) {
return NewParallel(ll, workers), nil
}

case "batch":
var queueSize, batchSize int
var flushInterval time.Duration
var err error
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "queueSize":
queueSize, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("queueSize parameter of batch destination should be a number and not %s", value)
}
case "batchSize":
batchSize, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("batchSize parameter of batch destination should be a number and not %s", value)
}
case "flushInterval":
flushInterval, err = time.ParseDuration(value)
if err != nil {
return nil, errs.Errorf("flushInterval parameter of batch destination should be a duration and not %s", value)
}
default:
return nil, errs.Errorf("Unknown parameter for batch destination %s. Please use queueSize/batchSize/flushInterval", key)
}
}
destination, err := lastLayer()
if err != nil {
return nil, err
}
lastLayer = func() (eventkit.Destination, error) {
return NewBatchQueue(destination, queueSize, batchSize, flushInterval), nil
}
}
}
if lastLayer == nil {
return nil, errs.Errorf("No evenkit destinatino is defined")
}
return lastLayer()
}

0 comments on commit 204fc04

Please sign in to comment.