Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for increasing partitions to the kafka shim #37

Merged
merged 2 commits into from Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions pom.xml
Expand Up @@ -3,26 +3,26 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>0.6.5</version>
<version>0.7.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
<version>0.1.2-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
<packaging>jar</packaging>
<name>Pub/Sub Lite Kafka Shim</name>
<url>https://github.com/googleapis/java-pubsublite</url>
<url>https://github.com/googleapis/java-pubsublite-kafka</url>
<description>Kafka Producer and Consumer for Google Cloud Pub/Sub Lite</description>
<dependencies>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.6.1</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.1</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
Expand Down Expand Up @@ -73,7 +72,6 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
long partitionCount = PartitionLookupUtils.numPartitions(topic);
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
Expand Down Expand Up @@ -110,14 +108,12 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {

CursorClient cursorClient =
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());

SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
return new PubsubLiteConsumer(
subscriptionPath(),
topic,
partitionCount,
consumerFactory,
assignerFactory,
cursorClient);
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
}
Expand Down
Expand Up @@ -16,16 +16,13 @@

package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import org.apache.kafka.clients.producer.Producer;

@AutoValue
Expand All @@ -48,18 +45,23 @@ public abstract static class Builder {
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
TopicPath canonicalTopic = toCanonical(topicPath());
RoutingPublisherBuilder.Builder routingBuilder =
RoutingPublisherBuilder.newBuilder()
.setTopic(canonicalTopic)
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setContext(PubsubContext.of(FRAMEWORK))
.setTopic(canonicalTopic)
.setTopic(topicPath())
.setPartition(partition)
.build());
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(topicPath().location().region())
.build()));
return new PubsubLiteProducer(
routingBuilder.build(), PartitionLookupUtils.numPartitions(canonicalTopic), canonicalTopic);
new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath());
}
}
Expand Up @@ -70,7 +70,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final SubscriptionPath subscriptionPath;
private final TopicPath topicPath;
private final long partitionCount;
private final SharedBehavior shared;
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
Expand All @@ -80,13 +80,13 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
PubsubLiteConsumer(
SubscriptionPath subscriptionPath,
TopicPath topicPath,
long partitionCount,
SharedBehavior shared,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
Expand Down Expand Up @@ -440,7 +440,7 @@ public List<PartitionInfo> partitionsFor(String s) {
@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
checkTopic(topic);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
return shared.partitionsFor(topicPath, timeout);
}

@Override
Expand Down Expand Up @@ -511,6 +511,11 @@ public void close(Duration timeout) {
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
try {
shared.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
}
unsubscribe();
}

Expand Down
Expand Up @@ -51,20 +51,21 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;

class PubsubLiteProducer implements Producer<byte[], byte[]> {
private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION =
new UnsupportedVersionException(
"Pub/Sub Lite is a non-transactional system and does not support producer transactions.");
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final SharedBehavior shared;
private final Publisher<PublishMetadata> publisher;
private final TopicPath topicPath;
private final long partitionCount;

PubsubLiteProducer(
Publisher<PublishMetadata> publisher, long partitionCount, TopicPath topicPath) {
Publisher<PublishMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
this.publisher = publisher;
this.shared = shared;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.publisher.addListener(
new Listener() {
@Override
Expand Down Expand Up @@ -175,7 +176,7 @@ public void flush() {
@Override
public List<PartitionInfo> partitionsFor(String s) {
checkTopic(s);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
return shared.partitionsFor(topicPath, INFINITE_DURATION);
}

@Override
Expand All @@ -190,6 +191,11 @@ public void close() {

@Override
public void close(Duration duration) {
try {
shared.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Producer shutdown.");
}
try {
publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS);
} catch (TimeoutException e) {
Expand Down
Expand Up @@ -18,17 +18,24 @@

import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.PartitionInfo;

/** Shared behavior for producer and consumer. */
final class SharedBehavior {
private SharedBehavior() {}
final class SharedBehavior implements AutoCloseable {
private final AdminClient client;

static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
SharedBehavior(AdminClient client) {
this.client = client;
}

private static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
return new PartitionInfo(
topic.toString(),
(int) partition.value(),
Expand All @@ -37,8 +44,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
PubsubLiteNode.NODES);
}

static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
List<PartitionInfo> partitionsFor(TopicPath topic, Duration timeout) {
try {
long partitionCount =
client.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
ImmutableList.Builder<PartitionInfo> result = ImmutableList.builder();
for (int i = 0; i < partitionCount; ++i) {
result.add(toPartitionInfo(topic, Partition.of(i)));
Expand All @@ -48,4 +57,9 @@ static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
throw toKafka(t);
}
}

@Override
public void close() {
client.close();
}
}
Expand Up @@ -30,6 +30,7 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -52,6 +53,7 @@
import com.google.common.collect.Multimaps;
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -61,6 +63,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -98,6 +101,7 @@ private static <T> T example(Class<T> klass) {
@Mock ConsumerFactory consumerFactory;
@Mock AssignerFactory assignerFactory;
@Mock CursorClient cursorClient;
@Mock AdminClient adminClient;

@Mock Assigner assigner;
@Mock SingleSubscriptionConsumer underlying;
Expand All @@ -111,7 +115,7 @@ public void setUp() {
new PubsubLiteConsumer(
example(SubscriptionPath.class),
example(TopicPath.class),
3,
new SharedBehavior(adminClient),
consumerFactory,
assignerFactory,
cursorClient);
Expand Down Expand Up @@ -455,4 +459,19 @@ public void seek() {
example(Partition.class),
SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build());
}

@Test
public void partitionsFor() {
when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
.thenReturn(ApiFutures.immediateFuture(2L));
List<PartitionInfo> info = consumer.partitionsFor(example(TopicPath.class).toString());
assertThat(info.size()).isEqualTo(2L);
}

@Test
public void close() {
consumer.close();
verify(adminClient).close();
verify(cursorClient).close();
}
}
Expand Up @@ -23,8 +23,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -35,18 +37,21 @@
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

Expand All @@ -64,13 +69,16 @@ abstract static class FakePublisher extends FakeApiService
example(TopicPath.class).toString(), (int) example(Partition.class).value());

@Spy FakePublisher underlying;
@Mock AdminClient adminClient;

Producer<byte[], byte[]> producer;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class));
producer =
new PubsubLiteProducer(
underlying, new SharedBehavior(adminClient), example(TopicPath.class));
verify(underlying).startAsync();
verify(underlying).awaitRunning();
}
Expand Down Expand Up @@ -207,7 +215,16 @@ public void flush() throws Exception {
@Test
public void close() throws Exception {
producer.close();
verify(adminClient).close();
verify(underlying).stopAsync();
verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}

@Test
public void partitionsFor() {
when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
.thenReturn(ApiFutures.immediateFuture(2L));
List<PartitionInfo> info = producer.partitionsFor(example(TopicPath.class).toString());
assertThat(info.size()).isEqualTo(2L);
}
}