Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
make nonblocking semaphore not blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Mar 15, 2021
1 parent 8b71d8f commit b7b525d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 25 deletions.
Expand Up @@ -47,6 +47,7 @@ private static void checkNotNegative(long l) {
this.limit = permits;
}

@Override
public synchronized void release(long permits) {
checkNotNegative(permits);

Expand All @@ -59,6 +60,7 @@ public synchronized void release(long permits) {
notifyAll();
}

@Override
public synchronized boolean acquire(long permits) {
checkNotNegative(permits);

Expand All @@ -78,6 +80,7 @@ public synchronized boolean acquire(long permits) {
return true;
}

@Override
public synchronized boolean acquirePartial(long permits) {
checkNotNegative(permits);

Expand All @@ -100,6 +103,7 @@ public synchronized boolean acquirePartial(long permits) {
return true;
}

@Override
public synchronized void reducePermits(long reduction) {
checkNotNegative(reduction);
checkNotNegative(limit - reduction);
Expand Down
14 changes: 12 additions & 2 deletions gax/src/main/java/com/google/api/gax/batching/FlowController.java
Expand Up @@ -328,12 +328,22 @@ public Long getMinRequestBytesLimit() {
@InternalApi("For google-cloud-java client use only")
@Nullable
public Long getCurrentElementCountLimit() {
return currentElementCountLimit;
Long ret;
synchronized (updateLimitLock) {
// make sure the returned value is not stale
ret = currentElementCountLimit;
}
return ret;
}

@InternalApi("For google-cloud-java client use only")
@Nullable
public Long getCurrentRequestBytesLimit() {
return currentRequestBytesLimit;
Long ret;
synchronized (updateLimitLock) {
// make sure the returned value is not stale
ret = currentRequestBytesLimit;
}
return ret;
}
}
Expand Up @@ -31,59 +31,78 @@
package com.google.api.gax.batching;

import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicLong;

/** A {@link Semaphore64} that immediately returns with failure if permits are not available. */
class NonBlockingSemaphore implements Semaphore64 {
private long currentPermits;
private long limit;
private AtomicLong currentPermits;
private AtomicLong limit;

private static void checkNotNegative(long l) {
Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l);
}

NonBlockingSemaphore(long permits) {
checkNotNegative(permits);
this.currentPermits = permits;
this.limit = permits;
this.currentPermits = new AtomicLong(permits);
this.limit = new AtomicLong(permits);
}

public synchronized void release(long permits) {
@Override
public void release(long permits) {
checkNotNegative(permits);

currentPermits += permits;
long diff = permits + currentPermits.get() - limit.get();
currentPermits.addAndGet(permits);
// If more permits are returned than what was originally set, we need to add these extra
// permits to the limit
if (currentPermits > limit) {
limit = currentPermits;
if (diff > 0) {
limit.addAndGet(diff);
}
}

public synchronized boolean acquire(long permits) {
@Override
public boolean acquire(long permits) {
checkNotNegative(permits);
if (currentPermits < permits) {
return false;
while (true) {
long old = currentPermits.get();
if (old < permits) {
return false;
}
if (currentPermits.compareAndSet(old, old - permits)) {
return true;
}
}
currentPermits -= permits;
return true;
}

public synchronized boolean acquirePartial(long permits) {
@Override
public boolean acquirePartial(long permits) {
checkNotNegative(permits);

// Give out permits as long as currentPermits is greater or equal to max of (limit, permits).
// currentPermits could be negative after the permits are given out, which marks how many
// permits are owed.
if (currentPermits < Math.min(limit, permits)) {
return false;
while (true) {
long old = currentPermits.get();
if (old < Math.min(limit.get(), permits)) {
return false;
}
if (currentPermits.compareAndSet(old, old - permits)) {
return true;
}
}
currentPermits -= permits;
return true;
}

public synchronized void reducePermits(long reduction) {
@Override
public void reducePermits(long reduction) {
checkNotNegative(reduction);
checkNotNegative(limit - reduction);
limit -= reduction;
currentPermits -= reduction;

while (true) {
long oldLimit = limit.get();
checkNotNegative(oldLimit - reduction);
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
currentPermits.addAndGet(-reduction);
return;
}
}
}
}

0 comments on commit b7b525d

Please sign in to comment.