Skip to content

Commit

Permalink
feat: add api client headers to outbound requests (#182)
Browse files Browse the repository at this point in the history
* fix: keep track of internal seek

* fix: check shutdown on seek

* fix: reset tokens on seek

* feat: add api client header to outbound requests

* fix format

* fix: use gccl instead of gapic
  • Loading branch information
hannahrogers-google committed Aug 6, 2020
1 parent e8e3a6f commit 0b47e06
Showing 1 changed file with 47 additions and 1 deletion.
Expand Up @@ -16,13 +16,28 @@

package com.google.cloud.pubsublite;

import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsublite.internal.ChannelCache;
import com.google.common.collect.ImmutableList;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.AbstractStub;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;

public class Stubs {
Expand All @@ -31,13 +46,44 @@ public class Stubs {
public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
String target, Function<Channel, StubT> stubFactory) throws IOException {
return stubFactory
.apply(channels.get(target))
.apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors()))
.withCallCredentials(
MoreCallCredentials.from(
GoogleCredentials.getApplicationDefault()
.createScoped(
ImmutableList.of("https://www.googleapis.com/auth/cloud-platform"))));
}

private static List<ClientInterceptor> getClientInterceptors() {
List<ClientInterceptor> clientInterceptors = new ArrayList<>();
Map<String, String> apiClientHeaders =
ApiClientHeaderProvider.newBuilder()
.setClientLibToken("gccl", GaxProperties.getLibraryVersion(Stubs.class))
.setTransportToken(
GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion())
.build()
.getHeaders();
clientInterceptors.add(
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
for (Entry<String, String> apiClientHeader : apiClientHeaders.entrySet()) {
headers.put(
Key.of(apiClientHeader.getKey(), Metadata.ASCII_STRING_MARSHALLER),
apiClientHeader.getValue());
}
super.start(responseListener, headers);
}
};
}
});
return clientInterceptors;
}

private Stubs() {}
}

0 comments on commit 0b47e06

Please sign in to comment.