Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
move limit to semaphore class
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Mar 16, 2021
1 parent c1a0543 commit 96ef367
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 37 deletions.
Expand Up @@ -110,4 +110,9 @@ public synchronized void reducePermits(long reduction) {
currentPermits -= reduction;
limit -= reduction;
}

@Override
public synchronized long getLimit() {
return limit;
}
}
54 changes: 20 additions & 34 deletions gax/src/main/java/com/google/api/gax/batching/FlowController.java
Expand Up @@ -146,8 +146,6 @@ public enum LimitExceededBehavior {
@Nullable private final Long maxRequestBytesLimit;
@Nullable private final Long minElementCountLimit;
@Nullable private final Long minRequestBytesLimit;
@Nullable private Long currentElementCountLimit;
@Nullable private Long currentRequestBytesLimit;
private final LimitExceededBehavior limitExceededBehavior;
private final Object updateLimitLock;

Expand All @@ -171,8 +169,6 @@ public FlowController(DynamicFlowControlSettings settings) {
this.maxRequestBytesLimit = null;
this.minElementCountLimit = null;
this.minRequestBytesLimit = null;
this.currentElementCountLimit = null;
this.currentRequestBytesLimit = null;

this.outstandingElementCount = null;
this.outstandingByteCount = null;
Expand All @@ -183,24 +179,24 @@ public FlowController(DynamicFlowControlSettings settings) {
}
this.maxElementCountLimit = settings.getMaxOutstandingElementCount();
this.minElementCountLimit = settings.getMinOutstandingElementCount();
this.currentElementCountLimit = settings.getInitialOutstandingElementCount();
if (currentElementCountLimit == null) {
Long initialElementCountLimit = settings.getInitialOutstandingElementCount();
if (initialElementCountLimit == null) {
outstandingElementCount = null;
} else if (settings.getLimitExceededBehavior() == FlowController.LimitExceededBehavior.Block) {
outstandingElementCount = new BlockingSemaphore(currentElementCountLimit);
outstandingElementCount = new BlockingSemaphore(initialElementCountLimit);
} else {
outstandingElementCount = new NonBlockingSemaphore(currentElementCountLimit);
outstandingElementCount = new NonBlockingSemaphore(initialElementCountLimit);
}

this.maxRequestBytesLimit = settings.getMaxOutstandingRequestBytes();
this.minRequestBytesLimit = settings.getMinOutstandingRequestBytes();
this.currentRequestBytesLimit = settings.getInitialOutstandingRequestBytes();
if (currentRequestBytesLimit == null) {
Long initialRequestBytesLimit = settings.getInitialOutstandingRequestBytes();
if (initialRequestBytesLimit == null) {
outstandingByteCount = null;
} else if (settings.getLimitExceededBehavior() == FlowController.LimitExceededBehavior.Block) {
outstandingByteCount = new BlockingSemaphore(currentRequestBytesLimit);
outstandingByteCount = new BlockingSemaphore(initialRequestBytesLimit);
} else {
outstandingByteCount = new NonBlockingSemaphore(currentRequestBytesLimit);
outstandingByteCount = new NonBlockingSemaphore(initialRequestBytesLimit);
}
}

Expand All @@ -210,7 +206,7 @@ public void reserve(long elements, long bytes) throws FlowControlException {

if (outstandingElementCount != null) {
if (!outstandingElementCount.acquire(elements)) {
throw new MaxOutstandingElementCountReachedException(currentElementCountLimit);
throw new MaxOutstandingElementCountReachedException(outstandingElementCount.getLimit());
}
}

Expand All @@ -221,7 +217,7 @@ public void reserve(long elements, long bytes) throws FlowControlException {
if (outstandingElementCount != null) {
outstandingElementCount.release(elements);
}
throw new MaxOutstandingRequestBytesReachedException(currentRequestBytesLimit);
throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getLimit());
}
}
}
Expand All @@ -248,14 +244,14 @@ public void increaseThresholds(long elementSteps, long byteSteps) {
Preconditions.checkArgument(byteSteps >= 0);
synchronized (updateLimitLock) {
if (outstandingElementCount != null) {
long actualStep = Math.min(elementSteps, maxElementCountLimit - currentElementCountLimit);
currentElementCountLimit += actualStep;
long actualStep =
Math.min(elementSteps, maxElementCountLimit - outstandingElementCount.getLimit());
outstandingElementCount.release(actualStep);
}

if (outstandingByteCount != null) {
long actualStep = Math.min(byteSteps, maxRequestBytesLimit - currentRequestBytesLimit);
currentRequestBytesLimit += actualStep;
long actualStep =
Math.min(byteSteps, maxRequestBytesLimit - outstandingByteCount.getLimit());
outstandingByteCount.release(actualStep);
}
}
Expand All @@ -271,14 +267,14 @@ public void decreaseThresholds(long elementSteps, long byteSteps) {
Preconditions.checkArgument(byteSteps >= 0);
synchronized (updateLimitLock) {
if (outstandingElementCount != null) {
long actualStep = Math.min(elementSteps, currentElementCountLimit - minElementCountLimit);
currentElementCountLimit -= actualStep;
long actualStep =
Math.min(elementSteps, outstandingElementCount.getLimit() - minElementCountLimit);
outstandingElementCount.reducePermits(actualStep);
}

if (outstandingByteCount != null) {
long actualStep = Math.min(byteSteps, currentRequestBytesLimit - minRequestBytesLimit);
currentRequestBytesLimit -= actualStep;
long actualStep =
Math.min(byteSteps, outstandingByteCount.getLimit() - minRequestBytesLimit);
outstandingByteCount.reducePermits(actualStep);
}
}
Expand Down Expand Up @@ -328,22 +324,12 @@ public Long getMinRequestBytesLimit() {
@InternalApi("For google-cloud-java client use only")
@Nullable
public Long getCurrentElementCountLimit() {
Long ret;
synchronized (updateLimitLock) {
// make sure the returned value is not stale
ret = currentElementCountLimit;
}
return ret;
return outstandingElementCount == null ? null : outstandingElementCount.getLimit();
}

@InternalApi("For google-cloud-java client use only")
@Nullable
public Long getCurrentRequestBytesLimit() {
Long ret;
synchronized (updateLimitLock) {
// make sure the returned value is not stale
ret = currentRequestBytesLimit;
}
return ret;
return outstandingByteCount == null ? null : outstandingByteCount.getLimit();
}
}
Expand Up @@ -51,7 +51,6 @@ private static void checkNotNegative(long l) {
@Override
public void release(long permits) {
checkNotNegative(permits);

long diff = permits + currentPermits.get() - limit.get();
currentPermits.addAndGet(permits);
// If more permits are returned than what was originally set, we need to add these extra
Expand Down Expand Up @@ -105,4 +104,9 @@ public void reducePermits(long reduction) {
}
}
}

@Override
public long getLimit() {
return limit.get();
}
}
Expand Up @@ -41,11 +41,13 @@ interface Semaphore64 {

void release(long permits);

void reducePermits(long reduction);

/**
* When try to acquire more permits than what's allowed, acquiring the limit instead of what's
* asked.
*/
boolean acquirePartial(long permits);

void reducePermits(long reduction);

long getLimit();
}
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.batching;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -96,6 +97,7 @@ public void testReducePermitsNonBlocking() {
semaphore.reducePermits(3);
assertFalse(semaphore.acquire(3));
assertTrue(semaphore.acquire(2));
assertEquals(2, semaphore.getLimit());
}

@Test(timeout = 500)
Expand All @@ -120,6 +122,8 @@ public void run() {

semaphore.release(1);
t.join();

assertEquals(1, semaphore.getLimit());
}

@Test
Expand All @@ -130,6 +134,8 @@ public void testAcquirePartialNonBlocking() {
semaphore.release(6);
assertTrue(semaphore.acquire(1));
assertFalse(semaphore.acquirePartial(6));
// limit should still be 5
assertEquals(5, semaphore.getLimit());
}

@Test(timeout = 500)
Expand Down Expand Up @@ -164,5 +170,7 @@ public void run() {
// wait fo thread to start
Thread.sleep(100);
assertTrue(t2.isAlive());
// limit should still be 5 and get limit should not block
assertEquals(5, semaphore.getLimit());
}
}

0 comments on commit 96ef367

Please sign in to comment.