Skip to content

Commit

Permalink
fix: Version bump Pub/Sub Lite and make fixes to not create many chan…
Browse files Browse the repository at this point in the history
…nels (#263)

* fix: Version bump Pub/Sub Lite and make fixes to not create many channels

* fix: Version bump Pub/Sub Lite and make fixes to not create many channels

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] committed Dec 29, 2021
1 parent d7414cf commit e21ff5d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>1.4.6</version>
<version>1.4.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
Expand Down
Expand Up @@ -17,7 +17,10 @@
package com.google.cloud.pubsublite.kafka;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
Expand Down Expand Up @@ -127,6 +130,9 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
throw toCanonical(t).underlying;
}
};
SubscriberServiceClient subscriberServiceClient =
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(region, SubscriberServiceSettings.newBuilder()));
PullSubscriberFactory pullSubscriberFactory =
(partition, initialSeek, resetHandler) -> {
SubscriberFactory subscriberFactory =
Expand All @@ -136,14 +142,16 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setMessageConsumer(consumer)
.setServiceClient(
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(
region,
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition),
SubscriberServiceSettings.newBuilder()))))
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition));
return subscriberServiceClient
.subscribeCallable()
.splitCall(responseStream, context);
})
.setInitialLocation(initialSeek)
.setResetHandler(resetHandler)
.build();
Expand All @@ -154,16 +162,22 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
return new BlockingPullSubscriberImpl(
subscriberFactory, perPartitionFlowControlSettings());
};
CursorServiceClient cursorServiceClient =
CursorServiceClient.create(
addDefaultSettings(
subscriptionPath().location().extractRegion(),
CursorServiceSettings.newBuilder()));
CommitterFactory committerFactory =
partition -> {
try {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
.setServiceClient(
CursorServiceClient.create(
ServiceClients.addDefaultSettings(
region, CursorServiceSettings.newBuilder())))
.setStreamFactory(
responseStream ->
cursorServiceClient
.streamingCommitCursorCallable()
.splitCall(responseStream))
.build()
.instantiate();
} catch (Throwable t) {
Expand All @@ -189,7 +203,9 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
consumerFactory,
assignerFactory,
cursorClient,
topicStatsClient);
topicStatsClient,
cursorServiceClient,
subscriberServiceClient);
} catch (Exception e) {
throw toCanonical(e).underlying;
}
Expand Down
Expand Up @@ -18,14 +18,24 @@

import static com.google.cloud.pubsublite.cloudpubsub.PublisherSettings.DEFAULT_BATCHING_SETTINGS;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.*;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -54,31 +64,50 @@ private AdminClient newAdminClient() {
AdminClientSettings.newBuilder().setRegion(topicPath().location().extractRegion()).build());
}

private PublisherServiceClient newServiceClient() throws ApiException {
try {
return PublisherServiceClient.create(
addDefaultSettings(
topicPath().location().extractRegion(), PublisherServiceSettings.newBuilder()));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private PartitionPublisherFactory getPartitionPublisherFactory() {
PublisherServiceClient client = newServiceClient();
return new PartitionPublisherFactory() {
@Override
public Publisher<MessageMetadata> newPublisher(Partition partition) throws ApiException {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(topicPath())
.setPartition(partition)
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition));
return client.publishCallable().splitCall(responseStream, context);
});
return singlePartitionBuilder.build();
}

@Override
public void close() {
client.close();
}
};
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
PartitionCountWatchingPublisherSettings publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setAdminClient(newAdminClient())
.setPublisherFactory(
partition -> {
try {
return SinglePartitionPublisherBuilder.newBuilder()
.setServiceClient(
PublisherServiceClient.create(
ServiceClients.addDefaultSettings(
topicPath().location().extractRegion(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
PublisherServiceSettings.newBuilder()))))
.setTopic(topicPath())
.setPartition(partition)
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
})
.setPublisherFactory(getPartitionPublisherFactory())
.build();
SharedBehavior shared = new SharedBehavior(newAdminClient());
return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath());
Expand Down
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
Expand Down Expand Up @@ -79,6 +80,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
private final TopicStatsClient topicStatsClient;
private final List<AutoCloseable> toClose;
private Optional<Assigner> assigner = Optional.empty();
private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

Expand All @@ -89,14 +91,22 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient,
TopicStatsClient topicStatsClient) {
TopicStatsClient topicStatsClient,
AutoCloseable... resources) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
this.topicStatsClient = topicStatsClient;
this.toClose =
ImmutableList.<AutoCloseable>builder()
.add(resources)
.add(cursorClient)
.add(topicStatsClient)
.add(shared)
.build();
}

private TopicPartition toTopicPartition(Partition partition) {
Expand Down Expand Up @@ -558,23 +568,15 @@ public void close(long l, TimeUnit timeUnit) {

@Override
public void close(Duration timeout) {
try {
cursorClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
try {
topicStatsClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Error closing topic stats client during Consumer shutdown.");
}
try {
shared.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
}
unsubscribe();
for (AutoCloseable closeable : toClose) {
try {
closeable.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Error closing %s during Consumer shutdown.", closeable.getClass().getSimpleName());
}
}
}

@Override
Expand Down

0 comments on commit e21ff5d

Please sign in to comment.