Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic flow control #721

Merged
merged 5 commits into from Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,19 +17,23 @@

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import io.grpc.ManagedChannelBuilder;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
Expand Down Expand Up @@ -219,6 +223,25 @@ public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}

/**
* Gets if latency based throttling is enabled for {@link
* BigtableDataClient#newBulkMutationBatcher(String)}
*/
@BetaApi("Latency based throttling is not currently stable and may change in the future")
public boolean isLatencyBasedThrottlingForBatchMutationsEnabled() {
return stubSettings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled();
}

/**
* Gets target bulk mutation rpc latency if latency based throttling is enabled for {@link
* BigtableDataClient#newBulkMutationBatcher(String)}. Otherwise returns null.
*/
@BetaApi("Latency based throttling is not currently stable and may change in the future")
@Nullable
public Long getBatchMutationsTargetRpcLatencyMs() {
return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs();
}

/** Returns the underlying RPC settings. */
public EnhancedBigtableStubSettings getStubSettings() {
return stubSettings;
Expand Down Expand Up @@ -375,6 +398,74 @@ public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}

/**
* Enable latency based throttling for {@link BigtableDataClient#newBulkMutationBatcher(String)}
* with a target rpc latency. The number of allowed in-flight requests will be adjusted to reach
* the target bulk mutations rpc latency.
*
* <p>The logic of adjusting in-flight request limits is as follows:
*
* <pre>
* To start, {@link Batcher} allows {@link FlowController#getCurrentElementCountLimit()}
* in-flight elements with a total size of {@link FlowController#getCurrentRequestBytesLimit()}.
*
* Every 20 seconds, {@link Batcher} checks the mean rpc latency of the requests and compare
* it with the target rpc latency:
* if (mean latency &gt; 3 * target latency) {
* decrease element count limit by 30% of {@link FlowController#getMaxElementCountLimit()}
* } else if (mean latency &gt; 1.2 * target latency) {
* decrease element count limit by 10% of {@link FlowController#getMaxElementCountLimit()}
* } else if (there was throttling in the past 5 minutes
* && mean latency &lt; 0.8 * target latency) {
* increase element count limit by 5% of {@link FlowController#getMaxElementCountLimit()}
* } else if (there was throttling in the past 5 minutes
* && parallelism is 5% of {@link FlowController#getMaxElementCountLimit()}
* && mean latency &lt; 2 * target latency) {
* increase element count limit by 2% of {@link FlowController#getMaxElementCountLimit()}
*
* Increases are capped by {@link
* FlowController#getMaxElementCountLimit()}, Decreases are floored at {@link
* FlowController#getMinElementCountLimit()} so that there is some level of throughput.
* </pre>
*
* @see BigtableBatchingCallSettings.Builder#getDynamicFlowControlSettings() for explanation on
* default configurations.
*/
@BetaApi("Latency based throttling is not currently stable and may change in the future")
public Builder enableBatchMutationLatencyBasedThrottling(long targetRpcLatencyMs) {
stubSettings.bulkMutateRowsSettings().enableLatencyBasedThrottling(targetRpcLatencyMs);
return this;
}

/**
* Disable latency based throttling for {@link
* BigtableDataClient#newBulkMutationBatcher(String)}.
*/
@BetaApi("Latency based throttling is not currently stable and may change in the future")
public Builder disableBatchMutationLatencyBasedThrottling() {
stubSettings.bulkMutateRowsSettings().disableLatencyBasedThrottling();
return this;
}

/**
* Gets if latency based throttling is enabled for {@link
* BigtableDataClient#newBulkMutationBatcher(String)}
*/
@BetaApi("Latency based throttling is not currently stable and may change in the future")
public boolean isLatencyBasedThrottlingForBatchMutationEnabled() {
return stubSettings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled();
}

/**
* Gets target bulk mutation rpc latency if latency based throttling is enabled for {@link
* BigtableDataClient#newBulkMutationBatcher(String)}. Otherwise returns null.
*/
@BetaApi("Latency based throttling is not currently stable and may change in the future")
@Nullable
public Long getTargetRpcLatencyMsForBatchMutation() {
return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs();
}

/**
* Returns the underlying settings for making RPC calls. The settings should be changed with
* care.
Expand Down
Expand Up @@ -19,14 +19,19 @@
import com.google.api.gax.batching.BatchingCallSettings;
import com.google.api.gax.batching.BatchingDescriptor;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.DynamicFlowControlSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* This settings holds the batching thresholds as well as retry configuration.
Expand All @@ -43,6 +48,7 @@
* .setDelayThreshold(Duration.ofSeconds(10))
* .build())
* .setRetryableCodes(Code.DEADLINE_EXCEEDED)
* .setLatencyBasedThrottling(true, 1000L)
* .build();
* }</pre>
*
Expand All @@ -54,7 +60,11 @@ public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMu

// This settings is just a simple wrapper for BatchingCallSettings to allow us to add
// additional functionality.
private BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void> batchingCallSettings;
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void>
batchingCallSettings;
private final boolean isLatencyBasedThrottlingEnabled;
private final Long targetRpcLatencyMs;
private final DynamicFlowControlSettings dynamicFlowControlSettings;

private BigtableBatchingCallSettings(Builder builder) {
super(builder);
Expand All @@ -64,6 +74,9 @@ private BigtableBatchingCallSettings(Builder builder) {
.setRetrySettings(builder.getRetrySettings())
.setRetryableCodes(builder.getRetryableCodes())
.build();
this.isLatencyBasedThrottlingEnabled = builder.isLatencyBasedThrottlingEnabled;
this.targetRpcLatencyMs = builder.targetRpcLatencyMs;
this.dynamicFlowControlSettings = builder.dynamicFlowControlSettings;
}

/** Returns batching settings which contains multiple batch threshold levels. */
Expand All @@ -76,6 +89,26 @@ BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescri
return batchingCallSettings.getBatchingDescriptor();
}

/** Gets if latency based throttling is enabled. */
public boolean isLatencyBasedThrottlingEnabled() {
return isLatencyBasedThrottlingEnabled;
}

/** Gets target rpc latency if latency based throttling is enabled. Otherwise returns null. */
@Nullable
public Long getTargetRpcLatencyMs() {
return targetRpcLatencyMs;
}

/**
* Gets {@link DynamicFlowControlSettings}.
*
* @see Builder#getDynamicFlowControlSettings()
*/
DynamicFlowControlSettings getDynamicFlowControlSettings() {
return dynamicFlowControlSettings;
}

static Builder newBuilder(
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
return new Builder(batchingDescriptor);
Expand All @@ -90,6 +123,16 @@ public final Builder toBuilder() {
return new Builder(this);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("batchingCallSettings", batchingCallSettings)
.add("isLatencyBasedThrottlingEnabled", isLatencyBasedThrottlingEnabled)
.add("targetRpcLatency", targetRpcLatencyMs)
.add("dynamicFlowControlSettings", dynamicFlowControlSettings)
.toString();
}

/**
* A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
* {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
Expand All @@ -98,6 +141,9 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void

private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor;
private BatchingSettings batchingSettings;
private boolean isLatencyBasedThrottlingEnabled;
private Long targetRpcLatencyMs;
private DynamicFlowControlSettings dynamicFlowControlSettings;

private Builder(
@Nonnull
Expand All @@ -110,6 +156,9 @@ private Builder(@Nonnull BigtableBatchingCallSettings settings) {
super(settings);
this.batchingDescriptor = settings.getBatchingDescriptor();
this.batchingSettings = settings.getBatchingSettings();
this.isLatencyBasedThrottlingEnabled = settings.isLatencyBasedThrottlingEnabled();
this.targetRpcLatencyMs = settings.getTargetRpcLatencyMs();
this.dynamicFlowControlSettings = settings.getDynamicFlowControlSettings();
}

/** Sets the batching settings with various thresholds. */
Expand Down Expand Up @@ -145,9 +194,137 @@ public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) {
return this;
}

/**
* Enable latency based throttling. The number of allowed in-flight requests will be adjusted to
* reach the target rpc latency.
*/
public Builder enableLatencyBasedThrottling(long targetRpcLatency) {
Preconditions.checkArgument(
targetRpcLatency > 0, "target RPC latency must be greater than 0");
this.isLatencyBasedThrottlingEnabled = true;
this.targetRpcLatencyMs = targetRpcLatency;
return this;
}

/** Disable latency based throttling. */
public Builder disableLatencyBasedThrottling() {
this.isLatencyBasedThrottlingEnabled = false;
this.targetRpcLatencyMs = null;
return this;
}

/** Gets target rpc latency if latency based throttling is enabled. Otherwise returns null. */
@Nullable
public Long getTargetRpcLatencyMs() {
return isLatencyBasedThrottlingEnabled ? targetRpcLatencyMs : null;
}

/** Gets if latency based throttling is enabled. */
public boolean isLatencyBasedThrottlingEnabled() {
return this.isLatencyBasedThrottlingEnabled;
}

/**
* Gets the {@link DynamicFlowControlSettings} that'll be used to set up a {@link
* FlowController} for throttling.
*
* <p>By default, this will allow a maximum of 1000 entries per channel of {@link
* FlowControlSettings.Builder#setMaxOutstandingElementCount request count} and 100MB of {@link
* FlowControlSettings.Builder#setMaxOutstandingRequestBytes accumulated size} in-flight
* requests. Once the limits are reached, pending operations will by default be {@link
* FlowControlSettings.Builder#setLimitExceededBehavior blocked} until some of the in-flight
* requests are resolved.
*
* <p>If latency based throttling is enabled, number of entries allowed by {@link
* FlowController} will be adjusted to reach {@link Builder#getTargetRpcLatencyMs()}.
*
* <ul>
* <li>{@link FlowController} will be set to allow Math.max({@link BatchingSettings.Builder
* #setElementCountThreshold batch size}, {@link
* FlowControlSettings.Builder#setMaxOutstandingElementCount request count} / 4) entries
* to start with.
* <li>If bulk mutation rpc latency is higher than target latency, decrease allowed entries to
* a minimum of Math.max({@link BatchingSettings.Builder#setElementCountThreshold batch
* size}, {@link FlowControlSettings.Builder#setMaxOutstandingElementCount request count}
* / 100).
* <li>If bulk mutation rpc latency is lower than target latency and there was throttling,
* increase allowed entries to a maximum of {@link
* FlowControlSettings.Builder#setMaxOutstandingElementCount request count}.
* </ul>
*
* If latency based throttling is disabled, {@link FlowController} will always allow {@link
* FlowControlSettings.Builder#setMaxOutstandingElementCount request count}.
*
* <p>Latency based throttling only updates outstanding entries count. {@link FlowController}
* will always allow {@link FlowControlSettings.Builder#setMaxOutstandingRequestBytes
* accumulated size}.
*/
DynamicFlowControlSettings getDynamicFlowControlSettings() {
return this.dynamicFlowControlSettings;
}

/** Builds the {@link BigtableBatchingCallSettings} object with provided configuration. */
@Override
public BigtableBatchingCallSettings build() {
Preconditions.checkState(batchingSettings != null, "batchingSettings must be set");
FlowControlSettings defaultSettings = batchingSettings.getFlowControlSettings();
Preconditions.checkState(
defaultSettings.getMaxOutstandingElementCount() != null,
"maxOutstandingElementCount must be set in BatchingSettings#FlowControlSettings");
Preconditions.checkState(
defaultSettings.getMaxOutstandingRequestBytes() != null,
"maxOutstandingRequestBytes must be set in BatchingSettings#FlowControlSettings");
Preconditions.checkArgument(
batchingSettings.getElementCountThreshold() == null
|| defaultSettings.getMaxOutstandingElementCount()
>= batchingSettings.getElementCountThreshold(),
"if elementCountThreshold is set in BatchingSettings, maxOutstandingElementCount must be >= elementCountThreshold");
Preconditions.checkArgument(
batchingSettings.getRequestByteThreshold() == null
|| defaultSettings.getMaxOutstandingRequestBytes()
>= batchingSettings.getRequestByteThreshold(),
"if requestByteThreshold is set in BatchingSettings, getMaxOutstandingRequestBytes must be >= getRequestByteThreshold");
// Combine static FlowControlSettings with latency based throttling settings to create
// DynamicFlowControlSettings.
if (isLatencyBasedThrottlingEnabled()) {
long maxThrottlingElementCount = defaultSettings.getMaxOutstandingElementCount();
long maxThrottlingRequestByteCount = defaultSettings.getMaxOutstandingRequestBytes();
// The maximum in flight element count is pretty high. Set the initial parallelism to 25%
// of the maximum and then work up or down. This reduction should reduce the
// impacts of a bursty job, such as those found in Dataflow.
long initialElementCount = maxThrottlingElementCount / 4;
// Decreases are floored at 1% of the maximum so that there is some level of
// throughput.
long minElementCount = maxThrottlingElementCount / 100;
// Make sure initialOutstandingElementCount and minOutstandingElementCount element count are
// greater or equal to batch size to avoid deadlocks.
if (batchingSettings.getElementCountThreshold() != null) {
initialElementCount =
Math.max(initialElementCount, batchingSettings.getElementCountThreshold());
minElementCount = Math.max(minElementCount, batchingSettings.getElementCountThreshold());
}
dynamicFlowControlSettings =
DynamicFlowControlSettings.newBuilder()
.setLimitExceededBehavior(defaultSettings.getLimitExceededBehavior())
.setInitialOutstandingElementCount(initialElementCount)
.setMaxOutstandingElementCount(maxThrottlingElementCount)
.setMinOutstandingElementCount(minElementCount)
.setInitialOutstandingRequestBytes(maxThrottlingRequestByteCount)
.setMinOutstandingRequestBytes(maxThrottlingRequestByteCount)
.setMaxOutstandingRequestBytes(maxThrottlingRequestByteCount)
.build();
} else {
dynamicFlowControlSettings =
DynamicFlowControlSettings.newBuilder()
.setLimitExceededBehavior(defaultSettings.getLimitExceededBehavior())
.setInitialOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount())
.setMaxOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount())
.setMinOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount())
.setInitialOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes())
.setMinOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes())
.setMaxOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes())
.build();
}
return new BigtableBatchingCallSettings(this);
}
}
Expand Down