From c6308c906171ce05765ccacb716aa7162d95d9a2 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 5 Oct 2020 17:38:44 -0400 Subject: [PATCH] fix: Fix race condition in BatcherImpl flush (#1200) * fix: Fix race condition in BatcherImpl flush Currently the following race condition exists: T1 - awaitAllOutstandingBatches checks that numOfOutstandingBatches > 0 T2 - onBatchCompletion decrements numOfOutstandingBatches T2 - flushLock.notifyAll() T1 - flushLock.wait() so T1 will wait indefinitely The fix is quite simple: make sure that the there batches to wait for after acquiring the lock * add test --- .../google/api/gax/batching/BatcherImpl.java | 4 ++ .../api/gax/batching/BatcherImplTest.java | 67 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 553931258..347804882 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -198,6 +198,10 @@ private void onBatchCompletion() { private void awaitAllOutstandingBatches() throws InterruptedException { while (numOfOutstandingBatches.get() > 0) { synchronized (flushLock) { + // Check again under lock to avoid racing with onBatchCompletion + if (numOfOutstandingBatches.get() == 0) { + break; + } flushLock.wait(); } } diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 6f7258ca4..0c2dfc4ab 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -36,12 +36,14 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatcherImpl.BatcherReference; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; import java.util.ArrayList; import java.util.List; @@ -56,6 +58,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Filter; @@ -644,6 +647,70 @@ public boolean isLoggable(LogRecord record) { } } + @Test + public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException { + int iterations = 1_000_000; + + ExecutorService executor = Executors.newFixedThreadPool(100); + + try { + List> closeFutures = new ArrayList<>(); + + for (int i = 0; i < iterations; i++) { + final SettableApiFuture> result = SettableApiFuture.create(); + + UnaryCallable> callable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return result; + } + }; + final Batcher batcher = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, callable, labeledIntList, batchingSettings, EXECUTOR); + + batcher.add(1); + + executor.execute( + new Runnable() { + @Override + public void run() { + result.set(ImmutableList.of(1)); + } + }); + Future f = + executor.submit( + new Runnable() { + @Override + public void run() { + try { + batcher.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + }); + + closeFutures.add(f); + } + + // Make sure that none hang + for (Future f : closeFutures) { + try { + // Should never take this long, but padded just in case this runs on a limited machine + f.get(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + assertWithMessage("BatcherImpl.close() is deadlocked").fail(); + } + } + } finally { + executor.shutdownNow(); + } + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = new BatcherImpl<>(