Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Add response metadata handling support (#490)
Browse files Browse the repository at this point in the history
Add support for handling response metadata headers and trailers
  • Loading branch information
michaelbausor committed Mar 21, 2018
1 parent b06fc12 commit 4b3f21b
Show file tree
Hide file tree
Showing 10 changed files with 610 additions and 1 deletion.
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.grpc;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.grpc.CallOptions;
import io.grpc.Metadata;
Expand All @@ -44,6 +45,8 @@ class CallOptionsUtil {
// this is the header name, it is transferred over the wire
static Metadata.Key<String> REQUEST_PARAMS_HEADER_KEY =
Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
private static final CallOptions.Key<ResponseMetadataHandler> METADATA_HANDLER_CALL_OPTION_KEY =
CallOptions.Key.of("gax_metadata_handler", null);

private CallOptionsUtil() {}

Expand All @@ -69,4 +72,15 @@ static CallOptions putRequestParamsDynamicHeaderOption(
static Map<Key<String>, String> getDynamicHeadersOption(CallOptions callOptions) {
return callOptions.getOption(DYNAMIC_HEADERS_CALL_OPTION_KEY);
}

static CallOptions putMetadataHandlerOption(
CallOptions callOptions, ResponseMetadataHandler handler) {
Preconditions.checkNotNull(callOptions);
Preconditions.checkNotNull(handler);
return callOptions.withOption(METADATA_HANDLER_CALL_OPTION_KEY, handler);
}

public static ResponseMetadataHandler getMetadataHandlerOption(CallOptions callOptions) {
return callOptions.getOption(METADATA_HANDLER_CALL_OPTION_KEY);
}
}
Expand Up @@ -221,7 +221,10 @@ public ApiCallContext merge(ApiCallContext inputCallContext) {
}

CallOptions newCallOptions =
this.callOptions.withCallCredentials(newCallCredentials).withDeadline(newDeadline);
grpcCallContext
.callOptions
.withCallCredentials(newCallCredentials)
.withDeadline(newDeadline);

return new GrpcCallContext(
newChannel, newCallOptions, newStreamWaitTimeout, newStreamIdleTimeout, newChannelAffinity);
Expand Down
@@ -0,0 +1,84 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientCall.Listener;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

/**
* An interceptor to handle receiving the response headers.
*
* <p>Package-private for internal usage.
*/
@InternalApi
class GrpcMetadataHandlerInterceptor implements ClientInterceptor {

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);

final ResponseMetadataHandler metadataHandler =
CallOptionsUtil.getMetadataHandlerOption(callOptions);

if (metadataHandler == null) {
return call;
}
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Listener<RespT> forwardingResponseListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
super.onHeaders(headers);
metadataHandler.onHeaders(headers);
}

@Override
public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);
metadataHandler.onTrailers(trailers);
}
};
super.start(forwardingResponseListener, headers);
}
};
}
}
@@ -0,0 +1,115 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.BetaApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;

/**
* GrpcResponseMetadata provides a mechanism to access the headers and trailers returned by a gRPC
* client method.
*
* <p>NOTE: the GrpcResponseMetadata class is not thread-safe and should NOT be re-used for multiple
* calls. A new instance of GrpcResponseMetadata should be constructed for each call that requires
* metadata to be accessed.
*
* <p>Example usage:
*
* <pre>
* <code>
* GrpcResponseMetadata grpcResponseMetadata = new GrpcResponseMetadata();
* Foo foo = client.getFooCallable().call(getFooRequest, grpcResponseMetadata.createContextWithHandlers());
* Metadata headers = grpcResponseMetadata.getMetadata();
* Metadata trailers = grpcResponseMetadata.getTrailingMetadata();
* </code>
* </pre>
*/
@BetaApi("The surface for response metadata is not stable yet and may change in the future.")
public class GrpcResponseMetadata implements ResponseMetadataHandler {

private volatile Metadata responseMetadata;
private volatile Metadata trailingMetadata;

/**
* Constructs a new call context from an existing ApiCallContext, and sets the CallOptions to add
* handlers to retrieve the headers and trailers, and make them available via the getMetadata and
* getTrailingMetadata methods.
*/
public GrpcCallContext addHandlers(ApiCallContext apiCallContext) {
if (Preconditions.checkNotNull(apiCallContext) instanceof GrpcCallContext) {
return addHandlers((GrpcCallContext) apiCallContext);
}
throw new IllegalArgumentException(
"context must be an instance of GrpcCallContext, but found "
+ apiCallContext.getClass().getName());
}

/**
* Constructs a new call context and sets the CallOptions to add handlers to retrieve the headers
* and trailers, and make them available via the getMetadata and getTrailingMetadata methods.
*/
public GrpcCallContext createContextWithHandlers() {
return addHandlers(GrpcCallContext.createDefault());
}

private GrpcCallContext addHandlers(GrpcCallContext grpcCallContext) {
return Preconditions.checkNotNull(grpcCallContext)
.withCallOptions(
CallOptionsUtil.putMetadataHandlerOption(grpcCallContext.getCallOptions(), this));
}

/**
* Returns the headers from the gRPC method as Metadata. If the call has not completed, will
* return null.
*/
public Metadata getMetadata() {
return responseMetadata;
}

/**
* Returns the trailers from the gRPC method as Metadata. If the call has not completed, will
* return null.
*/
public Metadata getTrailingMetadata() {
return trailingMetadata;
}

@Override
public void onHeaders(Metadata metadata) {
responseMetadata = metadata;
}

@Override
public void onTrailers(Metadata metadata) {
trailingMetadata = metadata;
}
}
Expand Up @@ -156,6 +156,8 @@ private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
new GrpcHeaderInterceptor(headerProvider.getHeaders());
GrpcMetadataHandlerInterceptor metadataHandlerInterceptor =
new GrpcMetadataHandlerInterceptor();

int colon = endpoint.indexOf(':');
if (colon < 0) {
Expand All @@ -167,6 +169,7 @@ private ManagedChannel createSingleChannel() throws IOException {
ManagedChannelBuilder builder =
ManagedChannelBuilder.forAddress(serviceAddress, port)
.intercept(headerInterceptor)
.intercept(metadataHandlerInterceptor)
.userAgent(headerInterceptor.getUserAgentHeader())
.executor(executor);
if (maxInboundMessageSize != null) {
Expand Down
@@ -0,0 +1,48 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.BetaApi;
import io.grpc.Metadata;

/**
* An interface to handle metadata returned from an RPC. A ResponseMetadataHandler is used by the
* GrpcMetadataHandlerInterceptor class to provide custom handling of the returned headers and
* trailers.
*/
@BetaApi("The surface for response metadata is not stable yet and may change in the future.")
public interface ResponseMetadataHandler {

/** Handle the headers returned by an RPC. */
void onHeaders(Metadata metadata);

/** Handle the trailers returned by an RPC. */
void onTrailers(Metadata metadata);
}
Expand Up @@ -38,6 +38,7 @@
import com.google.auth.Credentials;
import com.google.common.collect.ImmutableMap;
import com.google.common.truth.Truth;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.Metadata.Key;
import java.util.Map;
Expand Down Expand Up @@ -194,4 +195,19 @@ public void testMergeWithStreamingIdleTimeout() {

Truth.assertThat(ctx1.merge(ctx2).getStreamIdleTimeout()).isEqualTo(timeout);
}

@Test
public void testMergeWithCustomCallOptions() {
CallOptions.Key<String> key = CallOptions.Key.of("somekey", "somedefault");
GrpcCallContext ctx1 = GrpcCallContext.createDefault();
GrpcCallContext ctx2 =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withOption(key, "somevalue"));

GrpcCallContext merged = (GrpcCallContext) ctx1.merge(ctx2);
Truth.assertThat(merged.getCallOptions().getOption(key))
.isNotEqualTo(ctx1.getCallOptions().getOption(key));
Truth.assertThat(merged.getCallOptions().getOption(key))
.isEqualTo(ctx2.getCallOptions().getOption(key));
}
}

0 comments on commit 4b3f21b

Please sign in to comment.