Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followup fix and improvements on SchemaConformingTransformerV2 #12932

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public class SchemaConformingTransformerV2 implements RecordTransformer {
private static final int MAXIMUM_LUCENE_DOCUMENT_SIZE = 32766;
private static final String MIN_DOCUMENT_LENGTH_DESCRIPTION =
"key length + `:` + shingle index overlap length + one non-overlap char";
private static final List<String> MERGED_TEXT_INDEX_SUFFIX_TO_EXCLUDE = Arrays.asList("_logtype", "_dictionaryVars",
"_encodedVars");

private final boolean _continueOnError;
private final SchemaConformingTransformerV2Config _transformerConfig;
Expand Down Expand Up @@ -190,6 +192,20 @@ public static void validateSchema(@Nonnull Schema schema,
SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, indexableExtrasFieldName);
}

Map<String, String> columnNameToJsonKeyPathMap = transformerConfig.getColumnNameToJsonKeyPathMap();
for (Map.Entry<String, String> entry : columnNameToJsonKeyPathMap.entrySet()) {
String columnName = entry.getKey();
FieldSpec fieldSpec = schema.getFieldSpecFor(entry.getKey());
Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in schema", columnName);
}
Set<String> preserveFieldNames = transformerConfig.getFieldPathsToPreserveInput();
for (String preserveFieldName : preserveFieldNames) {
Preconditions.checkState(
columnNameToJsonKeyPathMap.values().contains(preserveFieldName)
|| schema.getFieldSpecFor(preserveFieldName) != null,
"Preserved path '%s' doesn't exist in columnNameToJsonKeyPathMap or schema", preserveFieldName);
}

validateSchemaAndCreateTree(schema, transformerConfig);
}

Expand Down Expand Up @@ -265,7 +281,7 @@ private static SchemaTreeNode validateSchemaAndCreateTree(@Nonnull Schema schema
currentNode = childNode;
}
}
currentNode.setColumn(jsonKeyPathToColumnNameMap.get(field));
currentNode.setColumn(jsonKeyPathToColumnNameMap.get(field), schema);
}

return rootNode;
Expand Down Expand Up @@ -382,17 +398,21 @@ private ExtraFieldsContainer processField(SchemaTreeNode parentNode, Deque<Strin
}

String keyJsonPath = String.join(".", jsonPath);
if (_transformerConfig.getFieldPathsToPreserveInput().contains(keyJsonPath)) {
outputRecord.putValue(keyJsonPath, value);
return extraFieldsContainer;
}

Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop();
if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) {
return extraFieldsContainer;
}

SchemaTreeNode currentNode = parentNode == null ? null : parentNode.getChild(key);
if (_transformerConfig.getFieldPathsToPreserveInput().contains(keyJsonPath)) {
if (currentNode != null) {
outputRecord.putValue(currentNode.getColumnName(), currentNode.getValue(value));
} else {
outputRecord.putValue(keyJsonPath, value);
}
return extraFieldsContainer;
}
String unindexableFieldSuffix = _transformerConfig.getUnindexableFieldSuffix();
isIndexable = isIndexable && (null == unindexableFieldSuffix || !key.endsWith(unindexableFieldSuffix));
if (!(value instanceof Map)) {
Expand All @@ -406,7 +426,7 @@ private ExtraFieldsContainer processField(SchemaTreeNode parentNode, Deque<Strin
if (_transformerConfig.getFieldsToDoubleIngest().contains(keyJsonPath)) {
extraFieldsContainer.addIndexableEntry(key, value);
}
mergedTextIndexMap.put(keyJsonPath, value);
mergedTextIndexMap.put(currentNode.getColumnName(), value);
} else {
// Out of schema
if (storeIndexableExtras) {
Expand Down Expand Up @@ -609,7 +629,7 @@ private List<String> getLuceneDocumentsFromMergedTextIndexMap(Map<String, Object
.filter(kv -> !_transformerConfig.getMergedTextIndexPathToExclude().contains(kv.getKey())).filter(
kv -> !base64ValueFilter(kv.getValue().toString().getBytes(),
_transformerConfig.getMergedTextIndexBinaryDocumentDetectionMinLength())).filter(
kv -> _transformerConfig.getMergedTextIndexSuffixToExclude().stream()
kv -> MERGED_TEXT_INDEX_SUFFIX_TO_EXCLUDE.stream()
.anyMatch(suffix -> !kv.getKey().endsWith(suffix))).forEach(kv -> {
if (null == mergedTextIndexShinglingOverlapLength) {
generateTextIndexLuceneDocument(kv, luceneDocuments, mergedTextIndexDocumentMaxLength);
Expand Down Expand Up @@ -660,11 +680,12 @@ public boolean isColumn() {
return _isColumn;
}

public void setColumn(String columnName) {
public void setColumn(String columnName, Schema schema) {
if (columnName == null) {
_columnName = getJsonKeyPath();
} else {
_columnName = columnName;
_fieldSpec = schema.getFieldSpecFor(columnName);
}
_isColumn = true;
}
Expand Down Expand Up @@ -711,6 +732,9 @@ public Object getValue(Object value) {
if (value instanceof Object[]) {
return JsonUtils.objectToString(Arrays.asList((Object[]) value));
}
if (value instanceof Map) {
return JsonUtils.objectToString(value);
}
} catch (JsonProcessingException e) {
return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,21 @@ private static TableConfig createDefaultBasicTableConfig() {
IngestionConfig ingestionConfig = new IngestionConfig();
SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
new SchemaConformingTransformerV2Config(true, INDEXABLE_EXTRAS_FIELD_NAME, true, UNINDEXABLE_EXTRAS_FIELD_NAME,
UNINDEXABLE_FIELD_SUFFIX, null, null, null, null, null, null, null, null);
UNINDEXABLE_FIELD_SUFFIX, null, null, null, null, null, null, null, null, null, null);
ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config);
return new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
.build();
}

private static TableConfig createDefaultTableConfig(String indexableExtrasField, String unindexableExtrasField,
String unindexableFieldSuffix, Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve,
String mergedTextIndexField) {
String unindexableFieldSuffix, Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, Map<String,
String> columnNameToJsonKeyPathMap, String mergedTextIndexField) {
IngestionConfig ingestionConfig = new IngestionConfig();
SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
new SchemaConformingTransformerV2Config(indexableExtrasField != null, indexableExtrasField,
unindexableExtrasField != null, unindexableExtrasField, unindexableFieldSuffix, fieldPathsToDrop,
fieldPathsToPreserve, mergedTextIndexField, null, null, null, null, null);
fieldPathsToPreserve, columnNameToJsonKeyPathMap, mergedTextIndexField, null, null, null, null, null,
null);
ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config);
return new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
.build();
Expand Down Expand Up @@ -630,19 +631,24 @@ public void testKeyValueTransformation() {
CustomObjectNode expectedJsonNodeWithMergedTextIndex;
Schema.SchemaBuilder schemaBuilder;

String destColumnName = "someMeaningfulName";
String destStrColumnName = "myStringName";
String destMapColumnName = "myMapName";
// make array field as single value STRING, test the conversion function
// ignore the column nestedFields
// drop the column nestedFields.mapFields
// preserve the entire mapField value
// preserve the nestedFields.arrayField value and test the conversion function
// map the column someMeaningfulName to nestedFields.stringField
// abandon the json_data extra field
// mergedTextIndex should contain columns who are not in preserved or dropped list
schemaBuilder = createDefaultSchemaBuilder().addSingleValueDimension("arrayField", DataType.STRING)
.addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING)
.addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME, DataType.JSON)
.addSingleValueDimension(destColumnName, DataType.STRING);
.addSingleValueDimension(destMapColumnName, DataType.STRING)
.addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME, DataType.STRING)
.addSingleValueDimension(destStrColumnName, DataType.STRING);

Map<String, String> keyMapping = new HashMap<>() {
{
put(destColumnName, TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME);
put(destStrColumnName, TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME);
put(destMapColumnName, TEST_JSON_MAP_FIELD_NAME);
}
};
Set<String> pathToDrop = new HashSet<>() {
Expand All @@ -653,27 +659,21 @@ public void testKeyValueTransformation() {
Set<String> pathToPreserve = new HashSet<>() {
{
add(TEST_JSON_MAP_FIELD_NAME);
add(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME);
}
};

/*
{
"arrayField":[0,1,2,3],
"nestedFields.stringField":"a",
"mapField":{
"myStringName":"a",
"nestedFirlds.arrayField":[0,1,2,3],
"myMapName":{
"arrayField":[0,1,2,3],
"nullField":null,
"stringField":"a",
"intField_noIndex":9,
"string_noIndex":"z"
}
"indexableExtras":{
"nullField":null,
"stringField":"a",
"nestedFields":{
"arrayField":[0, 1, 2, 3],
"nullField":null,
}
},
"unindexableExtras":{
"intField_noIndex":9,
Expand All @@ -689,21 +689,18 @@ public void testKeyValueTransformation() {
}
},
__mergedTextIndex: [
"[0, 1, 2, 3]:arrayField", "a:stringField",
"[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField",
"[0, 1, 2, 3]:arrayField", "a:nestedFields.stringField",
]
}
*/
expectedJsonNode = CustomObjectNode.create()
.set(TEST_JSON_ARRAY_FIELD_NAME, N.textNode("[0,1,2,3]"))
.set(destColumnName, TEST_JSON_STRING_NODE)
.set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
.set(INDEXABLE_EXTRAS_FIELD_NAME,
CustomObjectNode.create().set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
.set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
.set(TEST_JSON_NESTED_MAP_FIELD_NAME,
CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE)
.set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)))
.set(destStrColumnName, TEST_JSON_STRING_NODE)
// For single value field, it would serialize the value whose format is slightly different
.set(destMapColumnName,
N.textNode("{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9,"
+ "\"stringField_noIndex\":\"z\"}"))
.set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME, N.textNode("[0,1,2,3]"))

.set(UNINDEXABLE_EXTRAS_FIELD_NAME,
CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
Expand All @@ -713,9 +710,9 @@ public void testKeyValueTransformation() {
.set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE)));

expectedJsonNodeWithMergedTextIndex = expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME,
N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:nestedFields.arrayField").add(
"a:nestedFields.stringField"));
transformKeyValueTransformation(
N.arrayNode().add("[0,1,2,3]:arrayField").add("a:" + destStrColumnName));
// test with no json_data
transformKeyValueTransformation(null, UNINDEXABLE_EXTRAS_FIELD_NAME, MERGED_TEXT_INDEX_FIELD_NAME,
schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, DataType.STRING).build(), keyMapping,
pathToDrop, pathToPreserve, inputJsonNode, expectedJsonNodeWithMergedTextIndex);
}
Expand All @@ -731,10 +728,10 @@ private void transformWithUnIndexableFieldsAndMergedTextIndex(Schema schema, Jso
null, null, null, inputRecordJsonNode.toString(), ouputRecordJsonNode.toString());
}

private void transformKeyValueTransformation(Schema schema, Map<String, String> keyMapping,
Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, JsonNode inputRecordJsonNode,
JsonNode ouputRecordJsonNode) {
testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, MERGED_TEXT_INDEX_FIELD_NAME, schema,
private void transformKeyValueTransformation(String indexableExtraField, String unindeableExtraField,
String mergedTextIndexField, Schema schema, Map<String, String> keyMapping, Set<String> fieldPathsToDrop,
Set<String> fieldPathsToPreserve, JsonNode inputRecordJsonNode, JsonNode ouputRecordJsonNode) {
testTransform(indexableExtraField, unindeableExtraField, mergedTextIndexField, schema,
keyMapping, fieldPathsToDrop, fieldPathsToPreserve, inputRecordJsonNode.toString(),
ouputRecordJsonNode.toString());
}
Expand All @@ -745,8 +742,7 @@ private void testTransform(String indexableExtrasField, String unindexableExtras
String expectedOutputRecordJSONString) {
TableConfig tableConfig =
createDefaultTableConfig(indexableExtrasField, unindexableExtrasField, UNINDEXABLE_FIELD_SUFFIX,
fieldPathsToDrop, fieldPathsToPreserve, mergedTextIndexField);
tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config().setColumnNameToJsonKeyPathMap(keyMapping);
fieldPathsToDrop, fieldPathsToPreserve, keyMapping, mergedTextIndexField);
GenericRow outputRecord = transformRow(tableConfig, schema, inputRecordJSONString);
Map<String, Object> expectedOutputRecordMap = jsonStringToMap(expectedOutputRecordJSONString);

Expand Down Expand Up @@ -809,8 +805,8 @@ public void testOverlappingSchemaFields() {
Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING)
.addSingleValueDimension("a.b.c", DataType.INT).build();
SchemaConformingTransformerV2.validateSchema(schema,
new SchemaConformingTransformerV2Config(null, INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null,
null, null, null, null, null));
new SchemaConformingTransformerV2Config(null, INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null,
null, null, null, null, null, null, null, null));
} catch (Exception ex) {
fail("Should not have thrown any exception when overlapping schema occurs");
}
Expand All @@ -820,8 +816,8 @@ public void testOverlappingSchemaFields() {
Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a.b.c", DataType.INT)
.addSingleValueDimension("a.b", DataType.STRING).build();
SchemaConformingTransformerV2.validateSchema(schema,
new SchemaConformingTransformerV2Config(null, INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null,
null, null, null, null, null));
new SchemaConformingTransformerV2Config(null, INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null,
null, null, null, null, null, null, null, null));
} catch (Exception ex) {
fail("Should not have thrown any exception when overlapping schema occurs");
}
Expand Down