diff --git a/clirr-ignored-differences.xml b/clirr-ignored-differences.xml
index 6aa9dcf9..2921d7b6 100644
--- a/clirr-ignored-differences.xml
+++ b/clirr-ignored-differences.xml
@@ -36,5 +36,9 @@
8001
com/google/cloud/pubsublite/spark/PslDataSourceOptions*
-
+
+ 6001
+ com/google/cloud/pubsublite/spark/Constants
+ DEFAULT_SCHEMA
+
\ No newline at end of file
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java
index 9ad29b23..9587ed23 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java
@@ -17,51 +17,14 @@
package com.google.cloud.pubsublite.spark;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.spark.sql.types.ArrayType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.MapType;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
public class Constants {
+ public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
+
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
- public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
- DataTypes.createArrayType(DataTypes.BinaryType);
- public static MapType ATTRIBUTES_DATATYPE =
- DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
- public static Map PUBLISH_FIELD_TYPES =
- ImmutableMap.of(
- "key", DataTypes.BinaryType,
- "data", DataTypes.BinaryType,
- "attributes", ATTRIBUTES_DATATYPE,
- "event_timestamp", DataTypes.TimestampType);
- public static StructType DEFAULT_SCHEMA =
- new StructType(
- new StructField[] {
- new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
- new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
- new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
- new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
- new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
- new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
- new StructField(
- "event_timestamp",
- PUBLISH_FIELD_TYPES.get("event_timestamp"),
- true,
- Metadata.empty()),
- new StructField(
- "attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
- });
-
- public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
-
public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
"pubsublite.flowcontrol.maxmessagesperbatch";
public static String BYTES_OUTSTANDING_CONFIG_KEY =
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
index ad2ca3da..d984b174 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
@@ -109,7 +109,7 @@ public void stop() {
@Override
public StructType readSchema() {
- return Constants.DEFAULT_SCHEMA;
+ return SparkStructs.DEFAULT_SCHEMA;
}
@Override
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
index a0f0dfee..d526526a 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
@@ -127,7 +127,7 @@ public void stop() {
@Override
public StructType readSchema() {
- return Constants.DEFAULT_SCHEMA;
+ return SparkStructs.DEFAULT_SCHEMA;
}
@Override
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
index 2510315a..cf336a35 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
@@ -123,31 +123,31 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
inputSchema,
row,
"key",
- Constants.PUBLISH_FIELD_TYPES.get("key"),
+ SparkStructs.PUBLISH_FIELD_TYPES.get("key"),
(byte[] o) -> builder.setKey(ByteString.copyFrom(o)));
extractVal(
inputSchema,
row,
"data",
- Constants.PUBLISH_FIELD_TYPES.get("data"),
+ SparkStructs.PUBLISH_FIELD_TYPES.get("data"),
(byte[] o) -> builder.setData(ByteString.copyFrom(o)));
extractVal(
inputSchema,
row,
"event_timestamp",
- Constants.PUBLISH_FIELD_TYPES.get("event_timestamp"),
+ SparkStructs.PUBLISH_FIELD_TYPES.get("event_timestamp"),
(Long o) -> builder.setEventTime(Timestamps.fromMicros(o)));
extractVal(
inputSchema,
row,
"attributes",
- Constants.PUBLISH_FIELD_TYPES.get("attributes"),
+ SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
(MapData o) -> {
ImmutableListMultimap.Builder attributeMapBuilder =
ImmutableListMultimap.builder();
o.foreach(
DataTypes.StringType,
- Constants.ATTRIBUTES_PER_KEY_DATATYPE,
+ SparkStructs.ATTRIBUTES_PER_KEY_DATATYPE,
new FromJavaBiConsumer<>(
(k, v) -> {
String key = ((UTF8String) k).toString();
@@ -170,7 +170,7 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
* @throws IllegalArgumentException if any DataType mismatch detected.
*/
public static void verifyWriteInputSchema(StructType inputSchema) {
- Constants.PUBLISH_FIELD_TYPES.forEach(
+ SparkStructs.PUBLISH_FIELD_TYPES.forEach(
(k, v) -> {
Option