From 4638e17dacd1fa76f9976f44974c4037fe4358dc Mon Sep 17 00:00:00 2001 From: shollyman Date: Mon, 12 Jul 2021 13:00:46 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): add state tracking (#4407) This PR introduces two new types: AppendResult - tracks the progress of an individual row append to completion, either success or error. Successful appends _may_ have an associated offset, failed appends will have an associated error. The AppendResult has a blocking method users can interrogate. pendingWrite - handles the state management for a set of rows appended as a group. There's a 1:many relationship between pendingWrite:AppendResult(s), so as a pendingWrite completes all associated AppendResult references should be updated. Towards: https://github.com/googleapis/google-cloud-go/issues/4366 --- .../storage/managedwriter/appendresult.go | 127 ++++++++++++++++++ .../managedwriter/appendresult_test.go | 117 ++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 bigquery/storage/managedwriter/appendresult.go create mode 100644 bigquery/storage/managedwriter/appendresult_test.go diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go new file mode 100644 index 00000000000..2570d80d7a0 --- /dev/null +++ b/bigquery/storage/managedwriter/appendresult.go @@ -0,0 +1,127 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +// NoStreamOffset is a sentinel value for signalling we're not tracking +// stream offset (e.g. a default stream which allows simultaneous append streams). +const NoStreamOffset int64 = -1 + +// AppendResult tracks the status of a single row of data. +type AppendResult struct { + // rowData contains the serialized row data. + rowData []byte + + ready chan struct{} + + // if the encapsulating append failed, this will retain a reference to the error. + err error + + // the stream offset + offset int64 +} + +func newAppendResult(data []byte) *AppendResult { + return &AppendResult{ + ready: make(chan struct{}), + rowData: data, + } +} + +// Ready blocks until the append request is completed. +func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready } + +// GetResult returns the optional offset of this row, or the associated +// error. +func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-ar.Ready(): + return ar.offset, ar.err + } +} + +// pendingWrite tracks state for a set of rows that are part of a single +// append request. +type pendingWrite struct { + request *storagepb.AppendRowsRequest + results []*AppendResult + + // this is used by the flow controller. + reqSize int +} + +// newPendingWrite constructs the proto request and attaches references +// to the pending results for later consumption. The reason for this is +// that in the future, we may want to allow row batching to be managed by +// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING +// streams, this should be managed by the user. +func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { + + results := make([]*AppendResult, len(appends)) + for k, r := range appends { + results[k] = newAppendResult(r) + } + pw := &pendingWrite{ + request: &storagepb.AppendRowsRequest{ + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + Rows: &storagepb.ProtoRows{ + SerializedRows: appends, + }, + }, + }, + }, + results: results, + } + if offset > 0 { + pw.request.Offset = &wrapperspb.Int64Value{Value: offset} + } + // We compute the size now for flow controller purposes, though + // the actual request size may be slightly larger (e.g. the first + // request in a new stream bears schema and stream id). + pw.reqSize = proto.Size(pw.request) + return pw +} + +// markDone propagates finalization of an append request to associated +// AppendResult references. +func (pw *pendingWrite) markDone(startOffset int64, err error) { + curOffset := startOffset + for _, ar := range pw.results { + if err != nil { + ar.err = err + close(ar.ready) + continue + } + + ar.offset = curOffset + // only advance curOffset if we were given a valid starting offset. + if startOffset >= 0 { + curOffset = curOffset + 1 + } + close(ar.ready) + } + // Clear the reference to the request. + pw.request = nil +} diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go new file mode 100644 index 00000000000..c8beeafa734 --- /dev/null +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -0,0 +1,117 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "bytes" + "fmt" + "testing" + "time" +) + +func TestAppendResult(t *testing.T) { + + wantRowBytes := []byte("rowdata") + + gotAR := newAppendResult(wantRowBytes) + if !bytes.Equal(gotAR.rowData, wantRowBytes) { + t.Errorf("mismatch in row data, got %q want %q", gotAR.rowData, wantRowBytes) + } +} + +func TestPendingWrite(t *testing.T) { + wantRowData := [][]byte{ + []byte("row1"), + []byte("row2"), + []byte("row3"), + } + + var wantOffset int64 = 99 + + // first, verify no offset behavior + pending := newPendingWrite(wantRowData, NoStreamOffset) + if pending.request.GetOffset() != nil { + t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) + } + pending.markDone(NoStreamOffset, nil) + for k, ar := range pending.results { + if ar.offset != NoStreamOffset { + t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset) + } + if ar.err != nil { + t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err) + } + } + + // now, verify behavior with a valid offset + pending = newPendingWrite(wantRowData, 99) + if pending.request.GetOffset() == nil { + t.Errorf("offset not set, should be %d", wantOffset) + } + if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset { + t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset) + } + + // check request shape + gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows()) + if gotRowCount != len(wantRowData) { + t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData)) + } + + // verify child AppendResults + if len(pending.results) != len(wantRowData) { + t.Errorf("mismatch in rows and append results. %d rows, %d AppendResults", len(wantRowData), len(pending.results)) + } + for k, ar := range pending.results { + gotData := ar.rowData + if !bytes.Equal(gotData, wantRowData[k]) { + t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k]) + } + select { + case <-ar.Ready(): + t.Errorf("got Ready() on incomplete AppendResult %d", k) + case <-time.After(100 * time.Millisecond): + continue + } + } + + // verify completion behavior + reportedOffset := int64(101) + wantErr := fmt.Errorf("foo") + pending.markDone(reportedOffset, wantErr) + + if pending.request != nil { + t.Errorf("expected request to be cleared, is present: %#v", pending.request) + } + for k, ar := range pending.results { + gotData := ar.rowData + if !bytes.Equal(gotData, wantRowData[k]) { + t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k]) + } + select { + case <-ar.Ready(): + continue + case <-time.After(100 * time.Millisecond): + t.Errorf("possible blocking on completed AppendResult %d", k) + } + if ar.offset != reportedOffset+int64(k) { + t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k)) + } + if ar.err != wantErr { + t.Errorf("mismatch in errors, got %v want %v", ar.err, wantErr) + } + } + +}