Skip to content

Commit

Permalink
feat: Major classes for Spark continuous streaming (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Dec 10, 2020
1 parent c7b297d commit 0c0d928
Show file tree
Hide file tree
Showing 11 changed files with 935 additions and 21 deletions.
27 changes: 27 additions & 0 deletions pubsublite-spark-sql-streaming/pom.xml
Expand Up @@ -30,6 +30,11 @@
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.7.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.7.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -58,6 +63,22 @@
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>google-extensions</artifactId>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down Expand Up @@ -92,6 +113,12 @@
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
<version>3.6.0</version>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
@@ -0,0 +1,32 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.wire.Committer;
import java.io.Closeable;

public interface MultiPartitionCommitter extends Closeable {

interface CommitterFactory {
Committer newCommitter(Partition partition);
}

void commit(PslSourceOffset offset);

void close();
}
@@ -0,0 +1,78 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.Map;

public class MultiPartitionCommitterImpl implements MultiPartitionCommitter {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final Map<Partition, Committer> committerMap = new HashMap<>();

@VisibleForTesting
MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) {
for (int i = 0; i < topicPartitionCount; i++) {
Partition p = Partition.of(i);
Committer committer = committerFactory.newCommitter(p);
committer.startAsync().awaitRunning();
committerMap.put(p, committer);
}
}

@Override
public synchronized void close() {
committerMap.values().forEach(c -> c.stopAsync().awaitTerminated());
}

@Override
public synchronized void commit(PslSourceOffset offset) {
offset
.partitionOffsetMap()
.forEach(
(p, o) -> {
// Note we don't need to worry about commit offset disorder here since Committer
// guarantees the ordering. Once commitOffset() returns, it's either already
// sent to stream, or waiting for next connection to open to be sent in order.
ApiFuture<Void> future = committerMap.get(p).commitOffset(o);
ApiFutures.addCallback(
future,
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
if (!future.isCancelled()) {
log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t);
}
}

@Override
public void onSuccess(Void result) {
log.atInfo().log("Committed %s,%s.", p.value(), o.value());
}
},
MoreExecutors.directExecutor());
});
}
}
@@ -0,0 +1,91 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import java.io.Serializable;
import java.util.concurrent.Executors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;

public class PslContinuousInputPartition
implements ContinuousInputPartition<InternalRow>, Serializable {

private final SparkPartitionOffset startOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslContinuousInputPartition(
SparkPartitionOffset startOffset,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings) {
this.startOffset = startOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
}

@Override
public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset offset) {
assert SparkPartitionOffset.class.isAssignableFrom(offset.getClass())
: "offset is not assignable to SparkPartitionOffset";

SparkPartitionOffset sparkPartitionOffset = (SparkPartitionOffset) offset;
PslPartitionOffset pslPartitionOffset =
PslSparkUtils.toPslPartitionOffset(sparkPartitionOffset);

BufferingPullSubscriber subscriber;
try {
subscriber =
new BufferingPullSubscriber(
// TODO(jiangmichael): Pass credentials settings here.
(consumer) ->
SubscriberBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(pslPartitionOffset.partition())
.setContext(PubsubContext.of(Constants.FRAMEWORK))
.setMessageConsumer(consumer)
.build(),
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(
Cursor.newBuilder().setOffset(pslPartitionOffset.offset().value()).build())
.build());
} catch (CheckedApiException e) {
throw new IllegalStateException(
"Unable to create PSL subscriber for " + startOffset.toString(), e);
}
return new PslContinuousInputPartitionReader(
subscriptionPath,
sparkPartitionOffset,
subscriber,
Executors.newSingleThreadScheduledExecutor());
}

@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
return createContinuousReader(startOffset);
}
}
@@ -0,0 +1,98 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.PullSubscriber;
import com.google.common.flogger.GoogleLogger;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;

public class PslContinuousInputPartitionReader
implements ContinuousInputPartitionReader<InternalRow> {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final SubscriptionPath subscriptionPath;
private final PullSubscriber<SequencedMessage> subscriber;
private final BlockingDeque<SequencedMessage> messages = new LinkedBlockingDeque<>();
private SparkPartitionOffset currentOffset;
private SequencedMessage currentMsg;

PslContinuousInputPartitionReader(
SubscriptionPath subscriptionPath,
SparkPartitionOffset startOffset,
PullSubscriber<SequencedMessage> subscriber,
ScheduledExecutorService pullExecutorService) {
this.subscriptionPath = subscriptionPath;
this.currentOffset = startOffset;
this.subscriber = subscriber;
this.currentMsg = null;
pullExecutorService.scheduleAtFixedRate(
() -> {
try {
messages.addAll(subscriber.pull());
} catch (CheckedApiException e) {
log.atWarning().log("Unable to pull from subscriber.", e);
}
},
0,
50,
TimeUnit.MILLISECONDS);
}

@Override
public PartitionOffset getOffset() {
return currentOffset;
}

@Override
public boolean next() {
try {
currentMsg = messages.takeFirst();
currentOffset =
SparkPartitionOffset.builder()
.partition(currentOffset.partition())
.offset(currentMsg.offset().value())
.build();
return true;
} catch (InterruptedException e) {
throw new IllegalStateException("Retrieving messages interrupted.", e);
}
}

@Override
public InternalRow get() {
assert currentMsg != null;
return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, currentOffset.partition());
}

@Override
public void close() {
try {
subscriber.close();
} catch (Exception e) {
log.atWarning().log("Subscriber failed to close.");
}
}
}

0 comments on commit 0c0d928

Please sign in to comment.