Skip to content

Commit

Permalink
[INLONG-7893][Manager] Support field description when parsing field b…
Browse files Browse the repository at this point in the history
…y JSON (#7894)
  • Loading branch information
featzhang committed Apr 24, 2023
1 parent d5bf178 commit fc19b4f
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 100 deletions.
Expand Up @@ -168,4 +168,19 @@ public class InlongConstants {
public static final Set<String> STREAM_FIELD_TYPES =
Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp");

/**
* The name prop when batch parsing fields in JSON mode
*/
public static final String BATCH_PARSING_FILED_JSON_NAME_PROP = "name";

/**
* The type prop when batch parsing fields in JSON mode
*/
public static final String BATCH_PARSING_FILED_JSON_TYPE_PROP = "type";

/**
* The comment prop when batch parsing fields in JSON mode
*/
public static final String BATCH_PARSING_FILED_JSON_COMMENT_PROP = "desc";

}
Expand Up @@ -76,15 +76,19 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;

/**
* Implementation of sink service interface
Expand All @@ -93,7 +97,7 @@
public class StreamSinkServiceImpl implements StreamSinkService {

private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]");
private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;

Expand Down Expand Up @@ -291,7 +295,8 @@ public List<SinkBriefInfo> listBrief(String groupId, String streamId) {
Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);

List<SinkBriefInfo> summaryList = sinkMapper.selectSummary(groupId, streamId);
LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
LOGGER.debug("success to list sink summary by groupId={}, streamId={}", groupId, streamId);

return summaryList;
}

Expand Down Expand Up @@ -710,25 +715,21 @@ public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
String method = parseFieldRequest.getMethod();
String statement = parseFieldRequest.getStatement();

Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
} else if (STATEMENT_TYPE_SQL.equals(method)) {
return parseFieldsBySql(statement);
} else {
return parseFieldsByCsv(statement);
switch (method) {
case STATEMENT_TYPE_JSON:
return parseFieldsByJson(statement);
case STATEMENT_TYPE_SQL:
return parseFieldsBySql(statement);
case STATEMENT_TYPE_CSV:
return parseFieldsByCsv(statement);
default:
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
String.format("Unsupported parse mode: %s", method));
}
return fieldsMap.entrySet().stream().map(entry -> {
SinkField field = new SinkField();
field.setFieldName(entry.getKey());
field.setFieldType(entry.getValue());
return field;
}).collect(Collectors.toList());

} catch (Exception e) {
LOGGER.error("parse sink fields error", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
String.format("parse sink fields error : %s", e.getMessage()));
String.format("parse sink fields error: %s", e.getMessage()));
}
}

Expand All @@ -741,7 +742,7 @@ private List<SinkField> parseFieldsByCsv(String statement) {
continue;
}

String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
String[] cols = PARSE_FIELD_CSV_SPLITTER.split(line, PARSE_FIELD_CSV_MAX_COLUMNS);
if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"At least two fields are required, line number is " + (i + 1));
Expand Down Expand Up @@ -771,21 +772,28 @@ private List<SinkField> parseFieldsBySql(String sql) throws JSQLParserException
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
List<SinkField> fields = new ArrayList<>();
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (ColumnDefinition definition : columnDefinitions) {
// get field name
String columnName = definition.getColumnName();
ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
// get field type
String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase();
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (!(statement instanceof CreateTable)) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");
}
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (ColumnDefinition definition : columnDefinitions) {
// get field name
String columnName = definition.getColumnName();
ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
SinkField sinkField = new SinkField();
sinkField.setFieldName(columnName);
// get field type
String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase();
sinkField.setFieldType(realDataType);
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (CollectionUtils.isNotEmpty(columnSpecs)) {
int commentIndex = -1;
for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) {
for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) {
String spec = columnSpecs.get(csIndex);
if (spec.toUpperCase().startsWith("COMMENT")) {
commentIndex = csIndex;
Expand All @@ -796,25 +804,26 @@ private List<SinkField> parseFieldsBySql(String sql) throws JSQLParserException
if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) {
comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", "");
}

SinkField sinkField = new SinkField();
sinkField.setFieldName(columnName);
sinkField.setFieldType(realDataType);
sinkField.setFieldComment(comment);
fields.add(sinkField);
}
} else {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");

fields.add(sinkField);
}
return fields;
}

private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
// Use LinkedHashMap deserialization to keep the order of the fields
return objectMapper.readValue(statement,
new TypeReference<LinkedHashMap<String, String>>() {
});
private List<SinkField> parseFieldsByJson(String statement) throws JsonProcessingException {
return objectMapper.readValue(statement, new TypeReference<List<Map<String, String>>>() {
}).stream().map(line -> {
String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP);
String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP);
String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP);
SinkField sinkField = new SinkField();
sinkField.setFieldName(name);
sinkField.setFieldType(type);
sinkField.setFieldComment(desc);
return sinkField;
}).collect(Collectors.toList());
}

private void checkSinkRequestParams(SinkRequest request) {
Expand Down
Expand Up @@ -81,16 +81,19 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FIELD_TYPES;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;

Expand Down Expand Up @@ -740,21 +743,17 @@ public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
String method = parseFieldRequest.getMethod();
String statement = parseFieldRequest.getStatement();

Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
} else if (STATEMENT_TYPE_SQL.equals(method)) {
return parseFieldsBySql(statement);
} else {
return parseFieldsByCsv(statement);
switch (method) {
case STATEMENT_TYPE_JSON:
return parseFieldsByJson(statement);
case STATEMENT_TYPE_SQL:
return parseFieldsBySql(statement);
case STATEMENT_TYPE_CSV:
return parseFieldsByCsv(statement);
default:
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
String.format("Unsupported parse field mode: %s", method));
}
return fieldsMap.entrySet().stream().map(entry -> {
StreamField field = new StreamField();
field.setFieldName(entry.getKey());
field.setFieldType(entry.getValue());
return field;
}).collect(Collectors.toList());

} catch (Exception e) {
LOGGER.error("parse inlong stream fields error", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
Expand Down Expand Up @@ -831,27 +830,35 @@ private List<StreamField> parseFieldsBySql(String sql) throws JSQLParserExceptio
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
List<StreamField> fields = new ArrayList<>();
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (int i = 0; i < columnDefinitions.size(); i++) {
ColumnDefinition definition = columnDefinitions.get(i);
// get field name
String columnName = definition.getColumnName();
ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
// convert SQL type to Java type
Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType);
if (clazz == Object.class) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType);
}
String type = clazz.getSimpleName().toLowerCase();
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (!(statement instanceof CreateTable)) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");
}
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (int i = 0; i < columnDefinitions.size(); i++) {
ColumnDefinition definition = columnDefinitions.get(i);
StreamField streamField = new StreamField();
// get field name
String columnName = definition.getColumnName();
streamField.setFieldName(columnName);

ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
// convert SQL type to Java type
Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType);
if (clazz == Object.class) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType);
}
String type = clazz.getSimpleName().toLowerCase();
streamField.setFieldType(type);
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (CollectionUtils.isNotEmpty(columnSpecs)) {
int commentIndex = -1;
for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) {
for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) {
String spec = columnSpecs.get(csIndex);
if (spec.toUpperCase().startsWith("COMMENT")) {
commentIndex = csIndex;
Expand All @@ -862,25 +869,25 @@ private List<StreamField> parseFieldsBySql(String sql) throws JSQLParserExceptio
if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) {
comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", "");
}

StreamField streamField = new StreamField();
streamField.setFieldName(columnName);
streamField.setFieldType(type);
streamField.setFieldComment(comment);
fields.add(streamField);
}
} else {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");
fields.add(streamField);
}
return fields;
}

private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
// Use LinkedHashMap deserialization to keep the order of the fields
return objectMapper.readValue(statement,
new TypeReference<LinkedHashMap<String, String>>() {
});
private List<StreamField> parseFieldsByJson(String statement) throws JsonProcessingException {
return objectMapper.readValue(statement, new TypeReference<List<Map<String, String>>>() {
}).stream().map(line -> {
String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP);
String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP);
String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP);
StreamField streamField = new StreamField();
streamField.setFieldName(name);
streamField.setFieldType(type);
streamField.setFieldComment(desc);
return streamField;
}).collect(Collectors.toList());
}

/**
Expand Down

0 comments on commit fc19b4f

Please sign in to comment.