Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: Bump underlying pub/sub lite version to 0.11.0 #79

Merged
merged 1 commit into from Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -17,12 +17,12 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.10.0</version>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.8.0</version>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
Expand All @@ -27,14 +29,22 @@
import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.CursorClientSettings;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.AssignerBuilder;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.AssignerSettings;
import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import org.apache.kafka.clients.consumer.Consumer;

@AutoValue
Expand Down Expand Up @@ -74,33 +84,63 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
TopicPath topic = TopicPath.parse(subscription.getTopic());
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
builder.setReceiver(receiver);
builder.setSubscriptionPath(subscriptionPath());
return builder.build();
try {
return AssignerSettings.newBuilder()
.setReceiver(receiver)
.setSubscriptionPath(subscriptionPath())
.setServiceClient(
PartitionAssignmentServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
PartitionAssignmentServiceSettings.newBuilder())))
.build()
.instantiate();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
};
PullSubscriberFactory pullSubscriberFactory =
(partition, initialSeek) -> {
SubscriberBuilder.Builder builder =
SubscriberBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
.setPartition(partition)
.setSubscriptionPath(subscriptionPath());
return new BufferingPullSubscriber(
SubscriberFactory subscriberFactory =
consumer -> {
synchronized (builder) {
return builder.setMessageConsumer(consumer).build();
try {
return SubscriberBuilder.newBuilder()
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setMessageConsumer(consumer)
.setServiceClient(
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition),
SubscriberServiceSettings.newBuilder()))))
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
},
perPartitionFlowControlSettings(),
initialSeek);
};
return new BufferingPullSubscriber(
subscriberFactory, perPartitionFlowControlSettings(), initialSeek);
};
CommitterFactory committerFactory =
partition ->
CommitterBuilder.newBuilder()
partition -> {
try {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
.build();
.setServiceClient(
CursorServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
CursorServiceSettings.newBuilder())))
.build()
.instantiate();
} catch (Throwable t) {
throw toCanonical(t);
}
};
ConsumerFactory consumerFactory =
() ->
new SingleSubscriptionConsumerImpl(
Expand All @@ -115,7 +155,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
return new PubsubLiteConsumer(
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
throw toCanonical(e).underlying;
}
}
}
Expand Up @@ -16,13 +16,17 @@

package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.kafka.clients.producer.Producer;

@AutoValue
Expand All @@ -45,23 +49,35 @@ public abstract static class Builder {
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
PartitionCountWatchingPublisherSettings publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
partition -> {
try {
return SinglePartitionPublisherBuilder.newBuilder()
.setServiceClient(
PublisherServiceClient.create(
ServiceClients.addDefaultSettings(
topicPath().location().region(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
PublisherServiceSettings.newBuilder()))))
.setTopic(topicPath())
.setPartition(partition)
.build());
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
})
.build();
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(topicPath().location().region())
.build()));
return new PubsubLiteProducer(
new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath());
return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath());
}
}
Expand Up @@ -25,7 +25,7 @@
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
Expand Down Expand Up @@ -58,11 +58,11 @@ class PubsubLiteProducer implements Producer<byte[], byte[]> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final SharedBehavior shared;
private final Publisher<PublishMetadata> publisher;
private final Publisher<MessageMetadata> publisher;
private final TopicPath topicPath;

PubsubLiteProducer(
Publisher<PublishMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
Publisher<MessageMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
this.publisher = publisher;
this.shared = shared;
this.topicPath = topicPath;
Expand Down Expand Up @@ -127,7 +127,7 @@ public ApiFuture<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRec
throw new UnsupportedOperationException(
"Pub/Sub Lite producers may not specify a partition in their records.");
}
ApiFuture<PublishMetadata> future =
ApiFuture<MessageMetadata> future =
publisher.publish(RecordTransforms.toMessage(producerRecord));
return ApiFutures.transform(
future,
Expand Down
Expand Up @@ -28,9 +28,9 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
Expand Down Expand Up @@ -58,7 +58,7 @@
@RunWith(JUnit4.class)
public class PubsubLiteProducerTest {
abstract static class FakePublisher extends FakeApiService
implements Publisher<PublishMetadata> {}
implements Publisher<MessageMetadata> {}

private static final ProducerRecord<byte[], byte[]> RECORD =
new ProducerRecord<>(
Expand Down Expand Up @@ -129,11 +129,11 @@ public void badTopicThrows() {

@Test
public void sendSuccess() throws Exception {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future = producer.send(RECORD);
verify(underlying).publish(MESSAGE);
response.set(PublishMetadata.of(example(Partition.class), example(Offset.class)));
response.set(MessageMetadata.of(example(Partition.class), example(Offset.class)));
// RecordMetadata doesn't define a equals implementation.
RecordMetadata metadata = future.get();
assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString());
Expand All @@ -145,7 +145,7 @@ public void sendSuccess() throws Exception {

@Test
public void sendSuccessWithCallback() throws Exception {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
SettableApiFuture<RecordMetadata> leaked = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future =
Expand All @@ -159,7 +159,7 @@ public void sendSuccessWithCallback() throws Exception {
}
});
verify(underlying).publish(MESSAGE);
response.set(PublishMetadata.of(example(Partition.class), example(Offset.class)));
response.set(MessageMetadata.of(example(Partition.class), example(Offset.class)));
// RecordMetadata doesn't define a equals implementation.
RecordMetadata metadata = leaked.get();
assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString());
Expand All @@ -177,7 +177,7 @@ public void sendSuccessWithCallback() throws Exception {

@Test
public void sendError() {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future = producer.send(RECORD);
verify(underlying).publish(MESSAGE);
Expand All @@ -187,7 +187,7 @@ public void sendError() {

@Test
public void sendErrorWithCallback() {
SettableApiFuture<PublishMetadata> response = SettableApiFuture.create();
SettableApiFuture<MessageMetadata> response = SettableApiFuture.create();
SettableApiFuture<RecordMetadata> leaked = SettableApiFuture.create();
when(underlying.publish(MESSAGE)).thenReturn(response);
Future<RecordMetadata> future =
Expand Down