Skip to content

Commit

Permalink
deps: Bump underlying pub/sub lite version (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Feb 26, 2021
1 parent cb584ae commit d901201
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 43 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -17,12 +17,12 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.10.0</version>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.8.0</version>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
Expand All @@ -27,14 +29,22 @@
import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.CursorClientSettings;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.AssignerBuilder;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.AssignerSettings;
import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import org.apache.kafka.clients.consumer.Consumer;

@AutoValue
Expand Down Expand Up @@ -74,33 +84,63 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
TopicPath topic = TopicPath.parse(subscription.getTopic());
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
builder.setReceiver(receiver);
builder.setSubscriptionPath(subscriptionPath());
return builder.build();
try {
return AssignerSettings.newBuilder()
.setReceiver(receiver)
.setSubscriptionPath(subscriptionPath())
.setServiceClient(
PartitionAssignmentServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
PartitionAssignmentServiceSettings.newBuilder())))
.build()
.instantiate();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
};
PullSubscriberFactory pullSubscriberFactory =
(partition, initialSeek) -> {
SubscriberBuilder.Builder builder =
SubscriberBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
.setPartition(partition)
.setSubscriptionPath(subscriptionPath());
return new BufferingPullSubscriber(
SubscriberFactory subscriberFactory =
consumer -> {
synchronized (builder) {
return builder.setMessageConsumer(consumer).build();
try {
return SubscriberBuilder.newBuilder()
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setMessageConsumer(consumer)
.setServiceClient(
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition),
SubscriberServiceSettings.newBuilder()))))
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
},
perPartitionFlowControlSettings(),
initialSeek);
};
return new BufferingPullSubscriber(
subscriberFactory, perPartitionFlowControlSettings(), initialSeek);
};
CommitterFactory committerFactory =
partition ->
CommitterBuilder.newBuilder()
partition -> {
try {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
.build();
.setServiceClient(
CursorServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
CursorServiceSettings.newBuilder())))
.build()
.instantiate();
} catch (Throwable t) {
throw toCanonical(t);
}
};
ConsumerFactory consumerFactory =
() ->
new SingleSubscriptionConsumerImpl(
Expand All @@ -115,7 +155,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
return new PubsubLiteConsumer(
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
throw toCanonical(e).underlying;
}
}
}
Expand Up @@ -16,13 +16,17 @@

package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.kafka.clients.producer.Producer;

@AutoValue
Expand All @@ -45,23 +49,35 @@ public abstract static class Builder {
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
PartitionCountWatchingPublisherSettings publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
partition -> {
try {
return SinglePartitionPublisherBuilder.newBuilder()
.setServiceClient(
PublisherServiceClient.create(
ServiceClients.addDefaultSettings(
topicPath().location().region(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
PublisherServiceSettings.newBuilder()))))
.setTopic(topicPath())
.setPartition(partition)
.build());
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
})
.build();
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(topicPath().location().region())
.build()));
return new PubsubLiteProducer(
new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath());
return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath());
}
}
Expand Up @@ -25,7 +25,7 @@
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
Expand Down Expand Up @@ -58,11 +58,11 @@ class PubsubLiteProducer implements Producer<byte[], byte[]> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final SharedBehavior shared;
private final Publisher<PublishMetadata> publisher;
private final Publisher<MessageMetadata> publisher;
private final TopicPath topicPath;

PubsubLiteProducer(
Publisher<PublishMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
Publisher<MessageMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
this.publisher = publisher;
this.shared = shared;
this.topicPath = topicPath;
Expand Down Expand Up @@ -127,7 +127,7 @@ public ApiFuture<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRec
throw new UnsupportedOperationException(
"Pub/Sub Lite producers may not specify a partition in their records.");
}
ApiFuture<PublishMetadata> future =
ApiFuture<MessageMetadata> future =
publisher.publish(RecordTransforms.toMessage(producerRecord));
return ApiFutures.transform(
future,
Expand Down
Expand Up @@ -28,9 +28,9 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
Expand Down Expand Up @@ -58,7 +58,7 @@
@RunWith(JUnit4.class)
public class PubsubLiteProducerTest {
abstract static class FakePublisher extends FakeApiService
implements Publisher<PublishMetadata> {}
implements Publisher<MessageMetadata> {}

private static final ProducerRecord<byte[], byte[]> RECORD =
new ProducerRecord<>(
Expand Down Expand Up @@ -129,11 +129,11 @@ public void badTopicThrows() {

@Test
public void sendSuccess() throws Exception {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future = producer.send(RECORD);
verify(underlying).publish(MESSAGE);
response.set(PublishMetadata.of(example(Partition.class), example(Offset.class)));
response.set(MessageMetadata.of(example(Partition.class), example(Offset.class)));
// RecordMetadata doesn't define a equals implementation.
RecordMetadata metadata = future.get();
assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString());
Expand All @@ -145,7 +145,7 @@ public void sendSuccess() throws Exception {

@Test
public void sendSuccessWithCallback() throws Exception {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
SettableApiFuture<RecordMetadata> leaked = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future =
Expand All @@ -159,7 +159,7 @@ public void sendSuccessWithCallback() throws Exception {
}
});
verify(underlying).publish(MESSAGE);
response.set(PublishMetadata.of(example(Partition.class), example(Offset.class)));
response.set(MessageMetadata.of(example(Partition.class), example(Offset.class)));
// RecordMetadata doesn't define a equals implementation.
RecordMetadata metadata = leaked.get();
assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString());
Expand All @@ -177,7 +177,7 @@ public void sendSuccessWithCallback() throws Exception {

@Test
public void sendError() {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future = producer.send(RECORD);
verify(underlying).publish(MESSAGE);
Expand All @@ -187,7 +187,7 @@ public void sendError() {

@Test
public void sendErrorWithCallback() {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
SettableApiFuture<RecordMetadata> leaked = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future =
Expand Down

0 comments on commit d901201

Please sign in to comment.