Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: Move Spark constants into its own class. (#127)
  • Loading branch information
jiangmichaellll committed Apr 2, 2021
1 parent 92cfdfd commit faf1ece
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 55 deletions.
6 changes: 5 additions & 1 deletion clirr-ignored-differences.xml
Expand Up @@ -36,5 +36,9 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PslDataSourceOptions*</className>
</difference>

<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/spark/Constants</className>
<field>DEFAULT_SCHEMA</field>
</difference>
</differences>
41 changes: 2 additions & 39 deletions src/main/java/com/google/cloud/pubsublite/spark/Constants.java
Expand Up @@ -17,51 +17,14 @@
package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Constants {
public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");

public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;

public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
DataTypes.createArrayType(DataTypes.BinaryType);
public static MapType ATTRIBUTES_DATATYPE =
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
ImmutableMap.of(
"key", DataTypes.BinaryType,
"data", DataTypes.BinaryType,
"attributes", ATTRIBUTES_DATATYPE,
"event_timestamp", DataTypes.TimestampType);
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField(
"event_timestamp",
PUBLISH_FIELD_TYPES.get("event_timestamp"),
true,
Metadata.empty()),
new StructField(
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
});

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");

public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
"pubsublite.flowcontrol.maxmessagesperbatch";
public static String BYTES_OUTSTANDING_CONFIG_KEY =
Expand Down
Expand Up @@ -109,7 +109,7 @@ public void stop() {

@Override
public StructType readSchema() {
return Constants.DEFAULT_SCHEMA;
return SparkStructs.DEFAULT_SCHEMA;
}

@Override
Expand Down
Expand Up @@ -127,7 +127,7 @@ public void stop() {

@Override
public StructType readSchema() {
return Constants.DEFAULT_SCHEMA;
return SparkStructs.DEFAULT_SCHEMA;
}

@Override
Expand Down
Expand Up @@ -123,31 +123,31 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
inputSchema,
row,
"key",
Constants.PUBLISH_FIELD_TYPES.get("key"),
SparkStructs.PUBLISH_FIELD_TYPES.get("key"),
(byte[] o) -> builder.setKey(ByteString.copyFrom(o)));
extractVal(
inputSchema,
row,
"data",
Constants.PUBLISH_FIELD_TYPES.get("data"),
SparkStructs.PUBLISH_FIELD_TYPES.get("data"),
(byte[] o) -> builder.setData(ByteString.copyFrom(o)));
extractVal(
inputSchema,
row,
"event_timestamp",
Constants.PUBLISH_FIELD_TYPES.get("event_timestamp"),
SparkStructs.PUBLISH_FIELD_TYPES.get("event_timestamp"),
(Long o) -> builder.setEventTime(Timestamps.fromMicros(o)));
extractVal(
inputSchema,
row,
"attributes",
Constants.PUBLISH_FIELD_TYPES.get("attributes"),
SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
(MapData o) -> {
ImmutableListMultimap.Builder<String, ByteString> attributeMapBuilder =
ImmutableListMultimap.builder();
o.foreach(
DataTypes.StringType,
Constants.ATTRIBUTES_PER_KEY_DATATYPE,
SparkStructs.ATTRIBUTES_PER_KEY_DATATYPE,
new FromJavaBiConsumer<>(
(k, v) -> {
String key = ((UTF8String) k).toString();
Expand All @@ -170,7 +170,7 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
* @throws IllegalArgumentException if any DataType mismatch detected.
*/
public static void verifyWriteInputSchema(StructType inputSchema) {
Constants.PUBLISH_FIELD_TYPES.forEach(
SparkStructs.PUBLISH_FIELD_TYPES.forEach(
(k, v) -> {
Option<Object> idxOr = inputSchema.getFieldIndex(k);
if (!idxOr.isEmpty()) {
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/SparkStructs.java
@@ -0,0 +1,58 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class SparkStructs {

public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
DataTypes.createArrayType(DataTypes.BinaryType);
public static MapType ATTRIBUTES_DATATYPE =
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
ImmutableMap.of(
"key", DataTypes.BinaryType,
"data", DataTypes.BinaryType,
"attributes", ATTRIBUTES_DATATYPE,
"event_timestamp", DataTypes.TimestampType);
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField(
"event_timestamp",
PUBLISH_FIELD_TYPES.get("event_timestamp"),
true,
Metadata.empty()),
new StructField(
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
});
}
Expand Up @@ -51,7 +51,7 @@ public class PslDataWriterTest {
new StructType(
new StructField[] {
new StructField(
"key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
"key", SparkStructs.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
});

private final PslDataWriter writer = new PslDataWriter(1L, 2L, 3L, keyOnly, publisherFactory);
Expand Down
Expand Up @@ -145,7 +145,8 @@ public void testToPubSubMessage() {
new StructField[] {
new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("attributes", Constants.ATTRIBUTES_DATATYPE, true, Metadata.empty()),
new StructField(
"attributes", SparkStructs.ATTRIBUTES_DATATYPE, true, Metadata.empty()),
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
new StructField("random_extra_field", DataTypes.BinaryType, false, Metadata.empty())
});
Expand All @@ -171,19 +172,19 @@ public void testToPubSubMessageLongForEventTimestamp() {

@Test
public void testVerifyWriteInputSchema() {
PslSparkUtils.verifyWriteInputSchema(Constants.DEFAULT_SCHEMA);
PslSparkUtils.verifyWriteInputSchema(SparkStructs.DEFAULT_SCHEMA);

StructType goodThoughMissing =
new StructType(
new StructField[] {
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
new StructField(
"key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
"key", SparkStructs.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
new StructField(
"publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField(
"attributes",
Constants.PUBLISH_FIELD_TYPES.get("attributes"),
SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
true,
Metadata.empty())
});
Expand All @@ -199,7 +200,7 @@ public void testVerifyWriteInputSchema() {
"publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField(
"attributes",
Constants.PUBLISH_FIELD_TYPES.get("attributes"),
SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"),
true,
Metadata.empty())
});
Expand Down
Expand Up @@ -24,7 +24,7 @@ public class PslStreamWriterTest {

private final PslStreamWriter writer =
new PslStreamWriter(
Constants.DEFAULT_SCHEMA,
SparkStructs.DEFAULT_SCHEMA,
PslWriteDataSourceOptions.builder()
.setTopicPath(UnitTestExamples.exampleTopicPath())
.build());
Expand Down

0 comments on commit faf1ece

Please sign in to comment.