Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: uses old version of gax-grpc method #426

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -60,7 +60,6 @@
import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand Down Expand Up @@ -171,11 +170,13 @@
/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@InternalApi
public class GapicSpannerRpc implements SpannerRpc {

/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {

// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new LinkedList<>();
Expand Down Expand Up @@ -317,7 +318,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutor(executorProvider.getExecutor())

// Before updating this method to setExecutor, please verify with a code owner on
// the lowest version of gax-grpc that needs to be supported. Currently v1.47.17,
// which doesn't support the setExecutor variant.
.setExecutorProvider(executorProvider)

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
Expand Down Expand Up @@ -480,6 +485,7 @@ private static void checkEmulatorConnection(

private static final class OperationFutureRetryAlgorithm<ResultT, MetadataT>
implements ResultRetryAlgorithm<OperationFuture<ResultT, MetadataT>> {

private static final ImmutableList<StatusCode.Code> RETRYABLE_CODES =
ImmutableList.of(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE);

Expand Down Expand Up @@ -519,6 +525,7 @@ public boolean shouldRetry(

private final class OperationFutureCallable<RequestT, ResponseT, MetadataT extends Message>
implements Callable<OperationFuture<ResponseT, MetadataT>> {

final OperationCallable<RequestT, ResponseT, MetadataT> operationCallable;
final RequestT initialRequest;
final MethodDescriptor<RequestT, Operation> method;
Expand Down Expand Up @@ -575,6 +582,7 @@ public OperationFuture<ResponseT, MetadataT> call() throws Exception {
}

private interface OperationsLister {

Paginated<Operation> listOperations(String nextPageToken);
}

Expand Down Expand Up @@ -610,6 +618,7 @@ private Operation mostRecentOperation(
}

private static final class TimestampComparator implements Comparator<Timestamp> {

private static final TimestampComparator INSTANCE = new TimestampComparator();

@Override
Expand Down Expand Up @@ -1458,6 +1467,7 @@ public boolean isClosed() {
* the {@link ResultStreamConsumer}.
*/
private static class SpannerResponseObserver implements ResponseObserver<PartialResultSet> {

private StreamController controller;
private final ResultStreamConsumer consumer;

Expand Down