Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
fix: fix flaky tests and non blocking semaphore (#1365)
Browse files Browse the repository at this point in the history
* fix: fix race condition in non blocking semaphore

* update comment
  • Loading branch information
mutianf committed May 4, 2021
1 parent 3c45191 commit fc8e520
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 66 deletions.
Expand Up @@ -35,7 +35,7 @@

/** A {@link Semaphore64} that immediately returns with failure if permits are not available. */
class NonBlockingSemaphore implements Semaphore64 {
private AtomicLong availablePermits;
private AtomicLong acquiredPermits;
private AtomicLong limit;

private static void checkNotNegative(long l) {
Expand All @@ -44,17 +44,18 @@ private static void checkNotNegative(long l) {

NonBlockingSemaphore(long permits) {
checkNotNegative(permits);
this.availablePermits = new AtomicLong(permits);
this.acquiredPermits = new AtomicLong(0);
this.limit = new AtomicLong(permits);
}

@Override
public void release(long permits) {
checkNotNegative(permits);
while (true) {
long old = availablePermits.get();
long old = acquiredPermits.get();
// TODO: throw exceptions when the permits overflow
if (availablePermits.compareAndSet(old, Math.min(old + permits, limit.get()))) {
long newAcquired = Math.max(0, old - permits);
if (acquiredPermits.compareAndSet(old, newAcquired)) {
return;
}
}
Expand All @@ -64,11 +65,11 @@ public void release(long permits) {
public boolean acquire(long permits) {
checkNotNegative(permits);
while (true) {
long old = availablePermits.get();
if (old < permits) {
long old = acquiredPermits.get();
if (old + permits > limit.get()) {
return false;
}
if (availablePermits.compareAndSet(old, old - permits)) {
if (acquiredPermits.compareAndSet(old, old + permits)) {
return true;
}
}
Expand All @@ -79,13 +80,13 @@ public boolean acquirePartial(long permits) {
checkNotNegative(permits);
// To allow individual oversized requests to be sent, clamp the requested permits to the maximum
// limit. This will allow individual large requests to be sent. Please note that this behavior
// will result in availablePermits going negative.
// will result in acquiredPermits going over limit.
while (true) {
long old = availablePermits.get();
if (old < Math.min(limit.get(), permits)) {
long old = acquiredPermits.get();
if (old + permits > limit.get() && old > 0) {
return false;
}
if (availablePermits.compareAndSet(old, old - permits)) {
if (acquiredPermits.compareAndSet(old, old + permits)) {
return true;
}
}
Expand All @@ -94,7 +95,6 @@ public boolean acquirePartial(long permits) {
@Override
public void increasePermitLimit(long permits) {
checkNotNegative(permits);
availablePermits.addAndGet(permits);
limit.addAndGet(permits);
}

Expand All @@ -106,7 +106,6 @@ public void reducePermitLimit(long reduction) {
long oldLimit = limit.get();
Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow");
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
availablePermits.addAndGet(-reduction);
return;
}
}
Expand Down
Expand Up @@ -36,8 +36,6 @@

import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -71,27 +69,14 @@ public void testCreateEvent() {
}

@Test
public void testGetLastEvent() throws InterruptedException {
final FlowControlEventStats stats = new FlowControlEventStats();
final long currentTime = System.currentTimeMillis();
public void testGetLastEvent() {
FlowControlEventStats stats = new FlowControlEventStats();
long currentTime = System.currentTimeMillis();

List<Thread> threads = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
final int timeElapsed = i;
Thread t =
new Thread() {
@Override
public void run() {
stats.recordFlowControlEvent(
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
}
};
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join(10);
int timeElapsed = i;
stats.recordFlowControlEvent(
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
}

assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());
Expand Down
Expand Up @@ -44,13 +44,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -500,7 +500,6 @@ private void testRejectedReserveRelease(
}

flowController.release(1, 1);

flowController.reserve(maxElementCount, maxNumBytes);
flowController.release(maxElementCount, maxNumBytes);
}
Expand All @@ -523,11 +522,11 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
final AtomicInteger totalDecreased = new AtomicInteger(0);
final AtomicInteger releasedCounter = new AtomicInteger(0);

List<Thread> reserveThreads =
List<Future> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Thread t : reserveThreads) {
t.join(200);
flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
for (Future t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
assertTrue(totalIncreased.get() > 0);
Expand All @@ -539,9 +538,6 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
testBlockingReserveRelease(flowController, 0, expectedValue);
}

// This test is very flaky. Remove @Ignore once https://github.com/googleapis/gax-java/issues/1359
// is fixed.
@Ignore
@Test
public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
int initialValue = 5000;
Expand All @@ -559,11 +555,11 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
AtomicInteger totalIncreased = new AtomicInteger(0);
AtomicInteger totalDecreased = new AtomicInteger(0);
AtomicInteger releasedCounter = new AtomicInteger(0);
List<Thread> reserveThreads =
List<Future> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Thread t : reserveThreads) {
t.join(200);
for (Future t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
assertTrue(totalIncreased.get() > 0);
Expand Down Expand Up @@ -698,8 +694,7 @@ public void run() {
};
// blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked
flowController.reserve(5, 1);
ExecutorService executor = Executors.newCachedThreadPool();
Future<?> finished1 = executor.submit(runnable);
Future<?> finished1 = Executors.newSingleThreadExecutor().submit(runnable);
try {
finished1.get(50, TimeUnit.MILLISECONDS);
fail("reserve should block");
Expand All @@ -722,7 +717,7 @@ public void run() {

// Similar to blocked by element, test blocking by bytes.
flowController.reserve(1, 5);
Future<?> finished2 = executor.submit(runnable);
Future<?> finished2 = Executors.newSingleThreadExecutor().submit(runnable);
try {
finished2.get(50, TimeUnit.MILLISECONDS);
fail("reserve should block");
Expand All @@ -739,15 +734,15 @@ public void run() {
.isAtLeast(currentTime);
}

private List<Thread> testConcurrentUpdates(
private List<Future> testConcurrentUpdates(
final FlowController flowController,
final int increaseStepRange,
final int decreaseStepRange,
final int reserve,
final AtomicInteger totalIncreased,
final AtomicInteger totalDecreased,
final AtomicInteger releasedCounter)
throws InterruptedException {
throws InterruptedException, TimeoutException, ExecutionException {
final Random random = new Random();
Runnable increaseRunnable =
new Runnable() {
Expand Down Expand Up @@ -779,22 +774,19 @@ public void run() {
}
}
};
List<Thread> updateThreads = new ArrayList<>();
List<Thread> reserveReleaseThreads = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Thread increase = new Thread(increaseRunnable);
Thread decrease = new Thread(decreaseRunnable);
Thread reserveRelease = new Thread(reserveReleaseRunnable);
updateThreads.add(increase);
updateThreads.add(decrease);
reserveReleaseThreads.add(reserveRelease);
increase.start();
decrease.start();
reserveRelease.start();
List<Future> updateFuture = new ArrayList<>();
List<Future> reserveReleaseFuture = new ArrayList<>();
ExecutorService executors = Executors.newFixedThreadPool(10);
ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
updateFuture.add(executors.submit(increaseRunnable));
updateFuture.add(executors.submit(decreaseRunnable));
reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
}
for (Thread t : updateThreads) {
t.join(10);
for (Future t : updateFuture) {
t.get(50, TimeUnit.MILLISECONDS);
}
return reserveReleaseThreads;
executors.shutdown();
return reserveReleaseFuture;
}
}

0 comments on commit fc8e520

Please sign in to comment.