Skip to content

Commit

Permalink
fix: auto throttle administrative requests
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Mar 16, 2020
1 parent 7e918ee commit ae90f2b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 3 deletions.
Expand Up @@ -81,6 +81,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final InstanceAdminStubSettings instanceAdminStubSettings;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final Duration partitionedDmlTimeout;
private final boolean autoThrottleAdministrativeRequests;
/**
* These are the default {@link QueryOptions} defined by the user on this {@link SpannerOptions}.
*/
Expand Down Expand Up @@ -152,6 +153,7 @@ private SpannerOptions(Builder builder) {
throw SpannerExceptionFactory.newSpannerException(e);
}
partitionedDmlTimeout = builder.partitionedDmlTimeout;
autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
defaultQueryOptions = builder.defaultQueryOptions;
envQueryOptions = builder.getEnvironmentQueryOptions();
if (envQueryOptions.equals(QueryOptions.getDefaultInstance())) {
Expand Down Expand Up @@ -226,6 +228,7 @@ public static class Builder
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
DatabaseAdminStubSettings.newBuilder();
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
private boolean autoThrottleAdministrativeRequests = false;
private Map<DatabaseId, QueryOptions> defaultQueryOptions = new HashMap<>();
private CallCredentialsProvider callCredentialsProvider;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
Expand All @@ -242,6 +245,7 @@ private Builder() {}
this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder();
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.defaultQueryOptions = options.defaultQueryOptions;
this.callCredentialsProvider = options.callCredentialsProvider;
this.channelProvider = options.channelProvider;
Expand Down Expand Up @@ -435,6 +439,22 @@ public Builder setPartitionedDmlTimeout(Duration timeout) {
return this;
}

/**
* Instructs the client library to automatically throttle the number of administrative requests
* if the rate of administrative requests generated by this {@link Spanner} instance will exceed
* the administrative limits Cloud Spanner. The default behavior is to not throttle any
* requests. If the limit is exceeded, Cloud Spanner will return a RESOURCE_EXHAUSTED error.
* More information on the administrative limits can be found here:
* https://cloud.google.com/spanner/quotas#administrative_limits. Setting this option is not a
* guarantee that the rate will never be exceeded, as this option will only throttle requests
* coming from this client. Additional requests from other clients could still cause the limit
* to be exceeded.
*/
public Builder setAutoThrottleAdministrativeRequests() {
this.autoThrottleAdministrativeRequests = true;
return this;
}

/**
* Sets the default {@link QueryOptions} that will be used for all queries on the specified
* database. Query options can also be specified on a per-query basis and as environment
Expand Down Expand Up @@ -592,6 +612,10 @@ public Duration getPartitionedDmlTimeout() {
return partitionedDmlTimeout;
}

public boolean isAutoThrottleAdministrativeRequests() {
return autoThrottleAdministrativeRequests;
}

public CallCredentialsProvider getCallCredentialsProvider() {
return callCredentialsProvider;
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.iam.v1.GetIamPolicyRequest;
import com.google.iam.v1.Policy;
Expand Down Expand Up @@ -125,6 +126,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -204,6 +207,11 @@ private synchronized void shutdown() {

private final ScheduledExecutorService spannerWatchdog;

private final boolean throttleAdministrativeRequests;
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 5.0D;
private static final ConcurrentMap<String, RateLimiter> ADMINISTRATIVE_REQUESTS_RATE_LIMITERS =
new ConcurrentHashMap<String, RateLimiter>();

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
}
Expand All @@ -218,6 +226,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
} catch (UnsupportedEncodingException e) { // Ignored.
}
this.projectName = projectNameStr;
this.throttleAdministrativeRequests = options.isAutoThrottleAdministrativeRequests();
if (throttleAdministrativeRequests) {
ADMINISTRATIVE_REQUESTS_RATE_LIMITERS.putIfAbsent(
projectNameStr, RateLimiter.create(ADMINISTRATIVE_REQUESTS_RATE_LIMIT));
}

// create a metadataProvider which combines both internal headers and
// per-method-call extra headers for channelProvider to inject the headers
Expand Down Expand Up @@ -322,6 +335,15 @@ public GapicSpannerRpc(final SpannerOptions options) {
}
}

private void acquireAdministrativeRequestsRateLimiter() {
if (throttleAdministrativeRequests) {
RateLimiter limiter = ADMINISTRATIVE_REQUESTS_RATE_LIMITERS.get(this.projectName);
if (limiter != null) {
limiter.acquire();
}
}
}

@Override
public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable String pageToken)
throws SpannerException {
Expand Down Expand Up @@ -408,6 +430,7 @@ public void deleteInstance(String instanceName) throws SpannerException {
@Override
public Paginated<Operation> listBackupOperations(
String instanceName, int pageSize, @Nullable String filter, @Nullable String pageToken) {
acquireAdministrativeRequestsRateLimiter();
ListBackupOperationsRequest.Builder requestBuilder =
ListBackupOperationsRequest.newBuilder().setParent(instanceName).setPageSize(pageSize);
if (filter != null) {
Expand All @@ -427,6 +450,7 @@ public Paginated<Operation> listBackupOperations(
@Override
public Paginated<Operation> listDatabaseOperations(
String instanceName, int pageSize, @Nullable String filter, @Nullable String pageToken) {
acquireAdministrativeRequestsRateLimiter();
ListDatabaseOperationsRequest.Builder requestBuilder =
ListDatabaseOperationsRequest.newBuilder().setParent(instanceName).setPageSize(pageSize);

Expand All @@ -448,6 +472,7 @@ public Paginated<Operation> listDatabaseOperations(
public Paginated<Backup> listBackups(
String instanceName, int pageSize, @Nullable String filter, @Nullable String pageToken)
throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
ListBackupsRequest.Builder requestBuilder =
ListBackupsRequest.newBuilder().setParent(instanceName).setPageSize(pageSize);
if (filter != null) {
Expand All @@ -467,6 +492,7 @@ public Paginated<Backup> listBackups(
@Override
public Paginated<Database> listDatabases(
String instanceName, int pageSize, @Nullable String pageToken) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
ListDatabasesRequest.Builder requestBuilder =
ListDatabasesRequest.newBuilder().setParent(instanceName).setPageSize(pageSize);
if (pageToken != null) {
Expand All @@ -484,6 +510,7 @@ public Paginated<Database> listDatabases(
public OperationFuture<Database, CreateDatabaseMetadata> createDatabase(
String instanceName, String createDatabaseStatement, Iterable<String> additionalStatements)
throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
CreateDatabaseRequest request =
CreateDatabaseRequest.newBuilder()
.setParent(instanceName)
Expand All @@ -498,6 +525,7 @@ public OperationFuture<Database, CreateDatabaseMetadata> createDatabase(
public OperationFuture<Empty, UpdateDatabaseDdlMetadata> updateDatabaseDdl(
String databaseName, Iterable<String> updateDatabaseStatements, @Nullable String updateId)
throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
UpdateDatabaseDdlRequest request =
UpdateDatabaseDdlRequest.newBuilder()
.setDatabase(databaseName)
Expand Down Expand Up @@ -526,6 +554,7 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> updateDatabaseDdl(

@Override
public void dropDatabase(String databaseName) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
DropDatabaseRequest request =
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();

Expand All @@ -535,6 +564,7 @@ public void dropDatabase(String databaseName) throws SpannerException {

@Override
public Database getDatabase(String databaseName) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
GetDatabaseRequest request = GetDatabaseRequest.newBuilder().setName(databaseName).build();

GrpcCallContext context = newCallContext(null, databaseName);
Expand All @@ -543,6 +573,7 @@ public Database getDatabase(String databaseName) throws SpannerException {

@Override
public List<String> getDatabaseDdl(String databaseName) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
GetDatabaseDdlRequest request =
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();

Expand All @@ -554,6 +585,7 @@ public List<String> getDatabaseDdl(String databaseName) throws SpannerException
@Override
public OperationFuture<Backup, CreateBackupMetadata> createBackup(
String instanceName, String backupId, Backup backup) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
CreateBackupRequest request =
CreateBackupRequest.newBuilder()
.setParent(instanceName)
Expand All @@ -567,7 +599,7 @@ public OperationFuture<Backup, CreateBackupMetadata> createBackup(
@Override
public final OperationFuture<Database, RestoreDatabaseMetadata> restoreDatabase(
String databaseInstanceName, String databaseId, String backupName) {

acquireAdministrativeRequestsRateLimiter();
RestoreDatabaseRequest request =
RestoreDatabaseRequest.newBuilder()
.setParent(databaseInstanceName)
Expand All @@ -580,6 +612,7 @@ public final OperationFuture<Database, RestoreDatabaseMetadata> restoreDatabase(

@Override
public final Backup updateBackup(Backup backup, FieldMask updateMask) {
acquireAdministrativeRequestsRateLimiter();
UpdateBackupRequest request =
UpdateBackupRequest.newBuilder().setBackup(backup).setUpdateMask(updateMask).build();
GrpcCallContext context = newCallContext(null, backup.getName());
Expand All @@ -588,20 +621,23 @@ public final Backup updateBackup(Backup backup, FieldMask updateMask) {

@Override
public final void deleteBackup(String backupName) {
acquireAdministrativeRequestsRateLimiter();
DeleteBackupRequest request = DeleteBackupRequest.newBuilder().setName(backupName).build();
GrpcCallContext context = newCallContext(null, backupName);
databaseAdminStub.deleteBackupCallable().call(request, context);
}

@Override
public Backup getBackup(String backupName) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
GetBackupRequest request = GetBackupRequest.newBuilder().setName(backupName).build();
GrpcCallContext context = newCallContext(null, backupName);
return get(databaseAdminStub.getBackupCallable().futureCall(request, context));
}

@Override
public Operation getOperation(String name) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
GrpcCallContext context = newCallContext(null, name);
return get(
Expand All @@ -610,6 +646,7 @@ public Operation getOperation(String name) throws SpannerException {

@Override
public void cancelOperation(String name) throws SpannerException {
acquireAdministrativeRequestsRateLimiter();
CancelOperationRequest request = CancelOperationRequest.newBuilder().setName(name).build();
GrpcCallContext context = newCallContext(null, name);
get(
Expand Down Expand Up @@ -770,6 +807,7 @@ public PartitionResponse partitionRead(

@Override
public Policy getDatabaseAdminIAMPolicy(String resource) {
acquireAdministrativeRequestsRateLimiter();
GrpcCallContext context = newCallContext(null, resource);
return get(
databaseAdminStub
Expand All @@ -779,6 +817,7 @@ public Policy getDatabaseAdminIAMPolicy(String resource) {

@Override
public Policy setDatabaseAdminIAMPolicy(String resource, Policy policy) {
acquireAdministrativeRequestsRateLimiter();
GrpcCallContext context = newCallContext(null, resource);
return get(
databaseAdminStub
Expand All @@ -791,6 +830,7 @@ public Policy setDatabaseAdminIAMPolicy(String resource, Policy policy) {
@Override
public TestIamPermissionsResponse testDatabaseAdminIAMPermissions(
String resource, Iterable<String> permissions) {
acquireAdministrativeRequestsRateLimiter();
GrpcCallContext context = newCallContext(null, resource);
return get(
databaseAdminStub
Expand All @@ -805,6 +845,7 @@ public TestIamPermissionsResponse testDatabaseAdminIAMPermissions(

@Override
public Policy getInstanceAdminIAMPolicy(String resource) {
acquireAdministrativeRequestsRateLimiter();
GrpcCallContext context = newCallContext(null, resource);
return get(
instanceAdminStub
Expand All @@ -814,6 +855,7 @@ public Policy getInstanceAdminIAMPolicy(String resource) {

@Override
public Policy setInstanceAdminIAMPolicy(String resource, Policy policy) {
acquireAdministrativeRequestsRateLimiter();
GrpcCallContext context = newCallContext(null, resource);
return get(
instanceAdminStub
Expand All @@ -826,6 +868,7 @@ public Policy setInstanceAdminIAMPolicy(String resource, Policy policy) {
@Override
public TestIamPermissionsResponse testInstanceAdminIAMPermissions(
String resource, Iterable<String> permissions) {
acquireAdministrativeRequestsRateLimiter();
GrpcCallContext context = newCallContext(null, resource);
return get(
instanceAdminStub
Expand Down
Expand Up @@ -133,7 +133,10 @@ public void cleanUp() {
*/
public static RemoteSpannerHelper create(InstanceId instanceId) throws Throwable {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId(instanceId.getProject()).build();
SpannerOptions.newBuilder()
.setProjectId(instanceId.getProject())
.setAutoThrottleAdministrativeRequests()
.build();
Spanner client = options.getService();
return new RemoteSpannerHelper(options, instanceId, client);
}
Expand Down
Expand Up @@ -51,7 +51,8 @@ public GceTestEnvConfig() {
double errorProbability =
Double.parseDouble(System.getProperty(GCE_STREAM_BROKEN_PROBABILITY, "0.0"));
checkState(errorProbability <= 1.0);
SpannerOptions.Builder builder = SpannerOptions.newBuilder();
SpannerOptions.Builder builder =
SpannerOptions.newBuilder().setAutoThrottleAdministrativeRequests();
if (!projectId.isEmpty()) {
builder.setProjectId(projectId);
}
Expand Down
Expand Up @@ -87,6 +87,7 @@ public static class BackupTestEnvConfig extends GceTestEnvConfig {
@Override
public SpannerOptions spannerOptions() {
SpannerOptions.Builder builder = super.spannerOptions().toBuilder();
builder.setAutoThrottleAdministrativeRequests();
builder
.getDatabaseAdminStubSettingsBuilder()
.createDatabaseOperationSettings()
Expand Down Expand Up @@ -159,6 +160,7 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
for (String backup : backups) {
waitForDbOperations(backup);
dbAdminClient.deleteBackup(testHelper.getInstanceId().getInstance(), backup);
}
backups.clear();
Expand All @@ -167,6 +169,41 @@ public void tearDown() throws Exception {
}
}

private void waitForDbOperations(String backupId) throws InterruptedException {
try {
Backup backupMetadata =
dbAdminClient.getBackup(testHelper.getInstanceId().getInstance(), backupId);
boolean allDbOpsDone = false;
while (!allDbOpsDone) {
allDbOpsDone = true;
for (String referencingDb : backupMetadata.getProto().getReferencingDatabasesList()) {
String filter =
String.format(
"name:%s/operations/ AND "
+ "(metadata.@type:type.googleapis.com/"
+ "google.spanner.admin.database.v1.OptimizeRestoredDatabaseMetadata)",
referencingDb);
for (Operation op :
dbAdminClient
.listDatabaseOperations(
testHelper.getInstanceId().getInstance(), Options.filter(filter))
.iterateAll()) {
if (!op.getDone()) {
Thread.sleep(5000L);
allDbOpsDone = false;
break;
}
}
}
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
return;
}
throw e;
}
}

private String getUniqueBackupId() {
return String.format("testbck_%06d_%04d", random.nextInt(1000000), backupSeq.incrementAndGet());
}
Expand Down Expand Up @@ -477,6 +514,7 @@ private void testPagination(int expectedMinimumTotalBackups) {
}

private void testDelete(String backupId) throws InterruptedException, ExecutionException {
waitForDbOperations(backupId);
// Get the backup.
logger.info("Fetching backup");
Backup backup = instance.getBackup(backupId);
Expand Down

0 comments on commit ae90f2b

Please sign in to comment.