diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index 3002aa611..e173571ff 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -17,12 +17,15 @@ import com.google.api.core.ApiFunction; import com.google.api.core.BetaApi; +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; @@ -30,6 +33,7 @@ import java.util.List; import java.util.logging.Logger; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.threeten.bp.Duration; /** @@ -219,6 +223,25 @@ public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } + /** + * Gets if latency based throttling is enabled for {@link + * BigtableDataClient#newBulkMutationBatcher(String)} + */ + @BetaApi("Latency based throttling is not currently stable and may change in the future") + public boolean isLatencyBasedThrottlingForBatchMutationsEnabled() { + return stubSettings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled(); + } + + /** + * Gets target bulk mutation rpc latency if latency based throttling is enabled for {@link + * BigtableDataClient#newBulkMutationBatcher(String)}. Otherwise returns null. + */ + @BetaApi("Latency based throttling is not currently stable and may change in the future") + @Nullable + public Long getBatchMutationsTargetRpcLatencyMs() { + return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs(); + } + /** Returns the underlying RPC settings. */ public EnhancedBigtableStubSettings getStubSettings() { return stubSettings; @@ -375,6 +398,74 @@ public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } + /** + * Enable latency based throttling for {@link BigtableDataClient#newBulkMutationBatcher(String)} + * with a target rpc latency. The number of allowed in-flight requests will be adjusted to reach + * the target bulk mutations rpc latency. + * + *

The logic of adjusting in-flight request limits is as follows: + * + *

+     * To start, {@link Batcher} allows {@link FlowController#getCurrentElementCountLimit()}
+     * in-flight elements with a total size of {@link FlowController#getCurrentRequestBytesLimit()}.
+     *
+     * Every 20 seconds, {@link Batcher} checks the mean rpc latency of the requests and compare
+     * it with the target rpc latency:
+     *   if (mean latency > 3 * target latency) {
+     *     decrease element count limit by 30% of {@link FlowController#getMaxElementCountLimit()}
+     *   } else if (mean latency > 1.2 * target latency) {
+     *     decrease element count limit by 10% of {@link FlowController#getMaxElementCountLimit()}
+     *   } else if (there was throttling in the past 5 minutes
+     *        && mean latency < 0.8 * target latency) {
+     *     increase element count limit by 5% of {@link FlowController#getMaxElementCountLimit()}
+     *   } else if (there was throttling in the past 5 minutes
+     *        && parallelism is 5% of {@link FlowController#getMaxElementCountLimit()}
+     *        && mean latency < 2 * target latency) {
+     *     increase element count limit by 2% of {@link FlowController#getMaxElementCountLimit()}
+     *
+     * Increases are capped by {@link
+     * FlowController#getMaxElementCountLimit()}, Decreases are floored at {@link
+     * FlowController#getMinElementCountLimit()} so that there is some level of throughput.
+     * 
+ * + * @see BigtableBatchingCallSettings.Builder#getDynamicFlowControlSettings() for explanation on + * default configurations. + */ + @BetaApi("Latency based throttling is not currently stable and may change in the future") + public Builder enableBatchMutationLatencyBasedThrottling(long targetRpcLatencyMs) { + stubSettings.bulkMutateRowsSettings().enableLatencyBasedThrottling(targetRpcLatencyMs); + return this; + } + + /** + * Disable latency based throttling for {@link + * BigtableDataClient#newBulkMutationBatcher(String)}. + */ + @BetaApi("Latency based throttling is not currently stable and may change in the future") + public Builder disableBatchMutationLatencyBasedThrottling() { + stubSettings.bulkMutateRowsSettings().disableLatencyBasedThrottling(); + return this; + } + + /** + * Gets if latency based throttling is enabled for {@link + * BigtableDataClient#newBulkMutationBatcher(String)} + */ + @BetaApi("Latency based throttling is not currently stable and may change in the future") + public boolean isLatencyBasedThrottlingForBatchMutationEnabled() { + return stubSettings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled(); + } + + /** + * Gets target bulk mutation rpc latency if latency based throttling is enabled for {@link + * BigtableDataClient#newBulkMutationBatcher(String)}. Otherwise returns null. + */ + @BetaApi("Latency based throttling is not currently stable and may change in the future") + @Nullable + public Long getTargetRpcLatencyMsForBatchMutation() { + return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs(); + } + /** * Returns the underlying settings for making RPC calls. The settings should be changed with * care. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java index e61dd72c1..09e657ac0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java @@ -19,14 +19,19 @@ import com.google.api.gax.batching.BatchingCallSettings; import com.google.api.gax.batching.BatchingDescriptor; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.DynamicFlowControlSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.util.Set; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * This settings holds the batching thresholds as well as retry configuration. @@ -43,6 +48,7 @@ * .setDelayThreshold(Duration.ofSeconds(10)) * .build()) * .setRetryableCodes(Code.DEADLINE_EXCEEDED) + * .setLatencyBasedThrottling(true, 1000L) * .build(); * } * @@ -54,7 +60,11 @@ public final class BigtableBatchingCallSettings extends UnaryCallSettings batchingCallSettings; + private final BatchingCallSettings + batchingCallSettings; + private final boolean isLatencyBasedThrottlingEnabled; + private final Long targetRpcLatencyMs; + private final DynamicFlowControlSettings dynamicFlowControlSettings; private BigtableBatchingCallSettings(Builder builder) { super(builder); @@ -64,6 +74,9 @@ private BigtableBatchingCallSettings(Builder builder) { .setRetrySettings(builder.getRetrySettings()) .setRetryableCodes(builder.getRetryableCodes()) .build(); + this.isLatencyBasedThrottlingEnabled = builder.isLatencyBasedThrottlingEnabled; + this.targetRpcLatencyMs = builder.targetRpcLatencyMs; + this.dynamicFlowControlSettings = builder.dynamicFlowControlSettings; } /** Returns batching settings which contains multiple batch threshold levels. */ @@ -76,6 +89,26 @@ BatchingDescriptor getBatchingDescri return batchingCallSettings.getBatchingDescriptor(); } + /** Gets if latency based throttling is enabled. */ + public boolean isLatencyBasedThrottlingEnabled() { + return isLatencyBasedThrottlingEnabled; + } + + /** Gets target rpc latency if latency based throttling is enabled. Otherwise returns null. */ + @Nullable + public Long getTargetRpcLatencyMs() { + return targetRpcLatencyMs; + } + + /** + * Gets {@link DynamicFlowControlSettings}. + * + * @see Builder#getDynamicFlowControlSettings() + */ + DynamicFlowControlSettings getDynamicFlowControlSettings() { + return dynamicFlowControlSettings; + } + static Builder newBuilder( BatchingDescriptor batchingDescriptor) { return new Builder(batchingDescriptor); @@ -90,6 +123,16 @@ public final Builder toBuilder() { return new Builder(this); } + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("batchingCallSettings", batchingCallSettings) + .add("isLatencyBasedThrottlingEnabled", isLatencyBasedThrottlingEnabled) + .add("targetRpcLatency", targetRpcLatencyMs) + .add("dynamicFlowControlSettings", dynamicFlowControlSettings) + .toString(); + } + /** * A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of * {@link BigtableBatchingCallSettings} for a description of the different values that can be set. @@ -98,6 +141,9 @@ public static class Builder extends UnaryCallSettings.Builder batchingDescriptor; private BatchingSettings batchingSettings; + private boolean isLatencyBasedThrottlingEnabled; + private Long targetRpcLatencyMs; + private DynamicFlowControlSettings dynamicFlowControlSettings; private Builder( @Nonnull @@ -110,6 +156,9 @@ private Builder(@Nonnull BigtableBatchingCallSettings settings) { super(settings); this.batchingDescriptor = settings.getBatchingDescriptor(); this.batchingSettings = settings.getBatchingSettings(); + this.isLatencyBasedThrottlingEnabled = settings.isLatencyBasedThrottlingEnabled(); + this.targetRpcLatencyMs = settings.getTargetRpcLatencyMs(); + this.dynamicFlowControlSettings = settings.getDynamicFlowControlSettings(); } /** Sets the batching settings with various thresholds. */ @@ -145,9 +194,137 @@ public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) { return this; } + /** + * Enable latency based throttling. The number of allowed in-flight requests will be adjusted to + * reach the target rpc latency. + */ + public Builder enableLatencyBasedThrottling(long targetRpcLatency) { + Preconditions.checkArgument( + targetRpcLatency > 0, "target RPC latency must be greater than 0"); + this.isLatencyBasedThrottlingEnabled = true; + this.targetRpcLatencyMs = targetRpcLatency; + return this; + } + + /** Disable latency based throttling. */ + public Builder disableLatencyBasedThrottling() { + this.isLatencyBasedThrottlingEnabled = false; + this.targetRpcLatencyMs = null; + return this; + } + + /** Gets target rpc latency if latency based throttling is enabled. Otherwise returns null. */ + @Nullable + public Long getTargetRpcLatencyMs() { + return isLatencyBasedThrottlingEnabled ? targetRpcLatencyMs : null; + } + + /** Gets if latency based throttling is enabled. */ + public boolean isLatencyBasedThrottlingEnabled() { + return this.isLatencyBasedThrottlingEnabled; + } + + /** + * Gets the {@link DynamicFlowControlSettings} that'll be used to set up a {@link + * FlowController} for throttling. + * + *

By default, this will allow a maximum of 1000 entries per channel of {@link + * FlowControlSettings.Builder#setMaxOutstandingElementCount request count} and 100MB of {@link + * FlowControlSettings.Builder#setMaxOutstandingRequestBytes accumulated size} in-flight + * requests. Once the limits are reached, pending operations will by default be {@link + * FlowControlSettings.Builder#setLimitExceededBehavior blocked} until some of the in-flight + * requests are resolved. + * + *

If latency based throttling is enabled, number of entries allowed by {@link + * FlowController} will be adjusted to reach {@link Builder#getTargetRpcLatencyMs()}. + * + *

    + *
  • {@link FlowController} will be set to allow Math.max({@link BatchingSettings.Builder + * #setElementCountThreshold batch size}, {@link + * FlowControlSettings.Builder#setMaxOutstandingElementCount request count} / 4) entries + * to start with. + *
  • If bulk mutation rpc latency is higher than target latency, decrease allowed entries to + * a minimum of Math.max({@link BatchingSettings.Builder#setElementCountThreshold batch + * size}, {@link FlowControlSettings.Builder#setMaxOutstandingElementCount request count} + * / 100). + *
  • If bulk mutation rpc latency is lower than target latency and there was throttling, + * increase allowed entries to a maximum of {@link + * FlowControlSettings.Builder#setMaxOutstandingElementCount request count}. + *
+ * + * If latency based throttling is disabled, {@link FlowController} will always allow {@link + * FlowControlSettings.Builder#setMaxOutstandingElementCount request count}. + * + *

Latency based throttling only updates outstanding entries count. {@link FlowController} + * will always allow {@link FlowControlSettings.Builder#setMaxOutstandingRequestBytes + * accumulated size}. + */ + DynamicFlowControlSettings getDynamicFlowControlSettings() { + return this.dynamicFlowControlSettings; + } + /** Builds the {@link BigtableBatchingCallSettings} object with provided configuration. */ @Override public BigtableBatchingCallSettings build() { + Preconditions.checkState(batchingSettings != null, "batchingSettings must be set"); + FlowControlSettings defaultSettings = batchingSettings.getFlowControlSettings(); + Preconditions.checkState( + defaultSettings.getMaxOutstandingElementCount() != null, + "maxOutstandingElementCount must be set in BatchingSettings#FlowControlSettings"); + Preconditions.checkState( + defaultSettings.getMaxOutstandingRequestBytes() != null, + "maxOutstandingRequestBytes must be set in BatchingSettings#FlowControlSettings"); + Preconditions.checkArgument( + batchingSettings.getElementCountThreshold() == null + || defaultSettings.getMaxOutstandingElementCount() + >= batchingSettings.getElementCountThreshold(), + "if elementCountThreshold is set in BatchingSettings, maxOutstandingElementCount must be >= elementCountThreshold"); + Preconditions.checkArgument( + batchingSettings.getRequestByteThreshold() == null + || defaultSettings.getMaxOutstandingRequestBytes() + >= batchingSettings.getRequestByteThreshold(), + "if requestByteThreshold is set in BatchingSettings, getMaxOutstandingRequestBytes must be >= getRequestByteThreshold"); + // Combine static FlowControlSettings with latency based throttling settings to create + // DynamicFlowControlSettings. + if (isLatencyBasedThrottlingEnabled()) { + long maxThrottlingElementCount = defaultSettings.getMaxOutstandingElementCount(); + long maxThrottlingRequestByteCount = defaultSettings.getMaxOutstandingRequestBytes(); + // The maximum in flight element count is pretty high. Set the initial parallelism to 25% + // of the maximum and then work up or down. This reduction should reduce the + // impacts of a bursty job, such as those found in Dataflow. + long initialElementCount = maxThrottlingElementCount / 4; + // Decreases are floored at 1% of the maximum so that there is some level of + // throughput. + long minElementCount = maxThrottlingElementCount / 100; + // Make sure initialOutstandingElementCount and minOutstandingElementCount element count are + // greater or equal to batch size to avoid deadlocks. + if (batchingSettings.getElementCountThreshold() != null) { + initialElementCount = + Math.max(initialElementCount, batchingSettings.getElementCountThreshold()); + minElementCount = Math.max(minElementCount, batchingSettings.getElementCountThreshold()); + } + dynamicFlowControlSettings = + DynamicFlowControlSettings.newBuilder() + .setLimitExceededBehavior(defaultSettings.getLimitExceededBehavior()) + .setInitialOutstandingElementCount(initialElementCount) + .setMaxOutstandingElementCount(maxThrottlingElementCount) + .setMinOutstandingElementCount(minElementCount) + .setInitialOutstandingRequestBytes(maxThrottlingRequestByteCount) + .setMinOutstandingRequestBytes(maxThrottlingRequestByteCount) + .setMaxOutstandingRequestBytes(maxThrottlingRequestByteCount) + .build(); + } else { + dynamicFlowControlSettings = + DynamicFlowControlSettings.newBuilder() + .setLimitExceededBehavior(defaultSettings.getLimitExceededBehavior()) + .setInitialOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount()) + .setMaxOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount()) + .setMinOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount()) + .setInitialOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes()) + .setMinOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes()) + .setMaxOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes()) + .build(); + } return new BigtableBatchingCallSettings(this); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallable.java new file mode 100644 index 000000000..8b89a0964 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallable.java @@ -0,0 +1,172 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static java.lang.Math.round; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; + +/** + * A callable that records rpc latency and adjusts flow control thresholds for latency based + * throttling. + */ +final class DynamicFlowControlCallable extends UnaryCallable { + // Defining adjusting criteria and adjusting rates + // Latency thresholds multipliers that will trigger flow control changes + static final double VERY_HIGH_LATENCY_MULTIPLIER = 3; + // targeting roughly 20% around target latency so there isn't too much churn + static final double HIGH_LATENCY_MULTIPLIER = 1.2; + static final double LOW_LATENCY_MULTIPLIER = 0.8; + static final double LOW_CONCURRENCY_MULTIPLIER = 0.05; + static final double LOW_CONCURRENCY_LATENCY_MULTIPLIER = 2; + // Rate of change that corresponds to the the thresholds above + static final double VERY_HIGH_LATENCY_DECREASE_CONCURRENCY_RATE = 0.3; + static final double HIGH_LATENCY_DECREASE_CONCURRENCY_RATE = 0.1; + // Increase parallelism at a slower rate than decrease. The lower rate should help the system + // maintain stability. + static final double LOW_LATENCY_INCREASE_CONCURRENCY_RATE = 0.05; + static final double LOW_CONCURRENCY_INCREASE_CONCURRENCY_RATE = 0.02; + // only look for throttling events in the past 5 minutes + static final long THROTTLING_EVENT_TIME_RANGE_MS = TimeUnit.MINUTES.toMillis(5); + + private final FlowController flowController; + private final DynamicFlowControlStats dynamicFlowControlStats; + private final long targetLatencyMs; + private final long adjustingIntervalMs; + private final UnaryCallable innerCallable; + + DynamicFlowControlCallable( + @Nonnull UnaryCallable innerCallable, + @Nonnull FlowController flowController, + @Nonnull DynamicFlowControlStats stats, + long targetLatencyMs, + long adjustingIntervalMs) { + this.innerCallable = innerCallable; + this.flowController = flowController; + this.dynamicFlowControlStats = stats; + this.targetLatencyMs = targetLatencyMs; + this.adjustingIntervalMs = adjustingIntervalMs; + } + + @Override + public ApiFuture futureCall(Object request, ApiCallContext context) { + final Runnable flowControllerRunnable = new DynamicFlowControlRunnable(); + ApiFuture future = innerCallable.futureCall(request, context); + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + // If the deadline expired before the operation could complete, it could mean that the + // server side is slow, and we should record the latency so flow control limits can be + // adjusted. Other errors might be user errors and may return immediately, so we're + // skipping recording the latencies for those. + if (t instanceof DeadlineExceededException) { + flowControllerRunnable.run(); + } + } + + @Override + public void onSuccess(Object result) { + flowControllerRunnable.run(); + } + }, + MoreExecutors.directExecutor()); + return future; + } + + class DynamicFlowControlRunnable implements Runnable { + private final Stopwatch timer; + + DynamicFlowControlRunnable() { + timer = Stopwatch.createStarted(); + } + + @Override + public void run() { + dynamicFlowControlStats.updateLatency(timer.elapsed(TimeUnit.MILLISECONDS)); + long lastAdjustedTimestamp = dynamicFlowControlStats.getLastAdjustedTimestampMs(); + long now = System.currentTimeMillis(); + // Avoid adjusting the thresholds too frequently + if (now - lastAdjustedTimestamp < adjustingIntervalMs) { + return; + } + double meanLatency = dynamicFlowControlStats.getMeanLatency(); + FlowControlEvent flowControlEvent = + flowController.getFlowControlEventStats().getLastFlowControlEvent(); + boolean wasRecentlyThrottled = + flowControlEvent != null + && (now - flowControlEvent.getTimestampMs() <= THROTTLING_EVENT_TIME_RANGE_MS); + long maxElementLimit = flowController.getMaxElementCountLimit(); + if (meanLatency > targetLatencyMs * VERY_HIGH_LATENCY_MULTIPLIER) { + // Decrease at 30% of the maximum + decrease( + lastAdjustedTimestamp, + now, + round(maxElementLimit * VERY_HIGH_LATENCY_DECREASE_CONCURRENCY_RATE)); + } else if (meanLatency > targetLatencyMs * HIGH_LATENCY_MULTIPLIER) { + // Decrease at 10% of the maximum + decrease( + lastAdjustedTimestamp, + now, + round(maxElementLimit * HIGH_LATENCY_DECREASE_CONCURRENCY_RATE)); + } else if (wasRecentlyThrottled && meanLatency < targetLatencyMs * LOW_LATENCY_MULTIPLIER) { + // If latency is low, and there was throttling, then increase the parallelism so that new + // calls will not be throttled. + + // Increase parallelism at a slower than we decrease. The lower rate should help the + // system maintain stability. + increase( + lastAdjustedTimestamp, + now, + round(maxElementLimit * LOW_LATENCY_INCREASE_CONCURRENCY_RATE)); + } else if (wasRecentlyThrottled + && flowController.getCurrentElementCountLimit() + < maxElementLimit * LOW_CONCURRENCY_MULTIPLIER + && meanLatency < targetLatencyMs * LOW_CONCURRENCY_LATENCY_MULTIPLIER) { + // When parallelism is reduced latency tends to be artificially higher. + // Increase slowly to ensure that the system restabilizes. + increase( + lastAdjustedTimestamp, + now, + round(maxElementLimit * LOW_CONCURRENCY_INCREASE_CONCURRENCY_RATE)); + } + } + + private void decrease(long last, long now, long elementSteps) { + if (dynamicFlowControlStats.setLastAdjustedTimestampMs(last, now)) { + flowController.decreaseThresholds(elementSteps, 0); + } + } + + private void increase(long last, long now, long elementSteps) { + if (dynamicFlowControlStats.setLastAdjustedTimestampMs(last, now)) { + flowController.increaseThresholds(elementSteps, 0); + } + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStats.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStats.java new file mode 100644 index 000000000..4169ac213 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStats.java @@ -0,0 +1,116 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.batching.FlowController; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Records stats used in dynamic flow control, the decaying average of recorded latencies and the + * last timestamp when the thresholds in {@link FlowController} are updated. + */ +final class DynamicFlowControlStats { + + private static final double DEFAULT_DECAY_CONSTANT = 0.015; // Biased to the past 5 minutes + + private AtomicLong lastAdjustedTimestampMs; + private DecayingAverage meanLatency; + + DynamicFlowControlStats() { + this(DEFAULT_DECAY_CONSTANT); + } + + DynamicFlowControlStats(double decayConstant) { + this.lastAdjustedTimestampMs = new AtomicLong(0); + this.meanLatency = new DecayingAverage(decayConstant); + } + + void updateLatency(long latency) { + updateLatency(latency, System.currentTimeMillis()); + } + + @VisibleForTesting + void updateLatency(long latency, long timestampMs) { + meanLatency.update(latency, timestampMs); + } + + double getMeanLatency() { + return getMeanLatency(System.currentTimeMillis()); + } + + @VisibleForTesting + double getMeanLatency(long timestampMs) { + return meanLatency.getMean(timestampMs); + } + + public long getLastAdjustedTimestampMs() { + return lastAdjustedTimestampMs.get(); + } + + boolean setLastAdjustedTimestampMs(long last, long now) { + return lastAdjustedTimestampMs.compareAndSet(last, now); + } + + private class DecayingAverage { + private double decayConstant; + private double mean; + private double weightedCount; + private AtomicLong lastUpdateTimeInSecond; + + DecayingAverage(double decayConstant) { + this.decayConstant = decayConstant; + this.mean = 0.0; + this.weightedCount = 0.0; + this.lastUpdateTimeInSecond = new AtomicLong(0); + } + + synchronized void update(long value, long timestampMs) { + long now = TimeUnit.MILLISECONDS.toSeconds(timestampMs); + Preconditions.checkArgument( + now >= lastUpdateTimeInSecond.get(), "can't update an event in the past"); + if (lastUpdateTimeInSecond.get() == 0) { + lastUpdateTimeInSecond.set(now); + mean = value; + weightedCount = 1; + } else { + long prev = lastUpdateTimeInSecond.getAndSet(now); + long elapsed = now - prev; + double alpha = getAlpha(elapsed); + // Exponential moving average = weightedSum / weightedCount, where + // weightedSum(n) = value + alpha * weightedSum(n - 1) + // weightedCount(n) = 1 + alpha * weightedCount(n - 1) + // Using weighted count in case the sum overflows + mean = + mean * ((weightedCount * alpha) / (weightedCount * alpha + 1)) + + value / (weightedCount * alpha + 1); + weightedCount = weightedCount * alpha + 1; + } + } + + double getMean(long timestampMs) { + long timestampSecs = TimeUnit.MILLISECONDS.toSeconds(timestampMs); + long elapsed = timestampSecs - lastUpdateTimeInSecond.get(); + return mean * getAlpha(Math.max(0, elapsed)); + } + + private double getAlpha(long elapsedSecond) { + return Math.exp(-decayConstant * elapsedSecond); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 8f2505c58..9f52ddd8d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -19,6 +19,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GaxProperties; @@ -94,6 +95,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; /** @@ -111,10 +113,13 @@ @InternalApi public class EnhancedBigtableStub implements AutoCloseable { private static final String CLIENT_NAME = "Bigtable"; + private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20); private final EnhancedBigtableStubSettings settings; private final ClientContext clientContext; private final RequestContext requestContext; + private final FlowController bulkMutationFlowController; + private final DynamicFlowControlStats bulkMutationDynamicFlowControlStats; private final ServerStreamingCallable readRowsCallable; private final UnaryCallable readRowCallable; @@ -219,6 +224,9 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext this.requestContext = RequestContext.create( settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId()); + this.bulkMutationFlowController = + new FlowController(settings.bulkMutateRowsSettings().getDynamicFlowControlSettings()); + this.bulkMutationDynamicFlowControlStats = new DynamicFlowControlStats(); readRowsCallable = createReadRowsCallable(new DefaultRowAdapter()); readRowCallable = createReadRowCallable(new DefaultRowAdapter()); @@ -483,8 +491,19 @@ public Map extract(MutateRowRequest mutateRowRequest) { private UnaryCallable createBulkMutateRowsCallable() { UnaryCallable baseCallable = createMutateRowsBaseCallable(); + UnaryCallable flowControlCallable = null; + if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) { + flowControlCallable = + new DynamicFlowControlCallable( + baseCallable, + bulkMutationFlowController, + bulkMutationDynamicFlowControlStats, + settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(), + FLOW_CONTROL_ADJUSTING_INTERVAL_MS); + } UnaryCallable userFacing = - new BulkMutateRowsUserFacingCallable(baseCallable, requestContext); + new BulkMutateRowsUserFacingCallable( + flowControlCallable != null ? flowControlCallable : baseCallable, requestContext); SpanName spanName = getSpanName("MutateRows"); UnaryCallable traced = @@ -520,7 +539,8 @@ public Batcher newMutateRowsBatcher(@Nonnull String tabl bulkMutateRowsCallable, BulkMutation.create(tableId), settings.bulkMutateRowsSettings().getBatchingSettings(), - clientContext.getExecutor()); + clientContext.getExecutor(), + bulkMutationFlowController); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index eaea47f4e..c5e39e460 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -19,6 +19,7 @@ import com.google.api.gax.batching.BatchingCallSettings; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GoogleCredentialsProvider; @@ -400,15 +401,15 @@ public UnaryCallSettings mutateRowSettings() { * after batching initialization or last processed batch. * * - *

When the pending {@link FlowControlSettings.Builder#setMaxOutstandingElementCount request - * count} reaches a default of 1000 entries per channel or their {@link - * FlowControlSettings.Builder#setMaxOutstandingRequestBytes accumulated size} reaches default - * value of 100MB, then this operation will by default be {@link - * FlowControlSettings.Builder#setLimitExceededBehavior blocked} until some of the pending batch - * are resolved. + *

A {@link FlowController} will be set up with {@link BigtableBatchingCallSettings.Builder + * #getDynamicFlowControlSettings()} for throttling in-flight requests. When the pending request + * count or accumulated request size reaches {@link FlowController} thresholds, then this + * operation will be throttled until some of the pending batches are resolved. * * @see RetrySettings for more explanation. * @see BatchingSettings for batch related configuration explanation. + * @see BigtableBatchingCallSettings.Builder#getDynamicFlowControlSettings() for flow control + * related configuration explanation. */ public BigtableBatchingCallSettings bulkMutateRowsSettings() { return bulkMutateRowsSettings; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 864ba7502..fae914ea9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -35,6 +35,7 @@ import org.threeten.bp.Duration; class MetricsTracer implements ApiTracer { + private final OperationType operationType; private final Tagger tagger; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index 25c341d65..9e733640f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -19,6 +19,7 @@ import com.google.api.core.ApiClock; import com.google.api.core.ApiFunction; +import com.google.api.gax.batching.BatcherImpl; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; @@ -286,6 +287,31 @@ public void testCreateWithRefreshingChannel() throws Exception { assertThat(terminateAttributes).hasSize(poolSize); } + @Test + public void testBulkMutationFlowControllerConfigured() throws Exception { + BigtableDataSettings settings = + BigtableDataSettings.newBuilder() + .setProjectId("my-project") + .setInstanceId("my-instance") + .setCredentialsProvider(credentialsProvider) + .enableBatchMutationLatencyBasedThrottling(10L) + .build(); + try (BigtableDataClientFactory factory = BigtableDataClientFactory.create(settings)) { + BigtableDataClient client1 = factory.createDefault(); + BigtableDataClient client2 = factory.createForAppProfile("app-profile"); + + try (BatcherImpl batcher1 = (BatcherImpl) client1.newBulkMutationBatcher("my-table"); + BatcherImpl batcher2 = (BatcherImpl) client1.newBulkMutationBatcher("my-table")) { + assertThat(batcher1.getFlowController()).isSameInstanceAs(batcher2.getFlowController()); + } + + try (BatcherImpl batcher1 = (BatcherImpl) client1.newBulkMutationBatcher("my-table"); + BatcherImpl batcher2 = (BatcherImpl) client2.newBulkMutationBatcher("my-table")) { + assertThat(batcher1.getFlowController()).isNotSameInstanceAs(batcher2.getFlowController()); + } + } + } + private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { volatile MutateRowRequest lastRequest; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java index 3dc9fb1ed..2b95bf821 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java @@ -32,6 +32,7 @@ public void testToString() { .setProjectId("our-project-2-12") .setInstanceId("our-instance-85") .setAppProfileId("our-appProfile-06") + .enableBatchMutationLatencyBasedThrottling(10) .build(); EnhancedBigtableStubSettings stubSettings = settings.getStubSettings(); assertThat(settings.toString()) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkMutateIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkMutateIT.java new file mode 100644 index 000000000..402efd7b9 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkMutateIT.java @@ -0,0 +1,84 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.FlowControlEventStats; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.test_helpers.env.TestEnvRule; +import java.io.IOException; +import java.util.UUID; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BulkMutateIT { + + @ClassRule public static TestEnvRule testEnvRule = new TestEnvRule(); + + @Test(timeout = 60 * 1000) + public void test() throws IOException, InterruptedException { + BigtableDataSettings settings = testEnvRule.env().getDataClientSettings(); + String rowPrefix = UUID.randomUUID().toString(); + // Set target latency really low so it'll trigger adjusting thresholds + BigtableDataSettings.Builder builder = + settings.toBuilder().enableBatchMutationLatencyBasedThrottling(2L); + BigtableDataClient client = BigtableDataClient.create(builder.build()); + BatcherImpl batcher = + (BatcherImpl) client.newBulkMutationBatcher(testEnvRule.env().getTableId()); + try { + FlowControlEventStats events = batcher.getFlowController().getFlowControlEventStats(); + long initialThreashold = batcher.getFlowController().getCurrentElementCountLimit(); + assertThat(batcher.getFlowController().getCurrentElementCountLimit()) + .isNotEqualTo(batcher.getFlowController().getMinElementCountLimit()); + assertThat(batcher.getFlowController().getCurrentElementCountLimit()) + .isNotEqualTo(batcher.getFlowController().getMaxElementCountLimit()); + + String familyId = testEnvRule.env().getFamilyId(); + long initial = batcher.getFlowController().getCurrentElementCountLimit(); + for (long i = 0; i < initial * 3; i++) { + String key = rowPrefix + "test-key" + i; + batcher.add(RowMutationEntry.create(key).setCell(familyId, "qualifier", i)); + } + batcher.flush(); + assertThat(events.getLastFlowControlEvent()).isNotNull(); + // Verify that the threshold is adjusted + assertThat(batcher.getFlowController().getCurrentElementCountLimit()) + .isNotEqualTo(initialThreashold); + // Query a key to make sure the write succeeded + Row row = + testEnvRule + .env() + .getDataClient() + .readRowsCallable() + .first() + .call( + Query.create(testEnvRule.env().getTableId()) + .rowKey(rowPrefix + "test-key" + initial)); + assertThat(row.getCells()).hasSize(1); + } finally { + batcher.close(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java index e468999f9..488805f60 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.DynamicFlowControlSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.retrying.RetrySettings; @@ -53,6 +54,9 @@ public void testEmptyBuilder() { assertThat(builder.getBatchingSettings()).isNull(); assertThat(builder.getRetryableCodes()).isEmpty(); assertThat(builder.getRetrySettings()).isNotNull(); + assertThat(builder.isLatencyBasedThrottlingEnabled()).isFalse(); + assertThat(builder.getTargetRpcLatencyMs()).isNull(); + assertThat(builder.getDynamicFlowControlSettings()).isNull(); } @Test @@ -71,6 +75,27 @@ public void testBuilder() { assertThat(settings.getBatchingSettings()).isEqualTo(BATCHING_SETTINGS); assertThat(settings.getRetryableCodes()).isEqualTo(retryCodes); assertThat(settings.getRetrySettings()).isEqualTo(retrySettings); + assertThat(settings.isLatencyBasedThrottlingEnabled()).isFalse(); + assertThat(settings.getTargetRpcLatencyMs()).isNull(); + assertThat(settings.getDynamicFlowControlSettings()).isNotNull(); + verifyFlowControlSettingWhenLatencyBasedThrottlingDisabled( + settings.getDynamicFlowControlSettings()); + + builder.enableLatencyBasedThrottling(10L); + settings = builder.build(); + assertThat(settings.isLatencyBasedThrottlingEnabled()).isTrue(); + assertThat(settings.getTargetRpcLatencyMs()).isEqualTo(10); + assertThat(settings.getDynamicFlowControlSettings()).isNotNull(); + verifyFlowControlSettingWhenLatencyBasedThrottlingEnabled( + settings.getDynamicFlowControlSettings()); + + builder.disableLatencyBasedThrottling(); + settings = builder.build(); + assertThat(settings.isLatencyBasedThrottlingEnabled()).isFalse(); + assertThat(settings.getTargetRpcLatencyMs()).isNull(); + assertThat(settings.getDynamicFlowControlSettings()).isNotNull(); + verifyFlowControlSettingWhenLatencyBasedThrottlingDisabled( + settings.getDynamicFlowControlSettings()); } @Test @@ -82,7 +107,8 @@ public void testBuilderFromSettings() { builder .setBatchingSettings(BATCHING_SETTINGS) .setRetryableCodes(StatusCode.Code.UNAVAILABLE, StatusCode.Code.UNAUTHENTICATED) - .setRetrySettings(retrySettings); + .setRetrySettings(retrySettings) + .enableLatencyBasedThrottling(10L); BigtableBatchingCallSettings settings = builder.build(); BigtableBatchingCallSettings.Builder newBuilder = settings.toBuilder(); @@ -91,6 +117,11 @@ public void testBuilderFromSettings() { assertThat(newBuilder.getRetryableCodes()) .containsExactly(StatusCode.Code.UNAVAILABLE, StatusCode.Code.UNAUTHENTICATED); assertThat(newBuilder.getRetrySettings()).isEqualTo(retrySettings); + assertThat(newBuilder.isLatencyBasedThrottlingEnabled()).isTrue(); + assertThat(newBuilder.getTargetRpcLatencyMs()).isEqualTo(10L); + assertThat(newBuilder.getDynamicFlowControlSettings()).isNotNull(); + verifyFlowControlSettingWhenLatencyBasedThrottlingEnabled( + newBuilder.getDynamicFlowControlSettings()); } @Test @@ -110,4 +141,103 @@ public void testMandatorySettings() { } assertThat(actualEx).isInstanceOf(IllegalStateException.class); } + + @Test + public void testFlowControlMandatorySettings() { + Exception actualEx = null; + try { + BigtableBatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor()) + .setBatchingSettings( + BatchingSettings.newBuilder() + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(null) + .setMaxOutstandingRequestBytes(null) + .build()) + .build()) + .build(); + } catch (Exception ex) { + actualEx = ex; + } + assertThat(actualEx).isInstanceOf(IllegalStateException.class); + + BigtableBatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor()) + .setBatchingSettings( + BatchingSettings.newBuilder() + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(10L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .setElementCountThreshold(10L) + .setRequestByteThreshold(10L) + .build()) + .build(); + + actualEx = null; + try { + BigtableBatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor()) + .setBatchingSettings( + BatchingSettings.newBuilder() + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(10L) + .setMaxOutstandingRequestBytes(5L) + .build()) + .setElementCountThreshold(10L) + .setRequestByteThreshold(10L) + .build()) + .build(); + } catch (Exception ex) { + actualEx = ex; + } + assertThat(actualEx).isInstanceOf(IllegalArgumentException.class); + + actualEx = null; + try { + BigtableBatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor()) + .setBatchingSettings( + BatchingSettings.newBuilder() + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(5L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .setElementCountThreshold(10L) + .setRequestByteThreshold(10L) + .build()) + .build(); + } catch (Exception ex) { + actualEx = ex; + } + assertThat(actualEx).isInstanceOf(IllegalArgumentException.class); + } + + private void verifyFlowControlSettingWhenLatencyBasedThrottlingDisabled( + DynamicFlowControlSettings settings) { + assertThat(settings.getInitialOutstandingElementCount()) + .isEqualTo(BATCHING_SETTINGS.getFlowControlSettings().getMaxOutstandingElementCount()); + assertThat(settings.getMaxOutstandingElementCount()) + .isEqualTo(BATCHING_SETTINGS.getFlowControlSettings().getMaxOutstandingElementCount()); + assertThat(settings.getMinOutstandingElementCount()) + .isEqualTo(BATCHING_SETTINGS.getFlowControlSettings().getMaxOutstandingElementCount()); + assertThat(settings.getInitialOutstandingRequestBytes()) + .isEqualTo(BATCHING_SETTINGS.getFlowControlSettings().getMaxOutstandingRequestBytes()); + assertThat(settings.getMaxOutstandingRequestBytes()) + .isEqualTo(BATCHING_SETTINGS.getFlowControlSettings().getMaxOutstandingRequestBytes()); + assertThat(settings.getMinOutstandingRequestBytes()) + .isEqualTo(BATCHING_SETTINGS.getFlowControlSettings().getMaxOutstandingRequestBytes()); + } + + private void verifyFlowControlSettingWhenLatencyBasedThrottlingEnabled( + DynamicFlowControlSettings settings) { + assertThat(settings.getInitialOutstandingElementCount()) + .isLessThan(settings.getMaxOutstandingElementCount()); + assertThat(settings.getMinOutstandingElementCount()) + .isLessThan(settings.getMaxOutstandingElementCount()); + assertThat(settings.getInitialOutstandingRequestBytes()) + .isEqualTo(settings.getMaxOutstandingRequestBytes()); + assertThat(settings.getMinOutstandingRequestBytes()) + .isEqualTo(settings.getMaxOutstandingRequestBytes()); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallableTest.java new file mode 100644 index 000000000..426bcdc7c --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallableTest.java @@ -0,0 +1,290 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.DynamicFlowControlSettings; +import com.google.api.gax.batching.FlowControlEventStats; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.common.collect.Lists; +import io.grpc.Status.Code; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DynamicFlowControlCallableTest { + @Rule public Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + + private static final int TARGET_LATENCY_MS = 100; + private static final long ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20); + private static final String LATENCY_HEADER = "latency"; + private static final long INITIAL_ELEMENT = 20L; + private static final long MAX_ELEMENT = 30L; + private static final long MIN_ELEMENT = 5L; + private static final int DEADLINE_EXCEEDED_LATENCY = 501; + + private FlowController flowController; + private FlowControlEventStats flowControlEvents; + private DynamicFlowControlStats stats; + private UnaryCallable innerCallable; + private ApiCallContext context; + private MutateRowsRequest request; + + private DynamicFlowControlCallable callableToTest; + + @Before + public void setup() { + flowController = + new FlowController( + DynamicFlowControlSettings.newBuilder() + .setInitialOutstandingElementCount(INITIAL_ELEMENT) + .setMaxOutstandingElementCount(MAX_ELEMENT) + .setMinOutstandingElementCount(MIN_ELEMENT) + .setInitialOutstandingRequestBytes(15L) + .setMaxOutstandingRequestBytes(15L) + .setMinOutstandingRequestBytes(15L) + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); + flowControlEvents = flowController.getFlowControlEventStats(); + stats = new DynamicFlowControlStats(); + context = GrpcCallContext.createDefault(); + innerCallable = new MockInnerCallable(); + request = + MutateRowsRequest.newBuilder() + .addEntries(MutateRowsRequest.Entry.getDefaultInstance()) + .build(); + callableToTest = + new DynamicFlowControlCallable( + innerCallable, flowController, stats, TARGET_LATENCY_MS, ADJUSTING_INTERVAL_MS); + } + + @Test + public void testLatenciesAreRecorded() throws Exception { + Map> extraHeaders = new HashMap<>(); + extraHeaders.put(LATENCY_HEADER, Arrays.asList("5")); + ApiCallContext newContext = context.withExtraHeaders(extraHeaders); + ApiFuture future = callableToTest.futureCall(request, newContext); + future.get(); + assertThat(stats.getMeanLatency()).isNonZero(); + assertThat(stats.getLastAdjustedTimestampMs()).isEqualTo(0); + } + + @Test + public void testTriggeringAdjustingThreshold() throws Exception { + Map> extraHeaders = new HashMap<>(); + extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(TARGET_LATENCY_MS * 4))); + long currentTimeMs = System.currentTimeMillis(); + ApiCallContext newContext = context.withExtraHeaders(extraHeaders); + ApiFuture future = callableToTest.futureCall(request, newContext); + future.get(); + assertThat(stats.getMeanLatency()) + .isAtLeast(TARGET_LATENCY_MS * DynamicFlowControlCallable.VERY_HIGH_LATENCY_MULTIPLIER); + assertThat(stats.getLastAdjustedTimestampMs()).isGreaterThan(currentTimeMs); + long expectedStep = + Math.round( + MAX_ELEMENT * DynamicFlowControlCallable.VERY_HIGH_LATENCY_DECREASE_CONCURRENCY_RATE); + assertThat(flowController.getCurrentElementCountLimit()) + .isEqualTo(INITIAL_ELEMENT - expectedStep); + } + + @Test + public void testNoConsecutiveUpdatesToThreshold() throws Exception { + Map> extraHeaders = new HashMap<>(); + extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(TARGET_LATENCY_MS * 4))); + long firstRequest = System.currentTimeMillis(); + ApiCallContext newContext = context.withExtraHeaders(extraHeaders); + ApiFuture future = callableToTest.futureCall(request, newContext); + future.get(); + long secondRequest = System.currentTimeMillis(); + future = callableToTest.futureCall(request, newContext); + future.get(); + assertThat(stats.getMeanLatency()) + .isAtLeast(TARGET_LATENCY_MS * DynamicFlowControlCallable.VERY_HIGH_LATENCY_MULTIPLIER); + assertThat(stats.getLastAdjustedTimestampMs()).isGreaterThan(firstRequest); + assertThat(stats.getLastAdjustedTimestampMs()).isAtMost(secondRequest); + long expectedStep = + Math.round( + MAX_ELEMENT * DynamicFlowControlCallable.VERY_HIGH_LATENCY_DECREASE_CONCURRENCY_RATE); + assertThat(flowController.getCurrentElementCountLimit()) + .isEqualTo(INITIAL_ELEMENT - expectedStep); + } + + @Test + public void testDecreasingThresholdsCantGoOverLimit() throws Exception { + // set adjusting intervals to 0 so the thresholds can keep getting updated + callableToTest = + new DynamicFlowControlCallable(innerCallable, flowController, stats, TARGET_LATENCY_MS, 0); + Map> extraHeaders = new HashMap<>(); + extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(TARGET_LATENCY_MS * 4))); + ApiCallContext newContext = context.withExtraHeaders(extraHeaders); + List futures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + ApiFuture future = callableToTest.futureCall(request, newContext); + futures.add(future); + } + for (Future f : futures) { + f.get(); + } + long expectedStep = + Math.round( + MAX_ELEMENT + * DynamicFlowControlCallable.VERY_HIGH_LATENCY_DECREASE_CONCURRENCY_RATE) + * 3; + assertThat(INITIAL_ELEMENT - expectedStep).isLessThan(MIN_ELEMENT); + assertThat(flowController.getCurrentElementCountLimit()).isEqualTo(MIN_ELEMENT); + } + + @Test + public void testIncreasingThreshold() throws Exception { + // Test when there was flow control events and mean latency is low, increase the thresholds + callableToTest = + new DynamicFlowControlCallable( + innerCallable, flowController, stats, 1000, ADJUSTING_INTERVAL_MS); + createFlowControlEvent(flowController); + ApiFuture future = callableToTest.futureCall(request, context); + future.get(); + long expectedIncrease = + Math.round(MAX_ELEMENT * DynamicFlowControlCallable.LOW_LATENCY_INCREASE_CONCURRENCY_RATE); + assertThat(expectedIncrease).isNotEqualTo(0); + assertThat(INITIAL_ELEMENT + expectedIncrease).isLessThan(MAX_ELEMENT); + assertThat(flowController.getCurrentElementCountLimit()) + .isEqualTo(INITIAL_ELEMENT + expectedIncrease); + } + + @Test + public void testIncreasingThresholdCantGoOverLimit() throws Exception { + // set adjusting interval to 0 so it can be updated multiple times + callableToTest = new DynamicFlowControlCallable(innerCallable, flowController, stats, 1000, 0); + createFlowControlEvent(flowController); + List futures = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + ApiFuture future = callableToTest.futureCall(request, context); + futures.add(future); + } + for (Future f : futures) { + f.get(); + } + long expectedIncrease = + Math.round(MAX_ELEMENT * DynamicFlowControlCallable.LOW_LATENCY_INCREASE_CONCURRENCY_RATE) + * 20; + assertThat(INITIAL_ELEMENT + expectedIncrease).isGreaterThan(MAX_ELEMENT); + assertThat(flowController.getCurrentElementCountLimit()).isEqualTo(MAX_ELEMENT); + } + + @Test + public void testConcurrentUpdates() throws Exception { + callableToTest = + new DynamicFlowControlCallable( + innerCallable, flowController, stats, 1000, ADJUSTING_INTERVAL_MS); + createFlowControlEvent(flowController); + List futures = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + ApiFuture future = callableToTest.futureCall(request, context); + futures.add(future); + } + for (Future f : futures) { + f.get(); + } + // should only be updated once + long expectedIncrease = + Math.round(MAX_ELEMENT * DynamicFlowControlCallable.LOW_LATENCY_INCREASE_CONCURRENCY_RATE); + assertThat(expectedIncrease).isNotEqualTo(0); + assertThat(INITIAL_ELEMENT + expectedIncrease).isLessThan(MAX_ELEMENT); + assertThat(flowController.getCurrentElementCountLimit()) + .isEqualTo(INITIAL_ELEMENT + expectedIncrease); + } + + @Test + public void testDeadlineExceeded() throws Exception { + // very high latency with deadline exceeded exception, limits should be decreased + Map> extraHeaders = new HashMap<>(); + extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(DEADLINE_EXCEEDED_LATENCY))); + callableToTest.futureCall(request, context.withExtraHeaders(extraHeaders)); + assertThat(flowController.getCurrentElementCountLimit()) + .isEqualTo( + INITIAL_ELEMENT + - Math.round( + MAX_ELEMENT + * DynamicFlowControlCallable.VERY_HIGH_LATENCY_DECREASE_CONCURRENCY_RATE)); + } + + static class MockInnerCallable + extends UnaryCallable> { + List response = Lists.newArrayList(); + + @Override + public ApiFuture> futureCall( + MutateRowsRequest request, ApiCallContext context) { + List latencyHeader = context.getExtraHeaders().get(LATENCY_HEADER); + if (latencyHeader != null) { + try { + Thread.sleep(Integer.valueOf(latencyHeader.get(0))); + } catch (InterruptedException e) { + } + if (Integer.valueOf(latencyHeader.get(0)) == DEADLINE_EXCEEDED_LATENCY) { + return ApiFutures.immediateFailedFuture( + new DeadlineExceededException( + "deadline exceeded", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), false)); + } + } + return ApiFutures.immediateFuture(response); + } + } + + private void createFlowControlEvent(final FlowController flowController) throws Exception { + flowController.reserve(INITIAL_ELEMENT, 0); + Thread t = + new Thread( + new Runnable() { + @Override + public void run() { + try { + flowController.reserve(1, 0); + } catch (Exception e) { + } + } + }); + t.start(); + Thread.sleep(10); + flowController.release(INITIAL_ELEMENT, 0); + t.join(); + flowController.release(1, 0); + + assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent()).isNotNull(); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStatsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStatsTest.java new file mode 100644 index 000000000..653489f33 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStatsTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DynamicFlowControlStatsTest { + + @Test + public void testUpdate() { + DynamicFlowControlStats stats = new DynamicFlowControlStats(); + long now = System.currentTimeMillis(); + + stats.updateLatency(10, now); + assertThat(stats.getMeanLatency(now)).isEqualTo(10); + + stats.updateLatency(10, now); + stats.updateLatency(10, now); + assertThat(stats.getMeanLatency(now)).isEqualTo(10); + + // In five minutes the previous latency should be decayed to under 1. And the new average should + // be very close to 20 + long fiveMinutesLater = now + TimeUnit.MINUTES.toMillis(5); + assertThat(stats.getMeanLatency(fiveMinutesLater)).isLessThan(1); + stats.updateLatency(20, fiveMinutesLater); + assertThat(stats.getMeanLatency(fiveMinutesLater)).isGreaterThan(19); + assertThat(stats.getMeanLatency(fiveMinutesLater)).isLessThan(20); + + long aDayLater = now + TimeUnit.HOURS.toMillis(24); + assertThat(stats.getMeanLatency(aDayLater)).isZero(); + + long timestamp = aDayLater; + for (int i = 0; i < 10; i++) { + timestamp += TimeUnit.SECONDS.toMillis(i); + stats.updateLatency(i, timestamp); + } + assertThat(stats.getMeanLatency(timestamp)).isGreaterThan(4.5); + assertThat(stats.getMeanLatency(timestamp)).isLessThan(6); + } + + @Test(timeout = 1000) + public void testConcurrentUpdates() throws InterruptedException, ExecutionException { + final DynamicFlowControlStats stats = new DynamicFlowControlStats(); + ExecutorService executor = Executors.newCachedThreadPool(); + List> futures = new LinkedList<>(); + for (int i = 1; i <= 50; i++) { + final long latency = i; + Runnable r = + new Runnable() { + @Override + public void run() { + stats.updateLatency(latency); + } + }; + futures.add(executor.submit(r)); + } + for (Future f : futures) { + f.get(); + } + // Mean should be around 50 / 2 = 25 + assertThat(stats.getMeanLatency()).isGreaterThan(20); + assertThat(stats.getMeanLatency()).isLessThan(30); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 10ba67582..b0f60be60 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; @@ -433,6 +434,8 @@ public void bulkMutateRowsSettingsAreNotLostTest() { .setProjectId(dummyProjectId) .setInstanceId(dummyInstanceId); + assertThat(builder.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()).isFalse(); + RetrySettings retrySettings = RetrySettings.newBuilder() .setMaxAttempts(10) @@ -443,13 +446,22 @@ public void bulkMutateRowsSettingsAreNotLostTest() { .setJittered(true) .build(); - BatchingSettings batchingSettings = BatchingSettings.newBuilder().build(); - + long flowControlSetting = 10L; + BatchingSettings batchingSettings = + BatchingSettings.newBuilder() + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(10L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build(); + long targetLatency = 10L; builder .bulkMutateRowsSettings() .setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED) .setRetrySettings(retrySettings) .setBatchingSettings(batchingSettings) + .enableLatencyBasedThrottling(targetLatency) .build(); assertThat(builder.bulkMutateRowsSettings().getRetryableCodes()) @@ -457,6 +469,20 @@ public void bulkMutateRowsSettingsAreNotLostTest() { assertThat(builder.bulkMutateRowsSettings().getRetrySettings()).isEqualTo(retrySettings); assertThat(builder.bulkMutateRowsSettings().getBatchingSettings()) .isSameInstanceAs(batchingSettings); + assertThat(builder.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()).isTrue(); + assertThat(builder.bulkMutateRowsSettings().getTargetRpcLatencyMs()).isEqualTo(targetLatency); + assertThat( + builder + .bulkMutateRowsSettings() + .getDynamicFlowControlSettings() + .getMaxOutstandingElementCount()) + .isEqualTo(flowControlSetting); + assertThat( + builder + .bulkMutateRowsSettings() + .getDynamicFlowControlSettings() + .getMaxOutstandingRequestBytes()) + .isEqualTo(flowControlSetting); assertThat(builder.build().bulkMutateRowsSettings().getRetryableCodes()) .containsAtLeast(Code.ABORTED, Code.DEADLINE_EXCEEDED); @@ -464,6 +490,23 @@ public void bulkMutateRowsSettingsAreNotLostTest() { .isEqualTo(retrySettings); assertThat(builder.build().bulkMutateRowsSettings().getBatchingSettings()) .isSameInstanceAs(batchingSettings); + assertThat(builder.build().bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()).isTrue(); + assertThat(builder.build().bulkMutateRowsSettings().getTargetRpcLatencyMs()) + .isEqualTo(targetLatency); + assertThat( + builder + .build() + .bulkMutateRowsSettings() + .getDynamicFlowControlSettings() + .getMaxOutstandingElementCount()) + .isEqualTo(flowControlSetting); + assertThat( + builder + .build() + .bulkMutateRowsSettings() + .getDynamicFlowControlSettings() + .getMaxOutstandingRequestBytes()) + .isEqualTo(flowControlSetting); assertThat(builder.build().toBuilder().bulkMutateRowsSettings().getRetryableCodes()) .containsAtLeast(Code.ABORTED, Code.DEADLINE_EXCEEDED); @@ -471,6 +514,27 @@ public void bulkMutateRowsSettingsAreNotLostTest() { .isEqualTo(retrySettings); assertThat(builder.build().toBuilder().bulkMutateRowsSettings().getBatchingSettings()) .isSameInstanceAs(batchingSettings); + assertThat( + builder.build().toBuilder().bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) + .isTrue(); + assertThat(builder.build().toBuilder().bulkMutateRowsSettings().getTargetRpcLatencyMs()) + .isEqualTo(targetLatency); + assertThat( + builder + .build() + .toBuilder() + .bulkMutateRowsSettings() + .getDynamicFlowControlSettings() + .getMaxOutstandingElementCount()) + .isEqualTo(flowControlSetting); + assertThat( + builder + .build() + .toBuilder() + .bulkMutateRowsSettings() + .getDynamicFlowControlSettings() + .getMaxOutstandingRequestBytes()) + .isEqualTo(flowControlSetting); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index 194123b1b..804d51a2e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -17,6 +17,10 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.bigtable.v2.BigtableGrpc; @@ -85,6 +89,7 @@ public void setUp() throws IOException, IllegalAccessException, InstantiationExc @After public void tearDown() { + enhancedBigtableStub.close(); serviceHelper.shutdown(); } @@ -152,6 +157,89 @@ public void testUserAgent() throws InterruptedException { .containsMatch("bigtable-java/\\d+\\.\\d+\\.\\d+(?:-SNAPSHOT)?"); } + @Test + public void testBulkMutationFlowControllerConfigured() throws Exception { + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilder() + .setProjectId("my-project") + .setInstanceId("my-instance") + .setCredentialsProvider(defaultSettings.getCredentialsProvider()) + .enableBatchMutationLatencyBasedThrottling(10L); + + settings + .stubSettings() + .bulkMutateRowsSettings() + .setBatchingSettings( + BatchingSettings.newBuilder() + .setElementCountThreshold(50L) + .setRequestByteThreshold(500L) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(100L) + .setMaxOutstandingRequestBytes(1000L) + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()) + .build()) + .build(); + + try (EnhancedBigtableStub stub1 = + EnhancedBigtableStub.create(settings.build().getStubSettings()); + EnhancedBigtableStub stub2 = + EnhancedBigtableStub.create(settings.build().getStubSettings())) { + + // Creating 2 batchers from the same stub, they should share the same FlowController and + // FlowControlEventStats + try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1"); + BatcherImpl batcher2 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table2")) { + assertThat(batcher1.getFlowController()).isNotNull(); + assertThat(batcher1.getFlowController().getFlowControlEventStats()).isNotNull(); + assertThat(batcher1).isNotSameInstanceAs(batcher2); + assertThat(batcher1.getFlowController()).isSameInstanceAs(batcher2.getFlowController()); + assertThat(batcher1.getFlowController().getFlowControlEventStats()) + .isSameInstanceAs(batcher2.getFlowController().getFlowControlEventStats()); + // Verify flow controller settings + assertThat(batcher1.getFlowController().getMaxElementCountLimit()).isEqualTo(100L); + assertThat(batcher1.getFlowController().getMaxRequestBytesLimit()).isEqualTo(1000L); + assertThat(batcher1.getFlowController().getCurrentElementCountLimit()).isLessThan(100L); + assertThat(batcher1.getFlowController().getCurrentRequestBytesLimit()).isEqualTo(1000L); + assertThat(batcher1.getFlowController().getMinElementCountLimit()) + .isAtLeast( + settings + .stubSettings() + .bulkMutateRowsSettings() + .getBatchingSettings() + .getElementCountThreshold()); + assertThat(batcher1.getFlowController().getMinRequestBytesLimit()).isEqualTo(1000L); + } + + // Creating 2 batchers from different stubs, they should not share the same FlowController and + // FlowControlEventStats + try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1"); + BatcherImpl batcher2 = (BatcherImpl) stub2.newMutateRowsBatcher("my-table2")) { + assertThat(batcher1.getFlowController()).isNotNull(); + assertThat(batcher1.getFlowController().getFlowControlEventStats()).isNotNull(); + assertThat(batcher1.getFlowController()).isNotSameInstanceAs(batcher2.getFlowController()); + assertThat(batcher1.getFlowController().getFlowControlEventStats()) + .isNotSameInstanceAs(batcher2.getFlowController().getFlowControlEventStats()); + } + } + try (EnhancedBigtableStub stub1 = + EnhancedBigtableStub.create(settings.build().getStubSettings()); + EnhancedBigtableStub stub2 = + EnhancedBigtableStub.create( + settings + .disableBatchMutationLatencyBasedThrottling() + .build() + .getStubSettings()); ) { + + try (BatcherImpl batcher = (BatcherImpl) stub2.newMutateRowsBatcher("my-table")) { + assertThat(batcher.getFlowController().getMaxElementCountLimit()).isEqualTo(100L); + assertThat(batcher.getFlowController().getCurrentElementCountLimit()).isEqualTo(100L); + assertThat(batcher.getFlowController().getMinElementCountLimit()).isEqualTo(100L); + } + } + } + private static class MetadataInterceptor implements ServerInterceptor { final BlockingQueue headers = Queues.newLinkedBlockingDeque();