Skip to content

Commit

Permalink
feat: Spark micro batch processing (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Dec 16, 2020
1 parent ff05b1a commit 86aecc9
Show file tree
Hide file tree
Showing 11 changed files with 669 additions and 37 deletions.
Expand Up @@ -19,9 +19,10 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.function.Consumer;

public interface SubscriberFactory {
public interface SubscriberFactory extends Serializable {
Subscriber newSubscriber(Consumer<ImmutableList<SequencedMessage>> message_consumer)
throws ApiException;
}
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import java.io.Closeable;

public interface HeadOffsetReader extends Closeable {

// Gets the head offsets for all partitions in a topic. Blocks.
PslSourceOffset getHeadOffset(TopicPath topic) throws CheckedApiException;

@Override
void close();
}
Expand Up @@ -16,17 +16,13 @@

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
Expand Down Expand Up @@ -84,23 +80,8 @@ public void setStartOffset(Optional<Offset> start) {
startOffset = (SparkSourceOffset) start.get();
return;
}
try {
Map<Partition, com.google.cloud.pubsublite.Offset> pslSourceOffsetMap = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
pslSourceOffsetMap.put(Partition.of(i), com.google.cloud.pubsublite.Offset.of(0));
}
cursorClient
.listPartitionCursors(subscriptionPath)
.get()
.entrySet()
.forEach((e) -> pslSourceOffsetMap.replace(e.getKey(), e.getValue()));
startOffset =
PslSparkUtils.toSparkSourceOffset(
PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(
"Failed to get information from PSL and construct startOffset", e);
}
startOffset =
PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount);
}

@Override
Expand All @@ -123,13 +104,13 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
return startOffset.getPartitionOffsetMap().entrySet().stream()
return startOffset.getPartitionOffsetMap().values().stream()
.map(
e ->
v ->
new PslContinuousInputPartition(
SparkPartitionOffset.builder()
.partition(e.getKey())
.offset(e.getValue().offset())
.partition(v.partition())
.offset(v.offset())
.build(),
subscriptionPath,
flowControlSettings))
Expand Down
Expand Up @@ -17,22 +17,27 @@
package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.common.collect.ImmutableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;

public class PslDataSource implements DataSourceV2, ContinuousReadSupport, DataSourceRegister {
public class PslDataSource
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {

@Override
public String shortName() {
Expand All @@ -52,16 +57,45 @@ public ContinuousReader createContinuousReader(
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
long topicPartitionCount;
long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
MultiPartitionCommitter committer =
new MultiPartitionCommitterImpl(
topicPartitionCount,
(partition) ->
CommitterBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(partition)
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
.build());
return new PslContinuousReader(
cursorClient,
committer,
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
}

@Override
public MicroBatchReader createMicroBatchReader(
Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
if (schema.isPresent()) {
throw new IllegalArgumentException(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try {
Subscription sub = adminClient.getSubscription(subscriptionPath).get();
topicPartitionCount =
adminClient.getTopicPartitionCount(TopicPath.parse(sub.getTopic())).get();
} catch (InterruptedException | ExecutionException e) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw new IllegalStateException(
"Failed to get information of subscription " + pslDataSourceOptions.subscriptionPath(),
e);
"Unable to get topic for subscription " + subscriptionPath, t);
}
long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
MultiPartitionCommitter committer =
new MultiPartitionCommitterImpl(
topicPartitionCount,
Expand All @@ -71,11 +105,37 @@ public ContinuousReader createContinuousReader(
.setPartition(partition)
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
.build());
return new PslContinuousReader(

return new PslMicroBatchReader(
cursorClient,
committer,
subscriptionPath,
PslSparkUtils.toSparkSourceOffset(getHeadOffset(topicPath)),
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
topicPartitionCount);
}

private static PslSourceOffset getHeadOffset(TopicPath topicPath) {
// TODO(jiangmichael): Replace it with real implementation.
HeadOffsetReader headOffsetReader =
new HeadOffsetReader() {
@Override
public PslSourceOffset getHeadOffset(TopicPath topic) {
return PslSourceOffset.builder()
.partitionOffsetMap(
ImmutableMap.of(
Partition.of(0), com.google.cloud.pubsublite.Offset.of(50),
Partition.of(1), com.google.cloud.pubsublite.Offset.of(50)))
.build();
}

@Override
public void close() {}
};
try {
return headOffsetReader.getHeadOffset(topicPath);
} catch (CheckedApiException e) {
throw new IllegalStateException("Unable to get head offset for topic " + topicPath, e);
}
}
}
@@ -0,0 +1,66 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

public class PslMicroBatchInputPartition implements InputPartition<InternalRow> {

private final SubscriberFactory subscriberFactory;
private final SparkPartitionOffset endOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslMicroBatchInputPartition(
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
SparkPartitionOffset endOffset,
SubscriberFactory subscriberFactory) {
this.endOffset = endOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.subscriberFactory = subscriberFactory;
}

@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
BlockingPullSubscriber subscriber;
try {
subscriber =
new BlockingPullSubscriberImpl(
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(endOffset.offset()).build())
.build());
} catch (CheckedApiException e) {
throw new IllegalStateException(
"Unable to create PSL subscriber for " + endOffset.partition(), e);
}
return new PslMicroBatchInputPartitionReader(subscriptionPath, endOffset, subscriber);
}
}
@@ -0,0 +1,98 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

public class PslMicroBatchInputPartitionReader implements InputPartitionReader<InternalRow> {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private static final Duration SUBSCRIBER_PULL_TIMEOUT = Duration.ofSeconds(10);

private final SubscriptionPath subscriptionPath;
private final SparkPartitionOffset endOffset;
private final BlockingPullSubscriber subscriber;
@Nullable private SequencedMessage currentMsg = null;
private boolean batchFulfilled = false;

@VisibleForTesting
PslMicroBatchInputPartitionReader(
SubscriptionPath subscriptionPath,
SparkPartitionOffset endOffset,
BlockingPullSubscriber subscriber) {
this.subscriptionPath = subscriptionPath;
this.subscriber = subscriber;
this.endOffset = endOffset;
}

@Override
public boolean next() {
if (batchFulfilled) {
return false;
}
Optional<SequencedMessage> msg;
while (true) {
try {
subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
msg = subscriber.messageIfAvailable();
break;
} catch (TimeoutException e) {
log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT);
} catch (Throwable t) {
throw new IllegalStateException("Failed to retrieve messages.", t);
}
}
// since next() is only called on one thread at a time, we are sure that the message is
// available to this thread.
assert msg.isPresent();
currentMsg = msg.get();
if (currentMsg.offset().value() == endOffset.offset()) {
// this is the last msg for the batch.
batchFulfilled = true;
} else if (currentMsg.offset().value() > endOffset.offset()) {
batchFulfilled = true;
return false;
}
return true;
}

@Override
public InternalRow get() {
assert currentMsg != null;
return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, endOffset.partition());
}

@Override
public void close() {
try {
subscriber.close();
} catch (Exception e) {
log.atWarning().log("Subscriber failed to close.");
}
}
}

0 comments on commit 86aecc9

Please sign in to comment.