diff --git a/clirr-ignored-differences.xml b/clirr-ignored-differences.xml index 6aa9dcf9..2921d7b6 100644 --- a/clirr-ignored-differences.xml +++ b/clirr-ignored-differences.xml @@ -36,5 +36,9 @@ 8001 com/google/cloud/pubsublite/spark/PslDataSourceOptions* - + + 6001 + com/google/cloud/pubsublite/spark/Constants + DEFAULT_SCHEMA + \ No newline at end of file diff --git a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java index 9ad29b23..9587ed23 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java @@ -17,51 +17,14 @@ package com.google.cloud.pubsublite.spark; import com.google.cloud.pubsublite.internal.wire.PubsubContext; -import com.google.common.collect.ImmutableMap; -import java.util.Map; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.MapType; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; public class Constants { + public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK"); + public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000; public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE; public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE; - public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE = - DataTypes.createArrayType(DataTypes.BinaryType); - public static MapType ATTRIBUTES_DATATYPE = - DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE); - public static Map PUBLISH_FIELD_TYPES = - ImmutableMap.of( - "key", DataTypes.BinaryType, - "data", DataTypes.BinaryType, - "attributes", ATTRIBUTES_DATATYPE, - "event_timestamp", DataTypes.TimestampType); - public static StructType DEFAULT_SCHEMA = - new StructType( - new StructField[] { - new StructField("subscription", DataTypes.StringType, false, Metadata.empty()), - new StructField("partition", DataTypes.LongType, false, Metadata.empty()), - new StructField("offset", DataTypes.LongType, false, Metadata.empty()), - new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), - new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()), - new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), - new StructField( - "event_timestamp", - PUBLISH_FIELD_TYPES.get("event_timestamp"), - true, - Metadata.empty()), - new StructField( - "attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty()) - }); - - public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK"); - public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY = "pubsublite.flowcontrol.maxmessagesperbatch"; public static String BYTES_OUTSTANDING_CONFIG_KEY = diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java index ad2ca3da..d984b174 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java @@ -109,7 +109,7 @@ public void stop() { @Override public StructType readSchema() { - return Constants.DEFAULT_SCHEMA; + return SparkStructs.DEFAULT_SCHEMA; } @Override diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java index a0f0dfee..d526526a 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java @@ -127,7 +127,7 @@ public void stop() { @Override public StructType readSchema() { - return Constants.DEFAULT_SCHEMA; + return SparkStructs.DEFAULT_SCHEMA; } @Override diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java index 2510315a..cf336a35 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -123,31 +123,31 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) { inputSchema, row, "key", - Constants.PUBLISH_FIELD_TYPES.get("key"), + SparkStructs.PUBLISH_FIELD_TYPES.get("key"), (byte[] o) -> builder.setKey(ByteString.copyFrom(o))); extractVal( inputSchema, row, "data", - Constants.PUBLISH_FIELD_TYPES.get("data"), + SparkStructs.PUBLISH_FIELD_TYPES.get("data"), (byte[] o) -> builder.setData(ByteString.copyFrom(o))); extractVal( inputSchema, row, "event_timestamp", - Constants.PUBLISH_FIELD_TYPES.get("event_timestamp"), + SparkStructs.PUBLISH_FIELD_TYPES.get("event_timestamp"), (Long o) -> builder.setEventTime(Timestamps.fromMicros(o))); extractVal( inputSchema, row, "attributes", - Constants.PUBLISH_FIELD_TYPES.get("attributes"), + SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"), (MapData o) -> { ImmutableListMultimap.Builder attributeMapBuilder = ImmutableListMultimap.builder(); o.foreach( DataTypes.StringType, - Constants.ATTRIBUTES_PER_KEY_DATATYPE, + SparkStructs.ATTRIBUTES_PER_KEY_DATATYPE, new FromJavaBiConsumer<>( (k, v) -> { String key = ((UTF8String) k).toString(); @@ -170,7 +170,7 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) { * @throws IllegalArgumentException if any DataType mismatch detected. */ public static void verifyWriteInputSchema(StructType inputSchema) { - Constants.PUBLISH_FIELD_TYPES.forEach( + SparkStructs.PUBLISH_FIELD_TYPES.forEach( (k, v) -> { Option idxOr = inputSchema.getFieldIndex(k); if (!idxOr.isEmpty()) { diff --git a/src/main/java/com/google/cloud/pubsublite/spark/SparkStructs.java b/src/main/java/com/google/cloud/pubsublite/spark/SparkStructs.java new file mode 100644 index 00000000..329ffbf7 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/SparkStructs.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class SparkStructs { + + public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE = + DataTypes.createArrayType(DataTypes.BinaryType); + public static MapType ATTRIBUTES_DATATYPE = + DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE); + public static Map PUBLISH_FIELD_TYPES = + ImmutableMap.of( + "key", DataTypes.BinaryType, + "data", DataTypes.BinaryType, + "attributes", ATTRIBUTES_DATATYPE, + "event_timestamp", DataTypes.TimestampType); + public static StructType DEFAULT_SCHEMA = + new StructType( + new StructField[] { + new StructField("subscription", DataTypes.StringType, false, Metadata.empty()), + new StructField("partition", DataTypes.LongType, false, Metadata.empty()), + new StructField("offset", DataTypes.LongType, false, Metadata.empty()), + new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), + new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()), + new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), + new StructField( + "event_timestamp", + PUBLISH_FIELD_TYPES.get("event_timestamp"), + true, + Metadata.empty()), + new StructField( + "attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty()) + }); +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java index a3f6f1a8..137cec63 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java @@ -51,7 +51,7 @@ public class PslDataWriterTest { new StructType( new StructField[] { new StructField( - "key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), + "key", SparkStructs.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), }); private final PslDataWriter writer = new PslDataWriter(1L, 2L, 3L, keyOnly, publisherFactory); diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java index 7081082f..e7928915 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java @@ -145,7 +145,8 @@ public void testToPubSubMessage() { new StructField[] { new StructField("key", DataTypes.BinaryType, false, Metadata.empty()), new StructField("data", DataTypes.BinaryType, false, Metadata.empty()), - new StructField("attributes", Constants.ATTRIBUTES_DATATYPE, true, Metadata.empty()), + new StructField( + "attributes", SparkStructs.ATTRIBUTES_DATATYPE, true, Metadata.empty()), new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()), new StructField("random_extra_field", DataTypes.BinaryType, false, Metadata.empty()) }); @@ -171,19 +172,19 @@ public void testToPubSubMessageLongForEventTimestamp() { @Test public void testVerifyWriteInputSchema() { - PslSparkUtils.verifyWriteInputSchema(Constants.DEFAULT_SCHEMA); + PslSparkUtils.verifyWriteInputSchema(SparkStructs.DEFAULT_SCHEMA); StructType goodThoughMissing = new StructType( new StructField[] { new StructField("offset", DataTypes.LongType, false, Metadata.empty()), new StructField( - "key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), + "key", SparkStructs.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), new StructField( "publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), new StructField( "attributes", - Constants.PUBLISH_FIELD_TYPES.get("attributes"), + SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty()) }); @@ -199,7 +200,7 @@ public void testVerifyWriteInputSchema() { "publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), new StructField( "attributes", - Constants.PUBLISH_FIELD_TYPES.get("attributes"), + SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty()) }); diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java index 35b525d7..48f1c77b 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java @@ -24,7 +24,7 @@ public class PslStreamWriterTest { private final PslStreamWriter writer = new PslStreamWriter( - Constants.DEFAULT_SCHEMA, + SparkStructs.DEFAULT_SCHEMA, PslWriteDataSourceOptions.builder() .setTopicPath(UnitTestExamples.exampleTopicPath()) .build());