Skip to content

Commit

Permalink
fix: Improve test coverage for internal/wire folder. (#214)
Browse files Browse the repository at this point in the history
* fix: Add a test for SinglePartitionPublisherBuilder.

* fix: Add a test for Versions.

* fix: Add a test for ChannelCache.

* fix: Improve ConnectedSubscriberImpl coverage.

* fix: Add a test for RoutingPublisher.

* fix: Add a test for PublisherBuilder.

* fix: Add test coverage for remaining builders.

* fix: Add licence headers.
  • Loading branch information
dpcollins-google committed Aug 21, 2020
1 parent fa49931 commit db2bc7a
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 18 deletions.
Expand Up @@ -16,21 +16,31 @@

package com.google.cloud.pubsublite.internal;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/** A ChannelCache creates and stores default channels for use with api methods. */
public class ChannelCache {
private final Function<String, ManagedChannel> channelFactory;
private final ConcurrentHashMap<String, ManagedChannel> channels = new ConcurrentHashMap<>();

public ChannelCache() {
this(ChannelCache::newChannel);
Runtime.getRuntime().addShutdownHook(new Thread(this::onShutdown));
}

private void onShutdown() {
@VisibleForTesting
ChannelCache(Function<String, ManagedChannel> channelFactory) {
this.channelFactory = channelFactory;
}

@VisibleForTesting
void onShutdown() {
channels.forEachValue(
channels.size(),
channel -> {
Expand All @@ -43,10 +53,10 @@ private void onShutdown() {
}

public Channel get(String target) {
return channels.computeIfAbsent(target, this::newChannel);
return channels.computeIfAbsent(target, channelFactory);
}

private ManagedChannel newChannel(String target) {
private static ManagedChannel newChannel(String target) {
return ManagedChannelBuilder.forTarget(target).build();
}
}
Expand Up @@ -39,8 +39,13 @@ public abstract class SinglePartitionPublisherBuilder {
// Rarely set parameters.
abstract PubsubContext context();

// For testing.
abstract PublisherBuilder.Builder underlyingBuilder();

public static Builder newBuilder() {
return new AutoValue_SinglePartitionPublisherBuilder.Builder().setContext(PubsubContext.of());
return new AutoValue_SinglePartitionPublisherBuilder.Builder()
.setContext(PubsubContext.of())
.setUnderlyingBuilder(PublisherBuilder.builder());
}

@AutoValue.Builder
Expand All @@ -59,12 +64,16 @@ public abstract static class Builder {
// Rarely set parameters.
public abstract Builder setContext(PubsubContext context);

// For testing.
abstract Builder setUnderlyingBuilder(PublisherBuilder.Builder underlyingBuilder);

abstract SinglePartitionPublisherBuilder autoBuild();

public SinglePartitionPublisher build() throws StatusException {
SinglePartitionPublisherBuilder builder = autoBuild();
PublisherBuilder.Builder publisherBuilder =
PublisherBuilder.builder()
builder
.underlyingBuilder()
.setTopic(builder.topic())
.setPartition(builder.partition())
.setContext(builder.context());
Expand Down
Expand Up @@ -17,22 +17,32 @@
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.core.GaxProperties;
import com.google.common.annotations.VisibleForTesting;

/** The version number of this library. */
final class Versions {
private Versions() {}
private final String versionString;

private static String[] GetVersionSplits() {
@VisibleForTesting
Versions(String versionString) {
this.versionString = versionString;
}

private Versions() {
this(GaxProperties.getLibraryVersion(Versions.class));
}

private String[] getVersionSplits() {
try {
String versionString = GaxProperties.getLibraryVersion(Versions.class);
return versionString.split("\\.");
} catch (Exception e) {
return new String[0];
}
}

private static int GetMajorVersion() {
String[] splits = GetVersionSplits();
@VisibleForTesting
int getMajorVersion() {
String[] splits = getVersionSplits();
if (splits.length != 3) return 0;
try {
return Integer.parseInt(splits[0]);
Expand All @@ -41,8 +51,9 @@ private static int GetMajorVersion() {
}
}

private static int GetMinorVersion() {
String[] splits = GetVersionSplits();
@VisibleForTesting
int getMinorVersion() {
String[] splits = getVersionSplits();
if (splits.length != 3) return 0;
try {
return Integer.parseInt(splits[1]);
Expand All @@ -51,7 +62,8 @@ private static int GetMinorVersion() {
}
}

private static final Versions VERSIONS = new Versions();
// TODO: Do this using generation automation as opposed to maven packaging.
static final int MAJOR_VERSION = GetMajorVersion();
static final int MINOR_VERSION = GetMinorVersion();
static final int MAJOR_VERSION = VERSIONS.getMajorVersion();
static final int MINOR_VERSION = VERSIONS.getMinorVersion();
}
@@ -0,0 +1,57 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;

@RunWith(JUnit4.class)
public class ChannelCacheTest {
@Mock ManagedChannel mockChannel;
@Mock Function<String, ManagedChannel> channelFactory;

@Before
public void setUp() {
initMocks(this);
}

@Test
public void reusesChannels() {
when(channelFactory.apply(any())).thenReturn(mockChannel);
ChannelCache cache = new ChannelCache(channelFactory);
Channel chan1 = cache.get("abc");
Channel chan2 = cache.get("abc");
assertThat(chan1).isEqualTo(chan2);
verify(channelFactory, times(1)).apply("abc");
when(mockChannel.shutdownNow()).thenReturn(mockChannel);
cache.onShutdown();
verify(mockChannel, times(1)).shutdownNow();
}
}
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static org.mockito.Mockito.mock;

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc;
import io.grpc.Channel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AssignerBuilderTest {
@Test
public void testBuilder() throws Exception {
AssignerBuilder.newBuilder()
.setSubscriptionPath(
SubscriptionPaths.newBuilder()
.setZone(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.setProjectNumber(ProjectNumber.of(3))
.setSubscriptionName(SubscriptionName.of("abc"))
.build())
.setReceiver(mock(PartitionAssignmentReceiver.class))
.setAssignmentStub(PartitionAssignmentServiceGrpc.newStub(mock(Channel.class)))
.build();
}
}
@@ -0,0 +1,49 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static org.mockito.Mockito.mock;

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc;
import io.grpc.Channel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class CommitterBuilderTest {
@Test
public void testBuilder() throws Exception {
CommitterBuilder.newBuilder()
.setSubscriptionPath(
SubscriptionPaths.newBuilder()
.setZone(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.setProjectNumber(ProjectNumber.of(3))
.setSubscriptionName(SubscriptionName.of("abc"))
.build())
.setPartition(Partition.of(987))
.setCursorStub(CursorServiceGrpc.newStub(mock(Channel.class)))
.build();
}
}

0 comments on commit db2bc7a

Please sign in to comment.