Skip to content

Commit

Permalink
Add utils functions to convert protobuf schema to pinot schema with s…
Browse files Browse the repository at this point in the history
…upport for complex type schema
  • Loading branch information
rseetham committed May 10, 2024
1 parent a17bd9c commit 003bf3f
Show file tree
Hide file tree
Showing 5 changed files with 516 additions and 0 deletions.
@@ -0,0 +1,162 @@
package org.apache.pinot.plugin.inputformat.protobuf;

import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;

public class ProtoBufSchemaUtils {

private ProtoBufSchemaUtils() {
}

/**
* Given an Protobuf schema, flatten/unnest the complex types based on the config, and then map from column to
* field type and time unit, return the equivalent Pinot schema.
*
* @param protoSchema Avro schema
* @param fieldTypeMap Map from column to field type
* @param timeUnit Time unit
* @param fieldsToUnnest the fields to unnest
* @param delimiter the delimiter to separate components in nested structure
*
* @return Pinot schema
*/
public static Schema getPinotSchemaFromPinotSchemaWithComplexTypeHandling(Descriptors.Descriptor protoSchema,
@Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> fieldsToUnnest,
String delimiter) {
Schema pinotSchema = new Schema();

for (Descriptors.FieldDescriptor field : protoSchema.getFields()) {
extractSchemaWithComplexTypeHandling(field, fieldsToUnnest, delimiter, field.getName(), pinotSchema,
fieldTypeMap, timeUnit);
}
return pinotSchema;
}

static void extractSchemaWithComplexTypeHandling(
Descriptors.FieldDescriptor fieldSchema,
List<String> fieldsToUnnest,
String delimiter,
String path,
Schema pinotSchema,
@Nullable Map<String, FieldSpec.FieldType> fieldTypeMap,
@Nullable TimeUnit timeUnit) {
Descriptors.FieldDescriptor.Type fieldType = fieldSchema.getType();
if (fieldSchema.isRepeated()) {
if (isPrimitiveType(fieldType)) {
addFieldToPinotSchema(pinotSchema, valueOf(fieldType), path, false, fieldTypeMap, timeUnit);
} else if (fieldsToUnnest.contains(path) && !fieldSchema.isMapField()) {
for (Descriptors.FieldDescriptor innerField : fieldSchema.getMessageType().getFields()) {
extractSchemaWithComplexTypeHandling(innerField, fieldsToUnnest, delimiter,
String.join(delimiter, path, innerField.getName()), pinotSchema, fieldTypeMap, timeUnit);
}
} else if (!fieldSchema.isMapField()) {
addFieldToPinotSchema(pinotSchema, FieldSpec.DataType.STRING, path, true, fieldTypeMap, timeUnit);
}
// Ignores Map type since it's not supported when complex type handling is enabled
} else if (fieldType == Descriptors.FieldDescriptor.Type.MESSAGE) {
for (Descriptors.FieldDescriptor innerField : fieldSchema.getMessageType().getFields()) {
extractSchemaWithComplexTypeHandling(innerField, fieldsToUnnest, delimiter,
String.join(delimiter, path, innerField.getName()), pinotSchema, fieldTypeMap, timeUnit);
}
} else {
FieldSpec.DataType dataType = valueOf(fieldType);
addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap, timeUnit);
}
}

public static FieldSpec.DataType valueOf(Descriptors.FieldDescriptor.Type pinotType) {
switch (pinotType) {
case INT32:
case UINT32:
case SINT32:
case FIXED32:
case SFIXED32:
return FieldSpec.DataType.INT;
case INT64:
case UINT64:
case FIXED64:
case SINT64:
case SFIXED64:
return FieldSpec.DataType.LONG;
case DOUBLE:
return FieldSpec.DataType.DOUBLE;
case FLOAT:
return FieldSpec.DataType.FLOAT;
case BOOL:
return FieldSpec.DataType.BOOLEAN;
case BYTES:
return FieldSpec.DataType.BYTES;
case STRING:
case ENUM:
return FieldSpec.DataType.STRING;
default:
throw new UnsupportedOperationException("Unsupported ProtoBuf type: " + pinotType);
}
}

/**
* @return if the given avro type is a primitive type.
*/
public static boolean isPrimitiveType(Descriptors.FieldDescriptor.Type pinotType) {
switch (pinotType) {
case INT32:
case INT64:
case UINT32:
case UINT64:
case SINT32:
case SINT64:
case FIXED64:
case FIXED32:
case SFIXED64:
case DOUBLE:
case FLOAT:
case BOOL:
case BYTES:
case STRING:
case ENUM:
return true;
default:
return false;
}
}

private static void addFieldToPinotSchema(Schema pinotSchema, FieldSpec.DataType dataType, String name,
boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap,
@Nullable TimeUnit timeUnit) {
if (fieldTypeMap == null) {
pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField));
} else {
FieldSpec.FieldType fieldType = fieldTypeMap.getOrDefault(name, FieldSpec.FieldType.DIMENSION);
Preconditions.checkNotNull(fieldType, "Field type not specified for field: %s", name);
switch (fieldType) {
case DIMENSION:
pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField));
break;
case METRIC:
Preconditions.checkState(isSingleValueField, "Metric field: %s cannot be multi-valued", name);
pinotSchema.addField(new MetricFieldSpec(name, dataType));
break;
case DATE_TIME:
Preconditions.checkState(isSingleValueField, "Time field: %s cannot be multi-valued", name);
Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
// TODO: Switch to new format after releasing 0.11.0
// "EPOCH|" + timeUnit.name()
String format = "1:" + timeUnit.name() + ":EPOCH";
String granularity = "1:" + timeUnit.name();
pinotSchema.addField(new DateTimeFieldSpec(name, dataType, format, granularity));
break;
default:
throw new UnsupportedOperationException("Unsupported field type: " + fieldType + " for field: " + name);
}
}
}
}
Expand Up @@ -164,6 +164,7 @@ void generateDecodeCodeForAMessage(Map<String, MessageDecoderMethod> msgDecodeCo
case FIXED64:
case FIXED32:
case UINT32:
case SFIXED32:
case SFIXED64:
case SINT32:
case SINT64:
Expand Down
@@ -0,0 +1,211 @@
package org.apache.pinot.plugin.inputformat.protobuf;

import com.google.protobuf.Descriptors;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.apache.pinot.plugin.inputformat.protobuf.ProtoBufTestDataGenerator.*;
import static org.junit.Assert.assertEquals;

public class ProtoBufSchemaUtilsTest {

@DataProvider(name = "scalarCases")
public Object[][] scalarCases() {
return new Object[][]{
new Object[] {STRING_FIELD, FieldSpec.DataType.STRING, true},
new Object[] {NULLABLE_STRING_FIELD, FieldSpec.DataType.STRING, true},
new Object[] {REPEATED_STRINGS, FieldSpec.DataType.STRING, false},

new Object[] {INT_FIELD, FieldSpec.DataType.INT, true},
new Object[] {NULLABLE_INT_FIELD, FieldSpec.DataType.INT, true},
new Object[] {REPEATED_INTS, FieldSpec.DataType.INT, false},

new Object[] {LONG_FIELD, FieldSpec.DataType.LONG, true},
new Object[] {NULLABLE_LONG_FIELD, FieldSpec.DataType.LONG, true},
new Object[] {REPEATED_LONGS, FieldSpec.DataType.LONG, false},

new Object[] {DOUBLE_FIELD, FieldSpec.DataType.DOUBLE, true},
new Object[] {NULLABLE_DOUBLE_FIELD, FieldSpec.DataType.DOUBLE, true},
new Object[] {REPEATED_DOUBLES, FieldSpec.DataType.DOUBLE, false},

new Object[] {FLOAT_FIELD, FieldSpec.DataType.FLOAT, true},
new Object[] {NULLABLE_FLOAT_FIELD, FieldSpec.DataType.FLOAT, true},
new Object[] {REPEATED_FLOATS, FieldSpec.DataType.FLOAT, false},

new Object[] {BYTES_FIELD, FieldSpec.DataType.BYTES, true},
new Object[] {NULLABLE_BYTES_FIELD, FieldSpec.DataType.BYTES, true},
new Object[] {REPEATED_BYTES, FieldSpec.DataType.BYTES, false},

new Object[] {BOOL_FIELD, FieldSpec.DataType.BOOLEAN, true},
new Object[] {NULLABLE_BOOL_FIELD, FieldSpec.DataType.BOOLEAN, true},
new Object[] {REPEATED_BOOLS, FieldSpec.DataType.BOOLEAN, false},

new Object[] {ENUM_FIELD, FieldSpec.DataType.STRING, true},
new Object[] {REPEATED_ENUMS, FieldSpec.DataType.STRING, false},
};
}
@Test(dataProvider = "scalarCases")
public void testExtractSchemaWithComplexTypeHandling(
String fieldName, FieldSpec.DataType type, boolean isSingleValue) {
Descriptors.FieldDescriptor desc = ComplexTypes.TestMessage.getDescriptor().findFieldByName(fieldName);
Schema schema = new Schema();
ProtoBufSchemaUtils.extractSchemaWithComplexTypeHandling(
desc,
Collections.emptyList(),
".",
desc.getName(),
schema,
new HashMap<>(),
TimeUnit.SECONDS);
Schema expectedSchema;
if (isSingleValue) {
expectedSchema = new Schema.SchemaBuilder()
.addSingleValueDimension(fieldName, type)
.build();
} else {
expectedSchema = new Schema.SchemaBuilder()
.addMultiValueDimension(fieldName, type)
.build();
}
assertEquals(expectedSchema, schema);
}

@Test
public void testExtractSchemaWithComplexTypeHandlingNestedMessage() {
Descriptors.FieldDescriptor desc = ComplexTypes.TestMessage.getDescriptor().findFieldByName(NESTED_MESSAGE);
Schema schema = new Schema();
ProtoBufSchemaUtils.extractSchemaWithComplexTypeHandling(
desc,
Collections.emptyList(),
".",
desc.getName(),
schema,
new HashMap<>(),
TimeUnit.SECONDS);
Schema expectedSchema = new Schema.SchemaBuilder()
.addSingleValueDimension("nested_message.nested_string_field", FieldSpec.DataType.STRING)
.addSingleValueDimension("nested_message.nested_int_field", FieldSpec.DataType.INT)
.build();
assertEquals(expectedSchema, schema);

schema = new Schema();
ProtoBufSchemaUtils.extractSchemaWithComplexTypeHandling(
desc,
Collections.emptyList(),
"__",
desc.getName(),
schema,
new HashMap<>(),
TimeUnit.SECONDS);
expectedSchema = new Schema.SchemaBuilder()
.addSingleValueDimension("nested_message__nested_string_field", FieldSpec.DataType.STRING)
.addSingleValueDimension("nested_message__nested_int_field", FieldSpec.DataType.INT)
.build();
assertEquals(expectedSchema, schema);


desc = ComplexTypes.TestMessage.getDescriptor().findFieldByName(REPEATED_NESTED_MESSAGES);
schema = new Schema();
ProtoBufSchemaUtils.extractSchemaWithComplexTypeHandling(
desc,
Collections.emptyList(),
"__",
desc.getName(),
schema,
new HashMap<>(),
TimeUnit.SECONDS);
expectedSchema = new Schema.SchemaBuilder()
.addSingleValueDimension(REPEATED_NESTED_MESSAGES, FieldSpec.DataType.STRING)
.build();
assertEquals(expectedSchema, schema);

schema = new Schema();
ProtoBufSchemaUtils.extractSchemaWithComplexTypeHandling(
desc,
Collections.singletonList(REPEATED_NESTED_MESSAGES),
"__",
desc.getName(),
schema,
new HashMap<>(),
TimeUnit.SECONDS);
expectedSchema = new Schema.SchemaBuilder()
.addSingleValueDimension("repeated_nested_messages__nested_string_field", FieldSpec.DataType.STRING)
.addSingleValueDimension("repeated_nested_messages__nested_int_field", FieldSpec.DataType.INT)
.build();
assertEquals(expectedSchema, schema);
}

@Test(dataProvider = "scalarCases")
public void testExtractSchemaWithCompositeTypeHandling(
String fieldName, FieldSpec.DataType type, boolean isSingleValue) {
Descriptors.Descriptor desc = CompositeTypes.CompositeMessage.getDescriptor();
Map<String, FieldSpec.FieldType> fieldTypeMap = new HashMap<>();
fieldTypeMap.put("test_message.long_field", FieldSpec.FieldType.DATE_TIME);
FieldSpec schema = ProtoBufSchemaUtils.getPinotSchemaFromPinotSchemaWithComplexTypeHandling(
desc,
fieldTypeMap,
TimeUnit.SECONDS,
Collections.emptyList(),
".").getFieldSpecFor("test_message." + fieldName);
FieldSpec expectedSchema = new DimensionFieldSpec("test_message." + fieldName, type, isSingleValue);
assertEquals(expectedSchema, schema);
}

@Test
public void testExtractSchemaWithCompositeTypeHandlingWithTimeColumnAndFieldSpec() {
Descriptors.Descriptor desc = CompositeTypes.CompositeMessage.getDescriptor();
Map<String, FieldSpec.FieldType> fieldTypeMap = new HashMap<>();
fieldTypeMap.put("test_message.long_field", FieldSpec.FieldType.DATE_TIME);
fieldTypeMap.put("test_message.int_field", FieldSpec.FieldType.METRIC);
fieldTypeMap.put("test_message.double_field", FieldSpec.FieldType.METRIC);
Schema schema = ProtoBufSchemaUtils.getPinotSchemaFromPinotSchemaWithComplexTypeHandling(
desc,
fieldTypeMap,
TimeUnit.SECONDS,
Collections.emptyList(),
".");
FieldSpec fieldSpec = schema.getFieldSpecFor("test_message.long_field");
FieldSpec expectedSchema = new DateTimeFieldSpec("test_message.long_field", FieldSpec.DataType.LONG,
"1:SECONDS:EPOCH", "1:SECONDS");
assertEquals(expectedSchema, fieldSpec);

fieldSpec = schema.getFieldSpecFor("test_message.int_field");
expectedSchema = new MetricFieldSpec("test_message.int_field", FieldSpec.DataType.INT);
assertEquals(expectedSchema, fieldSpec);

fieldSpec = schema.getFieldSpecFor("test_message.double_field");
expectedSchema = new MetricFieldSpec("test_message.double_field", FieldSpec.DataType.DOUBLE);
assertEquals(expectedSchema, fieldSpec);
}

@Test
public void testGetPinotSchemaFromPinotSchemaWithComplexTypeHandling()
throws URISyntaxException, IOException {
Descriptors.Descriptor desc = CompositeTypes.CompositeMessage.getDescriptor();
Map<String, FieldSpec.FieldType> fieldTypeMap = new HashMap<>();
fieldTypeMap.put("test_message.long_field", FieldSpec.FieldType.DATE_TIME);
Schema schema = ProtoBufSchemaUtils.getPinotSchemaFromPinotSchemaWithComplexTypeHandling(
desc,
fieldTypeMap,
TimeUnit.MILLISECONDS,
Collections.emptyList(),
".");
URL resource = getClass().getClassLoader().getResource("complex_type_schema.json");
Schema expectedSchema = Schema.fromString(new String(Files.readAllBytes(Paths.get(resource.toURI()))));
assertEquals(expectedSchema, schema);
}
}
Expand Up @@ -64,6 +64,7 @@ private ProtoBufTestDataGenerator() {
public static final String REPEATED_FLOATS = "repeated_floats";
public static final String REPEATED_BOOLS = "repeated_bools";
public static final String REPEATED_BYTES = "repeated_bytes";
public static final String REPEATED_ENUMS = "repeated_enums";

public static ComplexTypes.TestMessage getComplexTypeObject(Map<String, Object> inputRecord)
throws IOException {
Expand Down

0 comments on commit 003bf3f

Please sign in to comment.