Skip to content

Commit

Permalink
feat: Supports topic partition increase. (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Mar 19, 2021
1 parent b7237f8 commit 20f3366
Show file tree
Hide file tree
Showing 13 changed files with 488 additions and 120 deletions.
15 changes: 15 additions & 0 deletions clirr-ignored-differences.xml
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/spark/*Reader</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/spark/*Reader</className>
<method>*</method>
<to>*</to>
</difference>
</differences>
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class CachedPartitionCountReader implements PartitionCountReader {
private final AdminClient adminClient;
private final Supplier<Integer> supplier;

public CachedPartitionCountReader(AdminClient adminClient, TopicPath topicPath) {
this.adminClient = adminClient;
this.supplier =
Suppliers.memoizeWithExpiration(
() -> PartitionLookupUtils.numPartitions(topicPath, adminClient), 1, TimeUnit.MINUTES);
}

@Override
public void close() {
adminClient.close();
}

public int getPartitionCount() {
return supplier.get();
}
}
Expand Up @@ -27,7 +27,9 @@
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -40,18 +42,22 @@
* offsets for the topic at most once per minute.
*/
public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final TopicStatsClient topicStatsClient;
private final TopicPath topic;
private final long topicPartitionCount;
private final PartitionCountReader partitionCountReader;
private final AsyncLoadingCache<Partition, Offset> cachedHeadOffsets;

@VisibleForTesting
public LimitingHeadOffsetReader(
TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) {
TopicStatsClient topicStatsClient,
TopicPath topic,
PartitionCountReader partitionCountReader,
Ticker ticker) {
this.topicStatsClient = topicStatsClient;
this.topic = topic;
this.topicPartitionCount = topicPartitionCount;
this.partitionCountReader = partitionCountReader;
this.cachedHeadOffsets =
Caffeine.newBuilder()
.ticker(ticker)
Expand Down Expand Up @@ -82,7 +88,7 @@ public void onSuccess(Cursor c) {
@Override
public PslSourceOffset getHeadOffset() {
Set<Partition> keySet = new HashSet<>();
for (int i = 0; i < topicPartitionCount; i++) {
for (int i = 0; i < partitionCountReader.getPartitionCount(); i++) {
keySet.add(Partition.of(i));
}
CompletableFuture<Map<Partition, Offset>> future = cachedHeadOffsets.getAll(keySet);
Expand All @@ -95,6 +101,10 @@ public PslSourceOffset getHeadOffset() {

@Override
public void close() {
topicStatsClient.close();
try (AutoCloseable a = topicStatsClient;
Closeable b = partitionCountReader) {
} catch (Exception e) {
log.atWarning().withCause(e).log("Unable to close LimitingHeadOffsetReader.");
}
}
}
Expand Up @@ -25,30 +25,95 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

/**
* A {@link MultiPartitionCommitter} that lazily adjusts for partition changes when {@link
* MultiPartitionCommitter#commit(PslSourceOffset)} is called.
*/
public class MultiPartitionCommitterImpl implements MultiPartitionCommitter {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final CommitterFactory committerFactory;

@GuardedBy("this")
private final Map<Partition, Committer> committerMap = new HashMap<>();

@GuardedBy("this")
private final Set<Partition> partitionsCleanUp = new HashSet<>();

public MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) {
this(
topicPartitionCount,
committerFactory,
MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)));
}

@VisibleForTesting
MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) {
MultiPartitionCommitterImpl(
long topicPartitionCount,
CommitterFactory committerFactory,
ScheduledExecutorService executorService) {
this.committerFactory = committerFactory;
for (int i = 0; i < topicPartitionCount; i++) {
Partition p = Partition.of(i);
Committer committer = committerFactory.newCommitter(p);
committer.startAsync().awaitRunning();
committerMap.put(p, committer);
committerMap.put(p, createCommitter(p));
}
executorService.scheduleWithFixedDelay(this::cleanUpCommitterMap, 10, 10, TimeUnit.MINUTES);
}

@Override
public synchronized void close() {
committerMap.values().forEach(c -> c.stopAsync().awaitTerminated());
}

/** Adjust committerMap based on the partitions that needs to be committed. */
private synchronized void updateCommitterMap(PslSourceOffset offset) {
int currentPartitions = committerMap.size();
int newPartitions = offset.partitionOffsetMap().size();

if (currentPartitions == newPartitions) {
return;
}
if (currentPartitions < newPartitions) {
for (int i = currentPartitions; i < newPartitions; i++) {
Partition p = Partition.of(i);
if (!committerMap.containsKey(p)) {
committerMap.put(p, createCommitter(p));
}
partitionsCleanUp.remove(p);
}
return;
}
partitionsCleanUp.clear();
for (int i = newPartitions; i < currentPartitions; i++) {
partitionsCleanUp.add(Partition.of(i));
}
}

private synchronized Committer createCommitter(Partition p) {
Committer committer = committerFactory.newCommitter(p);
committer.startAsync().awaitRunning();
return committer;
}

private synchronized void cleanUpCommitterMap() {
for (Partition p : partitionsCleanUp) {
committerMap.get(p).stopAsync();
committerMap.remove(p);
}
partitionsCleanUp.clear();
}

@Override
public synchronized void commit(PslSourceOffset offset) {
updateCommitterMap(offset);
offset
.partitionOffsetMap()
.forEach(
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import java.io.Closeable;

public interface PartitionCountReader extends Closeable {
int getPartitionCount();

@Override
void close();
}
Expand Up @@ -41,8 +41,9 @@ public class PslContinuousReader implements ContinuousReader {
private final PartitionSubscriberFactory partitionSubscriberFactory;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
private SparkSourceOffset startOffset;
private final PartitionCountReader partitionCountReader;
private final long topicPartitionCount;

@VisibleForTesting
public PslContinuousReader(
Expand All @@ -51,13 +52,14 @@ public PslContinuousReader(
PartitionSubscriberFactory partitionSubscriberFactory,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
long topicPartitionCount) {
PartitionCountReader partitionCountReader) {
this.cursorClient = cursorClient;
this.committer = committer;
this.partitionSubscriberFactory = partitionSubscriberFactory;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
this.partitionCountReader = partitionCountReader;
this.topicPartitionCount = partitionCountReader.getPartitionCount();
}

@Override
Expand Down Expand Up @@ -126,4 +128,9 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
}
return list;
}

@Override
public boolean needsReconfiguration() {
return partitionCountReader.getPartitionCount() != topicPartitionCount;
}
}
24 changes: 13 additions & 11 deletions src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
Expand Up @@ -21,7 +21,6 @@
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.auto.service.AutoService;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import java.util.Objects;
Expand Down Expand Up @@ -55,17 +54,21 @@ public ContinuousReader createContinuousReader(
PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
long topicPartitionCount;
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
return new PslContinuousReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
partitionCountReader);
}

@Override
Expand All @@ -80,25 +83,24 @@ public MicroBatchReader createMicroBatchReader(
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
TopicPath topicPath;
long topicPartitionCount;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
return new PslMicroBatchReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
new LimitingHeadOffsetReader(
pslDataSourceOptions.newTopicStatsClient(),
topicPath,
topicPartitionCount,
partitionCountReader,
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagesPerBatch(),
topicPartitionCount);
pslDataSourceOptions.maxMessagesPerBatch());
}
}

0 comments on commit 20f3366

Please sign in to comment.