Skip to content

Commit

Permalink
all: support for passing credentials path for BigQuery authentication
Browse files Browse the repository at this point in the history
sometimes depending on the setup it's more convenient to have the
credentials file stored in a filesystem path somewhere. This change
supports a new "credentialsPath" in the BigQuery definition, and
falls back to GOOGLE_APPLICATION_CREDENTIALS env if not provided.

Change-Id: Ie4caa5c0fe22074342b60688be05e70cd8bd81b9
  • Loading branch information
halkyon committed Apr 15, 2024
1 parent 6cb545e commit 1d9596f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 14 deletions.
5 changes: 3 additions & 2 deletions bigquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"cloud.google.com/go/bigquery"
"github.com/pkg/errors"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"

"storj.io/eventkit/pb"
)
Expand All @@ -23,8 +24,8 @@ type BigQueryClient struct {
schemeChangeLock sync.Locker
}

func NewBigQueryClient(ctx context.Context, project string, datasetName string) (*BigQueryClient, error) {
client, err := bigquery.NewClient(ctx, project)
func NewBigQueryClient(ctx context.Context, project, datasetName string, options ...option.ClientOption) (*BigQueryClient, error) {
client, err := bigquery.NewClient(ctx, project, options...)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
12 changes: 10 additions & 2 deletions bigquery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/zeebo/errs/v2"
"google.golang.org/api/option"

"storj.io/eventkit"
"storj.io/eventkit/destination"
Expand All @@ -19,6 +20,7 @@ import (
// bigquery:app=...,project=...,dataset=...
// bigquery:app=...,project=...,dataset=...|batch:queueSize=111,flashSize=111,flushInterval=111
// bigquery:app=...,project=...,dataset=...|parallel:runners=10|batch:queueSize=111,flashSize=111,flushInterval=111
// bigquery:app=...,project=...,dataset=...,credentialsPath=/path/to/my/service-account.json|parallel:runners=10|batch:queueSize=111
func CreateDestination(ctx context.Context, config string) (eventkit.Destination, error) {
layers := strings.Split(config, "|")
var lastLayer func() (eventkit.Destination, error)
Expand All @@ -33,7 +35,7 @@ func CreateDestination(ctx context.Context, config string) (eventkit.Destination
}
switch typeName {
case "bigquery", "bq":
var appName, project, dataset string
var appName, project, dataset, credentialsPath string
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
Expand All @@ -46,13 +48,19 @@ func CreateDestination(ctx context.Context, config string) (eventkit.Destination
project = value
case "dataset":
dataset = value
case "credentialsPath":
credentialsPath = value
default:
return nil, errs.Errorf("Unknown parameter for bigquery destination %s. Please use appName/project/dataset", key)
}

}
lastLayer = func() (eventkit.Destination, error) {
return NewBigQueryDestination(ctx, appName, project, dataset)
var options []option.ClientOption
if credentialsPath != "" {
options = append(options, option.WithCredentialsFile(credentialsPath))
}
return NewBigQueryDestination(ctx, appName, project, dataset, options...)
}
case "parallel":
var workers int
Expand Down
6 changes: 4 additions & 2 deletions bigquery/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"os"
"time"

"google.golang.org/api/option"

"storj.io/eventkit"
"storj.io/eventkit/pb"
)
Expand All @@ -22,8 +24,8 @@ type BigQueryDestination struct {

var _ eventkit.Destination = &BigQueryDestination{}

func NewBigQueryDestination(ctx context.Context, appName string, project string, dataset string) (*BigQueryDestination, error) {
c, err := NewBigQueryClient(ctx, project, dataset)
func NewBigQueryDestination(ctx context.Context, appName, project, dataset string, options ...option.ClientOption) (*BigQueryDestination, error) {
c, err := NewBigQueryClient(ctx, project, dataset, options...)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions eventkitd-bigquery/bigquery/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"google.golang.org/api/option"

"storj.io/eventkit/bigquery"
"storj.io/eventkit/eventkitd/listener"
"storj.io/eventkit/pb"
Expand All @@ -14,8 +16,8 @@ type BigQuerySink struct {
client *bigquery.BigQueryClient
}

func NewBigQuerySink(ctx context.Context, project string, dataset string) (*BigQuerySink, error) {
c, err := bigquery.NewBigQueryClient(ctx, project, dataset)
func NewBigQuerySink(ctx context.Context, project, dataset string, options ...option.ClientOption) (*BigQuerySink, error) {
c, err := bigquery.NewBigQueryClient(ctx, project, dataset, options...)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions tools/eventkit-save-bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"
"github.com/spf13/cobra"
"google.golang.org/api/option"

"storj.io/eventkit"
"storj.io/eventkit/bigquery"
Expand All @@ -23,17 +24,23 @@ func main() {
name := c.Flags().StringP("name", "n", "test", "Name of the event sending out")
project := c.Flags().StringP("project", "p", "", "GCP project to use")
dataset := c.Flags().StringP("dataset", "d", "eventkitd", "GCP dataset to use")
credentialsPath := c.Flags().StringP("credentialsPath", "c", "", "GCP credentials path, defaults to GOOGLE_APPLICATION_CREDENTIALS if not provided")
c.RunE = func(cmd *cobra.Command, args []string) error {
return send(*project, *dataset, *name, args)
return send(*project, *dataset, *credentialsPath, *name, args)
}
err := c.Execute()
if err != nil {
log.Fatalf("%++v", err)
}
}

func send(project string, dataset string, name string, args []string) error {
d, err := bigquery.NewBigQueryDestination(context.Background(), "evenkit-save", project, dataset)
func send(project, dataset, credentialsPath, name string, args []string) error {
var options []option.ClientOption
if credentialsPath != "" {
options = append(options, option.WithCredentialsFile(credentialsPath))
}

d, err := bigquery.NewBigQueryDestination(context.Background(), "evenkit-save", project, dataset, options...)
if err != nil {
return errors.WithStack(err)
}
Expand Down
20 changes: 17 additions & 3 deletions tools/eventkit-time/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spf13/viper"
"github.com/zeebo/errs/v2"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"

"storj.io/eventkit"
"storj.io/eventkit/bigquery"
Expand All @@ -29,6 +30,7 @@ func main() {
_ = c.Flags().StringSliceP("tag", "t", []string{}, "Custom tags to add to the events")
_ = c.Flags().StringP("instance", "i", "", "Instance name of the eventkitd monitoring (default: hostname)")
_ = c.Flags().StringP("scope", "s", "eventkit-time", "Scope to use for events")
_ = c.Flags().StringP("credentialsPath", "c", "", "GCP credentials path, defaults to GOOGLE_APPLICATION_CREDENTIALS if not provided")
version := c.Flags().BoolP("version", "v", false, "Scope to use for events")
viper.SetConfigName("eventkit-time")
viper.SetEnvPrefix("EVENTKIT")
Expand Down Expand Up @@ -57,15 +59,23 @@ func main() {
}
}

return execute(viper.GetString("destination"), viper.GetString("name"), args, viper.GetStringSlice("tag"), viper.GetString("scope"), viper.GetString("instance"))
return execute(
viper.GetString("destination"),
viper.GetString("name"),
viper.GetString("credentialsPath"),
args,
viper.GetStringSlice("tag"),
viper.GetString("scope"),
viper.GetString("instance"),
)
}
err = c.Execute()
if err != nil {
log.Fatalf("%++v", err)
}
}

func execute(dest string, name string, args []string, customTags []string, scope string, instance string) error {
func execute(dest, name, credentialsPath string, args []string, customTags []string, scope, instance string) error {
ek := eventkit.DefaultRegistry.Scope(scope)
if instance == "" {
instance, _ = os.Hostname()
Expand All @@ -82,7 +92,11 @@ func execute(dest string, name string, args []string, customTags []string, scope
var err error
dest = strings.TrimPrefix(dest, "bq:")
parts := strings.Split(dest, "/")
client, err = bigquery.NewBigQueryDestination(destCtx, "eventkit-time", parts[0], parts[1])
var options []option.ClientOption
if credentialsPath != "" {
options = append(options, option.WithCredentialsFile(credentialsPath))
}
client, err = bigquery.NewBigQueryDestination(destCtx, "eventkit-time", parts[0], parts[1], options...)
if err != nil {
return err
}
Expand Down

0 comments on commit 1d9596f

Please sign in to comment.