Skip to content

Commit

Permalink
fix: Modify UnboundedRreader to correctly initialize committers. (#74)
Browse files Browse the repository at this point in the history
* fix: Modify UnboundedRreader to correctly initialize committers.

* fix: move FakeApiService for shared usage with beam library and fix test.
  • Loading branch information
dpcollins-google committed May 21, 2020
1 parent 502484a commit 5fa68c7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.google.cloud.pubsublite;
package com.google.cloud.pubsublite.internal;

import com.google.api.core.AbstractApiService;

Expand Down
Expand Up @@ -27,14 +27,14 @@
import com.google.api.core.ApiService.Listener;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.FakeApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.NackHandler;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.internal.StatusExceptionMatcher;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand All @@ -33,11 +34,14 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
Expand All @@ -52,17 +56,51 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
@GuardedBy("monitor.monitor")
private final ImmutableMap<Partition, SubscriberState> subscriberMap;

private final CommitterProxy committerProxy;

@GuardedBy("monitor.monitor")
private final Queue<PartitionedSequencedMessage> messages = new ArrayDeque<>();

@GuardedBy("monitor.monitor")
private Optional<StatusException> permanentError = Optional.empty();

private static class CommitterProxy extends ProxyService {
private final Consumer<StatusException> permanentErrorSetter;

CommitterProxy(
Collection<SubscriberState> states, Consumer<StatusException> permanentErrorSetter)
throws StatusException {
this.permanentErrorSetter = permanentErrorSetter;
addServices(states.stream().map(state -> state.committer).collect(Collectors.toList()));
}

@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {
permanentErrorSetter.accept(error);
}
}

public PubsubLiteUnboundedReader(
UnboundedSource<SequencedMessage, ?> source,
ImmutableMap<Partition, SubscriberState> subscriberMap) {
ImmutableMap<Partition, SubscriberState> subscriberMap)
throws StatusException {
this.source = source;
this.subscriberMap = subscriberMap;
this.committerProxy =
new CommitterProxy(
subscriberMap.values(),
error -> {
try (CloseableMonitor.Hold h = monitor.enter()) {
permanentError = Optional.of(permanentError.orElse(error));
}
});
this.committerProxy.startAsync().awaitRunning();
}

@Override
Expand Down Expand Up @@ -188,14 +226,14 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
public void close() {
try (CloseableMonitor.Hold h = monitor.enter()) {
for (SubscriberState state : subscriberMap.values()) {
state.committer.stopAsync().awaitTerminated();
try {
state.subscriber.close();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
committerProxy.stopAsync().awaitTerminated();
}

@Override
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.beam.PubsubLiteUnboundedReader.SubscriberState;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -48,6 +49,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
public class PubsubLiteUnboundedReaderTest {
Expand All @@ -57,8 +60,10 @@ public class PubsubLiteUnboundedReaderTest {
@SuppressWarnings("unchecked")
private final PullSubscriber<SequencedMessage> subscriber8 = mock(PullSubscriber.class);

private final Committer committer5 = mock(Committer.class);
private final Committer committer8 = mock(Committer.class);
abstract static class CommitterFakeService extends FakeApiService implements Committer {}

@Spy private CommitterFakeService committer5;
@Spy private CommitterFakeService committer8;

@SuppressWarnings("unchecked")
private final UnboundedSource<SequencedMessage, ?> source = mock(UnboundedSource.class);
Expand All @@ -78,6 +83,7 @@ private static Instant toInstant(Timestamp timestamp) {
}

public PubsubLiteUnboundedReaderTest() throws StatusException {
MockitoAnnotations.initMocks(this);
SubscriberState state5 = new SubscriberState();
state5.subscriber = subscriber5;
state5.committer = committer5;
Expand Down Expand Up @@ -208,6 +214,5 @@ public void checkpointMarkFinalizeCommits() throws Exception {
when(committer5.commitOffset(Offset.of(10))).thenReturn(ApiFutures.immediateFuture(null));
mark.finalizeCheckpoint();
verify(committer5).commitOffset(Offset.of(10));
verifyNoMoreInteractions(committer5, committer8);
}
}

0 comments on commit 5fa68c7

Please sign in to comment.