Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): improve method parity in manage…
Browse files Browse the repository at this point in the history
…dwriter (#5007)

This PR exposes the raw methods for creating and committing streams to the wrapped managedwriter client.

It allows users to interact with all the methods of the underlying API using the managedwriter client (which itself wraps the raw v1 client).  The disadvantage is that it couples managedwriter directly to v1, as it accepts requests in the v1 namespace. The existing append interactions all use abstractions local to the managedwriter.

PR also gets rid of the utility method for batch committing write streams; there's not a great deal of utility saved here vs the underlying method.

Towards: #4366
  • Loading branch information
shollyman committed Oct 21, 2021
1 parent 587bba5 commit a2af4de
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
36 changes: 16 additions & 20 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -177,28 +177,24 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
return nil
}

// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
// as a single transaction. Streams must be finalized before committing.
// BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
// parent table.
//
// Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility
// function TableParentFromStreamName can be used to derive this from a Stream's name.
//
// If the returned response contains stream errors, this indicates that the batch commit failed and no data was
// committed.
//
// TODO: currently returns the raw response. Determine how we want to surface StreamErrors.
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {

// determine table from first streamName, as all must share the same table.
if len(streamNames) <= 0 {
return nil, fmt.Errorf("no streamnames provided")
}
// Streams must be finalized before commit and cannot be committed multiple
// times. Once a stream is committed, data in the stream becomes available
// for read operations.
func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
}

req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: TableParentFromStreamName(streamNames[0]),
WriteStreams: streamNames,
}
return c.rawClient.BatchCommitWriteStreams(ctx, req)
// CreateWriteStream creates a write stream to the given table.
// Additionally, every table has a special stream named ‘_default’
// to which data can be written. This stream doesn’t need to be created using
// CreateWriteStream. It is a stream that can be used simultaneously by any
// number of clients. Data written to this stream is considered committed as
// soon as an acknowledgement is received.
func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
return c.rawClient.CreateWriteStream(ctx, req, opts...)
}

// getWriteStream returns information about a given write stream.
Expand Down
8 changes: 7 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/internal/uid"
"go.opencensus.io/stats/view"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
Expand Down Expand Up @@ -415,7 +416,12 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
}

// Commit stream and validate.
resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: TableParentFromStreamName(ms.StreamName()),
WriteStreams: []string{ms.StreamName()},
}

resp, err := mwClient.BatchCommitWriteStreams(ctx, req)
if err != nil {
t.Errorf("client.BatchCommit: %v", err)
}
Expand Down

0 comments on commit a2af4de

Please sign in to comment.