Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

fix: fix flaky tests and non blocking semaphore #1365

Merged
merged 3 commits into from May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,7 +35,7 @@

/** A {@link Semaphore64} that immediately returns with failure if permits are not available. */
class NonBlockingSemaphore implements Semaphore64 {
private AtomicLong availablePermits;
private AtomicLong acquiredPermits;
private AtomicLong limit;

private static void checkNotNegative(long l) {
Expand All @@ -44,17 +44,18 @@ private static void checkNotNegative(long l) {

NonBlockingSemaphore(long permits) {
checkNotNegative(permits);
this.availablePermits = new AtomicLong(permits);
this.acquiredPermits = new AtomicLong(0);
this.limit = new AtomicLong(permits);
}

@Override
public void release(long permits) {
checkNotNegative(permits);
while (true) {
long old = availablePermits.get();
long old = acquiredPermits.get();
// TODO: throw exceptions when the permits overflow
if (availablePermits.compareAndSet(old, Math.min(old + permits, limit.get()))) {
long newAcquired = Math.max(0, old - permits);
if (acquiredPermits.compareAndSet(old, newAcquired)) {
return;
}
}
Expand All @@ -64,11 +65,11 @@ public void release(long permits) {
public boolean acquire(long permits) {
checkNotNegative(permits);
while (true) {
long old = availablePermits.get();
if (old < permits) {
long old = acquiredPermits.get();
if (old + permits > limit.get()) {
return false;
}
if (availablePermits.compareAndSet(old, old - permits)) {
if (acquiredPermits.compareAndSet(old, old + permits)) {
return true;
}
}
Expand All @@ -79,13 +80,13 @@ public boolean acquirePartial(long permits) {
checkNotNegative(permits);
// To allow individual oversized requests to be sent, clamp the requested permits to the maximum
// limit. This will allow individual large requests to be sent. Please note that this behavior
// will result in availablePermits going negative.
// will result in acquiredPermits going over limit.
while (true) {
long old = availablePermits.get();
if (old < Math.min(limit.get(), permits)) {
long old = acquiredPermits.get();
if (old + permits > limit.get() && old > 0) {
return false;
}
if (availablePermits.compareAndSet(old, old - permits)) {
if (acquiredPermits.compareAndSet(old, old + permits)) {
return true;
}
}
Expand All @@ -94,7 +95,6 @@ public boolean acquirePartial(long permits) {
@Override
public void increasePermitLimit(long permits) {
checkNotNegative(permits);
availablePermits.addAndGet(permits);
limit.addAndGet(permits);
}

Expand All @@ -106,7 +106,6 @@ public void reducePermitLimit(long reduction) {
long oldLimit = limit.get();
Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow");
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
availablePermits.addAndGet(-reduction);
return;
}
}
Expand Down
Expand Up @@ -36,8 +36,6 @@

import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -71,27 +69,14 @@ public void testCreateEvent() {
}

@Test
public void testGetLastEvent() throws InterruptedException {
final FlowControlEventStats stats = new FlowControlEventStats();
final long currentTime = System.currentTimeMillis();
public void testGetLastEvent() {
FlowControlEventStats stats = new FlowControlEventStats();
long currentTime = System.currentTimeMillis();

List<Thread> threads = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
final int timeElapsed = i;
Thread t =
new Thread() {
@Override
public void run() {
stats.recordFlowControlEvent(
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
}
};
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join(10);
int timeElapsed = i;
stats.recordFlowControlEvent(
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
}

assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());
Expand Down
Expand Up @@ -44,13 +44,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -500,7 +500,6 @@ private void testRejectedReserveRelease(
}

flowController.release(1, 1);

flowController.reserve(maxElementCount, maxNumBytes);
flowController.release(maxElementCount, maxNumBytes);
}
Expand All @@ -523,11 +522,11 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
final AtomicInteger totalDecreased = new AtomicInteger(0);
final AtomicInteger releasedCounter = new AtomicInteger(0);

List<Thread> reserveThreads =
List<Future> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Thread t : reserveThreads) {
t.join(200);
flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
for (Future t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
assertTrue(totalIncreased.get() > 0);
Expand All @@ -539,9 +538,6 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
testBlockingReserveRelease(flowController, 0, expectedValue);
}

// This test is very flaky. Remove @Ignore once https://github.com/googleapis/gax-java/issues/1359
// is fixed.
@Ignore
@Test
public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
int initialValue = 5000;
Expand All @@ -559,11 +555,11 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
AtomicInteger totalIncreased = new AtomicInteger(0);
AtomicInteger totalDecreased = new AtomicInteger(0);
AtomicInteger releasedCounter = new AtomicInteger(0);
List<Thread> reserveThreads =
List<Future> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Thread t : reserveThreads) {
t.join(200);
for (Future t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
assertTrue(totalIncreased.get() > 0);
Expand Down Expand Up @@ -698,8 +694,7 @@ public void run() {
};
// blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked
flowController.reserve(5, 1);
ExecutorService executor = Executors.newCachedThreadPool();
Future<?> finished1 = executor.submit(runnable);
Future<?> finished1 = Executors.newSingleThreadExecutor().submit(runnable);
try {
finished1.get(50, TimeUnit.MILLISECONDS);
fail("reserve should block");
Expand All @@ -722,7 +717,7 @@ public void run() {

// Similar to blocked by element, test blocking by bytes.
flowController.reserve(1, 5);
Future<?> finished2 = executor.submit(runnable);
Future<?> finished2 = Executors.newSingleThreadExecutor().submit(runnable);
try {
finished2.get(50, TimeUnit.MILLISECONDS);
fail("reserve should block");
Expand All @@ -739,15 +734,15 @@ public void run() {
.isAtLeast(currentTime);
}

private List<Thread> testConcurrentUpdates(
private List<Future> testConcurrentUpdates(
final FlowController flowController,
final int increaseStepRange,
final int decreaseStepRange,
final int reserve,
final AtomicInteger totalIncreased,
final AtomicInteger totalDecreased,
final AtomicInteger releasedCounter)
throws InterruptedException {
throws InterruptedException, TimeoutException, ExecutionException {
final Random random = new Random();
Runnable increaseRunnable =
new Runnable() {
Expand Down Expand Up @@ -779,22 +774,19 @@ public void run() {
}
}
};
List<Thread> updateThreads = new ArrayList<>();
List<Thread> reserveReleaseThreads = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Thread increase = new Thread(increaseRunnable);
Thread decrease = new Thread(decreaseRunnable);
Thread reserveRelease = new Thread(reserveReleaseRunnable);
updateThreads.add(increase);
updateThreads.add(decrease);
reserveReleaseThreads.add(reserveRelease);
increase.start();
decrease.start();
reserveRelease.start();
List<Future> updateFuture = new ArrayList<>();
List<Future> reserveReleaseFuture = new ArrayList<>();
ExecutorService executors = Executors.newFixedThreadPool(10);
ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
updateFuture.add(executors.submit(increaseRunnable));
updateFuture.add(executors.submit(decreaseRunnable));
reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
}
for (Thread t : updateThreads) {
t.join(10);
for (Future t : updateFuture) {
t.get(50, TimeUnit.MILLISECONDS);
}
return reserveReleaseThreads;
executors.shutdown();
return reserveReleaseFuture;
}
}