diff --git a/pom.xml b/pom.xml index c465b213..fcb8c446 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.google.cloud google-cloud-pubsublite-parent - 0.6.5 + 0.7.0 4.0.0 com.google.cloud @@ -11,18 +11,18 @@ 0.1.2-SNAPSHOT jar Pub/Sub Lite Kafka Shim - https://github.com/googleapis/java-pubsublite + https://github.com/googleapis/java-pubsublite-kafka Kafka Producer and Consumer for Google Cloud Pub/Sub Lite com.google.api.grpc proto-google-cloud-pubsublite-v1 - 0.6.1 + 0.7.0 com.google.cloud google-cloud-pubsublite - 0.6.1 + 0.7.0 org.apache.kafka diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 8cdf4152..625276c2 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -21,7 +21,6 @@ import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.CloudZone; -import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; @@ -73,7 +72,6 @@ public Consumer instantiate() throws ApiException { AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) { Subscription subscription = adminClient.getSubscription(subscriptionPath()).get(); TopicPath topic = TopicPath.parse(subscription.getTopic()); - long partitionCount = PartitionLookupUtils.numPartitions(topic); AssignerFactory assignerFactory = receiver -> { AssignerBuilder.Builder builder = AssignerBuilder.newBuilder(); @@ -110,14 +108,12 @@ public Consumer instantiate() throws ApiException { CursorClient cursorClient = CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build()); - + SharedBehavior shared = + new SharedBehavior( + AdminClient.create( + AdminClientSettings.newBuilder().setRegion(topic.location().region()).build())); return new PubsubLiteConsumer( - subscriptionPath(), - topic, - partitionCount, - consumerFactory, - assignerFactory, - cursorClient); + subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient); } catch (Exception e) { throw ExtractStatus.toCanonical(e).underlying; } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java index c11144c8..35bee099 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java @@ -16,16 +16,13 @@ package com.google.cloud.pubsublite.kafka; -import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical; - import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.PartitionLookupUtils; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import com.google.cloud.pubsublite.internal.wire.*; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; -import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder; -import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; import org.apache.kafka.clients.producer.Producer; @AutoValue @@ -48,18 +45,23 @@ public abstract static class Builder { } public Producer instantiate() throws ApiException { - TopicPath canonicalTopic = toCanonical(topicPath()); - RoutingPublisherBuilder.Builder routingBuilder = - RoutingPublisherBuilder.newBuilder() - .setTopic(canonicalTopic) + PartitionCountWatchingPublisherSettings.Builder publisherSettings = + PartitionCountWatchingPublisherSettings.newBuilder() + .setTopic(topicPath()) .setPublisherFactory( partition -> SinglePartitionPublisherBuilder.newBuilder() .setContext(PubsubContext.of(FRAMEWORK)) - .setTopic(canonicalTopic) + .setTopic(topicPath()) .setPartition(partition) .build()); + SharedBehavior shared = + new SharedBehavior( + AdminClient.create( + AdminClientSettings.newBuilder() + .setRegion(topicPath().location().region()) + .build())); return new PubsubLiteProducer( - routingBuilder.build(), PartitionLookupUtils.numPartitions(canonicalTopic), canonicalTopic); + new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath()); } } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index de02fb4b..ab4facc4 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -70,7 +70,7 @@ class PubsubLiteConsumer implements Consumer { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); private final SubscriptionPath subscriptionPath; private final TopicPath topicPath; - private final long partitionCount; + private final SharedBehavior shared; private final ConsumerFactory consumerFactory; private final AssignerFactory assignerFactory; private final CursorClient cursorClient; @@ -80,13 +80,13 @@ class PubsubLiteConsumer implements Consumer { PubsubLiteConsumer( SubscriptionPath subscriptionPath, TopicPath topicPath, - long partitionCount, + SharedBehavior shared, ConsumerFactory consumerFactory, AssignerFactory assignerFactory, CursorClient cursorClient) { this.subscriptionPath = subscriptionPath; this.topicPath = topicPath; - this.partitionCount = partitionCount; + this.shared = shared; this.consumerFactory = consumerFactory; this.assignerFactory = assignerFactory; this.cursorClient = cursorClient; @@ -440,7 +440,7 @@ public List partitionsFor(String s) { @Override public List partitionsFor(String topic, Duration timeout) { checkTopic(topic); - return SharedBehavior.partitionsFor(partitionCount, topicPath); + return shared.partitionsFor(topicPath, timeout); } @Override @@ -511,6 +511,11 @@ public void close(Duration timeout) { } catch (Exception e) { logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown."); } + try { + shared.close(); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown."); + } unsubscribe(); } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java index c5937495..966c71ab 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java @@ -51,20 +51,21 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; class PubsubLiteProducer implements Producer { + private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE); private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION = new UnsupportedVersionException( "Pub/Sub Lite is a non-transactional system and does not support producer transactions."); private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + private final SharedBehavior shared; private final Publisher publisher; private final TopicPath topicPath; - private final long partitionCount; PubsubLiteProducer( - Publisher publisher, long partitionCount, TopicPath topicPath) { + Publisher publisher, SharedBehavior shared, TopicPath topicPath) { this.publisher = publisher; + this.shared = shared; this.topicPath = topicPath; - this.partitionCount = partitionCount; this.publisher.addListener( new Listener() { @Override @@ -175,7 +176,7 @@ public void flush() { @Override public List partitionsFor(String s) { checkTopic(s); - return SharedBehavior.partitionsFor(partitionCount, topicPath); + return shared.partitionsFor(topicPath, INFINITE_DURATION); } @Override @@ -190,6 +191,11 @@ public void close() { @Override public void close(Duration duration) { + try { + shared.close(); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error closing admin client during Producer shutdown."); + } try { publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS); } catch (TimeoutException e) { diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java index 80d83fab..d0ea3c9c 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java @@ -18,17 +18,24 @@ import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; +import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; import com.google.common.collect.ImmutableList; +import java.time.Duration; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.PartitionInfo; /** Shared behavior for producer and consumer. */ -final class SharedBehavior { - private SharedBehavior() {} +final class SharedBehavior implements AutoCloseable { + private final AdminClient client; - static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) { + SharedBehavior(AdminClient client) { + this.client = client; + } + + private static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) { return new PartitionInfo( topic.toString(), (int) partition.value(), @@ -37,8 +44,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) { PubsubLiteNode.NODES); } - static List partitionsFor(long partitionCount, TopicPath topic) { + List partitionsFor(TopicPath topic, Duration timeout) { try { + long partitionCount = + client.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS); ImmutableList.Builder result = ImmutableList.builder(); for (int i = 0; i < partitionCount; ++i) { result.add(toPartitionInfo(topic, Partition.of(i))); @@ -48,4 +57,9 @@ static List partitionsFor(long partitionCount, TopicPath topic) { throw toKafka(t); } } + + @Override + public void close() { + client.close(); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index 9b084d11..b9c9bad9 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -30,6 +30,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; @@ -52,6 +53,7 @@ import com.google.common.collect.Multimaps; import com.google.common.reflect.ImmutableTypeToInstanceMap; import java.time.Duration; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; @@ -61,6 +63,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -98,6 +101,7 @@ private static T example(Class klass) { @Mock ConsumerFactory consumerFactory; @Mock AssignerFactory assignerFactory; @Mock CursorClient cursorClient; + @Mock AdminClient adminClient; @Mock Assigner assigner; @Mock SingleSubscriptionConsumer underlying; @@ -111,7 +115,7 @@ public void setUp() { new PubsubLiteConsumer( example(SubscriptionPath.class), example(TopicPath.class), - 3, + new SharedBehavior(adminClient), consumerFactory, assignerFactory, cursorClient); @@ -455,4 +459,19 @@ public void seek() { example(Partition.class), SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build()); } + + @Test + public void partitionsFor() { + when(adminClient.getTopicPartitionCount(example(TopicPath.class))) + .thenReturn(ApiFutures.immediateFuture(2L)); + List info = consumer.partitionsFor(example(TopicPath.class).toString()); + assertThat(info.size()).isEqualTo(2L); + } + + @Test + public void close() { + consumer.close(); + verify(adminClient).close(); + verify(cursorClient).close(); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java index 64784d5c..4c55aab2 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java @@ -23,8 +23,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; @@ -35,18 +37,21 @@ import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.common.collect.ImmutableMap; +import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.Spy; @@ -64,13 +69,16 @@ abstract static class FakePublisher extends FakeApiService example(TopicPath.class).toString(), (int) example(Partition.class).value()); @Spy FakePublisher underlying; + @Mock AdminClient adminClient; Producer producer; @Before public void setUp() { MockitoAnnotations.initMocks(this); - producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class)); + producer = + new PubsubLiteProducer( + underlying, new SharedBehavior(adminClient), example(TopicPath.class)); verify(underlying).startAsync(); verify(underlying).awaitRunning(); } @@ -207,7 +215,16 @@ public void flush() throws Exception { @Test public void close() throws Exception { producer.close(); + verify(adminClient).close(); verify(underlying).stopAsync(); verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } + + @Test + public void partitionsFor() { + when(adminClient.getTopicPartitionCount(example(TopicPath.class))) + .thenReturn(ApiFutures.immediateFuture(2L)); + List info = producer.partitionsFor(example(TopicPath.class).toString()); + assertThat(info.size()).isEqualTo(2L); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java index abcb416e..9ef468a0 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java @@ -18,19 +18,43 @@ import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.StatusCode; +import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import java.time.Duration; import java.util.List; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.BrokerNotAvailableException; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; @RunWith(JUnit4.class) public class SharedBehaviorTest { + @Mock AdminClient adminClient; + SharedBehavior shared; + + @Before + public void setUp() { + initMocks(this); + shared = new SharedBehavior(adminClient); + } + @Test public void partitionsForSuccess() { - List result = SharedBehavior.partitionsFor(2, example(TopicPath.class)); + when(adminClient.getTopicPartitionCount(example(TopicPath.class))) + .thenReturn(ApiFutures.immediateFuture(2L)); + List result = + shared.partitionsFor(example(TopicPath.class), Duration.ofMillis(10)); assertThat(result.size()).isEqualTo(2); assertThat(result.get(0).topic()).isEqualTo(example(TopicPath.class).toString()); assertThat(result.get(0).partition()).isEqualTo(0); @@ -39,4 +63,21 @@ public void partitionsForSuccess() { assertThat(result.get(1).partition()).isEqualTo(1); assertThat(result.get(1).leader()).isEqualTo(PubsubLiteNode.NODE); } + + @Test + public void partitionsForFailure() { + when(adminClient.getTopicPartitionCount(example(TopicPath.class))) + .thenReturn( + ApiFutures.immediateFailedFuture( + new CheckedApiException(StatusCode.Code.FAILED_PRECONDITION).underlying)); + assertThrows( + BrokerNotAvailableException.class, + () -> shared.partitionsFor(example(TopicPath.class), Duration.ofMillis(10))); + } + + @Test + public void close() { + shared.close(); + verify(adminClient).close(); + } }