Skip to content

Commit

Permalink
fix: reduce the probability of RESOURCE_EXHAUSTED errors during tests
Browse files Browse the repository at this point in the history
Reduces the probability of RESOURCE_EXHAUSTED errors during tests by making the
GetOperation method retry errors with this code with an exponential backoff.
The GetOperation method is called repeatedly for long-running operations by a
polling future. These calls also count towards the max 5 admin requests per second.

Fixes #733
  • Loading branch information
olavloite committed Dec 13, 2020
1 parent f14f7c9 commit da8b666
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 5 deletions.
Expand Up @@ -26,6 +26,8 @@
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcStubCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
Expand All @@ -35,6 +37,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.rpc.OperationCallable;
Expand All @@ -43,6 +46,8 @@
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.UnavailableException;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.api.pathtemplate.PathTemplate;
Expand All @@ -58,7 +63,7 @@
import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory;
import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
Expand All @@ -70,6 +75,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.iam.v1.GetIamPolicyRequest;
Expand Down Expand Up @@ -155,6 +161,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -428,7 +435,43 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build();
this.databaseAdminStub = GrpcDatabaseAdminStub.create(this.databaseAdminStubSettings);
GrpcStubCallableFactory factory =
new GrpcDatabaseAdminCallableFactory() {
@Override
public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCallable(
GrpcCallSettings<RequestT, ResponseT> grpcCallSettings,
UnaryCallSettings<RequestT, ResponseT> callSettings,
ClientContext clientContext) {
// Make GetOperation retry on RESOURCE_EXHAUSTED to prevent polling operations from
// failing with an Administrative requests limit exceeded error.
if (grpcCallSettings
.getMethodDescriptor()
.getFullMethodName()
.equals("google.longrunning.Operations/GetOperation")) {
Set<StatusCode.Code> codes =
ImmutableSet.<StatusCode.Code>builderWithExpectedSize(
callSettings.getRetryableCodes().size() + 1)
.addAll(callSettings.getRetryableCodes())
.add(StatusCode.Code.RESOURCE_EXHAUSTED)
.build();
callSettings =
callSettings
.toBuilder()
.setRetryableCodes(codes)
.setRetrySettings(
callSettings
.getRetrySettings()
.toBuilder()
.setInitialRetryDelay(Duration.ofSeconds(10L))
.build())
.build();
}
return super.createUnaryCallable(grpcCallSettings, callSettings, clientContext);
}
};
this.databaseAdminStub =
new GrpcDatabaseAdminStubWithCustomCallableFactory(
databaseAdminStubSettings, ClientContext.create(databaseAdminStubSettings), factory);

// Check whether the SPANNER_EMULATOR_HOST env var has been set, and if so, if the emulator is
// actually running.
Expand Down Expand Up @@ -487,9 +530,9 @@ private static void checkEmulatorConnection(

private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofSeconds(2L))
.setRetryDelayMultiplier(1.5)
.setMaxRetryDelay(Duration.ofSeconds(15L))
.setInitialRetryDelay(Duration.ofSeconds(5L))
.setRetryDelayMultiplier(2.0)
.setMaxRetryDelay(Duration.ofSeconds(60L))
.setMaxAttempts(10)
.build();

Expand Down Expand Up @@ -1004,6 +1047,11 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> call() throws Exception
throw newSpannerException(e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
SpannerException se = SpannerExceptionFactory.asSpannerException(t);
if (se instanceof AdminRequestsPerMinuteExceededException) {
// Propagate this to trigger a retry.
throw se;
}
if (t instanceof AlreadyExistsException) {
String operationName =
OPERATION_NAME_TEMPLATE.instantiate(
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.grpc.GrpcStubCallableFactory;
import com.google.api.gax.rpc.ClientContext;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
import java.io.IOException;

/**
* Wrapper around {@link GrpcDatabaseAdminStub} to make the constructor available inside this
* package.
*/
class GrpcDatabaseAdminStubWithCustomCallableFactory extends GrpcDatabaseAdminStub {
GrpcDatabaseAdminStubWithCustomCallableFactory(
DatabaseAdminStubSettings settings,
ClientContext clientContext,
GrpcStubCallableFactory callableFactory)
throws IOException {
super(settings, clientContext, callableFactory);
}
}

0 comments on commit da8b666

Please sign in to comment.