Skip to content

Commit

Permalink
feat: Re-fork the beam repo from upstream (#513)
Browse files Browse the repository at this point in the history
The beam version has changes (notably the use of bundle finalizers) which make it impossible to use with dataflow at present.
  • Loading branch information
dpcollins-google committed Feb 23, 2021
1 parent 3d42030 commit 041b3a5
Show file tree
Hide file tree
Showing 46 changed files with 3,883 additions and 11 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -121,6 +121,7 @@
<module>google-cloud-pubsublite</module>
<module>grpc-google-cloud-pubsublite-v1</module>
<module>proto-google-cloud-pubsublite-v1</module>
<module>pubsublite-beam-io</module>
</modules>

<profiles>
Expand Down
16 changes: 5 additions & 11 deletions pubsublite-beam-io/README.md
Expand Up @@ -3,19 +3,18 @@
1. Add the following to your POM file to download the Pub/Sub Lite I/O.
```xml
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.27.0</version>
<groupId>com.google.cloud.pubsublite</groupId>
<artifactId>pubsublite-beam-io</artifactId>
<version>0.11.0</version>
</dependency>
```
1. Create a topic using `gcloud pubsub lite-topics create`
1. Write some messages using:

```java
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import com.google.cloud.pubsublite.beam.PubsubLiteIO;
import com.google.cloud.pubsublite.beam.PublisherOptions;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.*;

...

Expand All @@ -42,7 +41,6 @@
import com.google.cloud.pubsublite.beam.SubscriberOptions;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;

...

Expand All @@ -59,9 +57,5 @@
.setProject(ProjectNumber.of(PROJECT_NUM))
.setName(SubscriptionName.of("my-sub"))
.build())
.setFlowControlSettings(FlowControlSettings.builder()
.setBytesOutstanding(100_000_000) // 100 MB
.setMessagesOutstanding(Long.MAX_VALUE)
.build())
.build()));
```
79 changes: 79 additions & 0 deletions pubsublite-beam-io/pom.xml
@@ -0,0 +1,79 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>0.10.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-beam-io</artifactId>
<version>0.10.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Pub/Sub Lite IO</name>
<url>https://github.com/googleapis/java-pubsublite</url>
<description>Beam IO for Google Cloud Pub/Sub Lite</description>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.10.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.10.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.0-rc7</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.28.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-protobuf</artifactId>
<version>2.28.0</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.28.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Need to skip clirr since it is referencing the old IO -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A transform to add UUIDs to each message to be written to Pub/Sub Lite. */
class AddUuidsTransform extends PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>> {
private static PubSubMessage addUuid(PubSubMessage message) {
PubSubMessage.Builder builder = message.toBuilder();
builder.putAttributes(
Uuid.DEFAULT_ATTRIBUTE,
AttributeValues.newBuilder().addValues(Uuid.random().value()).build());
return builder.build();
}

@Override
public PCollection<PubSubMessage> expand(PCollection<PubSubMessage> input) {
PCollection<PubSubMessage> withUuids =
input.apply(
"AddUuids",
MapElements.into(new TypeDescriptor<PubSubMessage>() {})
.via(AddUuidsTransform::addUuid));
// Reshuffle into 1000 buckets to avoid having unit-sized bundles under high throughput.
return withUuids.apply(
"ShuffleToPersist", Reshuffle.<PubSubMessage>viaRandomKey().withNumBuckets(1000));
}
}
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;

import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message
* types.
*/
public final class CloudPubsubChecks {
private CloudPubsubChecks() {}

/**
* Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
* standard transformation methods in the client library.
*
* <p>Will fail the pipeline if a message has multiple attributes per key.
*/
public static PTransform<PCollection<? extends PubSubMessage>, PCollection<PubSubMessage>>
ensureUsableAsCloudPubsub() {
return MapElements.into(TypeDescriptor.of(PubSubMessage.class))
.via(
message -> {
Object unused = toCpsPublishTransformer().transform(Message.fromProto(message));
return message;
});
}
}
@@ -0,0 +1,76 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import com.google.auto.service.AutoService;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.io.Failure;
import org.apache.beam.sdk.schemas.io.GenericDlqProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;

@Internal
@AutoService(GenericDlqProvider.class)
public class DlqProvider implements GenericDlqProvider {
@Override
public String identifier() {
return "pubsublite";
}

@Override
public PTransform<PCollection<Failure>, PDone> newDlqTransform(String config) {
return new DlqTransform(TopicPath.parse(config));
}

private static class DlqTransform extends PTransform<PCollection<Failure>, PDone> {
private final TopicPath topic;

DlqTransform(TopicPath topic) {
this.topic = topic;
}

@Override
public PDone expand(PCollection<Failure> input) {
return input
.apply(
"Failure to PubSubMessage",
MapElements.into(TypeDescriptor.of(PubSubMessage.class))
.via(DlqTransform::getMessage))
.apply(
"Write Failures to Pub/Sub Lite",
PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topic).build()));
}

private static PubSubMessage getMessage(Failure failure) {
PubSubMessage.Builder builder = PubSubMessage.newBuilder();
builder.putAttributes(
"beam-dlq-error",
AttributeValues.newBuilder()
.addValues(ByteString.copyFromUtf8(failure.getError()))
.build());
builder.setData(ByteString.copyFrom(failure.getPayload()));
return builder.build();
}
}
}
@@ -0,0 +1,27 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;

interface InitialOffsetReader extends AutoCloseable {
Offset read() throws ApiException;

@Override
void close();
}
@@ -0,0 +1,53 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import java.util.Map;

class InitialOffsetReaderImpl implements InitialOffsetReader {
private final CursorClient client;
private final SubscriptionPath subscription;
private final Partition partition;

InitialOffsetReaderImpl(CursorClient client, SubscriptionPath subscription, Partition partition) {
this.client = client;
this.subscription = subscription;
this.partition = partition;
}

@Override
public Offset read() throws ApiException {
try {
Map<Partition, Offset> results = client.listPartitionCursors(subscription).get();
return results.getOrDefault(partition, Offset.of(0));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

@Override
public void close() {
client.close();
}
}

0 comments on commit 041b3a5

Please sign in to comment.