Skip to content

Commit

Permalink
feat: Add support for increasing partitions to the kafka shim (#37)
Browse files Browse the repository at this point in the history
* feat: Add support for increasing partitions to the kafka shim

* Updated to address comments
  • Loading branch information
palmere-google committed Dec 16, 2020
1 parent 5a5ff68 commit 13f2138
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 40 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Expand Up @@ -3,26 +3,26 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>0.6.5</version>
<version>0.7.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
<version>0.1.2-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
<packaging>jar</packaging>
<name>Pub/Sub Lite Kafka Shim</name>
<url>https://github.com/googleapis/java-pubsublite</url>
<url>https://github.com/googleapis/java-pubsublite-kafka</url>
<description>Kafka Producer and Consumer for Google Cloud Pub/Sub Lite</description>
<dependencies>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.6.1</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.1</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
Expand Down Expand Up @@ -73,7 +72,6 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
long partitionCount = PartitionLookupUtils.numPartitions(topic);
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
Expand Down Expand Up @@ -110,14 +108,12 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {

CursorClient cursorClient =
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());

SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
return new PubsubLiteConsumer(
subscriptionPath(),
topic,
partitionCount,
consumerFactory,
assignerFactory,
cursorClient);
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
}
Expand Down
Expand Up @@ -16,16 +16,13 @@

package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import org.apache.kafka.clients.producer.Producer;

@AutoValue
Expand All @@ -48,18 +45,23 @@ public abstract static class Builder {
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
TopicPath canonicalTopic = toCanonical(topicPath());
RoutingPublisherBuilder.Builder routingBuilder =
RoutingPublisherBuilder.newBuilder()
.setTopic(canonicalTopic)
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
.setTopic(canonicalTopic)
.setTopic(topicPath())
.setPartition(partition)
.build());
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(topicPath().location().region())
.build()));
return new PubsubLiteProducer(
routingBuilder.build(), PartitionLookupUtils.numPartitions(canonicalTopic), canonicalTopic);
new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath());
}
}
Expand Up @@ -70,7 +70,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final SubscriptionPath subscriptionPath;
private final TopicPath topicPath;
private final long partitionCount;
private final SharedBehavior shared;
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
Expand All @@ -80,13 +80,13 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
PubsubLiteConsumer(
SubscriptionPath subscriptionPath,
TopicPath topicPath,
long partitionCount,
SharedBehavior shared,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
Expand Down Expand Up @@ -440,7 +440,7 @@ public List<PartitionInfo> partitionsFor(String s) {
@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
checkTopic(topic);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
return shared.partitionsFor(topicPath, timeout);
}

@Override
Expand Down Expand Up @@ -511,6 +511,11 @@ public void close(Duration timeout) {
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
try {
shared.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
}
unsubscribe();
}

Expand Down
Expand Up @@ -51,20 +51,21 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;

class PubsubLiteProducer implements Producer<byte[], byte[]> {
private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION =
new UnsupportedVersionException(
"Pub/Sub Lite is a non-transactional system and does not support producer transactions.");
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final SharedBehavior shared;
private final Publisher<PublishMetadata> publisher;
private final TopicPath topicPath;
private final long partitionCount;

PubsubLiteProducer(
Publisher<PublishMetadata> publisher, long partitionCount, TopicPath topicPath) {
Publisher<PublishMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
this.publisher = publisher;
this.shared = shared;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.publisher.addListener(
new Listener() {
@Override
Expand Down Expand Up @@ -175,7 +176,7 @@ public void flush() {
@Override
public List<PartitionInfo> partitionsFor(String s) {
checkTopic(s);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
return shared.partitionsFor(topicPath, INFINITE_DURATION);
}

@Override
Expand All @@ -190,6 +191,11 @@ public void close() {

@Override
public void close(Duration duration) {
try {
shared.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Producer shutdown.");
}
try {
publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS);
} catch (TimeoutException e) {
Expand Down
Expand Up @@ -18,17 +18,24 @@

import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.PartitionInfo;

/** Shared behavior for producer and consumer. */
final class SharedBehavior {
private SharedBehavior() {}
final class SharedBehavior implements AutoCloseable {
private final AdminClient client;

static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
SharedBehavior(AdminClient client) {
this.client = client;
}

private static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
return new PartitionInfo(
topic.toString(),
(int) partition.value(),
Expand All @@ -37,8 +44,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
PubsubLiteNode.NODES);
}

static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
List<PartitionInfo> partitionsFor(TopicPath topic, Duration timeout) {
try {
long partitionCount =
client.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
ImmutableList.Builder<PartitionInfo> result = ImmutableList.builder();
for (int i = 0; i < partitionCount; ++i) {
result.add(toPartitionInfo(topic, Partition.of(i)));
Expand All @@ -48,4 +57,9 @@ static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
throw toKafka(t);
}
}

@Override
public void close() {
client.close();
}
}
Expand Up @@ -30,6 +30,7 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -52,6 +53,7 @@
import com.google.common.collect.Multimaps;
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -61,6 +63,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -98,6 +101,7 @@ private static <T> T example(Class<T> klass) {
@Mock ConsumerFactory consumerFactory;
@Mock AssignerFactory assignerFactory;
@Mock CursorClient cursorClient;
@Mock AdminClient adminClient;

@Mock Assigner assigner;
@Mock SingleSubscriptionConsumer underlying;
Expand All @@ -111,7 +115,7 @@ public void setUp() {
new PubsubLiteConsumer(
example(SubscriptionPath.class),
example(TopicPath.class),
3,
new SharedBehavior(adminClient),
consumerFactory,
assignerFactory,
cursorClient);
Expand Down Expand Up @@ -455,4 +459,19 @@ public void seek() {
example(Partition.class),
SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build());
}

@Test
public void partitionsFor() {
when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
.thenReturn(ApiFutures.immediateFuture(2L));
List<PartitionInfo> info = consumer.partitionsFor(example(TopicPath.class).toString());
assertThat(info.size()).isEqualTo(2L);
}

@Test
public void close() {
consumer.close();
verify(adminClient).close();
verify(cursorClient).close();
}
}
Expand Up @@ -23,8 +23,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -35,18 +37,21 @@
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

Expand All @@ -64,13 +69,16 @@ abstract static class FakePublisher extends FakeApiService
example(TopicPath.class).toString(), (int) example(Partition.class).value());

@Spy FakePublisher underlying;
@Mock AdminClient adminClient;

Producer<byte[], byte[]> producer;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class));
producer =
new PubsubLiteProducer(
underlying, new SharedBehavior(adminClient), example(TopicPath.class));
verify(underlying).startAsync();
verify(underlying).awaitRunning();
}
Expand Down Expand Up @@ -207,7 +215,16 @@ public void flush() throws Exception {
@Test
public void close() throws Exception {
producer.close();
verify(adminClient).close();
verify(underlying).stopAsync();
verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}

@Test
public void partitionsFor() {
when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
.thenReturn(ApiFutures.immediateFuture(2L));
List<PartitionInfo> info = producer.partitionsFor(example(TopicPath.class).toString());
assertThat(info.size()).isEqualTo(2L);
}
}

0 comments on commit 13f2138

Please sign in to comment.