diff --git a/pom.xml b/pom.xml
index d5d8d4a6..e06ad049 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.google.cloud
google-cloud-pubsublite-parent
- 1.4.6
+ 1.4.7
4.0.0
com.google.cloud
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java
index 498e69ed..95d3b6c3 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java
@@ -17,7 +17,10 @@
package com.google.cloud.pubsublite.kafka;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
@@ -127,6 +130,9 @@ public Consumer instantiate() throws ApiException {
throw toCanonical(t).underlying;
}
};
+ SubscriberServiceClient subscriberServiceClient =
+ SubscriberServiceClient.create(
+ ServiceClients.addDefaultSettings(region, SubscriberServiceSettings.newBuilder()));
PullSubscriberFactory pullSubscriberFactory =
(partition, initialSeek, resetHandler) -> {
SubscriberFactory subscriberFactory =
@@ -136,14 +142,16 @@ public Consumer instantiate() throws ApiException {
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setMessageConsumer(consumer)
- .setServiceClient(
- SubscriberServiceClient.create(
- ServiceClients.addDefaultSettings(
- region,
- ServiceClients.addDefaultMetadata(
- PubsubContext.of(FRAMEWORK),
- RoutingMetadata.of(subscriptionPath(), partition),
- SubscriberServiceSettings.newBuilder()))))
+ .setStreamFactory(
+ responseStream -> {
+ ApiCallContext context =
+ getCallContext(
+ PubsubContext.of(FRAMEWORK),
+ RoutingMetadata.of(subscriptionPath(), partition));
+ return subscriberServiceClient
+ .subscribeCallable()
+ .splitCall(responseStream, context);
+ })
.setInitialLocation(initialSeek)
.setResetHandler(resetHandler)
.build();
@@ -154,16 +162,22 @@ public Consumer instantiate() throws ApiException {
return new BlockingPullSubscriberImpl(
subscriberFactory, perPartitionFlowControlSettings());
};
+ CursorServiceClient cursorServiceClient =
+ CursorServiceClient.create(
+ addDefaultSettings(
+ subscriptionPath().location().extractRegion(),
+ CursorServiceSettings.newBuilder()));
CommitterFactory committerFactory =
partition -> {
try {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
- .setServiceClient(
- CursorServiceClient.create(
- ServiceClients.addDefaultSettings(
- region, CursorServiceSettings.newBuilder())))
+ .setStreamFactory(
+ responseStream ->
+ cursorServiceClient
+ .streamingCommitCursorCallable()
+ .splitCall(responseStream))
.build()
.instantiate();
} catch (Throwable t) {
@@ -189,7 +203,9 @@ public Consumer instantiate() throws ApiException {
consumerFactory,
assignerFactory,
cursorClient,
- topicStatsClient);
+ topicStatsClient,
+ cursorServiceClient,
+ subscriberServiceClient);
} catch (Exception e) {
throw toCanonical(e).underlying;
}
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java
index b1572e43..e9a14f0c 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java
@@ -18,14 +18,24 @@
import static com.google.cloud.pubsublite.cloudpubsub.PublisherSettings.DEFAULT_BATCHING_SETTINGS;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.internal.wire.*;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
+import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
+import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
+import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
+import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.kafka.clients.producer.Producer;
@@ -54,31 +64,50 @@ private AdminClient newAdminClient() {
AdminClientSettings.newBuilder().setRegion(topicPath().location().extractRegion()).build());
}
+ private PublisherServiceClient newServiceClient() throws ApiException {
+ try {
+ return PublisherServiceClient.create(
+ addDefaultSettings(
+ topicPath().location().extractRegion(), PublisherServiceSettings.newBuilder()));
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ private PartitionPublisherFactory getPartitionPublisherFactory() {
+ PublisherServiceClient client = newServiceClient();
+ return new PartitionPublisherFactory() {
+ @Override
+ public Publisher newPublisher(Partition partition) throws ApiException {
+ SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
+ SinglePartitionPublisherBuilder.newBuilder()
+ .setTopic(topicPath())
+ .setPartition(partition)
+ .setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
+ .setStreamFactory(
+ responseStream -> {
+ ApiCallContext context =
+ getCallContext(
+ PubsubContext.of(FRAMEWORK),
+ RoutingMetadata.of(topicPath(), partition));
+ return client.publishCallable().splitCall(responseStream, context);
+ });
+ return singlePartitionBuilder.build();
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+ };
+ }
+
public Producer instantiate() throws ApiException {
PartitionCountWatchingPublisherSettings publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setAdminClient(newAdminClient())
- .setPublisherFactory(
- partition -> {
- try {
- return SinglePartitionPublisherBuilder.newBuilder()
- .setServiceClient(
- PublisherServiceClient.create(
- ServiceClients.addDefaultSettings(
- topicPath().location().extractRegion(),
- ServiceClients.addDefaultMetadata(
- PubsubContext.of(FRAMEWORK),
- RoutingMetadata.of(topicPath(), partition),
- PublisherServiceSettings.newBuilder()))))
- .setTopic(topicPath())
- .setPartition(partition)
- .setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
- .build();
- } catch (Throwable t) {
- throw toCanonical(t).underlying;
- }
- })
+ .setPublisherFactory(getPartitionPublisherFactory())
.build();
SharedBehavior shared = new SharedBehavior(newAdminClient());
return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath());
diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
index 1868976c..75e269d7 100644
--- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
+++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java
@@ -35,6 +35,7 @@
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
@@ -79,6 +80,7 @@ class PubsubLiteConsumer implements Consumer {
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
private final TopicStatsClient topicStatsClient;
+ private final List toClose;
private Optional assigner = Optional.empty();
private Optional consumer = Optional.empty();
@@ -89,7 +91,8 @@ class PubsubLiteConsumer implements Consumer {
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient,
- TopicStatsClient topicStatsClient) {
+ TopicStatsClient topicStatsClient,
+ AutoCloseable... resources) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.shared = shared;
@@ -97,6 +100,13 @@ class PubsubLiteConsumer implements Consumer {
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
this.topicStatsClient = topicStatsClient;
+ this.toClose =
+ ImmutableList.builder()
+ .add(resources)
+ .add(cursorClient)
+ .add(topicStatsClient)
+ .add(shared)
+ .build();
}
private TopicPartition toTopicPartition(Partition partition) {
@@ -558,23 +568,15 @@ public void close(long l, TimeUnit timeUnit) {
@Override
public void close(Duration timeout) {
- try {
- cursorClient.close();
- } catch (Exception e) {
- logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
- }
- try {
- topicStatsClient.close();
- } catch (Exception e) {
- logger.atSevere().withCause(e).log(
- "Error closing topic stats client during Consumer shutdown.");
- }
- try {
- shared.close();
- } catch (Exception e) {
- logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
- }
unsubscribe();
+ for (AutoCloseable closeable : toClose) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Error closing %s during Consumer shutdown.", closeable.getClass().getSimpleName());
+ }
+ }
}
@Override