Skip to content

Commit

Permalink
fix: Implement TrivialProxyService to remove duplicate ProxyService l…
Browse files Browse the repository at this point in the history
…ogic for the trivial case. (#302)

* fix: Implement TrivialProxyService to remove duplicate ProxyService logic for the trivial case.

* fix: Deflake
  • Loading branch information
dpcollins-google committed Oct 19, 2020
1 parent b18ba85 commit ed74c6f
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 76 deletions.
Expand Up @@ -23,7 +23,7 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
Expand All @@ -34,7 +34,7 @@
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class AckSetTrackerImpl extends ProxyService implements AckSetTracker {
public class AckSetTrackerImpl extends TrivialProxyService implements AckSetTracker {
private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
Expand All @@ -47,20 +47,10 @@ public class AckSetTrackerImpl extends ProxyService implements AckSetTracker {
private final PriorityQueue<Offset> acks = new PriorityQueue<>();

public AckSetTrackerImpl(Committer committer) throws StatusException {
addServices(committer);
super(committer);
this.committer = committer;
}

// ProxyService implementation. Noop as this is a thin wrapper around committer.
@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}

// AckSetTracker implementation.
@Override
public Runnable track(SequencedMessage message) throws StatusException {
Expand Down
Expand Up @@ -17,27 +17,18 @@
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import io.grpc.StatusException;
import java.util.List;

// A MultiPartitionSubscriber wraps multiple subscribers into a single ApiService that can be
// interacted with. If any single subscriber fails, all others are stopped.
public class MultiPartitionSubscriber extends ProxyService implements Subscriber {
public class MultiPartitionSubscriber extends TrivialProxyService implements Subscriber {
public static Subscriber of(List<Subscriber> subscribers) throws StatusException {
return new MultiPartitionSubscriber(subscribers);
}

private MultiPartitionSubscriber(List<Subscriber> subscribers) throws StatusException {
addServices(subscribers);
super(subscribers);
}

@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}
}
Expand Up @@ -22,36 +22,26 @@
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
// publisher. It encodes a PublishMetadata object in the response string.
public class WrappingPublisher extends ProxyService implements Publisher {
public class WrappingPublisher extends TrivialProxyService implements Publisher {
private final com.google.cloud.pubsublite.internal.Publisher<PublishMetadata> wirePublisher;
private final MessageTransformer<PubsubMessage, Message> transformer;

public WrappingPublisher(
com.google.cloud.pubsublite.internal.Publisher<PublishMetadata> wirePublisher,
MessageTransformer<PubsubMessage, Message> transformer)
throws StatusException {
super(wirePublisher);
this.wirePublisher = wirePublisher;
this.transformer = transformer;
addServices(wirePublisher);
}

// ProxyService implementation. SinglePartitionPublisher is a thin proxy around a wire publisher.
@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}

// Publisher implementation.
@Override
public ApiFuture<String> publish(PubsubMessage message) {
Expand Down
@@ -0,0 +1,41 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiService;
import io.grpc.StatusException;
import java.util.Arrays;
import java.util.Collection;

public class TrivialProxyService extends ProxyService {
public TrivialProxyService(ApiService... services) throws StatusException {
this(Arrays.asList(services));
}

public <T extends ApiService> TrivialProxyService(Collection<T> services) throws StatusException {
addServices(services);
}

@Override
protected final void start() {}

@Override
protected final void stop() {}

@Override
protected final void handlePermanentError(StatusException error) {}
}
Expand Up @@ -18,7 +18,7 @@

import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
Expand All @@ -29,7 +29,7 @@
import java.util.HashSet;
import java.util.Set;

public class AssignerImpl extends ProxyService
public class AssignerImpl extends TrivialProxyService
implements Assigner, RetryingConnectionObserver<PartitionAssignment> {
@GuardedBy("monitor.monitor")
private final RetryingConnection<ConnectedAssigner> connection;
Expand All @@ -56,15 +56,6 @@ public class AssignerImpl extends ProxyService
addServices(this.connection);
}

@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}

@Override
public void triggerReinitialize() {
try (CloseableMonitor.Hold h = monitor.enter()) {
Expand Down
Expand Up @@ -23,36 +23,25 @@
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.Map;

public class RoutingPublisher extends ProxyService implements Publisher<PublishMetadata> {
public class RoutingPublisher extends TrivialProxyService implements Publisher<PublishMetadata> {
private final Map<Partition, Publisher<PublishMetadata>> partitionPublishers;
private final RoutingPolicy policy;

RoutingPublisher(
Map<Partition, Publisher<PublishMetadata>> partitionPublishers, RoutingPolicy policy)
throws StatusException {
super(partitionPublishers.values());
this.partitionPublishers = partitionPublishers;
this.policy = policy;
addServices(partitionPublishers.values());
}

// ProxyService implementation. This is a thin proxy around all of the partition publishers so
// methods are noops.
@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}

// Publisher implementation.
@Override
public ApiFuture<PublishMetadata> publish(Message message) {
Expand Down
Expand Up @@ -22,33 +22,24 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.StatusException;
import java.io.IOException;

public class SinglePartitionPublisher extends ProxyService implements Publisher<PublishMetadata> {
public class SinglePartitionPublisher extends TrivialProxyService
implements Publisher<PublishMetadata> {
private final Publisher<Offset> publisher;
private final Partition partition;

SinglePartitionPublisher(Publisher<Offset> publisher, Partition partition)
throws StatusException {
super(publisher);
this.publisher = publisher;
this.partition = partition;
addServices(publisher);
}

// ProxyService implementation
@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}

// Publisher implementation.
@Override
public ApiFuture<PublishMetadata> publish(Message message) {
Expand Down
Expand Up @@ -203,6 +203,7 @@ public void messagesEmpty_IsError() throws Exception {

@Test
public void messagesUnordered_IsError() throws Exception {
Future<Void> failed = whenFailed(permanentErrorHandler);
subscriber.allowFlow(bigFlowControlRequest());
leakedResponseObserver.onNext(
Response.ofMessages(
Expand All @@ -211,6 +212,7 @@ public void messagesUnordered_IsError() throws Exception {
SequencedMessage.of(
Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10))));
assertThrows(IllegalStateException.class, subscriber::awaitTerminated);
failed.get();
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.INVALID_ARGUMENT)));
}
Expand Down Expand Up @@ -244,6 +246,7 @@ public void messagesOrdered_Ok() throws Exception {

@Test
public void messageResponseSubtracts() throws Exception {
Future<Void> failed = whenFailed(permanentErrorHandler);
FlowControlRequest request =
FlowControlRequest.newBuilder().setAllowedBytes(100).setAllowedMessages(100).build();
subscriber.allowFlow(request);
Expand All @@ -259,6 +262,7 @@ public void messageResponseSubtracts() throws Exception {
verify(mockMessageConsumer).accept(messages1);
verify(permanentErrorHandler, times(0)).failed(any(), any());
leakedResponseObserver.onNext(Response.ofMessages(messages2));
failed.get();
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
}
Expand Down

0 comments on commit ed74c6f

Please sign in to comment.