Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): add append stream plumbing (#4452)
Browse files Browse the repository at this point in the history
This PR adds enough of the wiring to the client to being testing via integration tests.  It adapts a similar pattern to the pullstream in pubsub, in that it abstracts individual calls from stream state management.

There's two significant units of future work that may yield changes here:

* For traffic efficiency sake, we only want to add things like the stream ID, schema, and trace ID to the first append on any stream.

* For stream connection retry, we may want to re-send writes that were sent but we didn't get an acknowledgement back.  For default/committed streams, this behavior may yield additional writes (at least once semantics).  For buffered/pending streams, it means either the library or user should know to expect "data already present" for these resent-writes.


Towards #4366
  • Loading branch information
shollyman committed Jul 26, 2021
1 parent e5019de commit b085384
Show file tree
Hide file tree
Showing 10 changed files with 910 additions and 21 deletions.
71 changes: 52 additions & 19 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -21,8 +21,10 @@ import (
"strings"

storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/grpc"
)

// Client is a managed BigQuery Storage write client scoped to a single project.
Expand Down Expand Up @@ -53,41 +55,72 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
}, nil
}

// Close releases resources held by the client.
func (c *Client) Close() error {
// TODO: consider if we should propagate a cancellation from client to all associated managed streams.
if c.rawClient == nil {
return fmt.Errorf("already closed")
}
c.rawClient.Close()
c.rawClient = nil
return nil
}

// NewManagedStream establishes a new managed stream for appending data into a table.
//
// Context here is retained for use by the underlying streaming connections the managed stream may create.
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...)
}

func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {

ctx, cancel := context.WithCancel(ctx)

ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
c: c,
ctx: ctx,
cancel: cancel,
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
arc, err := streamFunc(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
if err != nil {
return nil, err
}
return arc, nil
},
}

// apply writer options
for _, opt := range opts {
opt(ms)
}

if err := c.validateOptions(ctx, ms); err != nil {
return nil, err
}
// skipSetup exists for testing scenarios.
if !skipSetup {
if err := c.validateOptions(ctx, ms); err != nil {
return nil, err
}

if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: ms.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(ms.streamSettings.streamType),
}}
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("couldn't create write stream: %v", err)
if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: ms.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(ms.streamSettings.streamType),
}}
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("couldn't create write stream: %v", err)
}
streamName = resp.GetName()
}
streamName = resp.GetName()
ms.streamSettings.streamID = streamName
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
}
ms.streamSettings.streamID = streamName
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
}

return ms, nil
Expand Down
3 changes: 2 additions & 1 deletion bigquery/storage/managedwriter/doc.go
Expand Up @@ -14,7 +14,8 @@

// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient.
//
// It is EXPERIMENTAL and subject to change or removal without notice.
// It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha
// state, and breaking changes are frequent.
//
// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is
// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and
Expand Down
207 changes: 207 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
@@ -0,0 +1,207 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"context"
"fmt"
"testing"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)

var (
datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
defaultTestTimeout = 15 * time.Second
)

func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
projID := testutil.ProjID()
if projID == "" {
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
}
ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
if ts == nil {
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
}
opts = append(opts, option.WithTokenSource(ts))
client, err := NewClient(ctx, projID, opts...)
if err != nil {
t.Fatalf("couldn't create managedwriter client: %v", err)
}

bqClient, err := bigquery.NewClient(ctx, projID, opts...)
if err != nil {
t.Fatalf("couldn't create bigquery client: %v", err)
}
return client, bqClient
}

// validateRowCount confirms the number of rows in a table visible to the query engine.
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) {

// Verify data is present in the table with a count query.
sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Errorf("failed to issue validation query: %v", err)
return
}
var rowdata []bigquery.Value
err = it.Next(&rowdata)
if err != nil {
t.Errorf("error fetching validation results: %v", err)
return
}
count, ok := rowdata[0].(int64)
if !ok {
t.Errorf("got unexpected data from validation query: %v", rowdata[0])
}
if count != expectedRows {
t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows)
}
}

// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) {
dataset := bqc.Dataset(datasetIDs.New())
if err := dataset.Create(ctx, nil); err != nil {
return nil, nil, err
}
return dataset, func() {
if err := dataset.DeleteWithContents(ctx); err != nil {
t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err)
}
}, nil
}

// setupDynamicDescriptors aids testing when not using a supplied proto
func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
if err != nil {
t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
}

descriptor, err := adapt.StorageSchemaToDescriptor(convertedSchema, "root")
if err != nil {
t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
}
messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
if !ok {
t.Fatalf("adapted descriptor is not a message descriptor")
}
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
}

func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
// to send as the stream's schema.
m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
{Name: "three", Value: 3},
{Name: "four", Value: 1},
{Name: "five", Value: 2},
}

// First, send the test rows individually.
var results []*AppendResult
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testData))
validateRowCount(ctx, t, bqClient, testTable, wantRows)

// Now, send the test rows grouped into in a single append.
var data [][]byte
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := append(data, b)
results, err = ms.AppendRows(data, NoStreamOffset)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
}
// wait for the result to indicate ready, then validate again.
results[0].Ready()
wantRows = wantRows * 2
validateRowCount(ctx, t, bqClient, testTable, wantRows)
}

0 comments on commit b085384

Please sign in to comment.