Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Version bump Pub/Sub Lite and make fixes to not create many channels #263

Merged
merged 4 commits into from Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>1.4.6</version>
<version>1.4.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
Expand Down
Expand Up @@ -17,7 +17,10 @@
package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
Expand Down Expand Up @@ -127,6 +130,9 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
throw toCanonical(t).underlying;
}
};
SubscriberServiceClient subscriberServiceClient =
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(region, SubscriberServiceSettings.newBuilder()));
PullSubscriberFactory pullSubscriberFactory =
(partition, initialSeek, resetHandler) -> {
SubscriberFactory subscriberFactory =
Expand All @@ -136,14 +142,16 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setMessageConsumer(consumer)
.setServiceClient(
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(
region,
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition),
SubscriberServiceSettings.newBuilder()))))
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition));
return subscriberServiceClient
.subscribeCallable()
.splitCall(responseStream, context);
})
.setInitialLocation(initialSeek)
.setResetHandler(resetHandler)
.build();
Expand All @@ -154,16 +162,22 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
return new BlockingPullSubscriberImpl(
subscriberFactory, perPartitionFlowControlSettings());
};
CursorServiceClient cursorServiceClient =
CursorServiceClient.create(
addDefaultSettings(
subscriptionPath().location().extractRegion(),
CursorServiceSettings.newBuilder()));
CommitterFactory committerFactory =
partition -> {
try {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
.setServiceClient(
CursorServiceClient.create(
ServiceClients.addDefaultSettings(
region, CursorServiceSettings.newBuilder())))
.setStreamFactory(
responseStream ->
cursorServiceClient
.streamingCommitCursorCallable()
.splitCall(responseStream))
.build()
.instantiate();
} catch (Throwable t) {
Expand All @@ -189,7 +203,9 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
consumerFactory,
assignerFactory,
cursorClient,
topicStatsClient);
topicStatsClient,
cursorServiceClient,
subscriberServiceClient);
} catch (Exception e) {
throw toCanonical(e).underlying;
}
Expand Down
Expand Up @@ -18,14 +18,24 @@

import static com.google.cloud.pubsublite.cloudpubsub.PublisherSettings.DEFAULT_BATCHING_SETTINGS;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -54,31 +64,50 @@ private AdminClient newAdminClient() {
AdminClientSettings.newBuilder().setRegion(topicPath().location().extractRegion()).build());
}

private PublisherServiceClient newServiceClient() throws ApiException {
try {
return PublisherServiceClient.create(
addDefaultSettings(
topicPath().location().extractRegion(), PublisherServiceSettings.newBuilder()));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private PartitionPublisherFactory getPartitionPublisherFactory() {
PublisherServiceClient client = newServiceClient();
return new PartitionPublisherFactory() {
@Override
public Publisher<MessageMetadata> newPublisher(Partition partition) throws ApiException {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(topicPath())
.setPartition(partition)
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition));
return client.publishCallable().splitCall(responseStream, context);
});
return singlePartitionBuilder.build();
}

@Override
public void close() {
client.close();
}
};
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
PartitionCountWatchingPublisherSettings publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setAdminClient(newAdminClient())
.setPublisherFactory(
partition -> {
try {
return SinglePartitionPublisherBuilder.newBuilder()
.setServiceClient(
PublisherServiceClient.create(
ServiceClients.addDefaultSettings(
topicPath().location().extractRegion(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
PublisherServiceSettings.newBuilder()))))
.setTopic(topicPath())
.setPartition(partition)
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
})
.setPublisherFactory(getPartitionPublisherFactory())
.build();
SharedBehavior shared = new SharedBehavior(newAdminClient());
return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath());
Expand Down
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
Expand Down Expand Up @@ -79,6 +80,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
private final TopicStatsClient topicStatsClient;
private final List<AutoCloseable> toClose;
private Optional<Assigner> assigner = Optional.empty();
private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

Expand All @@ -89,14 +91,22 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient,
TopicStatsClient topicStatsClient) {
TopicStatsClient topicStatsClient,
AutoCloseable... resources) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
this.topicStatsClient = topicStatsClient;
this.toClose =
ImmutableList.<AutoCloseable>builder()
.add(resources)
.add(cursorClient)
.add(topicStatsClient)
.add(shared)
.build();
}

private TopicPartition toTopicPartition(Partition partition) {
Expand Down Expand Up @@ -558,23 +568,15 @@ public void close(long l, TimeUnit timeUnit) {

@Override
public void close(Duration timeout) {
try {
cursorClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
try {
topicStatsClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Error closing topic stats client during Consumer shutdown.");
}
try {
shared.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
}
unsubscribe();
for (AutoCloseable closeable : toClose) {
try {
closeable.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Error closing %s during Consumer shutdown.", closeable.getClass().getSimpleName());
}
}
}

@Override
Expand Down