diff --git a/bigquery/go.mod b/bigquery/go.mod index 2e954688b69..5f55d3f3b0a 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 github.com/googleapis/gax-go/v2 v2.0.5 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/api v0.50.0 google.golang.org/genproto v0.0.0-20210707141755-0f065b0b1eb9 diff --git a/bigquery/go.sum b/bigquery/go.sum index c3cf88137d0..8eeb6fa9a27 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -275,6 +275,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/bigquery/storage/managedwriter/flow_controller.go b/bigquery/storage/managedwriter/flow_controller.go new file mode 100644 index 00000000000..3177ecfa38f --- /dev/null +++ b/bigquery/storage/managedwriter/flow_controller.go @@ -0,0 +1,119 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "sync/atomic" + + "golang.org/x/sync/semaphore" +) + +// Flow controller for write API. Adapted from pubsub. +type flowController struct { + // The max number of pending write requests. + maxInsertCount int + // The max pending request bytes. + maxInsertBytes int + + // Semaphores for governing pending inserts. + semInsertCount, semInsertBytes *semaphore.Weighted + + countRemaining int64 // Atomic. +} + +func newFlowController(maxInserts, maxInsertBytes int) *flowController { + fc := &flowController{ + maxInsertCount: maxInserts, + maxInsertBytes: maxInsertBytes, + semInsertCount: nil, + semInsertBytes: nil, + } + if maxInserts > 0 { + fc.semInsertCount = semaphore.NewWeighted(int64(maxInserts)) + } + if maxInsertBytes > 0 { + fc.semInsertBytes = semaphore.NewWeighted(int64(maxInsertBytes)) + } + return fc +} + +// acquire blocks until one insert of size bytes can proceed or ctx is done. +// It returns nil in the first case, or ctx.Err() in the second. +// +// acquire allows large messages to proceed by treating a size greater than maxSize +// as if it were equal to maxSize. +func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error { + if fc.semInsertCount != nil { + if err := fc.semInsertCount.Acquire(ctx, 1); err != nil { + return err + } + } + if fc.semInsertBytes != nil { + if err := fc.semInsertBytes.Acquire(ctx, fc.bound(sizeBytes)); err != nil { + if fc.semInsertCount != nil { + fc.semInsertCount.Release(1) + } + return err + } + } + atomic.AddInt64(&fc.countRemaining, 1) + return nil +} + +// tryAcquire returns false if acquire would block. Otherwise, it behaves like +// acquire and returns true. +// +// tryAcquire allows large inserts to proceed by treating a size greater than +// maxSize as if it were equal to maxSize. +func (fc *flowController) tryAcquire(sizeBytes int) bool { + if fc.semInsertCount != nil { + if !fc.semInsertCount.TryAcquire(1) { + return false + } + } + if fc.semInsertBytes != nil { + if !fc.semInsertBytes.TryAcquire(fc.bound(sizeBytes)) { + if fc.semInsertCount != nil { + fc.semInsertCount.Release(1) + } + return false + } + } + atomic.AddInt64(&fc.countRemaining, 1) + return true +} + +func (fc *flowController) release(sizeBytes int) { + atomic.AddInt64(&fc.countRemaining, -1) + if fc.semInsertCount != nil { + fc.semInsertCount.Release(1) + } + if fc.semInsertBytes != nil { + fc.semInsertBytes.Release(fc.bound(sizeBytes)) + } +} + +// bound normalizes input size to maxInsertBytes if it exceeds the limit. +func (fc *flowController) bound(sizeBytes int) int64 { + if sizeBytes > fc.maxInsertBytes { + return int64(fc.maxInsertBytes) + } + return int64(sizeBytes) +} + +func (fc *flowController) count() int { + return int(atomic.LoadInt64(&fc.countRemaining)) +} diff --git a/bigquery/storage/managedwriter/flow_controller_test.go b/bigquery/storage/managedwriter/flow_controller_test.go new file mode 100644 index 00000000000..ff880639af0 --- /dev/null +++ b/bigquery/storage/managedwriter/flow_controller_test.go @@ -0,0 +1,259 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "testing" + "time" + + "golang.org/x/sync/errgroup" +) + +func TestFlowControllerCancel(t *testing.T) { + // Test canceling a flow controller's context. + t.Parallel() + wantInsertBytes := 10 + fc := newFlowController(3, wantInsertBytes) + if fc.maxInsertBytes != 10 { + t.Fatalf("maxInsertBytes mismatch, got %d want %d", fc.maxInsertBytes, wantInsertBytes) + } + if err := fc.acquire(context.Background(), 5); err != nil { + t.Fatal(err) + } + // Experiment: a context that times out should always return an error. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cancel() + if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded { + t.Fatalf("got %v, expected DeadlineExceeded", err) + } + // Control: a context that is not done should always return nil. + go func() { + time.Sleep(5 * time.Millisecond) + fc.release(5) + }() + if err := fc.acquire(context.Background(), 6); err != nil { + t.Errorf("got %v, expected nil", err) + } +} + +func TestFlowControllerLargeRequest(t *testing.T) { + // Large requests succeed, consuming the entire allotment. + t.Parallel() + fc := newFlowController(3, 10) + err := fc.acquire(context.Background(), 11) + if err != nil { + t.Fatal(err) + } +} + +func TestFlowControllerNoStarve(t *testing.T) { + // A large request won't starve, because the flowController is + // (best-effort) FIFO. + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + fc := newFlowController(10, 10) + first := make(chan int) + for i := 0; i < 20; i++ { + go func() { + for { + if err := fc.acquire(ctx, 1); err != nil { + if err != context.Canceled { + t.Error(err) + } + return + } + select { + case first <- 1: + default: + } + fc.release(1) + } + }() + } + <-first // Wait until the flowController's state is non-zero. + if err := fc.acquire(ctx, 11); err != nil { + t.Errorf("got %v, want nil", err) + } +} + +func TestFlowControllerSaturation(t *testing.T) { + t.Parallel() + const ( + maxCount = 6 + maxSize = 10 + ) + for _, test := range []struct { + acquireSize int + wantCount, wantSize int64 + }{ + { + // Many small acquires cause the flow controller to reach its max count. + acquireSize: 1, + wantCount: 6, + wantSize: 6, + }, + { + // Five acquires of size 2 will cause the flow controller to reach its max size, + // but not its max count. + acquireSize: 2, + wantCount: 5, + wantSize: 10, + }, + { + // If the requests are the right size (relatively prime to maxSize), + // the flow controller will not saturate on size. (In this case, not on count either.) + acquireSize: 3, + wantCount: 3, + wantSize: 9, + }, + } { + fc := newFlowController(maxCount, maxSize) + // Atomically track flow controller state. + // The flowController itself tracks count. + var curSize int64 + success := errors.New("") + // Time out if wantSize or wantCount is never reached. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < 10; i++ { + g.Go(func() error { + var hitCount, hitSize bool + // Run at least until we hit the expected values, and at least + // for enough iterations to exceed them if the flow controller + // is broken. + for i := 0; i < 100 || !hitCount || !hitSize; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := fc.acquire(ctx, test.acquireSize); err != nil { + return err + } + c := int64(fc.count()) + if c > test.wantCount { + return fmt.Errorf("count %d exceeds want %d", c, test.wantCount) + } + if c == test.wantCount { + hitCount = true + } + s := atomic.AddInt64(&curSize, int64(test.acquireSize)) + if s > test.wantSize { + return fmt.Errorf("size %d exceeds want %d", s, test.wantSize) + } + if s == test.wantSize { + hitSize = true + } + time.Sleep(5 * time.Millisecond) // Let other goroutines make progress. + if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 { + return errors.New("negative size") + } + fc.release(test.acquireSize) + } + return success + }) + } + if err := g.Wait(); err != success { + t.Errorf("%+v: %v", test, err) + continue + } + } +} + +func TestFlowControllerTryAcquire(t *testing.T) { + t.Parallel() + fc := newFlowController(3, 10) + + // Successfully tryAcquire 4 bytes. + if !fc.tryAcquire(4) { + t.Error("got false, wanted true") + } + + // Fail to tryAcquire 7 bytes. + if fc.tryAcquire(7) { + t.Error("got true, wanted false") + } + + // Successfully tryAcquire 6 byte. + if !fc.tryAcquire(6) { + t.Error("got false, wanted true") + } +} + +func TestFlowControllerUnboundedCount(t *testing.T) { + t.Parallel() + ctx := context.Background() + fc := newFlowController(0, 10) + + // Successfully acquire 4 bytes. + if err := fc.acquire(ctx, 4); err != nil { + t.Errorf("got %v, wanted no error", err) + } + + // Successfully tryAcquire 4 bytes. + if !fc.tryAcquire(4) { + t.Error("got false, wanted true") + } + + // Fail to tryAcquire 3 bytes. + if fc.tryAcquire(3) { + t.Error("got true, wanted false") + } +} + +func TestFlowControllerUnboundedCount2(t *testing.T) { + t.Parallel() + ctx := context.Background() + fc := newFlowController(0, 0) + // Successfully acquire 4 bytes. + if err := fc.acquire(ctx, 4); err != nil { + t.Errorf("got %v, wanted no error", err) + } + fc.release(1) + fc.release(1) + fc.release(1) + wantCount := int64(-2) + c := int64(fc.count()) + if c != wantCount { + t.Fatalf("got count %d, want %d", c, wantCount) + } +} + +func TestFlowControllerUnboundedBytes(t *testing.T) { + t.Parallel() + ctx := context.Background() + fc := newFlowController(2, 0) + + // Successfully acquire 4GB. + if err := fc.acquire(ctx, 4e9); err != nil { + t.Errorf("got %v, wanted no error", err) + } + + // Successfully tryAcquire 4GB bytes. + if !fc.tryAcquire(4e9) { + t.Error("got false, wanted true") + } + + // Fail to tryAcquire a third message. + if fc.tryAcquire(3) { + t.Error("got true, wanted false") + } +}