Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): add state tracking #4407

Merged
14 changes: 4 additions & 10 deletions bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -22,6 +22,10 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

// NoOffset is a sentinel value for signalling we're not tracking
// stream offset (e.g. a default stream which allows simultaneous append streams).
const NoOffset int64 = -1
codyoss marked this conversation as resolved.
Show resolved Hide resolved

// AppendResult tracks the status of a single row of data.
type AppendResult struct {
// rowData contains the serialized row data.
Expand Down Expand Up @@ -49,12 +53,6 @@ func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready }
// GetResult returns the optional offset of this row, or the associated
codyoss marked this conversation as resolved.
Show resolved Hide resolved
// error.
func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
select {
case <-ar.Ready():
return ar.offset, ar.err
default:
}

select {
codyoss marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return 0, ctx.Err()
Expand Down Expand Up @@ -106,10 +104,6 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
return pw
}

// noOffsetSpecified is a sentinel value for signalling we're not tracking
// stream offset (e.g. a default stream which allows simultaneous append streams).
var noOffsetSpecified int64 = -1

// markDone propagates finalization of an append request to associated
// AppendResult references.
func (pw *pendingWrite) markDone(startOffset int64, err error) {
Expand Down
11 changes: 10 additions & 1 deletion bigquery/storage/managedwriter/appendresult_test.go
Expand Up @@ -41,10 +41,19 @@ func TestPendingWrite(t *testing.T) {
var wantOffset int64 = 99

// first, verify no offset behavior
pending := newPendingWrite(wantRowData, noOffsetSpecified)
pending := newPendingWrite(wantRowData, NoOffset)
if pending.request.GetOffset() != nil {
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
}
pending.markDone(NoOffset, nil)
for k, ar := range pending.results {
if ar.offset != NoOffset {
t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoOffset)
}
if ar.err != nil {
t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err)
}
}

// now, verify behavior with a valid offset
pending = newPendingWrite(wantRowData, 99)
Expand Down