Skip to content

Commit

Permalink
fix: clean up lint issues (#39)
Browse files Browse the repository at this point in the history
Co-authored-by: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com>
  • Loading branch information
hannahrogers-google and dpcollins-google committed May 15, 2020
1 parent 071029c commit e51093d
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 48 deletions.
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsublite;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableList;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.auth.MoreCallCredentials;
Expand All @@ -32,7 +33,8 @@ public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
.withCallCredentials(
MoreCallCredentials.from(
GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform")));
.createScoped(
ImmutableList.of("https://www.googleapis.com/auth/cloud-platform"))));
}

private Stubs() {}
Expand Down
Expand Up @@ -30,8 +30,10 @@
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
Expand All @@ -49,7 +51,7 @@ public abstract class SubscriberSettings {

abstract SubscriptionPath subscriptionPath();

abstract List<Partition> partitions();
abstract ImmutableList<Partition> partitions();

abstract FlowControlSettings perPartitionFlowControlSettings();

Expand Down Expand Up @@ -102,8 +104,7 @@ public SubscriberSettings build() throws StatusException {

@SuppressWarnings("CheckReturnValue")
Subscriber instantiate() throws StatusException {
com.google.cloud.pubsublite.internal.wire.SubscriberBuilder.Builder wireSubscriberBuilder =
com.google.cloud.pubsublite.internal.wire.SubscriberBuilder.newBuilder();
SubscriberBuilder.Builder wireSubscriberBuilder = SubscriberBuilder.newBuilder();
wireSubscriberBuilder.setSubscriptionPath(subscriptionPath());
subscriberServiceStub().ifPresent(wireSubscriberBuilder::setSubscriberServiceStub);
wireSubscriberBuilder.setContext(PubsubContext.of(FRAMEWORK));
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertThrows;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -71,7 +72,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -195,7 +195,7 @@ public void setUp() throws IOException {
@After
public void tearDown() throws Exception {
client.shutdownNow();
Preconditions.checkArgument(client.awaitTermination(10, TimeUnit.SECONDS));
Preconditions.checkArgument(client.awaitTermination(10, SECONDS));
}

@Test
Expand Down
Expand Up @@ -102,6 +102,8 @@ private static StreamingCommitCursorRequest initialRequest() {
private Optional<StreamObserver<StreamingCommitCursorResponse>> leakedResponseStream =
Optional.empty();

private ConnectedCommitter committer;

public ConnectedCommitterImplTest() {}

@Before
Expand Down Expand Up @@ -198,23 +200,21 @@ public void construct_SendsCommitResponseError() throws Exception {
leakedResponseStream = Optional.empty();
}

private ConnectedCommitter initialize() {
private void initialize() {
Preconditions.checkNotNull(serviceImpl);
doAnswer(
AnswerWith(
StreamingCommitCursorResponse.newBuilder()
.setInitial(InitialCommitCursorResponse.getDefaultInstance())))
.when(mockRequestStream)
.onNext(initialRequest());
ConnectedCommitter committer =
FACTORY.New(stub::streamingCommitCursor, mockOutputStream, initialRequest());
committer = FACTORY.New(stub::streamingCommitCursor, mockOutputStream, initialRequest());
verify(mockRequestStream).onNext(initialRequest());
return committer;
}

@Test
public void responseAfterClose_Dropped() throws Exception {
ConnectedCommitter committer = initialize();
initialize();
committer.close();
verify(mockRequestStream).onCompleted();
committer.commit(Offset.of(10));
Expand All @@ -223,8 +223,7 @@ public void responseAfterClose_Dropped() throws Exception {

@Test
public void duplicateInitial_Abort() {
// committer is never used in this test, but it is marked to not discard.
ConnectedCommitter committer = initialize();
initialize();
StreamingCommitCursorResponse.Builder builder = StreamingCommitCursorResponse.newBuilder();
builder.getInitialBuilder();
leakedResponseStream.get().onNext(builder.build());
Expand All @@ -234,8 +233,7 @@ public void duplicateInitial_Abort() {

@Test
public void commitRequestProxied() {
// committer is never used in this test, but it is marked to not discard.
ConnectedCommitter committer = initialize();
initialize();
StreamingCommitCursorRequest.Builder builder = StreamingCommitCursorRequest.newBuilder();
builder.getCommitBuilder().setCursor(Cursor.newBuilder().setOffset(154));
committer.commit(Offset.of(154));
Expand All @@ -244,8 +242,7 @@ public void commitRequestProxied() {

@Test
public void commitResponseProxied() {
// committer is never used in this test, but it is marked to not discard.
ConnectedCommitter committer = initialize();
initialize();
leakedResponseStream
.get()
.onNext(
Expand Down
Expand Up @@ -107,6 +107,8 @@ private static SubscribeRequest initialRequest() {

private Optional<StreamObserver<SubscribeResponse>> leakedResponseStream = Optional.empty();

private ConnectedSubscriberImpl subscriber;

public ConnectedSubscriberImplTest() {}

@Before
Expand Down Expand Up @@ -216,7 +218,7 @@ public void construct_SendsSeekResponseError() {
leakedResponseStream = Optional.empty();
}

private ConnectedSubscriberImpl initialize() {
private void initialize() {
Preconditions.checkNotNull(serviceImpl);
doAnswer(
AnswerWith(
Expand All @@ -226,12 +228,12 @@ private ConnectedSubscriberImpl initialize() {
.setCursor(Cursor.newBuilder().setOffset(INITIAL_OFFSET.value())))))
.when(mockRequestStream)
.onNext(initialRequest());
return FACTORY.New(stub::subscribe, mockOutputStream, initialRequest());
subscriber = FACTORY.New(stub::subscribe, mockOutputStream, initialRequest());
}

@Test
public void responseAfterClose_Dropped() {
ConnectedSubscriberImpl subscriber = initialize();
initialize();
subscriber.close();
verify(mockRequestStream).onCompleted();
subscriber.seek(SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.HEAD).build());
Expand All @@ -240,8 +242,7 @@ public void responseAfterClose_Dropped() {

@Test
public void duplicateInitial_Abort() {
// subscriber is never used in this test, but it is marked to not discard.
ConnectedSubscriber subscriber = initialize();
initialize();
SubscribeResponse.Builder builder = SubscribeResponse.newBuilder();
builder.getInitial();
leakedResponseStream.get().onNext(builder.build());
Expand All @@ -251,8 +252,7 @@ public void duplicateInitial_Abort() {

@Test
public void emptyMessagesResponse_Abort() {
// subscriber is never used in this test, but it is marked to not discard.
ConnectedSubscriber subscriber = initialize();
initialize();
SubscribeResponse.Builder builder = SubscribeResponse.newBuilder();
builder.getMessages();
leakedResponseStream.get().onNext(builder.build());
Expand All @@ -269,8 +269,7 @@ private SequencedMessage messageWithOffset(Offset offset) {

@Test
public void outOfOrderMessagesResponse_Abort() {
// subscriber is never used in this test, but it is marked to not discard.
ConnectedSubscriber subscriber = initialize();
initialize();
SubscribeResponse.Builder builder = SubscribeResponse.newBuilder();
builder.getMessagesBuilder().addMessages(messageWithOffset(Offset.of(10)));
builder.getMessagesBuilder().addMessages(messageWithOffset(Offset.of(10)));
Expand All @@ -281,7 +280,7 @@ public void outOfOrderMessagesResponse_Abort() {

@Test
public void seekToOffsetRequest() {
ConnectedSubscriber subscriber = initialize();
initialize();
SeekRequest request =
SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(10)).build();
subscriber.seek(request);
Expand All @@ -290,7 +289,7 @@ public void seekToOffsetRequest() {

@Test
public void seekToHeadRequest() {
ConnectedSubscriber subscriber = initialize();
initialize();
SeekRequest request =
SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.HEAD).build();
subscriber.seek(request);
Expand All @@ -299,7 +298,7 @@ public void seekToHeadRequest() {

@Test
public void seekToCommitRequest() {
ConnectedSubscriber subscriber = initialize();
initialize();
SeekRequest request =
SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.COMMITTED_CURSOR).build();
subscriber.seek(request);
Expand All @@ -312,7 +311,7 @@ SeekRequest validSeekRequest() {

@Test
public void seekRequestWhileSeekInFlight() {
ConnectedSubscriber subscriber = initialize();
initialize();
subscriber.seek(validSeekRequest());
verify(mockRequestStream)
.onNext(SubscribeRequest.newBuilder().setSeek(validSeekRequest()).build());
Expand All @@ -324,7 +323,7 @@ public void seekRequestWhileSeekInFlight() {

@Test
public void seekRequestResponseRequest() {
ConnectedSubscriber subscriber = initialize();
initialize();
SubscribeRequest request = SubscribeRequest.newBuilder().setSeek(validSeekRequest()).build();
doAnswer(
AnswerWith(
Expand All @@ -350,8 +349,7 @@ public void seekRequestResponseRequest() {

@Test
public void seekResponseWithoutRequest_Aborts() {
// subscriber is never used in this test, but it is marked to not discard.
ConnectedSubscriber subscriber = initialize();
initialize();
leakedResponseStream
.get()
.onNext(
Expand Down
Expand Up @@ -168,10 +168,9 @@ public void messagesUnordered_IsError() {
leakedResponseObserver.onNext(
Response.ofMessages(
ImmutableList.of(
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10),
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(1), 10),
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(0), 10))));
Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10))));
assertThrows(IllegalStateException.class, subscriber::awaitTerminated);
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.INVALID_ARGUMENT)));
Expand All @@ -182,8 +181,7 @@ public void messageBatchesOutOfOrder_IsError() {
subscriber.allowFlow(bigFlowControlRequest());
ImmutableList<SequencedMessage> messages =
ImmutableList.of(
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(0), 0));
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 0));
leakedResponseObserver.onNext(Response.ofMessages(messages));
leakedResponseObserver.onNext(Response.ofMessages(messages));
assertThrows(IllegalStateException.class, subscriber::awaitTerminated);
Expand All @@ -196,10 +194,8 @@ public void messagesOrdered_Ok() {
subscriber.allowFlow(bigFlowControlRequest());
ImmutableList<SequencedMessage> messages =
ImmutableList.of(
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(0), 10),
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(1), 10));
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
leakedResponseObserver.onNext(Response.ofMessages(messages));

verify(mockMessageConsumer).accept(messages);
Expand All @@ -214,14 +210,11 @@ public void messageResponseSubtracts() {
verify(mockConnectedSubscriber).allowFlow(request);
ImmutableList<SequencedMessage> messages1 =
ImmutableList.of(
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(1), 98),
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(2), 1));
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 98),
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(2), 1));
ImmutableList<SequencedMessage> messages2 =
ImmutableList.of(
SequencedMessage.of(
Message.builder().build(), Timestamps.fromNanos(0), Offset.of(3), 2));
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(3), 2));
leakedResponseObserver.onNext(Response.ofMessages(messages1));
verify(mockMessageConsumer).accept(messages1);
verify(permanentErrorHandler, times(0)).failed(any(), any());
Expand Down

0 comments on commit e51093d

Please sign in to comment.