Skip to content

Commit

Permalink
[INLONG-7660][Sort] Support DDL model for MySQL connector when runnin…
Browse files Browse the repository at this point in the history
…g in all migrate mode (#7846)
  • Loading branch information
EMsnap committed Apr 19, 2023
1 parent 9038d9e commit ab170aa
Show file tree
Hide file tree
Showing 29 changed files with 1,658 additions and 177 deletions.
4 changes: 4 additions & 0 deletions inlong-common/pom.xml
Expand Up @@ -84,6 +84,10 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.ddl.Utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.sf.jsqlparser.statement.create.table.ColDataType;
import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
import org.apache.inlong.sort.protocol.ddl.expressions.Column;
import org.apache.inlong.sort.protocol.ddl.expressions.Column.ColumnBuilder;
import org.apache.inlong.sort.protocol.ddl.expressions.Position;
import org.apache.inlong.sort.protocol.ddl.enums.PositionType;

/**
* Utils for parse from statement in sqlParser to a column object.
*/
public class ColumnUtils {

public static final String DEFAULT = "default";
public static final String NULL = "null";
public static final String NOT = "not";
public static final String COMMENT = "comment";
public static final String AFTER = "after";

/**
* parse column definition to a Column object
* this method is used for alter operation where a first flag is passed
* to determine whether the column is in the first position of one table.
*/
public static Column parseColumnWithPosition(boolean isFirst,
Map<String, Integer> sqlType,
ColumnDefinition columnDefinition) {

ColDataType colDataType = columnDefinition.getColDataType();

List<String> definitions = new ArrayList<>();
if (colDataType.getArgumentsStringList() != null) {
definitions.addAll(colDataType.getArgumentsStringList());
}

List<String> columnSpecs = columnDefinition.getColumnSpecs();

ColumnBuilder columnBuilder = Column.builder();
String columnName = reformatName(columnDefinition.getColumnName());
columnBuilder.name(columnName)
.definition(definitions).isNullable(parseNullable(columnSpecs))
.defaultValue(parseDefaultValue(columnSpecs))
.jdbcType(sqlType.get(columnName))
.comment(parseComment(columnSpecs));

if (isFirst) {
// the column is in the first position of one table
columnBuilder.position(new Position(PositionType.FIRST, null));
} else {
columnBuilder.position(parsePosition(columnSpecs));
}

return columnBuilder.build();
}

/**
* parse column definitions to Column list.
* this method is used for createTable operation.
* @param sqlType the sql type map
* @param columnDefinitions the column definition list
* @return the column list
*/
public static List<Column> parseColumns(Map<String, Integer> sqlType,
List<ColumnDefinition> columnDefinitions) {
List<Column> columns = new ArrayList<>();
columnDefinitions.forEach(columnDefinition -> {
columns.add(parseColumnWithPosition(false, sqlType, columnDefinition));
});
return columns;
}

public static String parseDefaultValue(List<String> specs) {
return removeContinuousQuotes(parseAdjacentString(specs, DEFAULT, false));
}

public static boolean parseNullable(List<String> specs) {
return !parseAdjacentString(specs, NULL, true).equalsIgnoreCase(NOT);
}

public static String parseComment(List<String> specs) {
return removeContinuousQuotes(parseAdjacentString(specs, COMMENT, false));
}

public static Position parsePosition(List<String> specs) {
String afterColumn = reformatName(parseAdjacentString(specs, AFTER, false));
if (!afterColumn.isEmpty()) {
return new Position(PositionType.AFTER, afterColumn);
}
return null;
}

/**
* get the string before or after the specific string in a list
* @param stringList the string list
* @param specificString the specific string
* @param front is front of the specific string
* @return the string before or after the specific string
*/
public static String parseAdjacentString(List<String> stringList,
String specificString, boolean front) {

if (stringList == null || stringList.isEmpty()) {
return "";
}

for (int i = 0; i < stringList.size(); i++) {
if (stringList.get(i).equalsIgnoreCase(specificString)) {
if (front && i > 0) {
return stringList.get(i - 1);
} else if (i < stringList.size() - 1) {
return stringList.get(i + 1);
}
}
}
return "";

}

/**
* remove the continuous char in the string from both sides.
* @param str the input string, target the char to be removed
* @return the string without continuous chars from both sides
*/
public static String removeContinuousChar(String str, char target) {
if (str == null || str.length() < 2) {
return str;
}
int start = 0;
int end = str.length() - 1;
while (start <= end && str.charAt(start) == target) {
start++;
}
while (end >= start && str.charAt(end) == target) {
end--;
}
return str.substring(start, end + 1);
}

public static String removeContinuousQuotes(String str) {
return removeContinuousChar(str, '\'');
}

public static String reformatName(String str) {
return removeContinuousChar(str, '`');
}

}
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.ddl.enums;

/**
* Alter type for alter column operation
*/
public enum AlterType {

RENAME_COLUMN,
ADD_COLUMN,
DROP_COLUMN,
MODIFY_COLUMN,
CHANGE_COLUMN

}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.ddl.enums;

/**
* Index type for create table operation
* only support normal index and primary key
*/
public enum IndexType {

NORMAL_INDEX,
PRIMARY_KEY

}
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.ddl.enums;

/**
* Operation type for ddl operation
*/
public enum OperationType {

CREATE,
ALTER,
DROP,
RENAME,
TRUNCATE,
INSERT,
UPDATE,
DELETE,
OTHER

}
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.ddl.enums;

/**
* Position type for add column operation
*/
public enum PositionType {

/**
* add column to first position of a table
*/
FIRST,

/**
* add column after a certain column
*/
AFTER
}
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.ddl.expressions;

import lombok.Data;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.inlong.sort.protocol.ddl.enums.AlterType;

/**
* Alter column expression.
*/
@JsonInclude(Include.NON_NULL)
@Data
public class AlterColumn {

@JsonProperty("alterType")
private AlterType alterType;

@JsonProperty("newColumn")
private Column newColumn;

@JsonProperty("oldColumn")
private Column oldColumn;

@JsonCreator
public AlterColumn(@JsonProperty("alterType") AlterType alterType,
@JsonProperty("newColumn") Column newColumn,
@JsonProperty("oldColumn") Column oldColumn) {
this.alterType = alterType;
this.newColumn = newColumn;
this.oldColumn = oldColumn;
}

public AlterColumn(@JsonProperty("alterType") AlterType alterType) {
this.alterType = alterType;
}
}

0 comments on commit ab170aa

Please sign in to comment.