From 663c899c3b8aa751527d24f541d964f2ba91a233 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 28 Jul 2021 14:18:58 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): naming and doc improvements (#4508) Minor changes: WithTracePrefix -> WithTraceID for the option and accompanying downstream usage exported TableParentFromStreamName to aid users of BatchCommit. The rest of the PR represents docstring improvements. Towards https://github.com/googleapis/google-cloud-go/issues/4366 --- .../storage/managedwriter/appendresult.go | 5 ++-- bigquery/storage/managedwriter/client.go | 19 +++++++++---- bigquery/storage/managedwriter/client_test.go | 2 +- bigquery/storage/managedwriter/doc.go | 28 +++++++++++++------ .../storage/managedwriter/integration_test.go | 2 +- .../storage/managedwriter/managed_stream.go | 21 ++++++++++---- .../managedwriter/managed_stream_test.go | 2 +- .../storage/managedwriter/writer_option.go | 19 +++++++------ .../managedwriter/writer_option_test.go | 8 +++--- 9 files changed, 69 insertions(+), 37 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 0854a407385..2cfff5069c6 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -47,11 +47,12 @@ func newAppendResult(data []byte) *AppendResult { } } -// Ready blocks until the append request is completed. +// Ready blocks until the append request has reached a completed state, +// which may be a successful append or an error. func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready } // GetResult returns the optional offset of this row, or the associated -// error. +// error. It blocks until the result is ready. func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { select { case <-ctx.Done(): diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 4e6b66297b5..e12fe61970f 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -143,7 +143,7 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error { } // update type and destination based on stream metadata ms.streamSettings.streamType = StreamType(info.Type.String()) - ms.destinationTable = tableParentFromStreamName(ms.streamSettings.streamID) + ms.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID) } if ms.destinationTable == "" { return fmt.Errorf("no destination table specified") @@ -158,7 +158,13 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error { // BatchCommit is used to commit one or more PendingStream streams belonging to the same table // as a single transaction. Streams must be finalized before committing. // -// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type. +// Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility +// function TableParentFromStreamName can be used to derive this from a Stream's name. +// +// If the returned response contains stream errors, this indicates that the batch commit failed and no data was +// committed. +// +// TODO: currently returns the raw response. Determine how we want to surface StreamErrors. func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) { // determine table from first streamName, as all must share the same table. @@ -167,7 +173,7 @@ func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamName } req := &storagepb.BatchCommitWriteStreamsRequest{ - Parent: tableParentFromStreamName(streamNames[0]), + Parent: TableParentFromStreamName(streamNames[0]), WriteStreams: streamNames, } return c.rawClient.BatchCommitWriteStreams(ctx, req) @@ -183,9 +189,10 @@ func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storag return c.rawClient.GetWriteStream(ctx, req) } -// tableParentFromStreamName return the corresponding parent table -// identifier given a fully qualified streamname. -func tableParentFromStreamName(streamName string) string { +// TableParentFromStreamName is a utility function for extracting the parent table +// prefix from a stream name. When an invalid stream ID is passed, this simply returns +// the original stream name. +func TableParentFromStreamName(streamName string) string { // Stream IDs have the following prefix: // projects/{project}/datasets/{dataset}/tables/{table}/blah parts := strings.SplitN(streamName, "/", 7) diff --git a/bigquery/storage/managedwriter/client_test.go b/bigquery/storage/managedwriter/client_test.go index 81d91e5ca13..2183f4c6c79 100644 --- a/bigquery/storage/managedwriter/client_test.go +++ b/bigquery/storage/managedwriter/client_test.go @@ -40,7 +40,7 @@ func TestTableParentFromStreamName(t *testing.T) { } for _, tc := range testCases { - got := tableParentFromStreamName(tc.in) + got := TableParentFromStreamName(tc.in) if got != tc.want { t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want) } diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go index 5fb6dfd6f54..33d70a619d5 100644 --- a/bigquery/storage/managedwriter/doc.go +++ b/bigquery/storage/managedwriter/doc.go @@ -12,12 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient. -// -// It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha -// state, and breaking changes are frequent. -// -// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is -// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and -// the Inserter abstraction is the equivalent to this in the cloud.google.com/go/bigquery package. +/* +Package managedwriter provides an EXPERIMENTAL thick client around the BigQuery storage API's BigQueryWriteClient. +More information about this new write client may also be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api + +It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha +state, and breaking changes are frequent. + +Currently, this client targets the BigQueryWriteClient present in the v1beta2 endpoint, and is intended as a more +feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction +in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST +methods. + +Appending data is accomplished through the use of streams. There are four stream types, each targetting slightly different +use cases and needs. See the StreamType documentation for more details about each stream type. + +This API uses serialized protocol buffer messages for appending data to streams. For users who don't have predefined protocol +buffer messages for sending data, the cloud.google.com/go/bigquery/storage/managedwriter/adapt subpackage includes functionality +for defining protocol buffer messages dynamically using table schema information, which enables users to do things like using +protojson to convert json text into a protocol buffer. +*/ package managedwriter diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 95289688ea4..b181f804cf1 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -448,7 +448,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } // Commit stream and validate. - resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) + resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) if err != nil { t.Errorf("client.BatchCommit: %v", err) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index be88be9a870..4b5a5e07134 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -45,6 +45,9 @@ var ( // BufferedStream is a form of checkpointed stream, that allows // you to advance the offset of visible rows via Flush operations. + // + // NOTE: Buffered Streams are currently in limited preview, and as such + // methods like FlushRows() may yield errors for non-enrolled projects. BufferedStream StreamType = "BUFFERED" // PendingStream is a stream in which no data is made visible to @@ -106,9 +109,9 @@ type streamSettings struct { // request bytes can be outstanding into the system. MaxInflightBytes int - // TracePrefix sets a suitable prefix for the trace ID set on - // append requests. Useful for diagnostic purposes. - TracePrefix string + // TraceID can be set when appending data on a stream. It's + // purpose is to aid in debug and diagnostic scenarios. + TraceID string } func defaultStreamSettings() *streamSettings { @@ -116,7 +119,7 @@ func defaultStreamSettings() *streamSettings { streamType: DefaultStream, MaxInflightRequests: 1000, MaxInflightBytes: 0, - TracePrefix: "defaultManagedWriter", + TraceID: "", } } @@ -241,7 +244,9 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ ProtoDescriptor: ms.schemaDescriptor, } - reqCopy.TraceId = ms.streamSettings.TracePrefix + if ms.streamSettings.TraceID != "" { + reqCopy.TraceId = ms.streamSettings.TraceID + } req = &reqCopy }) @@ -290,10 +295,16 @@ func (ms *ManagedStream) Close() error { ms.mu.Lock() ms.err = io.EOF ms.mu.Unlock() + // Propagate cancellation. + if ms.cancel != nil { + ms.cancel() + } return err } // AppendRows sends the append requests to the service, and returns one AppendResult per row. +// The format of the row data is binary serialized protocol buffer bytes, and and the message +// must adhere to the format of the schema Descriptor passed in when creating the managed stream. func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) { pw := newPendingWrite(data, offset) // check flow control diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index fac9139ab20..23976f0b1aa 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -115,7 +115,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) { fc: newFlowController(0, 0), } ms.streamSettings.streamID = "FOO" - ms.streamSettings.TracePrefix = "TRACE" + ms.streamSettings.TraceID = "TRACE" ms.schemaDescriptor = schema fakeData := [][]byte{ diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go index 180418c5ff1..efd33f03c8f 100644 --- a/bigquery/storage/managedwriter/writer_option.go +++ b/bigquery/storage/managedwriter/writer_option.go @@ -16,10 +16,10 @@ package managedwriter import "google.golang.org/protobuf/types/descriptorpb" -// WriterOption is used to configure a ManagedWriteClient. +// WriterOption are variadic options used to configure a ManagedStream instance. type WriterOption func(*ManagedStream) -// WithType sets the write type of the new writer. +// WithType sets the stream type for the managed stream. func WithType(st StreamType) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.streamType = st @@ -28,10 +28,10 @@ func WithType(st StreamType) WriterOption { // WithStreamName allows users to set the stream name this writer will // append to explicitly. By default, the managed client will create the -// stream when instantiated. +// stream when instantiated if necessary. // -// Note: Supplying this option causes other options such as WithStreamType -// and WithDestinationTable to be ignored. +// Note: Supplying this option causes other options which affect stream construction +// such as WithStreamType and WithDestinationTable to be ignored. func WithStreamName(name string) WriterOption { return func(ms *ManagedStream) { ms.streamSettings.streamID = name @@ -62,15 +62,16 @@ func WithMaxInflightBytes(n int) WriterOption { } } -// WithTracePrefix allows instruments requests to the service with a custom trace prefix. +// WithTraceID allows instruments requests to the service with a custom trace prefix. // This is generally for diagnostic purposes only. -func WithTracePrefix(prefix string) WriterOption { +func WithTraceID(traceID string) WriterOption { return func(ms *ManagedStream) { - ms.streamSettings.TracePrefix = prefix + ms.streamSettings.TraceID = traceID } } -// WithSchemaDescriptor describes the format of messages you'll be sending to the service. +// WithSchemaDescriptor describes the format of the serialized data being sent by +// AppendRows calls on the stream. func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { return func(ms *ManagedStream) { ms.schemaDescriptor = dp diff --git a/bigquery/storage/managedwriter/writer_option_test.go b/bigquery/storage/managedwriter/writer_option_test.go index 929f8b2292e..ed2119bf4b4 100644 --- a/bigquery/storage/managedwriter/writer_option_test.go +++ b/bigquery/storage/managedwriter/writer_option_test.go @@ -63,12 +63,12 @@ func TestWriterOptions(t *testing.T) { }, { desc: "WithTracePrefix", - options: []WriterOption{WithTracePrefix("foo")}, + options: []WriterOption{WithTraceID("foo")}, want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), } - ms.streamSettings.TracePrefix = "foo" + ms.streamSettings.TraceID = "foo" return ms }(), }, @@ -88,7 +88,7 @@ func TestWriterOptions(t *testing.T) { options: []WriterOption{ WithType(PendingStream), WithMaxInflightBytes(5), - WithTracePrefix("pre"), + WithTraceID("id"), }, want: func() *ManagedStream { ms := &ManagedStream{ @@ -96,7 +96,7 @@ func TestWriterOptions(t *testing.T) { } ms.streamSettings.MaxInflightBytes = 5 ms.streamSettings.streamType = PendingStream - ms.streamSettings.TracePrefix = "pre" + ms.streamSettings.TraceID = "id" return ms }(), },