From ac48f12c296752aef067d83d0f6c06a119692ff0 Mon Sep 17 00:00:00 2001 From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com> Date: Fri, 5 Feb 2021 18:12:20 -0500 Subject: [PATCH] feat: Close clients gracefully (#56) * update * update * update * update * update * update --- .../pubsublite/spark/PslContinuousReader.java | 2 +- .../cloud/pubsublite/spark/PslDataSource.java | 24 +++++++++---------- .../PslMicroBatchInputPartitionReader.java | 6 +---- .../pubsublite/spark/PslMicroBatchReader.java | 2 ++ 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java index ff77d6c7..ba6452b7 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java @@ -98,8 +98,8 @@ public void commit(Offset end) { @Override public void stop() { - cursorClient.shutdown(); committer.close(); + cursorClient.close(); } @Override diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java index b07122c4..3f436ddd 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java @@ -16,13 +16,14 @@ package com.google.cloud.pubsublite.spark; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; + import com.github.benmanes.caffeine.cache.Ticker; import com.google.auto.service.AutoService; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.CursorClient; import java.util.Objects; import java.util.Optional; import org.apache.spark.sql.sources.DataSourceRegister; @@ -53,12 +54,13 @@ public ContinuousReader createContinuousReader( PslDataSourceOptions pslDataSourceOptions = PslDataSourceOptions.fromSparkDataSourceOptions(options); - CursorClient cursorClient = pslDataSourceOptions.newCursorClient(); - AdminClient adminClient = pslDataSourceOptions.newAdminClient(); SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); - long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient); + long topicPartitionCount; + try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) { + topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient); + } return new PslContinuousReader( - cursorClient, + pslDataSourceOptions.newCursorClient(), pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount), pslDataSourceOptions.getSubscriberFactory(), subscriptionPath, @@ -76,19 +78,17 @@ public MicroBatchReader createMicroBatchReader( PslDataSourceOptions pslDataSourceOptions = PslDataSourceOptions.fromSparkDataSourceOptions(options); - CursorClient cursorClient = pslDataSourceOptions.newCursorClient(); - AdminClient adminClient = pslDataSourceOptions.newAdminClient(); SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); TopicPath topicPath; - try { + long topicPartitionCount; + try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) { topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()); + topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient); } catch (Throwable t) { - throw new IllegalStateException( - "Unable to get topic for subscription " + subscriptionPath, t); + throw toCanonical(t).underlying; } - long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient); return new PslMicroBatchReader( - cursorClient, + pslDataSourceOptions.newCursorClient(), pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount), pslDataSourceOptions.getSubscriberFactory(), new LimitingHeadOffsetReader( diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java index 2c1b0816..43e62f92 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java @@ -97,10 +97,6 @@ public InternalRow get() { @Override public void close() { - try { - subscriber.close(); - } catch (Exception e) { - log.atWarning().log("Subscriber failed to close."); - } + subscriber.close(); } } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java index 0c2533f8..3ae0d91d 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java @@ -115,6 +115,8 @@ public void commit(Offset end) { @Override public void stop() { committer.close(); + cursorClient.close(); + headOffsetReader.close(); } @Override