diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index 58ae09f1..ab4facc4 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -511,6 +511,11 @@ public void close(Duration timeout) { } catch (Exception e) { logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown."); } + try { + shared.close(); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown."); + } unsubscribe(); } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java index 9697840a..966c71ab 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java @@ -191,6 +191,11 @@ public void close() { @Override public void close(Duration duration) { + try { + shared.close(); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error closing admin client during Producer shutdown."); + } try { publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS); } catch (TimeoutException e) { diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java index 0945e73d..d0ea3c9c 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java @@ -28,8 +28,8 @@ import org.apache.kafka.common.PartitionInfo; /** Shared behavior for producer and consumer. */ -final class SharedBehavior { - AdminClient client; +final class SharedBehavior implements AutoCloseable { + private final AdminClient client; SharedBehavior(AdminClient client) { this.client = client; @@ -57,4 +57,9 @@ List partitionsFor(TopicPath topic, Duration timeout) { throw toKafka(t); } } + + @Override + public void close() { + client.close(); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index 8ee19539..b9c9bad9 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -30,7 +30,14 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.cloud.pubsublite.*; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.ProjectNumber; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; import com.google.cloud.pubsublite.internal.wire.Assigner; @@ -460,4 +467,11 @@ public void partitionsFor() { List info = consumer.partitionsFor(example(TopicPath.class).toString()); assertThat(info.size()).isEqualTo(2L); } + + @Test + public void close() { + consumer.close(); + verify(adminClient).close(); + verify(cursorClient).close(); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java index be4bc41f..4c55aab2 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java @@ -26,7 +26,13 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.StatusCode.Code; -import com.google.cloud.pubsublite.*; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.PublishMetadata; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.testing.FakeApiService; @@ -209,6 +215,7 @@ public void flush() throws Exception { @Test public void close() throws Exception { producer.close(); + verify(adminClient).close(); verify(underlying).stopAsync(); verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java index 7c972563..9ef468a0 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java @@ -19,6 +19,7 @@ import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -73,4 +74,10 @@ public void partitionsForFailure() { BrokerNotAvailableException.class, () -> shared.partitionsFor(example(TopicPath.class), Duration.ofMillis(10))); } + + @Test + public void close() { + shared.close(); + verify(adminClient).close(); + } }