Skip to content

Commit

Permalink
fix: Add more coverage (#220)
Browse files Browse the repository at this point in the history
* fix: Add a test for WrappingPublisher

* fix: Restructure Stubs to have fewer untested lines in builder classes.

* fix: Add license header.
  • Loading branch information
dpcollins-google committed Aug 24, 2020
1 parent 32fce6f commit f92f828
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 91 deletions.
Expand Up @@ -21,9 +21,7 @@
import com.google.cloud.pubsublite.internal.AdminClientImpl;
import com.google.cloud.pubsublite.proto.AdminServiceGrpc;
import com.google.cloud.pubsublite.proto.AdminServiceGrpc.AdminServiceBlockingStub;
import io.grpc.Status;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.Optional;

@AutoValue
Expand Down Expand Up @@ -60,16 +58,7 @@ AdminClient instantiate() throws StatusException {
if (stub().isPresent()) {
stub = stub().get();
} else {
try {
stub =
Stubs.defaultStub(
Endpoints.regionalEndpoint(region()), AdminServiceGrpc::newBlockingStub);
} catch (IOException e) {
throw Status.INTERNAL
.withCause(e)
.withDescription("Creating admin stub failed.")
.asException();
}
stub = Stubs.defaultStub(region(), AdminServiceGrpc::newBlockingStub);
}
return new AdminClientImpl(region(), stub, retrySettings());
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsublite.internal.ChannelCache;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.common.collect.ImmutableList;
import io.grpc.CallOptions;
import io.grpc.Channel;
Expand All @@ -31,6 +32,7 @@
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;
import io.grpc.StatusException;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.AbstractStub;
import java.io.IOException;
Expand All @@ -44,14 +46,23 @@ public class Stubs {
private static final ChannelCache channels = new ChannelCache();

public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
String target, Function<Channel, StubT> stubFactory) throws IOException {
return stubFactory
.apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors()))
.withCallCredentials(
MoreCallCredentials.from(
GoogleCredentials.getApplicationDefault()
.createScoped(
ImmutableList.of("https://www.googleapis.com/auth/cloud-platform"))));
CloudRegion target, Function<Channel, StubT> stubFactory) throws StatusException {
return defaultStub(Endpoints.regionalEndpoint(target), stubFactory);
}

public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
String target, Function<Channel, StubT> stubFactory) throws StatusException {
try {
return stubFactory
.apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors()))
.withCallCredentials(
MoreCallCredentials.from(
GoogleCredentials.getApplicationDefault()
.createScoped(
ImmutableList.of("https://www.googleapis.com/auth/cloud-platform"))));
} catch (IOException e) {
throw ExtractStatus.toCanonical(e);
}
}

private static List<ClientInterceptor> getClientInterceptors() {
Expand Down
Expand Up @@ -19,13 +19,10 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.Stubs;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub;
import io.grpc.Status;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.Optional;

@AutoValue
Expand Down Expand Up @@ -63,16 +60,7 @@ TopicStatsClient instantiate() throws StatusException {
if (stub().isPresent()) {
stub = stub().get();
} else {
try {
stub =
Stubs.defaultStub(
Endpoints.regionalEndpoint(region()), TopicStatsServiceGrpc::newBlockingStub);
} catch (IOException e) {
throw Status.INTERNAL
.withCause(e)
.withDescription("Creating topic stats stub failed.")
.asException();
}
stub = Stubs.defaultStub(region(), TopicStatsServiceGrpc::newBlockingStub);
}
return new TopicStatsClientImpl(region(), stub, retrySettings());
}
Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.pubsublite.internal.wire;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.Stubs;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
Expand All @@ -26,9 +25,7 @@
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -69,18 +66,10 @@ public Assigner build() throws StatusException {
if (builder.assignmentStub().isPresent()) {
stub = builder.assignmentStub().get();
} else {
try {
stub =
Stubs.defaultStub(
Endpoints.regionalEndpoint(
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
PartitionAssignmentServiceGrpc::newStub);
} catch (IOException e) {
throw Status.INTERNAL
.withCause(e)
.withDescription("Creating assigner stub failed.")
.asException();
}
stub =
Stubs.defaultStub(
SubscriptionPaths.getZone(builder.subscriptionPath()).region(),
PartitionAssignmentServiceGrpc::newStub);
}

UUID uuid = UUID.randomUUID();
Expand Down
Expand Up @@ -17,17 +17,14 @@
package com.google.cloud.pubsublite.internal.wire;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.Stubs;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc.CursorServiceStub;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import io.grpc.Status;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.Optional;

@AutoValue
Expand Down Expand Up @@ -65,18 +62,10 @@ public Committer build() throws StatusException {
if (builder.cursorStub().isPresent()) {
cursorStub = builder.cursorStub().get();
} else {
try {
cursorStub =
Stubs.defaultStub(
Endpoints.regionalEndpoint(
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
CursorServiceGrpc::newStub);
} catch (IOException e) {
throw Status.INTERNAL
.withCause(e)
.withDescription("Creating cursor stub failed.")
.asException();
}
cursorStub =
Stubs.defaultStub(
SubscriptionPaths.getZone(builder.subscriptionPath()).region(),
CursorServiceGrpc::newStub);
}

InitialCommitCursorRequest initialCommitCursorRequest =
Expand Down
Expand Up @@ -20,21 +20,18 @@
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.Stubs;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.PublisherServiceGrpc;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;
import io.grpc.StatusException;
import io.grpc.stub.MetadataUtils;
import java.io.IOException;
import java.util.Optional;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -107,16 +104,11 @@ public abstract static class Builder {
public Publisher<Offset> build() throws StatusException {
PublisherBuilder autoBuilt = autoBuild();
PublisherServiceGrpc.PublisherServiceStub actualStub;
try {
actualStub =
autoBuilt.stub().isPresent()
? autoBuilt.stub().get()
: Stubs.defaultStub(
Endpoints.regionalEndpoint(TopicPaths.getZone(autoBuilt.topic()).region()),
PublisherServiceGrpc::newStub);
} catch (IOException e) {
throw ExtractStatus.toCanonical(e);
}
actualStub =
autoBuilt.stub().isPresent()
? autoBuilt.stub().get()
: Stubs.defaultStub(
TopicPaths.getZone(autoBuilt.topic()).region(), PublisherServiceGrpc::newStub);
Metadata metadata = autoBuilt.context().getMetadata();
metadata.merge(RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition()));
actualStub = MetadataUtils.attachHeaders(actualStub, metadata);
Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.pubsublite.internal.wire;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.Stubs;
Expand All @@ -28,10 +27,8 @@
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberServiceStub;
import com.google.common.collect.ImmutableList;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.MetadataUtils;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Consumer;

Expand Down Expand Up @@ -80,18 +77,10 @@ public Subscriber build() throws StatusException {
if (builder.subscriberServiceStub().isPresent()) {
subscriberServiceStub = builder.subscriberServiceStub().get();
} else {
try {
subscriberServiceStub =
Stubs.defaultStub(
Endpoints.regionalEndpoint(
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
SubscriberServiceGrpc::newStub);
} catch (IOException e) {
throw Status.INTERNAL
.withCause(e)
.withDescription("Creating subscriber stub failed.")
.asException();
}
subscriberServiceStub =
Stubs.defaultStub(
SubscriptionPaths.getZone(builder.subscriptionPath()).region(),
SubscriberServiceGrpc::newStub);
}
Metadata metadata = builder.context().getMetadata();
metadata.merge(RoutingMetadata.of(builder.subscriptionPath(), builder.partition()));
Expand Down
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.cloudpubsub.internal;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.StatusExceptionMatcher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Spy;

@RunWith(JUnit4.class)
public class WrappingPublisherTest {
abstract static class FakePublisher extends FakeApiService
implements Publisher<PublishMetadata> {}

@Spy private FakePublisher underlying;

private WrappingPublisher publisher;

@Before
public void setUp() throws StatusException {
initMocks(this);
publisher =
new WrappingPublisher(
underlying, MessageTransforms.fromCpsPublishTransformer(KeyExtractor.DEFAULT));
publisher.startAsync().awaitRunning();
verify(underlying).startAsync();
}

@After
public void tearDown() {
if (publisher.isRunning()) {
publisher.stopAsync().awaitTerminated();
verify(underlying).stopAsync();
}
}

@Test
public void validPublish() throws Exception {
PubsubMessage message = PubsubMessage.newBuilder().setOrderingKey("abc").build();
Message wireMessage = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
SettableApiFuture<PublishMetadata> metadataFuture = SettableApiFuture.create();
when(underlying.publish(wireMessage)).thenReturn(metadataFuture);
ApiFuture<String> published = publisher.publish(message);
verify(underlying).publish(wireMessage);
assertThat(published.isDone()).isFalse();
PublishMetadata metadata = PublishMetadata.of(Partition.of(3), Offset.of(88));
metadataFuture.set(metadata);
assertThat(published.isDone()).isTrue();
assertThat(published.get()).isEqualTo(metadata.encode());
}

@Test
public void badTimestampCannotBeTransformed() {
PubsubMessage message =
PubsubMessage.newBuilder()
.setOrderingKey("abc")
.putAttributes(
MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO,
"Not a valid encoded timestamp")
.build();

ApiFuture<String> published = publisher.publish(message);
verify(underlying, times(0)).publish(any());
StatusExceptionMatcher.assertFutureThrowsCode(published, Code.INVALID_ARGUMENT);
assertThat(publisher.isRunning()).isFalse();
}
}

0 comments on commit f92f828

Please sign in to comment.