diff --git a/pom.xml b/pom.xml index 38ef7bb..c0e0194 100644 --- a/pom.xml +++ b/pom.xml @@ -17,12 +17,12 @@ com.google.api.grpc proto-google-cloud-pubsublite-v1 - 0.10.0 + 0.11.0 com.google.cloud google-cloud-pubsublite - 0.8.0 + 0.11.0 org.apache.kafka diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 625276c..c2ace41 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsublite.kafka; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; + import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.AdminClient; @@ -27,14 +29,22 @@ import com.google.cloud.pubsublite.internal.BufferingPullSubscriber; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.CursorClientSettings; -import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.wire.AssignerBuilder; import com.google.cloud.pubsublite.internal.wire.AssignerFactory; -import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; +import com.google.cloud.pubsublite.internal.wire.AssignerSettings; +import com.google.cloud.pubsublite.internal.wire.CommitterSettings; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; +import com.google.cloud.pubsublite.internal.wire.ServiceClients; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; +import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; import com.google.cloud.pubsublite.proto.Subscription; +import com.google.cloud.pubsublite.v1.CursorServiceClient; +import com.google.cloud.pubsublite.v1.CursorServiceSettings; +import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient; +import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings; +import com.google.cloud.pubsublite.v1.SubscriberServiceClient; +import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import org.apache.kafka.clients.consumer.Consumer; @AutoValue @@ -74,33 +84,63 @@ public Consumer instantiate() throws ApiException { TopicPath topic = TopicPath.parse(subscription.getTopic()); AssignerFactory assignerFactory = receiver -> { - AssignerBuilder.Builder builder = AssignerBuilder.newBuilder(); - builder.setReceiver(receiver); - builder.setSubscriptionPath(subscriptionPath()); - return builder.build(); + try { + return AssignerSettings.newBuilder() + .setReceiver(receiver) + .setSubscriptionPath(subscriptionPath()) + .setServiceClient( + PartitionAssignmentServiceClient.create( + ServiceClients.addDefaultSettings( + subscriptionPath().location().region(), + PartitionAssignmentServiceSettings.newBuilder()))) + .build() + .instantiate(); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } }; PullSubscriberFactory pullSubscriberFactory = (partition, initialSeek) -> { - SubscriberBuilder.Builder builder = - SubscriberBuilder.newBuilder() - .setContext(PubsubContext.of(FRAMEWORK)) - .setPartition(partition) - .setSubscriptionPath(subscriptionPath()); - return new BufferingPullSubscriber( + SubscriberFactory subscriberFactory = consumer -> { - synchronized (builder) { - return builder.setMessageConsumer(consumer).build(); + try { + return SubscriberBuilder.newBuilder() + .setPartition(partition) + .setSubscriptionPath(subscriptionPath()) + .setMessageConsumer(consumer) + .setServiceClient( + SubscriberServiceClient.create( + ServiceClients.addDefaultSettings( + subscriptionPath().location().region(), + ServiceClients.addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(subscriptionPath(), partition), + SubscriberServiceSettings.newBuilder())))) + .build(); + } catch (Throwable t) { + throw toCanonical(t).underlying; } - }, - perPartitionFlowControlSettings(), - initialSeek); + }; + return new BufferingPullSubscriber( + subscriberFactory, perPartitionFlowControlSettings(), initialSeek); }; CommitterFactory committerFactory = - partition -> - CommitterBuilder.newBuilder() + partition -> { + try { + return CommitterSettings.newBuilder() .setSubscriptionPath(subscriptionPath()) .setPartition(partition) - .build(); + .setServiceClient( + CursorServiceClient.create( + ServiceClients.addDefaultSettings( + subscriptionPath().location().region(), + CursorServiceSettings.newBuilder()))) + .build() + .instantiate(); + } catch (Throwable t) { + throw toCanonical(t); + } + }; ConsumerFactory consumerFactory = () -> new SingleSubscriptionConsumerImpl( @@ -115,7 +155,7 @@ public Consumer instantiate() throws ApiException { return new PubsubLiteConsumer( subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient); } catch (Exception e) { - throw ExtractStatus.toCanonical(e).underlying; + throw toCanonical(e).underlying; } } } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java index 35bee09..4baf0f4 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsublite.kafka; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; + import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.AdminClient; @@ -23,6 +25,8 @@ import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.wire.*; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.v1.PublisherServiceClient; +import com.google.cloud.pubsublite.v1.PublisherServiceSettings; import org.apache.kafka.clients.producer.Producer; @AutoValue @@ -45,23 +49,35 @@ public abstract static class Builder { } public Producer instantiate() throws ApiException { - PartitionCountWatchingPublisherSettings.Builder publisherSettings = + PartitionCountWatchingPublisherSettings publisherSettings = PartitionCountWatchingPublisherSettings.newBuilder() .setTopic(topicPath()) .setPublisherFactory( - partition -> - SinglePartitionPublisherBuilder.newBuilder() - .setContext(PubsubContext.of(FRAMEWORK)) + partition -> { + try { + return SinglePartitionPublisherBuilder.newBuilder() + .setServiceClient( + PublisherServiceClient.create( + ServiceClients.addDefaultSettings( + topicPath().location().region(), + ServiceClients.addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(topicPath(), partition), + PublisherServiceSettings.newBuilder())))) .setTopic(topicPath()) .setPartition(partition) - .build()); + .build(); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + }) + .build(); SharedBehavior shared = new SharedBehavior( AdminClient.create( AdminClientSettings.newBuilder() .setRegion(topicPath().location().region()) .build())); - return new PubsubLiteProducer( - new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath()); + return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath()); } } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java index 966c71a..360aff6 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java @@ -25,7 +25,7 @@ import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; import com.google.api.gax.rpc.ApiException; -import com.google.cloud.pubsublite.PublishMetadata; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.Publisher; @@ -58,11 +58,11 @@ class PubsubLiteProducer implements Producer { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); private final SharedBehavior shared; - private final Publisher publisher; + private final Publisher publisher; private final TopicPath topicPath; PubsubLiteProducer( - Publisher publisher, SharedBehavior shared, TopicPath topicPath) { + Publisher publisher, SharedBehavior shared, TopicPath topicPath) { this.publisher = publisher; this.shared = shared; this.topicPath = topicPath; @@ -127,7 +127,7 @@ public ApiFuture send(ProducerRecord producerRec throw new UnsupportedOperationException( "Pub/Sub Lite producers may not specify a partition in their records."); } - ApiFuture future = + ApiFuture future = publisher.publish(RecordTransforms.toMessage(producerRecord)); return ApiFutures.transform( future, diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java index 4c55aab..85a92f8 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java @@ -28,9 +28,9 @@ import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CheckedApiException; @@ -58,7 +58,7 @@ @RunWith(JUnit4.class) public class PubsubLiteProducerTest { abstract static class FakePublisher extends FakeApiService - implements Publisher {} + implements Publisher {} private static final ProducerRecord RECORD = new ProducerRecord<>( @@ -129,11 +129,11 @@ public void badTopicThrows() { @Test public void sendSuccess() throws Exception { - SettableApiFuture response = SettableApiFuture.create(); + SettableApiFuture response = SettableApiFuture.create(); when(underlying.publish(MESSAGE)).thenReturn(response); Future future = producer.send(RECORD); verify(underlying).publish(MESSAGE); - response.set(PublishMetadata.of(example(Partition.class), example(Offset.class))); + response.set(MessageMetadata.of(example(Partition.class), example(Offset.class))); // RecordMetadata doesn't define a equals implementation. RecordMetadata metadata = future.get(); assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString()); @@ -145,7 +145,7 @@ public void sendSuccess() throws Exception { @Test public void sendSuccessWithCallback() throws Exception { - SettableApiFuture response = SettableApiFuture.create(); + SettableApiFuture response = SettableApiFuture.create(); SettableApiFuture leaked = SettableApiFuture.create(); when(underlying.publish(MESSAGE)).thenReturn(response); Future future = @@ -159,7 +159,7 @@ public void sendSuccessWithCallback() throws Exception { } }); verify(underlying).publish(MESSAGE); - response.set(PublishMetadata.of(example(Partition.class), example(Offset.class))); + response.set(MessageMetadata.of(example(Partition.class), example(Offset.class))); // RecordMetadata doesn't define a equals implementation. RecordMetadata metadata = leaked.get(); assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString()); @@ -177,7 +177,7 @@ public void sendSuccessWithCallback() throws Exception { @Test public void sendError() { - SettableApiFuture response = SettableApiFuture.create(); + SettableApiFuture response = SettableApiFuture.create(); when(underlying.publish(MESSAGE)).thenReturn(response); Future future = producer.send(RECORD); verify(underlying).publish(MESSAGE); @@ -187,7 +187,7 @@ public void sendError() { @Test public void sendErrorWithCallback() { - SettableApiFuture response = SettableApiFuture.create(); + SettableApiFuture response = SettableApiFuture.create(); SettableApiFuture leaked = SettableApiFuture.create(); when(underlying.publish(MESSAGE)).thenReturn(response); Future future =