diff --git a/bucket.go b/bucket.go index 2f61fb06fb..9acf0ebbee 100644 --- a/bucket.go +++ b/bucket.go @@ -20,9 +20,9 @@ import ( "golang.org/x/net/context" - "github.com/googlecloudplatform/gcsfuse/ratelimit" "github.com/jacobsa/gcloud/gcs" "github.com/jacobsa/gcloud/gcs/gcscaching" + "github.com/jacobsa/ratelimit" "github.com/jacobsa/timeutil" ) diff --git a/ratelimit/throttle.go b/ratelimit/throttle.go deleted file mode 100644 index dab5858235..0000000000 --- a/ratelimit/throttle.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit - -import ( - "time" - - "github.com/jacobsa/syncutil" - - "golang.org/x/net/context" -) - -// A simple interface for limiting the rate of some event. Unlike TokenBucket, -// does not allow the user control over what time means. -// -// Safe for concurrent access. -type Throttle interface { - // Return the maximum number of tokens that can be requested in a call to - // Wait. - Capacity() (c uint64) - - // Acquire the given number of tokens from the underlying token bucket, then - // sleep until when it says to wake. If the context is cancelled before then, - // return early with an error. - // - // REQUIRES: tokens <= capacity - Wait(ctx context.Context, tokens uint64) (err error) -} - -// Create a throttle that uses time.Now to judge the time given to the -// underlying token bucket. -// -// Be aware of the monotonicity issues. In particular: -// -// * If the system clock jumps into the future, the throttle will let through -// a burst of traffic. -// -// * If the system clock jumps into the past, it will halt all traffic for -// a potentially very long amount of time. -// -func NewThrottle( - rateHz float64, - capacity uint64) (t Throttle) { - typed := &throttle{ - startTime: time.Now(), - bucket: NewTokenBucket(rateHz, capacity), - } - - typed.mu = syncutil.NewInvariantMutex(typed.checkInvariants) - - t = typed - return -} - -type throttle struct { - ///////////////////////// - // Constant data - ///////////////////////// - - startTime time.Time - - ///////////////////////// - // Mutable state - ///////////////////////// - - mu syncutil.InvariantMutex - - // INVARIANT: bucket.CheckInvariants() - // - // GUARDED_BY(mu) - bucket TokenBucket -} - -// LOCKS_REQUIRED(t.mu) -func (t *throttle) checkInvariants() { - // INVARIANT: bucket.CheckInvariants() - t.bucket.CheckInvariants() -} - -// LOCKS_EXCLUDED(t.mu) -func (t *throttle) Capacity() (c uint64) { - t.mu.Lock() - c = t.bucket.Capacity() - t.mu.Unlock() - - return -} - -// LOCKS_EXCLUDED(t.mu) -func (t *throttle) Wait( - ctx context.Context, - tokens uint64) (err error) { - now := MonotonicTime(time.Now().Sub(t.startTime)) - - t.mu.Lock() - sleepUntil := t.bucket.Remove(now, tokens) - t.mu.Unlock() - - select { - case <-ctx.Done(): - err = ctx.Err() - return - - case <-time.After(time.Duration(sleepUntil - now)): - return - } -} diff --git a/ratelimit/throttle_test.go b/ratelimit/throttle_test.go deleted file mode 100644 index 4236d41114..0000000000 --- a/ratelimit/throttle_test.go +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit_test - -import ( - cryptorand "crypto/rand" - "io" - "math/rand" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" - - "golang.org/x/net/context" - - "github.com/googlecloudplatform/gcsfuse/ratelimit" - . "github.com/jacobsa/oglematchers" - . "github.com/jacobsa/ogletest" -) - -func TestThrottle(t *testing.T) { RunTests(t) } - -//////////////////////////////////////////////////////////////////////// -// Helpers -//////////////////////////////////////////////////////////////////////// - -func makeSeed() (seed int64) { - var buf [8]byte - _, err := io.ReadFull(cryptorand.Reader, buf[:]) - if err != nil { - panic(err) - } - - seed = (int64(buf[0])>>1)<<56 | - int64(buf[1])<<48 | - int64(buf[2])<<40 | - int64(buf[3])<<32 | - int64(buf[4])<<24 | - int64(buf[5])<<16 | - int64(buf[6])<<8 | - int64(buf[7])<<0 - - return -} - -func processArrivals( - ctx context.Context, - throttle ratelimit.Throttle, - arrivalRateHz float64, - d time.Duration) (processed uint64) { - // Set up an independent source of randomness. - randSrc := rand.New(rand.NewSource(makeSeed())) - - // Tick into a channel at a steady rate, buffering over delays caused by the - // token bucket. - arrivalPeriod := time.Duration((1.0 / arrivalRateHz) * float64(time.Second)) - ticks := make(chan struct{}, 3*int(float64(d)/float64(arrivalPeriod))) - - go func() { - ticker := time.NewTicker(arrivalPeriod) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - select { - case ticks <- struct{}{}: - default: - panic("Buffer exceeded?") - } - } - } - }() - - // Simulate until we're supposed to stop. - for { - // Accumulate a few packets. - toAccumulate := uint64(randSrc.Int63n(5)) - - var accumulated uint64 - for accumulated < toAccumulate { - select { - case <-ctx.Done(): - return - - case <-ticks: - accumulated++ - } - } - - // Wait. - err := throttle.Wait(ctx, accumulated) - if err != nil { - return - } - - processed += accumulated - } -} - -//////////////////////////////////////////////////////////////////////// -// Boilerplate -//////////////////////////////////////////////////////////////////////// - -type ThrottleTest struct { -} - -func init() { RegisterTestSuite(&ThrottleTest{}) } - -//////////////////////////////////////////////////////////////////////// -// Tests -//////////////////////////////////////////////////////////////////////// - -func (t *ThrottleTest) IntegrationTest() { - runtime.GOMAXPROCS(runtime.NumCPU()) - const perCaseDuration = 1 * time.Second - - // Set up several test cases where we have N goroutines simulating arrival of - // packets at a given rate, asking a token bucket when to admit them. - testCases := []struct { - numActors int - arrivalRateHz float64 - limitRateHz float64 - }{ - // Single actor - {1, 150, 200}, - {1, 200, 200}, - {1, 250, 200}, - - // Multiple actors - {4, 150, 200}, - {4, 200, 200}, - {4, 250, 200}, - } - - // Run each test case. - for i, tc := range testCases { - // Create a throttle. - capacity, err := ratelimit.ChooseTokenBucketCapacity( - tc.limitRateHz, - perCaseDuration) - - AssertEq(nil, err) - - throttle := ratelimit.NewThrottle(tc.limitRateHz, capacity) - - // Start workers. - var wg sync.WaitGroup - var totalProcessed uint64 - - ctx, _ := context.WithDeadline( - context.Background(), - time.Now().Add(perCaseDuration)) - - for i := 0; i < tc.numActors; i++ { - wg.Add(1) - go func() { - defer wg.Done() - processed := processArrivals( - ctx, - throttle, - tc.arrivalRateHz/float64(tc.numActors), - perCaseDuration) - - atomic.AddUint64(&totalProcessed, processed) - }() - } - - // Wait for them all to finish. - wg.Wait() - - // We should have processed about the correct number of arrivals. - smallerRateHz := tc.arrivalRateHz - if smallerRateHz > tc.limitRateHz { - smallerRateHz = tc.limitRateHz - } - - expected := smallerRateHz * (float64(perCaseDuration) / float64(time.Second)) - ExpectThat( - totalProcessed, - AllOf( - GreaterThan(expected*0.90), - LessThan(expected*1.10)), - "Test case %d. expected: %f", - i, - expected) - } -} diff --git a/ratelimit/throttled_bucket.go b/ratelimit/throttled_bucket.go deleted file mode 100644 index 3816a60e22..0000000000 --- a/ratelimit/throttled_bucket.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit - -import ( - "io" - - "github.com/jacobsa/gcloud/gcs" - "golang.org/x/net/context" -) - -// Create a bucket that limits the rate at which it calls the wrapped bucket -// using opThrottle, and limits the bandwidth with which it reads from the -// wrapped bucket using egressThrottle. -func NewThrottledBucket( - opThrottle Throttle, - egressThrottle Throttle, - wrapped gcs.Bucket) (b gcs.Bucket) { - b = &throttledBucket{ - opThrottle: opThrottle, - egressThrottle: egressThrottle, - wrapped: wrapped, - } - - return -} - -//////////////////////////////////////////////////////////////////////// -// throttledBucket -//////////////////////////////////////////////////////////////////////// - -type throttledBucket struct { - opThrottle Throttle - egressThrottle Throttle - wrapped gcs.Bucket -} - -func (b *throttledBucket) Name() string { - return b.wrapped.Name() -} - -func (b *throttledBucket) NewReader( - ctx context.Context, - req *gcs.ReadObjectRequest) (rc io.ReadCloser, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - rc, err = b.wrapped.NewReader(ctx, req) - if err != nil { - return - } - - // Wrap the result in a throttled layer. - rc = &readerCloser{ - Reader: ThrottledReader(ctx, rc, b.egressThrottle), - Closer: rc, - } - - return -} - -func (b *throttledBucket) CreateObject( - ctx context.Context, - req *gcs.CreateObjectRequest) (o *gcs.Object, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - o, err = b.wrapped.CreateObject(ctx, req) - - return -} - -func (b *throttledBucket) CopyObject( - ctx context.Context, - req *gcs.CopyObjectRequest) (o *gcs.Object, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - o, err = b.wrapped.CopyObject(ctx, req) - - return -} - -func (b *throttledBucket) ComposeObjects( - ctx context.Context, - req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - o, err = b.wrapped.ComposeObjects(ctx, req) - - return -} - -func (b *throttledBucket) StatObject( - ctx context.Context, - req *gcs.StatObjectRequest) (o *gcs.Object, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - o, err = b.wrapped.StatObject(ctx, req) - - return -} - -func (b *throttledBucket) ListObjects( - ctx context.Context, - req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - listing, err = b.wrapped.ListObjects(ctx, req) - - return -} - -func (b *throttledBucket) UpdateObject( - ctx context.Context, - req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - o, err = b.wrapped.UpdateObject(ctx, req) - - return -} - -func (b *throttledBucket) DeleteObject( - ctx context.Context, - req *gcs.DeleteObjectRequest) (err error) { - // Wait for permission to call through. - err = b.opThrottle.Wait(ctx, 1) - if err != nil { - return - } - - // Call through. - err = b.wrapped.DeleteObject(ctx, req) - - return -} - -//////////////////////////////////////////////////////////////////////// -// readerCloser -//////////////////////////////////////////////////////////////////////// - -// An io.ReadCloser that forwards read requests to an io.Reader and close -// requests to an io.Closer. -type readerCloser struct { - Reader io.Reader - Closer io.Closer -} - -func (rc *readerCloser) Read(p []byte) (n int, err error) { - n, err = rc.Reader.Read(p) - return -} - -func (rc *readerCloser) Close() (err error) { - err = rc.Closer.Close() - return -} diff --git a/ratelimit/throttled_reader.go b/ratelimit/throttled_reader.go deleted file mode 100644 index 289ec050f8..0000000000 --- a/ratelimit/throttled_reader.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit - -import ( - "io" - - "golang.org/x/net/context" -) - -// Create a reader that limits the bandwidth of reads made from r according to -// the supplied throttler. Reads are assumed to be made under the supplied -// context. -func ThrottledReader( - ctx context.Context, - r io.Reader, - throttle Throttle) io.Reader { - return &throttledReader{ - ctx: ctx, - wrapped: r, - throttle: throttle, - } -} - -type throttledReader struct { - ctx context.Context - wrapped io.Reader - throttle Throttle -} - -func (tr *throttledReader) Read(p []byte) (n int, err error) { - // We can't serve a read larger than the throttle's capacity. - if uint64(len(p)) > tr.throttle.Capacity() { - p = p[:int(tr.throttle.Capacity())] - } - - // Wait for permission to continue. - err = tr.throttle.Wait(tr.ctx, uint64(len(p))) - if err != nil { - return - } - - // Serve the full amount we acquired from the throttle (unless we hit an - // early error, including EOF). - for len(p) > 0 && err == nil { - var tmp int - tmp, err = tr.wrapped.Read(p) - - n += tmp - p = p[tmp:] - } - - return -} diff --git a/ratelimit/throttled_reader_test.go b/ratelimit/throttled_reader_test.go deleted file mode 100644 index 288c76da86..0000000000 --- a/ratelimit/throttled_reader_test.go +++ /dev/null @@ -1,331 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit_test - -import ( - "errors" - "io" - "testing" - - "golang.org/x/net/context" - - "github.com/googlecloudplatform/gcsfuse/ratelimit" - . "github.com/jacobsa/ogletest" -) - -func TestThrottledReader(t *testing.T) { RunTests(t) } - -//////////////////////////////////////////////////////////////////////// -// Helpers -//////////////////////////////////////////////////////////////////////// - -// An io.Reader that defers to a function. -type funcReader struct { - f func([]byte) (int, error) -} - -func (fr *funcReader) Read(p []byte) (n int, err error) { - n, err = fr.f(p) - return -} - -// A throttler that defers to a function. -type funcThrottle struct { - f func(context.Context, uint64) error -} - -func (ft *funcThrottle) Capacity() (c uint64) { - return 1024 -} - -func (ft *funcThrottle) Wait( - ctx context.Context, - tokens uint64) (err error) { - err = ft.f(ctx, tokens) - return -} - -//////////////////////////////////////////////////////////////////////// -// Boilerplate -//////////////////////////////////////////////////////////////////////// - -type ThrottledReaderTest struct { - ctx context.Context - - wrapped funcReader - throttle funcThrottle - - reader io.Reader -} - -var _ SetUpInterface = &ThrottledReaderTest{} - -func init() { RegisterTestSuite(&ThrottledReaderTest{}) } - -func (t *ThrottledReaderTest) SetUp(ti *TestInfo) { - t.ctx = ti.Ctx - - // Set up the default throttle function. - t.throttle.f = func(ctx context.Context, tokens uint64) (err error) { - return - } - - // Set up the reader. - t.reader = ratelimit.ThrottledReader(t.ctx, &t.wrapped, &t.throttle) -} - -//////////////////////////////////////////////////////////////////////// -// Tests -//////////////////////////////////////////////////////////////////////// - -func (t *ThrottledReaderTest) CallsThrottle() { - const readSize = 17 - AssertLe(readSize, t.throttle.Capacity()) - - // Throttle - var throttleCalled bool - t.throttle.f = func(ctx context.Context, tokens uint64) (err error) { - AssertFalse(throttleCalled) - throttleCalled = true - - AssertEq(t.ctx, ctx) - AssertEq(readSize, tokens) - - err = errors.New("") - return - } - - // Call - t.reader.Read(make([]byte, readSize)) - - ExpectTrue(throttleCalled) -} - -func (t *ThrottledReaderTest) ThrottleReturnsError() { - // Throttle - expectedErr := errors.New("taco") - t.throttle.f = func(ctx context.Context, tokens uint64) (err error) { - err = expectedErr - return - } - - // Call - n, err := t.reader.Read(make([]byte, 1)) - - ExpectEq(0, n) - ExpectEq(expectedErr, err) -} - -func (t *ThrottledReaderTest) CallsWrapped() { - buf := make([]byte, 16) - AssertLe(len(buf), t.throttle.Capacity()) - - // Wrapped - var readCalled bool - t.wrapped.f = func(p []byte) (n int, err error) { - AssertFalse(readCalled) - readCalled = true - - AssertEq(&buf[0], &p[0]) - AssertEq(len(buf), len(p)) - - err = errors.New("") - return - } - - // Call - t.reader.Read(buf) - - ExpectTrue(readCalled) -} - -func (t *ThrottledReaderTest) WrappedReturnsError() { - // Wrapped - expectedErr := errors.New("taco") - t.wrapped.f = func(p []byte) (n int, err error) { - n = 11 - err = expectedErr - return - } - - // Call - n, err := t.reader.Read(make([]byte, 16)) - - ExpectEq(11, n) - ExpectEq(expectedErr, err) -} - -func (t *ThrottledReaderTest) WrappedReturnsEOF() { - // Wrapped - t.wrapped.f = func(p []byte) (n int, err error) { - n = 11 - err = io.EOF - return - } - - // Call - n, err := t.reader.Read(make([]byte, 16)) - - ExpectEq(11, n) - ExpectEq(io.EOF, err) -} - -func (t *ThrottledReaderTest) WrappedReturnsFullRead() { - const readSize = 17 - AssertLe(readSize, t.throttle.Capacity()) - - // Wrapped - t.wrapped.f = func(p []byte) (n int, err error) { - n = len(p) - return - } - - // Call - n, err := t.reader.Read(make([]byte, readSize)) - - ExpectEq(readSize, n) - ExpectEq(nil, err) -} - -func (t *ThrottledReaderTest) WrappedReturnsShortRead_CallsAgain() { - buf := make([]byte, 16) - AssertLe(len(buf), t.throttle.Capacity()) - - // Wrapped - var callCount int - t.wrapped.f = func(p []byte) (n int, err error) { - AssertLt(callCount, 2) - switch callCount { - case 0: - callCount++ - n = 2 - - case 1: - callCount++ - AssertEq(&buf[2], &p[0]) - AssertEq(len(buf)-2, len(p)) - err = errors.New("") - } - - return - } - - // Call - t.reader.Read(buf) - - ExpectEq(2, callCount) -} - -func (t *ThrottledReaderTest) WrappedReturnsShortRead_SecondReturnsError() { - // Wrapped - var callCount int - expectedErr := errors.New("taco") - - t.wrapped.f = func(p []byte) (n int, err error) { - AssertLt(callCount, 2) - switch callCount { - case 0: - callCount++ - n = 2 - - case 1: - callCount++ - n = 11 - err = expectedErr - } - - return - } - - // Call - n, err := t.reader.Read(make([]byte, 16)) - - ExpectEq(2+11, n) - ExpectEq(expectedErr, err) -} - -func (t *ThrottledReaderTest) WrappedReturnsShortRead_SecondReturnsEOF() { - // Wrapped - var callCount int - t.wrapped.f = func(p []byte) (n int, err error) { - AssertLt(callCount, 2) - switch callCount { - case 0: - callCount++ - n = 2 - - case 1: - callCount++ - n = 11 - err = io.EOF - } - - return - } - - // Call - n, err := t.reader.Read(make([]byte, 16)) - - ExpectEq(2+11, n) - ExpectEq(io.EOF, err) -} - -func (t *ThrottledReaderTest) WrappedReturnsShortRead_SecondSucceedsInFull() { - // Wrapped - var callCount int - t.wrapped.f = func(p []byte) (n int, err error) { - AssertLt(callCount, 2) - switch callCount { - case 0: - callCount++ - n = 2 - - case 1: - callCount++ - n = len(p) - } - - return - } - - // Call - n, err := t.reader.Read(make([]byte, 16)) - - ExpectEq(16, n) - ExpectEq(nil, err) -} - -func (t *ThrottledReaderTest) ReadSizeIsAboveThrottleCapacity() { - buf := make([]byte, 2048) - AssertGt(len(buf), t.throttle.Capacity()) - - // Wrapped - var readCalled bool - t.wrapped.f = func(p []byte) (n int, err error) { - AssertFalse(readCalled) - readCalled = true - - AssertEq(&buf[0], &p[0]) - ExpectEq(t.throttle.Capacity(), len(p)) - - err = errors.New("") - return - } - - // Call - t.reader.Read(buf) - - ExpectTrue(readCalled) -} diff --git a/ratelimit/token_bucket.go b/ratelimit/token_bucket.go deleted file mode 100644 index 3dd936e84a..0000000000 --- a/ratelimit/token_bucket.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit - -import ( - "fmt" - "math" - "time" -) - -// A measurement of the amount of real time since some fixed epoch. -// -// TokenBucket doesn't care about calendar time, time of day, etc. -// Unfortunately time.Time takes these things into account, and in particular -// time.Now() is not monotonic -- it may jump arbitrarily far into the future -// or past when the system's wall time is changed. -// -// Instead we reckon in terms of a monotonic measurement of time elapsed since -// the bucket was initialized, and leave it up to the user to provide this. See -// SystemTimeTokenBucket for a convenience in doing so. -type MonotonicTime time.Duration - -// A bucket of tokens that refills at a specific rate up to a particular -// capacity. Users can remove tokens in sizes up to that capacity, can are told -// how long they should wait before proceeding. -// -// If users cooperate by waiting to take whatever action they are rate limiting -// as told by the token bucket, the overall action rate will be limited to the -// token bucket's fill rate. -// -// Not safe for concurrent access; requires external synchronization. -// -// Cf. http://en.wikipedia.org/wiki/Token_bucket -type TokenBucket interface { - CheckInvariants() - - // Return the maximum number of tokens that the bucket can hold. - Capacity() (c uint64) - - // Remove the specified number of tokens from the token bucket at the given - // time. The user should wait until sleepUntil before proceeding in order to - // obey the rate limit. - // - // REQUIRES: tokens <= Capacity() - Remove( - now MonotonicTime, - tokens uint64) (sleepUntil MonotonicTime) -} - -// Choose a token bucket capacity that ensures that the action gated by the -// token bucket will be limited to within a few percent of `rateHz * window` -// for any window of the given size. -// -// This is not be possible for all rates and windows. In that case, an error -// will be returned. -func ChooseTokenBucketCapacity( - rateHz float64, - window time.Duration) (capacity uint64, err error) { - // Check that the input is reasonable. - if rateHz <= 0 || math.IsInf(rateHz, 0) { - err = fmt.Errorf("Illegal rate: %f", rateHz) - return - } - - if window <= 0 { - err = fmt.Errorf("Illegal window: %v", window) - return - } - - // We cannot help but allow the rate to exceed the configured maximum by some - // factor in an arbitrary window, no matter how small we scale the max - // accumulated credit -- the bucket may be full at the start of the window, - // be immediately exhausted, then be repeatedly exhausted just before filling - // throughout the window. - // - // For example: let the window W = 10 seconds, and the bandwidth B = 20 MiB/s. - // Set the max accumulated credit C = W*B/2 = 100 MiB. Then this - // sequence of events is allowed: - // - // * T=0: Allow through 100 MiB. - // * T=4.999999: Allow through nearly 100 MiB. - // * T=9.999999: Allow through nearly 100 MiB. - // - // Therefore we exceed the allowed bytes for the window by nearly 50%. Note - // however that this trend cannot continue into the next window, so this must - // be a transient spike. - // - // In general if we set C <= W*B/N, then we're off by no more than a factor - // of (N+1)/N within any window of size W. - // - // Choose a reasonable N. - const N = 50 - - capacityFloat := math.Floor(rateHz * (float64(window) / float64(time.Second))) - if !(capacityFloat > 0 && capacityFloat < float64(math.MaxUint64)) { - err = fmt.Errorf( - "Can't use a token bucket to limit to %f Hz over a window of %v "+ - "(result is a capacity of %f)", - rateHz, - window, - capacityFloat) - - return - } - - capacity = uint64(capacityFloat) - if capacity == 0 { - panic(fmt.Sprintf( - "Calculated a zero capacity for inputs %f, %v. Float version: %f", - rateHz, - window, - capacityFloat)) - } - - return -} - -// Create a token bucket that fills at the given rate in tokens per second, up -// to the given capacity. ChooseTokenBucketCapacity may help you decide on a -// capacity. -// -// REQUIRES: rateHz > 0 -// REQUIRES: capacity > 0 -func NewTokenBucket( - rateHz float64, - capacity uint64) (tb TokenBucket) { - tb = &tokenBucket{ - rateHz: rateHz, - capacity: capacity, - } - - return -} - -//////////////////////////////////////////////////////////////////////// -// Implementation -//////////////////////////////////////////////////////////////////////// - -type tokenBucket struct { - ///////////////////////// - // Constant data - ///////////////////////// - - rateHz float64 - capacity uint64 - - ///////////////////////// - // Mutable state - ///////////////////////// - - // The time that we last updated the bucket's credit. Only moves forward. - creditTime MonotonicTime - - // The number of credits that were available at creditTime. - // - // INVARIANT: credit <= float64(capacity) - credit float64 -} - -func (tb *tokenBucket) CheckInvariants() { - // INVARIANT: credit <= float64(capacity) - if !(tb.credit <= float64(tb.capacity)) { - panic(fmt.Sprintf( - "Illegal credit: %f, capacity: %d", - tb.credit, - tb.capacity)) - } -} - -func (tb *tokenBucket) Capacity() (c uint64) { - c = tb.capacity - return -} - -func (tb *tokenBucket) Remove( - now MonotonicTime, - tokens uint64) (sleepUntil MonotonicTime) { - if tokens > tb.capacity { - panic(fmt.Sprintf( - "Token count %d out of range; capacity is %d", - tokens, - tb.capacity)) - } - - // First play the clock forward until now, crediting any tokens that have - // accumulated in the meantime, up to the bucket's capacity. - if tb.creditTime < now { - diff := now - tb.creditTime - - // Don't forget to cap at the capacity. - tb.credit += tb.rateHz * float64(diff) / float64(time.Second) - if !(tb.credit <= float64(tb.capacity)) { - tb.credit = float64(tb.capacity) - } - - tb.creditTime = now - } - - // Deduct the requested tokens. The user will need to wait until the credit - // makes it back to zero, which is when it would have otherwise made it to - // `tokens`. - tb.credit -= float64(tokens) - - sleepUntil = tb.creditTime - if tb.credit < 0 { - seconds := -tb.credit / tb.rateHz - sleepUntil = tb.creditTime + MonotonicTime(seconds*float64(time.Second)) - } - - return -} diff --git a/ratelimit/token_bucket_test.go b/ratelimit/token_bucket_test.go deleted file mode 100644 index 3cdb02f467..0000000000 --- a/ratelimit/token_bucket_test.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ratelimit_test - -import ( - "testing" - "time" - - "github.com/googlecloudplatform/gcsfuse/ratelimit" - . "github.com/jacobsa/ogletest" -) - -func TestTokenBucket(t *testing.T) { RunTests(t) } - -//////////////////////////////////////////////////////////////////////// -// Boilerplate -//////////////////////////////////////////////////////////////////////// - -type TokenBucketTest struct { -} - -func init() { RegisterTestSuite(&TokenBucketTest{}) } - -//////////////////////////////////////////////////////////////////////// -// Tests -//////////////////////////////////////////////////////////////////////// - -func (t *TokenBucketTest) CarefulAccounting() { - // Set up a bucket that ticks at the resolution of time.Duration (1 ns) and - // has a depth of four. - AssertEq(1, time.Nanosecond) - tb := ratelimit.NewTokenBucket(1e9, 4) - - // The token starts empty, so initially we should be required to wait one - // tick per token. - AssertEq(2, tb.Remove(0, 2)) - AssertEq(3, tb.Remove(2, 1)) - - // After the bucket recharges fully, we should be allowed to claim up to its - // capacity immediately. - AssertEq(4, tb.Remove(4, 1)) - AssertEq(8, tb.Remove(8, 4)) - - // When the bucket fills, it stays full and doesn't let you take more than - // its capacity immediately. - AssertEq(100, tb.Remove(100, 4)) - AssertEq(101, tb.Remove(100, 1)) - AssertEq(103, tb.Remove(102, 2)) - - // Taking capacity "concurrently" works fine. - AssertEq(200, tb.Remove(200, 1)) - AssertEq(200, tb.Remove(200, 3)) - AssertEq(201, tb.Remove(200, 1)) - - // Attempting to take capacity in the past doesn't screw up the accounting. - AssertEq(300, tb.Remove(300, 1)) - AssertEq(300, tb.Remove(0, 3)) - AssertEq(302, tb.Remove(301, 2)) -}