Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
batching: fix permit leak (#567)
Browse files Browse the repository at this point in the history
Previously flow control was released only if the RPC returns
successfully. This caused us to leak permits if RPCs fail.

This commit makes us unconditionally release permits.
  • Loading branch information
pongad committed Sep 1, 2018
1 parent 7f216a9 commit ff4b61e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 44 deletions.
Expand Up @@ -30,15 +30,20 @@
package com.google.api.gax.batching;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/** A simple ThresholdBatchReceiver that just accumulates batches. Not thread-safe. */
/** A simple ThresholdBatchReceiver that just accumulates batches. */
@BetaApi("The surface for batching is not stable yet and may change in the future.")
public final class AccumulatingBatchReceiver<T> implements ThresholdBatchReceiver<T> {
private final List<T> batches = new ArrayList<>();
private final ConcurrentLinkedQueue<T> batches = new ConcurrentLinkedQueue<>();
private final ApiFuture<?> retFuture;

public AccumulatingBatchReceiver(ApiFuture<?> retFuture) {
this.retFuture = retFuture;
}

@Override
public void validateBatch(T message) {
Expand All @@ -48,11 +53,14 @@ public void validateBatch(T message) {
@Override
public ApiFuture<?> processBatch(T batch) {
batches.add(batch);
return ApiFutures.<Void>immediateFuture(null);
return retFuture;
}

/** Returns the accumulated batches. */
/**
* Returns the accumulated batches. If called concurrently with {@code processBatch}, the new
* batch may or may not be returned.
*/
public List<T> getBatches() {
return batches;
return new ArrayList<>(batches);
}
}
Expand Up @@ -33,8 +33,10 @@

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -216,10 +218,35 @@ public ApiFuture<Void> pushCurrentBatch() {
final E batch = removeBatch();
if (batch == null) {
return ApiFutures.immediateFuture(null);
} else {
return ApiFutures.transform(
receiver.processBatch(batch), new ReleaseResourcesFunction<>(batch), directExecutor());
}

final SettableApiFuture<Void> retFuture = SettableApiFuture.create();

// It is tempting to use transform to both release and get ApiFuture<Void>.
// This is incorrect because we also need to release on failure.
//
// It is also tempting to transform to get ApiFuture<Void> and addListener
// separately to release. This probably works as most users expect,
// but makes this class hard to test: retFuture.get() returning
// won't guarantee that flow control has been released.
ApiFutures.addCallback(
receiver.processBatch(batch),
new ApiFutureCallback<Object>() {
@Override
public void onSuccess(Object obj) {
flowController.release(batch);
retFuture.set(null);
}

@Override
public void onFailure(Throwable t) {
flowController.release(batch);
retFuture.setException(t);
}
},
directExecutor());

return retFuture;
}

private E removeBatch() {
Expand Down
Expand Up @@ -29,14 +29,18 @@
*/
package com.google.api.gax.batching;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -142,21 +146,23 @@ private static ThresholdBatcher.Builder<SimpleBatch> createSimpleBatcherBuidler(

@Test
public void testAdd() throws Exception {
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
AccumulatingBatchReceiver<SimpleBatch> receiver =
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
ThresholdBatcher<SimpleBatch> batcher = createSimpleBatcherBuidler(receiver).build();
batcher.add(SimpleBatch.fromInteger(14));
Truth.assertThat(batcher.isEmpty()).isFalse();
Truth.assertThat(receiver.getBatches().size()).isEqualTo(0);
assertThat(batcher.isEmpty()).isFalse();
assertThat(receiver.getBatches()).hasSize(0);

batcher.pushCurrentBatch().get();
Truth.assertThat(batcher.isEmpty()).isTrue();
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
Truth.assertThat(receiver.getBatches().get(0).getIntegers()).isEqualTo(Arrays.asList(14));
assertThat(batcher.isEmpty()).isTrue();
assertThat(receiver.getBatches()).hasSize(1);
assertThat(receiver.getBatches().get(0).getIntegers()).isEqualTo(Arrays.asList(14));
}

@Test
public void testBatching() throws Exception {
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
AccumulatingBatchReceiver<SimpleBatch> receiver =
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
ThresholdBatcher<SimpleBatch> batcher =
createSimpleBatcherBuidler(receiver)
.setThresholds(BatchingThresholds.<SimpleBatch>create(2))
Expand All @@ -166,13 +172,13 @@ public void testBatching() throws Exception {
batcher.add(SimpleBatch.fromInteger(5));
// Give time for the executor to push the batch
Thread.sleep(100);
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
assertThat(receiver.getBatches()).hasSize(1);

batcher.add(SimpleBatch.fromInteger(7));
batcher.add(SimpleBatch.fromInteger(9));
// Give time for the executor to push the batch
Thread.sleep(100);
Truth.assertThat(receiver.getBatches().size()).isEqualTo(2);
assertThat(receiver.getBatches()).hasSize(2);

batcher.add(SimpleBatch.fromInteger(11));

Expand All @@ -184,20 +190,21 @@ public void testBatching() throws Exception {
for (SimpleBatch batch : receiver.getBatches()) {
actual.add(batch.getIntegers());
}
Truth.assertThat(actual).isEqualTo(expected);
assertThat(actual).isEqualTo(expected);
}

@Test
public void testBatchingWithDelay() throws Exception {
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
AccumulatingBatchReceiver<SimpleBatch> receiver =
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
ThresholdBatcher<SimpleBatch> batcher =
createSimpleBatcherBuidler(receiver).setMaxDelay(Duration.ofMillis(100)).build();

batcher.add(SimpleBatch.fromInteger(3));
batcher.add(SimpleBatch.fromInteger(5));
// Give time for the delay to trigger and push the batch
Thread.sleep(500);
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
assertThat(receiver.getBatches()).hasSize(1);

batcher.add(SimpleBatch.fromInteger(11));

Expand All @@ -208,7 +215,7 @@ public void testBatchingWithDelay() throws Exception {
for (SimpleBatch batch : receiver.getBatches()) {
actual.add(batch.getIntegers());
}
Truth.assertThat(actual).isEqualTo(expected);
assertThat(actual).isEqualTo(expected);
}

@Test
Expand All @@ -218,35 +225,37 @@ public void testExceptionWithNullFlowController() {
.setThresholds(BatchingThresholds.<SimpleBatch>create(100))
.setExecutor(EXECUTOR)
.setMaxDelay(Duration.ofMillis(10000))
.setReceiver(new AccumulatingBatchReceiver<SimpleBatch>())
.setReceiver(
new AccumulatingBatchReceiver<SimpleBatch>(ApiFutures.<Void>immediateFuture(null)))
.setBatchMerger(new SimpleBatchMerger())
.build();
}

@Test
public void testBatchingWithFlowControl() throws Exception {
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
AccumulatingBatchReceiver<SimpleBatch> receiver =
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
ThresholdBatcher<SimpleBatch> batcher =
createSimpleBatcherBuidler(receiver)
.setThresholds(BatchingThresholds.<SimpleBatch>create(2))
.setFlowController(
getTrackedIntegerBatchingFlowController(2L, null, LimitExceededBehavior.Block))
.build();

Truth.assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
Truth.assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
Truth.assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
Truth.assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);

batcher.add(SimpleBatch.fromInteger(3));
batcher.add(SimpleBatch.fromInteger(5));
batcher.add(
SimpleBatch.fromInteger(7)); // We expect to block here until the first batch is handled
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
assertThat(receiver.getBatches()).hasSize(1);
batcher.add(SimpleBatch.fromInteger(9));
batcher.add(
SimpleBatch.fromInteger(11)); // We expect to block here until the second batch is handled
Truth.assertThat(receiver.getBatches().size()).isEqualTo(2);
assertThat(receiver.getBatches()).hasSize(2);

batcher.pushCurrentBatch().get();

Expand All @@ -256,17 +265,18 @@ public void testBatchingWithFlowControl() throws Exception {
for (SimpleBatch batch : receiver.getBatches()) {
actual.add(batch.getIntegers());
}
Truth.assertThat(actual).isEqualTo(expected);
assertThat(actual).isEqualTo(expected);

Truth.assertThat(trackedFlowController.getElementsReserved())
assertThat(trackedFlowController.getElementsReserved())
.isEqualTo(trackedFlowController.getElementsReleased());
Truth.assertThat(trackedFlowController.getBytesReserved())
assertThat(trackedFlowController.getBytesReserved())
.isEqualTo(trackedFlowController.getBytesReleased());
}

@Test
public void testBatchingFlowControlExceptionRecovery() throws Exception {
AccumulatingBatchReceiver<SimpleBatch> receiver = new AccumulatingBatchReceiver<>();
AccumulatingBatchReceiver<SimpleBatch> receiver =
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFuture(null));
ThresholdBatcher<SimpleBatch> batcher =
createSimpleBatcherBuidler(receiver)
.setThresholds(BatchingThresholds.<SimpleBatch>create(4))
Expand All @@ -275,21 +285,21 @@ public void testBatchingFlowControlExceptionRecovery() throws Exception {
3L, null, LimitExceededBehavior.ThrowException))
.build();

Truth.assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
Truth.assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
Truth.assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
Truth.assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);
assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);

batcher.add(SimpleBatch.fromInteger(3));
batcher.add(SimpleBatch.fromInteger(5));
batcher.add(SimpleBatch.fromInteger(7));
try {
batcher.add(SimpleBatch.fromInteger(9));
Truth.assertWithMessage("Failing: expected exception").that(false).isTrue();
Assert.fail("expected exception");
} catch (FlowControlException e) {
}
batcher.pushCurrentBatch().get();
Truth.assertThat(receiver.getBatches().size()).isEqualTo(1);
assertThat(receiver.getBatches()).hasSize(1);
batcher.add(SimpleBatch.fromInteger(11));
batcher.add(SimpleBatch.fromInteger(13));
batcher.pushCurrentBatch().get();
Expand All @@ -299,11 +309,52 @@ public void testBatchingFlowControlExceptionRecovery() throws Exception {
for (SimpleBatch batch : receiver.getBatches()) {
actual.add(batch.getIntegers());
}
Truth.assertThat(actual).isEqualTo(expected);
assertThat(actual).isEqualTo(expected);

assertThat(trackedFlowController.getElementsReserved())
.isEqualTo(trackedFlowController.getElementsReleased());
assertThat(trackedFlowController.getBytesReserved())
.isEqualTo(trackedFlowController.getBytesReleased());
}

@Test
public void testBatchingFailedRPC() throws Exception {
Exception ex = new IllegalStateException("does nothing, unsuccessfully");
AccumulatingBatchReceiver<SimpleBatch> receiver =
new AccumulatingBatchReceiver<>(ApiFutures.<Void>immediateFailedFuture(ex));
ThresholdBatcher<SimpleBatch> batcher =
createSimpleBatcherBuidler(receiver)
.setThresholds(BatchingThresholds.<SimpleBatch>create(4))
.setFlowController(
getTrackedIntegerBatchingFlowController(
3L, null, LimitExceededBehavior.ThrowException))
.build();

assertThat(trackedFlowController.getElementsReserved()).isEqualTo(0);
assertThat(trackedFlowController.getElementsReleased()).isEqualTo(0);
assertThat(trackedFlowController.getBytesReserved()).isEqualTo(0);
assertThat(trackedFlowController.getBytesReleased()).isEqualTo(0);

batcher.add(SimpleBatch.fromInteger(3));
try {
batcher.pushCurrentBatch().get();
Assert.fail("expected exception");
} catch (Exception e) {
assertThat(e).isInstanceOf(ExecutionException.class);
assertThat(e).hasCauseThat().isSameAs(ex);
}
assertThat(receiver.getBatches()).hasSize(1);

List<List<Integer>> expected = Arrays.asList(Arrays.asList(3));
List<List<Integer>> actual = new ArrayList<>();
for (SimpleBatch batch : receiver.getBatches()) {
actual.add(batch.getIntegers());
}
assertThat(actual).isEqualTo(expected);

Truth.assertThat(trackedFlowController.getElementsReserved())
assertThat(trackedFlowController.getElementsReserved())
.isEqualTo(trackedFlowController.getElementsReleased());
Truth.assertThat(trackedFlowController.getBytesReserved())
assertThat(trackedFlowController.getBytesReserved())
.isEqualTo(trackedFlowController.getBytesReleased());
}
}

0 comments on commit ff4b61e

Please sign in to comment.