From fc8e520acfaf843ac61e806bdb4b5fe393d0b447 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 4 May 2021 15:00:51 -0400 Subject: [PATCH] fix: fix flaky tests and non blocking semaphore (#1365) * fix: fix race condition in non blocking semaphore * update comment --- .../gax/batching/NonBlockingSemaphore.java | 25 ++++----- .../batching/FlowControlEventStatsTest.java | 27 ++------- .../api/gax/batching/FlowControllerTest.java | 56 ++++++++----------- 3 files changed, 42 insertions(+), 66 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java b/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java index efa54d8bd..126d1bb7b 100644 --- a/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java +++ b/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java @@ -35,7 +35,7 @@ /** A {@link Semaphore64} that immediately returns with failure if permits are not available. */ class NonBlockingSemaphore implements Semaphore64 { - private AtomicLong availablePermits; + private AtomicLong acquiredPermits; private AtomicLong limit; private static void checkNotNegative(long l) { @@ -44,7 +44,7 @@ private static void checkNotNegative(long l) { NonBlockingSemaphore(long permits) { checkNotNegative(permits); - this.availablePermits = new AtomicLong(permits); + this.acquiredPermits = new AtomicLong(0); this.limit = new AtomicLong(permits); } @@ -52,9 +52,10 @@ private static void checkNotNegative(long l) { public void release(long permits) { checkNotNegative(permits); while (true) { - long old = availablePermits.get(); + long old = acquiredPermits.get(); // TODO: throw exceptions when the permits overflow - if (availablePermits.compareAndSet(old, Math.min(old + permits, limit.get()))) { + long newAcquired = Math.max(0, old - permits); + if (acquiredPermits.compareAndSet(old, newAcquired)) { return; } } @@ -64,11 +65,11 @@ public void release(long permits) { public boolean acquire(long permits) { checkNotNegative(permits); while (true) { - long old = availablePermits.get(); - if (old < permits) { + long old = acquiredPermits.get(); + if (old + permits > limit.get()) { return false; } - if (availablePermits.compareAndSet(old, old - permits)) { + if (acquiredPermits.compareAndSet(old, old + permits)) { return true; } } @@ -79,13 +80,13 @@ public boolean acquirePartial(long permits) { checkNotNegative(permits); // To allow individual oversized requests to be sent, clamp the requested permits to the maximum // limit. This will allow individual large requests to be sent. Please note that this behavior - // will result in availablePermits going negative. + // will result in acquiredPermits going over limit. while (true) { - long old = availablePermits.get(); - if (old < Math.min(limit.get(), permits)) { + long old = acquiredPermits.get(); + if (old + permits > limit.get() && old > 0) { return false; } - if (availablePermits.compareAndSet(old, old - permits)) { + if (acquiredPermits.compareAndSet(old, old + permits)) { return true; } } @@ -94,7 +95,6 @@ public boolean acquirePartial(long permits) { @Override public void increasePermitLimit(long permits) { checkNotNegative(permits); - availablePermits.addAndGet(permits); limit.addAndGet(permits); } @@ -106,7 +106,6 @@ public void reducePermitLimit(long reduction) { long oldLimit = limit.get(); Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow"); if (limit.compareAndSet(oldLimit, oldLimit - reduction)) { - availablePermits.addAndGet(-reduction); return; } } diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java index 5136f312a..4dcb0c4ff 100644 --- a/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java @@ -36,8 +36,6 @@ import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent; import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,27 +69,14 @@ public void testCreateEvent() { } @Test - public void testGetLastEvent() throws InterruptedException { - final FlowControlEventStats stats = new FlowControlEventStats(); - final long currentTime = System.currentTimeMillis(); + public void testGetLastEvent() { + FlowControlEventStats stats = new FlowControlEventStats(); + long currentTime = System.currentTimeMillis(); - List threads = new ArrayList<>(); for (int i = 1; i <= 10; i++) { - final int timeElapsed = i; - Thread t = - new Thread() { - @Override - public void run() { - stats.recordFlowControlEvent( - FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed)); - } - }; - threads.add(t); - t.start(); - } - - for (Thread t : threads) { - t.join(10); + int timeElapsed = i; + stats.recordFlowControlEvent( + FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed)); } assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs()); diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java index 57e60b094..cb82d28db 100644 --- a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java @@ -44,13 +44,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -500,7 +500,6 @@ private void testRejectedReserveRelease( } flowController.release(1, 1); - flowController.reserve(maxElementCount, maxNumBytes); flowController.release(maxElementCount, maxNumBytes); } @@ -523,11 +522,11 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception { final AtomicInteger totalDecreased = new AtomicInteger(0); final AtomicInteger releasedCounter = new AtomicInteger(0); - List reserveThreads = + List reserveThreads = testConcurrentUpdates( - flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter); - for (Thread t : reserveThreads) { - t.join(200); + flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter); + for (Future t : reserveThreads) { + t.get(200, TimeUnit.MILLISECONDS); } assertEquals(reserveThreads.size(), releasedCounter.get()); assertTrue(totalIncreased.get() > 0); @@ -539,9 +538,6 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception { testBlockingReserveRelease(flowController, 0, expectedValue); } - // This test is very flaky. Remove @Ignore once https://github.com/googleapis/gax-java/issues/1359 - // is fixed. - @Ignore @Test public void testConcurrentUpdateThresholds_nonBlocking() throws Exception { int initialValue = 5000; @@ -559,11 +555,11 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception { AtomicInteger totalIncreased = new AtomicInteger(0); AtomicInteger totalDecreased = new AtomicInteger(0); AtomicInteger releasedCounter = new AtomicInteger(0); - List reserveThreads = + List reserveThreads = testConcurrentUpdates( flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter); - for (Thread t : reserveThreads) { - t.join(200); + for (Future t : reserveThreads) { + t.get(200, TimeUnit.MILLISECONDS); } assertEquals(reserveThreads.size(), releasedCounter.get()); assertTrue(totalIncreased.get() > 0); @@ -698,8 +694,7 @@ public void run() { }; // blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked flowController.reserve(5, 1); - ExecutorService executor = Executors.newCachedThreadPool(); - Future finished1 = executor.submit(runnable); + Future finished1 = Executors.newSingleThreadExecutor().submit(runnable); try { finished1.get(50, TimeUnit.MILLISECONDS); fail("reserve should block"); @@ -722,7 +717,7 @@ public void run() { // Similar to blocked by element, test blocking by bytes. flowController.reserve(1, 5); - Future finished2 = executor.submit(runnable); + Future finished2 = Executors.newSingleThreadExecutor().submit(runnable); try { finished2.get(50, TimeUnit.MILLISECONDS); fail("reserve should block"); @@ -739,7 +734,7 @@ public void run() { .isAtLeast(currentTime); } - private List testConcurrentUpdates( + private List testConcurrentUpdates( final FlowController flowController, final int increaseStepRange, final int decreaseStepRange, @@ -747,7 +742,7 @@ private List testConcurrentUpdates( final AtomicInteger totalIncreased, final AtomicInteger totalDecreased, final AtomicInteger releasedCounter) - throws InterruptedException { + throws InterruptedException, TimeoutException, ExecutionException { final Random random = new Random(); Runnable increaseRunnable = new Runnable() { @@ -779,22 +774,19 @@ public void run() { } } }; - List updateThreads = new ArrayList<>(); - List reserveReleaseThreads = new ArrayList<>(); - for (int i = 0; i < 20; i++) { - Thread increase = new Thread(increaseRunnable); - Thread decrease = new Thread(decreaseRunnable); - Thread reserveRelease = new Thread(reserveReleaseRunnable); - updateThreads.add(increase); - updateThreads.add(decrease); - reserveReleaseThreads.add(reserveRelease); - increase.start(); - decrease.start(); - reserveRelease.start(); + List updateFuture = new ArrayList<>(); + List reserveReleaseFuture = new ArrayList<>(); + ExecutorService executors = Executors.newFixedThreadPool(10); + ExecutorService reserveExecutor = Executors.newFixedThreadPool(10); + for (int i = 0; i < 5; i++) { + updateFuture.add(executors.submit(increaseRunnable)); + updateFuture.add(executors.submit(decreaseRunnable)); + reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable)); } - for (Thread t : updateThreads) { - t.join(10); + for (Future t : updateFuture) { + t.get(50, TimeUnit.MILLISECONDS); } - return reserveReleaseThreads; + executors.shutdown(); + return reserveReleaseFuture; } }