From 13f2138c3274c52ea19d4fcac1fb0be3576a7acc Mon Sep 17 00:00:00 2001
From: palmere-google <68394592+palmere-google@users.noreply.github.com>
Date: Wed, 16 Dec 2020 16:04:06 -0500
Subject: [PATCH] feat: Add support for increasing partitions to the kafka shim
(#37)
* feat: Add support for increasing partitions to the kafka shim
* Updated to address comments
---
pom.xml | 8 ++--
.../pubsublite/kafka/ConsumerSettings.java | 14 +++---
.../pubsublite/kafka/ProducerSettings.java | 26 +++++------
.../pubsublite/kafka/PubsubLiteConsumer.java | 13 ++++--
.../pubsublite/kafka/PubsubLiteProducer.java | 14 ++++--
.../pubsublite/kafka/SharedBehavior.java | 22 ++++++++--
.../kafka/PubsubLiteConsumerTest.java | 21 ++++++++-
.../kafka/PubsubLiteProducerTest.java | 19 +++++++-
.../pubsublite/kafka/SharedBehaviorTest.java | 43 ++++++++++++++++++-
9 files changed, 140 insertions(+), 40 deletions(-)
diff --git a/pom.xml b/pom.xml
index c465b213..fcb8c446 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.google.cloud
google-cloud-pubsublite-parent
- 0.6.5
+ 0.7.0
4.0.0
com.google.cloud
@@ -11,18 +11,18 @@
0.1.2-SNAPSHOT
jar
Pub/Sub Lite Kafka Shim
- https://github.com/googleapis/java-pubsublite
+ https://github.com/googleapis/java-pubsublite-kafka
Kafka Producer and Consumer for Google Cloud Pub/Sub Lite
com.google.api.grpc
proto-google-cloud-pubsublite-v1
- 0.6.1
+ 0.7.0
com.google.cloud
google-cloud-pubsublite
- 0.6.1
+ 0.7.0
org.apache.kafka
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java
index 8cdf4152..625276c2 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java
@@ -21,7 +21,6 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
-import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
@@ -73,7 +72,6 @@ public Consumer instantiate() throws ApiException {
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
- long partitionCount = PartitionLookupUtils.numPartitions(topic);
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
@@ -110,14 +108,12 @@ public Consumer instantiate() throws ApiException {
CursorClient cursorClient =
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());
-
+ SharedBehavior shared =
+ new SharedBehavior(
+ AdminClient.create(
+ AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
return new PubsubLiteConsumer(
- subscriptionPath(),
- topic,
- partitionCount,
- consumerFactory,
- assignerFactory,
- cursorClient);
+ subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
}
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java
index c11144c8..35bee099 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java
@@ -16,16 +16,13 @@
package com.google.cloud.pubsublite.kafka;
-import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical;
-
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.internal.wire.PubsubContext;
+import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
-import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder;
-import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import org.apache.kafka.clients.producer.Producer;
@AutoValue
@@ -48,18 +45,23 @@ public abstract static class Builder {
}
public Producer instantiate() throws ApiException {
- TopicPath canonicalTopic = toCanonical(topicPath());
- RoutingPublisherBuilder.Builder routingBuilder =
- RoutingPublisherBuilder.newBuilder()
- .setTopic(canonicalTopic)
+ PartitionCountWatchingPublisherSettings.Builder publisherSettings =
+ PartitionCountWatchingPublisherSettings.newBuilder()
+ .setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
- .setTopic(canonicalTopic)
+ .setTopic(topicPath())
.setPartition(partition)
.build());
+ SharedBehavior shared =
+ new SharedBehavior(
+ AdminClient.create(
+ AdminClientSettings.newBuilder()
+ .setRegion(topicPath().location().region())
+ .build()));
return new PubsubLiteProducer(
- routingBuilder.build(), PartitionLookupUtils.numPartitions(canonicalTopic), canonicalTopic);
+ new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath());
}
}
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
index de02fb4b..ab4facc4 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
@@ -70,7 +70,7 @@ class PubsubLiteConsumer implements Consumer {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final SubscriptionPath subscriptionPath;
private final TopicPath topicPath;
- private final long partitionCount;
+ private final SharedBehavior shared;
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
@@ -80,13 +80,13 @@ class PubsubLiteConsumer implements Consumer {
PubsubLiteConsumer(
SubscriptionPath subscriptionPath,
TopicPath topicPath,
- long partitionCount,
+ SharedBehavior shared,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
- this.partitionCount = partitionCount;
+ this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
@@ -440,7 +440,7 @@ public List partitionsFor(String s) {
@Override
public List partitionsFor(String topic, Duration timeout) {
checkTopic(topic);
- return SharedBehavior.partitionsFor(partitionCount, topicPath);
+ return shared.partitionsFor(topicPath, timeout);
}
@Override
@@ -511,6 +511,11 @@ public void close(Duration timeout) {
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
+ try {
+ shared.close();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
+ }
unsubscribe();
}
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java
index c5937495..966c71ab 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java
@@ -51,20 +51,21 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
class PubsubLiteProducer implements Producer {
+ private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION =
new UnsupportedVersionException(
"Pub/Sub Lite is a non-transactional system and does not support producer transactions.");
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+ private final SharedBehavior shared;
private final Publisher publisher;
private final TopicPath topicPath;
- private final long partitionCount;
PubsubLiteProducer(
- Publisher publisher, long partitionCount, TopicPath topicPath) {
+ Publisher publisher, SharedBehavior shared, TopicPath topicPath) {
this.publisher = publisher;
+ this.shared = shared;
this.topicPath = topicPath;
- this.partitionCount = partitionCount;
this.publisher.addListener(
new Listener() {
@Override
@@ -175,7 +176,7 @@ public void flush() {
@Override
public List partitionsFor(String s) {
checkTopic(s);
- return SharedBehavior.partitionsFor(partitionCount, topicPath);
+ return shared.partitionsFor(topicPath, INFINITE_DURATION);
}
@Override
@@ -190,6 +191,11 @@ public void close() {
@Override
public void close(Duration duration) {
+ try {
+ shared.close();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("Error closing admin client during Producer shutdown.");
+ }
try {
publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS);
} catch (TimeoutException e) {
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java
index 80d83fab..d0ea3c9c 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java
@@ -18,17 +18,24 @@
import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;
+import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.collect.ImmutableList;
+import java.time.Duration;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.PartitionInfo;
/** Shared behavior for producer and consumer. */
-final class SharedBehavior {
- private SharedBehavior() {}
+final class SharedBehavior implements AutoCloseable {
+ private final AdminClient client;
- static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
+ SharedBehavior(AdminClient client) {
+ this.client = client;
+ }
+
+ private static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
return new PartitionInfo(
topic.toString(),
(int) partition.value(),
@@ -37,8 +44,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
PubsubLiteNode.NODES);
}
- static List partitionsFor(long partitionCount, TopicPath topic) {
+ List partitionsFor(TopicPath topic, Duration timeout) {
try {
+ long partitionCount =
+ client.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
ImmutableList.Builder result = ImmutableList.builder();
for (int i = 0; i < partitionCount; ++i) {
result.add(toPartitionInfo(topic, Partition.of(i)));
@@ -48,4 +57,9 @@ static List partitionsFor(long partitionCount, TopicPath topic) {
throw toKafka(t);
}
}
+
+ @Override
+ public void close() {
+ client.close();
+ }
}
diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java
index 9b084d11..b9c9bad9 100644
--- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java
+++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java
@@ -30,6 +30,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
+import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
@@ -52,6 +53,7 @@
import com.google.common.collect.Multimaps;
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import java.time.Duration;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
@@ -61,6 +63,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -98,6 +101,7 @@ private static T example(Class klass) {
@Mock ConsumerFactory consumerFactory;
@Mock AssignerFactory assignerFactory;
@Mock CursorClient cursorClient;
+ @Mock AdminClient adminClient;
@Mock Assigner assigner;
@Mock SingleSubscriptionConsumer underlying;
@@ -111,7 +115,7 @@ public void setUp() {
new PubsubLiteConsumer(
example(SubscriptionPath.class),
example(TopicPath.class),
- 3,
+ new SharedBehavior(adminClient),
consumerFactory,
assignerFactory,
cursorClient);
@@ -455,4 +459,19 @@ public void seek() {
example(Partition.class),
SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build());
}
+
+ @Test
+ public void partitionsFor() {
+ when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
+ .thenReturn(ApiFutures.immediateFuture(2L));
+ List info = consumer.partitionsFor(example(TopicPath.class).toString());
+ assertThat(info.size()).isEqualTo(2L);
+ }
+
+ @Test
+ public void close() {
+ consumer.close();
+ verify(adminClient).close();
+ verify(cursorClient).close();
+ }
}
diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java
index 64784d5c..4c55aab2 100644
--- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java
+++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java
@@ -23,8 +23,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
@@ -35,18 +37,21 @@
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.common.collect.ImmutableMap;
+import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
@@ -64,13 +69,16 @@ abstract static class FakePublisher extends FakeApiService
example(TopicPath.class).toString(), (int) example(Partition.class).value());
@Spy FakePublisher underlying;
+ @Mock AdminClient adminClient;
Producer producer;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class));
+ producer =
+ new PubsubLiteProducer(
+ underlying, new SharedBehavior(adminClient), example(TopicPath.class));
verify(underlying).startAsync();
verify(underlying).awaitRunning();
}
@@ -207,7 +215,16 @@ public void flush() throws Exception {
@Test
public void close() throws Exception {
producer.close();
+ verify(adminClient).close();
verify(underlying).stopAsync();
verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void partitionsFor() {
+ when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
+ .thenReturn(ApiFutures.immediateFuture(2L));
+ List info = producer.partitionsFor(example(TopicPath.class).toString());
+ assertThat(info.size()).isEqualTo(2L);
+ }
}
diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java
index abcb416e..9ef468a0 100644
--- a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java
+++ b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java
@@ -18,19 +18,43 @@
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import java.time.Duration;
import java.util.List;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
@RunWith(JUnit4.class)
public class SharedBehaviorTest {
+ @Mock AdminClient adminClient;
+ SharedBehavior shared;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ shared = new SharedBehavior(adminClient);
+ }
+
@Test
public void partitionsForSuccess() {
- List result = SharedBehavior.partitionsFor(2, example(TopicPath.class));
+ when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
+ .thenReturn(ApiFutures.immediateFuture(2L));
+ List result =
+ shared.partitionsFor(example(TopicPath.class), Duration.ofMillis(10));
assertThat(result.size()).isEqualTo(2);
assertThat(result.get(0).topic()).isEqualTo(example(TopicPath.class).toString());
assertThat(result.get(0).partition()).isEqualTo(0);
@@ -39,4 +63,21 @@ public void partitionsForSuccess() {
assertThat(result.get(1).partition()).isEqualTo(1);
assertThat(result.get(1).leader()).isEqualTo(PubsubLiteNode.NODE);
}
+
+ @Test
+ public void partitionsForFailure() {
+ when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
+ .thenReturn(
+ ApiFutures.immediateFailedFuture(
+ new CheckedApiException(StatusCode.Code.FAILED_PRECONDITION).underlying));
+ assertThrows(
+ BrokerNotAvailableException.class,
+ () -> shared.partitionsFor(example(TopicPath.class), Duration.ofMillis(10)));
+ }
+
+ @Test
+ public void close() {
+ shared.close();
+ verify(adminClient).close();
+ }
}