diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallSettings.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallSettings.java index e5549c1e3..739b480a8 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallSettings.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallSettings.java @@ -38,12 +38,12 @@ public class GrpcCallSettings { private final MethodDescriptor methodDescriptor; private final RequestParamsExtractor paramsExtractor; + private final boolean alwaysAwaitTrailers; - private GrpcCallSettings( - MethodDescriptor methodDescriptor, - RequestParamsExtractor paramsExtractor) { - this.methodDescriptor = methodDescriptor; - this.paramsExtractor = paramsExtractor; + private GrpcCallSettings(Builder builder) { + this.methodDescriptor = builder.methodDescriptor; + this.paramsExtractor = builder.paramsExtractor; + this.alwaysAwaitTrailers = builder.shouldAwaitTrailers; } public MethodDescriptor getMethodDescriptor() { @@ -55,8 +55,13 @@ public RequestParamsExtractor getParamsExtractor() { return paramsExtractor; } + @BetaApi + public boolean shouldAwaitTrailers() { + return alwaysAwaitTrailers; + } + public static Builder newBuilder() { - return new Builder<>(); + return new Builder().setShouldAwaitTrailers(true); } public static GrpcCallSettings create( @@ -73,11 +78,14 @@ public Builder toBuilder() { public static class Builder { private MethodDescriptor methodDescriptor; private RequestParamsExtractor paramsExtractor; + private boolean shouldAwaitTrailers; private Builder() {} private Builder(GrpcCallSettings settings) { this.methodDescriptor = settings.methodDescriptor; + this.paramsExtractor = settings.paramsExtractor; + this.shouldAwaitTrailers = settings.alwaysAwaitTrailers; } public Builder setMethodDescriptor( @@ -93,8 +101,14 @@ public Builder setParamsExtractor( return this; } + @BetaApi + public Builder setShouldAwaitTrailers(boolean b) { + this.shouldAwaitTrailers = b; + return this; + } + public GrpcCallSettings build() { - return new GrpcCallSettings<>(methodDescriptor, paramsExtractor); + return new GrpcCallSettings<>(this); } } } diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcClientCalls.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcClientCalls.java index 4ce174454..bc72f6f1f 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcClientCalls.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcClientCalls.java @@ -30,6 +30,9 @@ package com.google.api.gax.grpc; import com.google.api.client.util.Preconditions; +import com.google.api.core.AbstractApiFuture; +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.tracing.ApiTracer.Scope; import io.grpc.CallOptions; @@ -38,9 +41,13 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.Deadline; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.Status; import io.grpc.stub.MetadataUtils; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** * {@code GrpcClientCalls} creates a new {@code ClientCall} from the given call context. @@ -48,6 +55,8 @@ *

Package-private for internal use. */ class GrpcClientCalls { + private static final Logger LOGGER = Logger.getLogger(GrpcDirectCallable.class.getName()); + private GrpcClientCalls() {}; public static ClientCall newCall( @@ -90,4 +99,101 @@ public static ClientCall newCall( return channel.newCall(descriptor, callOptions); } } + + /** + * A work-alike of {@link io.grpc.stub.ClientCalls#futureUnaryCall(ClientCall, Object)}. + * + *

The only difference is that unlike grpc-stub's implementation. This implementation doesn't + * wait for trailers to resolve a unary RPC. This can save milliseconds when the server is + * overloaded. + */ + @BetaApi + static ApiFuture eagerFutureUnaryCall( + ClientCall clientCall, RequestT request) { + // Start the call + GrpcFuture future = new GrpcFuture<>(clientCall); + clientCall.start(new EagerFutureListener<>(future), new Metadata()); + + // Send the request + try { + clientCall.sendMessage(request); + clientCall.halfClose(); + // Request an extra message to detect misconfigured servers + clientCall.request(2); + } catch (Throwable sendError) { + // Cancel if anything goes wrong + try { + clientCall.cancel(null, sendError); + } catch (Throwable cancelError) { + LOGGER.log(Level.SEVERE, "Error encountered while closing it", sendError); + } + + throw sendError; + } + + return future; + } + + /** Thin wrapper around an ApiFuture that will cancel the underlying ClientCall. */ + private static class GrpcFuture extends AbstractApiFuture { + private final ClientCall call; + + private GrpcFuture(ClientCall call) { + this.call = call; + } + + @Override + protected void interruptTask() { + call.cancel("GrpcFuture was cancelled", null); + } + + @Override + public boolean set(T value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + } + + /** + * A bridge between gRPC's ClientCall.Listener to an ApiFuture. + * + *

The Listener will eagerly resolve the future when the first message is received and will not + * wait for the trailers. This should cut down on the latency at the expense of safety. If the + * server is misconfigured and sends a second response for a unary call, the error will be logged, + * but the future will still be successful. + */ + private static class EagerFutureListener extends ClientCall.Listener { + private final GrpcFuture future; + + private EagerFutureListener(GrpcFuture future) { + this.future = future; + } + + @Override + public void onMessage(T message) { + if (!future.set(message)) { + throw Status.INTERNAL + .withDescription("More than one value received for unary call") + .asRuntimeException(); + } + } + + @Override + public void onClose(Status status, Metadata trailers) { + if (!future.isDone()) { + future.setException( + Status.INTERNAL + .withDescription("No value received for unary call") + .asException(trailers)); + } + if (!status.isOk()) { + LOGGER.log( + Level.WARNING, "Received error for unary call after receiving a successful response"); + } + } + } } diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java index 32c4c277d..5b6a5f1ba 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java @@ -34,6 +34,7 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.base.Preconditions; +import io.grpc.ClientCall; import io.grpc.MethodDescriptor; import io.grpc.stub.ClientCalls; @@ -44,9 +45,11 @@ */ class GrpcDirectCallable extends UnaryCallable { private final MethodDescriptor descriptor; + private final boolean awaitTrailers; - GrpcDirectCallable(MethodDescriptor descriptor) { + GrpcDirectCallable(MethodDescriptor descriptor, boolean awaitTrailers) { this.descriptor = Preconditions.checkNotNull(descriptor); + this.awaitTrailers = awaitTrailers; } @Override @@ -54,8 +57,13 @@ public ApiFuture futureCall(RequestT request, ApiCallContext inputCon Preconditions.checkNotNull(request); Preconditions.checkNotNull(inputContext); - return new ListenableFutureToApiFuture<>( - ClientCalls.futureUnaryCall(GrpcClientCalls.newCall(descriptor, inputContext), request)); + ClientCall clientCall = GrpcClientCalls.newCall(descriptor, inputContext); + + if (awaitTrailers) { + return new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request)); + } else { + return GrpcClientCalls.eagerFutureUnaryCall(clientCall, request); + } } @Override diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcRawCallableFactory.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcRawCallableFactory.java index 248760cde..30f7be638 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcRawCallableFactory.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcRawCallableFactory.java @@ -52,7 +52,9 @@ private GrpcRawCallableFactory() {} public static UnaryCallable createUnaryCallable( GrpcCallSettings grpcCallSettings, Set retryableCodes) { UnaryCallable callable = - new GrpcDirectCallable<>(grpcCallSettings.getMethodDescriptor()); + new GrpcDirectCallable<>( + grpcCallSettings.getMethodDescriptor(), grpcCallSettings.shouldAwaitTrailers()); + if (grpcCallSettings.getParamsExtractor() != null) { callable = new GrpcUnaryRequestParamCallable<>(callable, grpcCallSettings.getParamsExtractor());