diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index c2ed3ba4043..882f33f023b 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -84,6 +84,10 @@
commons-collections
commons-collections
+
+ com.github.jsqlparser
+ jsqlparser
+
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
new file mode 100644
index 00000000000..84f74d5674f
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column.ColumnBuilder;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+
+/**
+ * Utils for parse from statement in sqlParser to a column object.
+ */
+public class ColumnUtils {
+
+ public static final String DEFAULT = "default";
+ public static final String NULL = "null";
+ public static final String NOT = "not";
+ public static final String COMMENT = "comment";
+ public static final String AFTER = "after";
+
+ /**
+ * parse column definition to a Column object
+ * this method is used for alter operation where a first flag is passed
+ * to determine whether the column is in the first position of one table.
+ */
+ public static Column parseColumnWithPosition(boolean isFirst,
+ Map sqlType,
+ ColumnDefinition columnDefinition) {
+
+ ColDataType colDataType = columnDefinition.getColDataType();
+
+ List definitions = new ArrayList<>();
+ if (colDataType.getArgumentsStringList() != null) {
+ definitions.addAll(colDataType.getArgumentsStringList());
+ }
+
+ List columnSpecs = columnDefinition.getColumnSpecs();
+
+ ColumnBuilder columnBuilder = Column.builder();
+ String columnName = reformatName(columnDefinition.getColumnName());
+ columnBuilder.name(columnName)
+ .definition(definitions).isNullable(parseNullable(columnSpecs))
+ .defaultValue(parseDefaultValue(columnSpecs))
+ .jdbcType(sqlType.get(columnName))
+ .comment(parseComment(columnSpecs));
+
+ if (isFirst) {
+ // the column is in the first position of one table
+ columnBuilder.position(new Position(PositionType.FIRST, null));
+ } else {
+ columnBuilder.position(parsePosition(columnSpecs));
+ }
+
+ return columnBuilder.build();
+ }
+
+ /**
+ * parse column definitions to Column list.
+ * this method is used for createTable operation.
+ * @param sqlType the sql type map
+ * @param columnDefinitions the column definition list
+ * @return the column list
+ */
+ public static List parseColumns(Map sqlType,
+ List columnDefinitions) {
+ List columns = new ArrayList<>();
+ columnDefinitions.forEach(columnDefinition -> {
+ columns.add(parseColumnWithPosition(false, sqlType, columnDefinition));
+ });
+ return columns;
+ }
+
+ public static String parseDefaultValue(List specs) {
+ return removeContinuousQuotes(parseAdjacentString(specs, DEFAULT, false));
+ }
+
+ public static boolean parseNullable(List specs) {
+ return !parseAdjacentString(specs, NULL, true).equalsIgnoreCase(NOT);
+ }
+
+ public static String parseComment(List specs) {
+ return removeContinuousQuotes(parseAdjacentString(specs, COMMENT, false));
+ }
+
+ public static Position parsePosition(List specs) {
+ String afterColumn = reformatName(parseAdjacentString(specs, AFTER, false));
+ if (!afterColumn.isEmpty()) {
+ return new Position(PositionType.AFTER, afterColumn);
+ }
+ return null;
+ }
+
+ /**
+ * get the string before or after the specific string in a list
+ * @param stringList the string list
+ * @param specificString the specific string
+ * @param front is front of the specific string
+ * @return the string before or after the specific string
+ */
+ public static String parseAdjacentString(List stringList,
+ String specificString, boolean front) {
+
+ if (stringList == null || stringList.isEmpty()) {
+ return "";
+ }
+
+ for (int i = 0; i < stringList.size(); i++) {
+ if (stringList.get(i).equalsIgnoreCase(specificString)) {
+ if (front && i > 0) {
+ return stringList.get(i - 1);
+ } else if (i < stringList.size() - 1) {
+ return stringList.get(i + 1);
+ }
+ }
+ }
+ return "";
+
+ }
+
+ /**
+ * remove the continuous char in the string from both sides.
+ * @param str the input string, target the char to be removed
+ * @return the string without continuous chars from both sides
+ */
+ public static String removeContinuousChar(String str, char target) {
+ if (str == null || str.length() < 2) {
+ return str;
+ }
+ int start = 0;
+ int end = str.length() - 1;
+ while (start <= end && str.charAt(start) == target) {
+ start++;
+ }
+ while (end >= start && str.charAt(end) == target) {
+ end--;
+ }
+ return str.substring(start, end + 1);
+ }
+
+ public static String removeContinuousQuotes(String str) {
+ return removeContinuousChar(str, '\'');
+ }
+
+ public static String reformatName(String str) {
+ return removeContinuousChar(str, '`');
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
new file mode 100644
index 00000000000..c6813b39988
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
+
+ RENAME_COLUMN,
+ ADD_COLUMN,
+ DROP_COLUMN,
+ MODIFY_COLUMN,
+ CHANGE_COLUMN
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
new file mode 100644
index 00000000000..c140e5b8d8a
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Index type for create table operation
+ * only support normal index and primary key
+ */
+public enum IndexType {
+
+ NORMAL_INDEX,
+ PRIMARY_KEY
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
new file mode 100644
index 00000000000..b5b6dee8235
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Operation type for ddl operation
+ */
+public enum OperationType {
+
+ CREATE,
+ ALTER,
+ DROP,
+ RENAME,
+ TRUNCATE,
+ INSERT,
+ UPDATE,
+ DELETE,
+ OTHER
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
new file mode 100644
index 00000000000..1eb15a59f99
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Position type for add column operation
+ */
+public enum PositionType {
+
+ /**
+ * add column to first position of a table
+ */
+ FIRST,
+
+ /**
+ * add column after a certain column
+ */
+ AFTER
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
new file mode 100644
index 00000000000..23b3e1b7e80
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+
+/**
+ * Alter column expression.
+ */
+@JsonInclude(Include.NON_NULL)
+@Data
+public class AlterColumn {
+
+ @JsonProperty("alterType")
+ private AlterType alterType;
+
+ @JsonProperty("newColumn")
+ private Column newColumn;
+
+ @JsonProperty("oldColumn")
+ private Column oldColumn;
+
+ @JsonCreator
+ public AlterColumn(@JsonProperty("alterType") AlterType alterType,
+ @JsonProperty("newColumn") Column newColumn,
+ @JsonProperty("oldColumn") Column oldColumn) {
+ this.alterType = alterType;
+ this.newColumn = newColumn;
+ this.oldColumn = oldColumn;
+ }
+
+ public AlterColumn(@JsonProperty("alterType") AlterType alterType) {
+ this.alterType = alterType;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
new file mode 100644
index 00000000000..612e0f7e973
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Column represents a column in a table.
+ */
+@Data
+@Builder
+@JsonInclude(Include.NON_NULL)
+public class Column {
+
+ @JsonProperty("name")
+ private String name;
+ @JsonProperty("definition")
+ private List definition;
+ @JsonProperty("jdbcType")
+ private int jdbcType;
+ @JsonProperty("position")
+ private Position position;
+ @JsonProperty("isNullable")
+ private boolean isNullable;
+ @JsonProperty("defaultValue")
+ private String defaultValue;
+ @JsonProperty("comment")
+ private String comment;
+
+ @JsonCreator
+ public Column(@JsonProperty("name") String name, @JsonProperty("definition") List definition,
+ @JsonProperty("jdbcType") int jdbcType, @JsonProperty("position") Position position,
+ @JsonProperty("isNullable") boolean isNullable, @JsonProperty("defaultValue") String defaultValue,
+ @JsonProperty("comment") String comment) {
+ this.name = name;
+ this.definition = definition;
+ this.jdbcType = jdbcType;
+ this.position = position;
+ this.defaultValue = defaultValue;
+ this.comment = comment;
+ this.isNullable = isNullable;
+ }
+
+ public Column(@JsonProperty("name") String name) {
+ this.name = name;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
new file mode 100644
index 00000000000..d07cdff853a
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+
+/**
+ * Position represents the position of a column in a table.
+ * It can be either before or after a specific column.
+ */
+@JsonInclude(Include.NON_NULL)
+@Data
+public class Position {
+
+ @JsonProperty("positionType")
+ private PositionType positionType;
+
+ @JsonProperty("columnName")
+ private String columnName;
+
+ @JsonCreator
+ public Position(@JsonProperty("positionType") PositionType positionType,
+ @JsonProperty("columnName") String columnName) {
+ this.positionType = positionType;
+ this.columnName = columnName;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
new file mode 100644
index 00000000000..e518865b1b1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.indexes;
+
+import java.util.List;
+import lombok.Data;
+import org.apache.inlong.sort.protocol.ddl.enums.IndexType;
+
+/**
+ * Index for create table operation
+ */
+@Data
+public class Index {
+
+ private IndexType indexType;
+
+ private String indexName;
+
+ private List indexColumns;
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
new file mode 100644
index 00000000000..a04bb4c4a69
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+
+/**
+ * Alter operation which contains alter columns.
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("alterOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class AlterOperation extends Operation {
+
+ @JsonProperty("alterColumns")
+ private List alterColumns;
+
+ @JsonCreator
+ public AlterOperation(@JsonProperty("alterColumns") List alterColumns) {
+ super(OperationType.ALTER);
+ this.alterColumns = alterColumns;
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
new file mode 100644
index 00000000000..63db1cd8cc1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.indexes.Index;
+
+/**
+ * CreateTableOperation represents a create table operation
+ * it can be "create table like" or "create table with columns and indexes"
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("createTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class CreateTableOperation extends Operation {
+
+ @JsonProperty("columns")
+ private List columns;
+
+ @JsonProperty("indexes")
+ private List indexes;
+
+ @JsonProperty("likeTable")
+ private String likeTable;
+
+ @JsonProperty("comment")
+ private String comment;
+
+ @JsonCreator
+ public CreateTableOperation(@JsonProperty("columns") List columns,
+ @JsonProperty("indexes") List indexes,
+ @JsonProperty("likeTable") String likeTable,
+ @JsonProperty("comment") String comment) {
+ super(OperationType.CREATE);
+ this.columns = columns;
+ this.indexes = indexes;
+ this.likeTable = likeTable;
+ this.comment = comment;
+ }
+
+ public CreateTableOperation() {
+ super(OperationType.CREATE);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
new file mode 100644
index 00000000000..34921a18893
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+/**
+ * DropTableOperation represents a drop operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("dropTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class DropTableOperation extends Operation {
+
+ @JsonCreator
+ public DropTableOperation() {
+ super(OperationType.DROP);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
new file mode 100644
index 00000000000..3d44a242cef
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+/**
+ * Operation represents a ddl operation.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = AlterOperation.class, name = "alterOperation"),
+ @JsonSubTypes.Type(value = CreateTableOperation.class, name = "createTableOperation"),
+ @JsonSubTypes.Type(value = DropTableOperation.class, name = "dropTableOperation"),
+ @JsonSubTypes.Type(value = TruncateTableOperation.class, name = "truncateTableOperation"),
+ @JsonSubTypes.Type(value = RenameTableOperation.class, name = "renameTableOperation")
+})
+@Data
+@NoArgsConstructor
+public abstract class Operation {
+
+ @JsonProperty("operationType")
+ private OperationType operationType;
+
+ public Operation(@JsonProperty("operationType") OperationType type) {
+ this.operationType = type;
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
new file mode 100644
index 00000000000..7dc7205bc22
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+/**
+ * RenameTableOperation represents a rename operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("renameTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class RenameTableOperation extends Operation {
+
+ @JsonCreator
+ public RenameTableOperation() {
+ super(OperationType.RENAME);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
new file mode 100644
index 00000000000..b30af9b07a9
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+/**
+ * TruncateTableOperation represents a truncate operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("truncateTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class TruncateTableOperation extends Operation {
+
+ @JsonCreator
+ public TruncateTableOperation() {
+ super(OperationType.TRUNCATE);
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 536ffb57e4a..9e26d58c432 100644
--- a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -746,6 +746,7 @@ private void extractDdlRecord(SourceRecord record, Collector out, Table
emit(record, insert, tableSchema, out);
} catch (Exception e) {
LOG.error("Failed to extract DDL record {}", record, e);
+ throw new RuntimeException(e);
}
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 94f7b2c609b..695c4363d7a 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -27,8 +27,15 @@
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.Map.Entry;
+import java.util.Set;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.RenameTableStatement;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
@@ -46,6 +53,8 @@
import java.util.Map;
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -57,6 +66,7 @@
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.toSnapshotRecord;
+import static org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.reformatName;
/**
* The {@link RecordEmitter} implementation for {@link MySqlSourceReader}.
@@ -87,6 +97,7 @@ public final class MySqlRecordEmitter
private volatile long snapProcessTime = 0L;
private boolean includeIncremental;
+ public final ObjectMapper objectMapper = new ObjectMapper();
public MySqlRecordEmitter(
DebeziumDeserializationSchema debeziumDeserializationSchema,
@@ -126,8 +137,9 @@ public void emitRecord(SourceRecord element, SourceOutput output, MySqlSplitS
if (tableChanges.isEmpty()) {
TableId tableId = RecordUtils.getTableId(element);
- // if this table is one of the captured tables, output the ddl element
- if (splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+ // if this table is one of the captured tables, output the ddl element.
+ if (splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
+ || shouldOutputRenameDdl(element, tableId)) {
outputDdlElement(element, output, splitState, null);
}
}
@@ -185,6 +197,34 @@ public void close() {
}
}
+ /**
+ * if rename operation is "rename a to b" where a is the captured table
+ * this method extract table names a and b, if any of table name is the captured table
+ * we should output ddl element
+ */
+ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
+ try {
+ String ddl = objectMapper.readTree(((Struct) element.value()).get(HISTORY_RECORD_FIELD).toString())
+ .get(DDL_FIELD_NAME).asText();
+ Statement statement = CCJSqlParserUtil.parse(ddl);
+ if (statement instanceof RenameTableStatement) {
+ RenameTableStatement renameTableStatement = (RenameTableStatement) statement;
+ Set> tableNames = renameTableStatement.getTableNames();
+ for (Entry entry : tableNames) {
+ Table oldTable = entry.getKey();
+ Table newTable = entry.getValue();
+ if (reformatName(oldTable.getName()).equals(tableId.table()) ||
+ reformatName(newTable.getName()).equals(tableId.table())) {
+ return true;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("parse ddl error {}", element, e);
+ }
+ return false;
+ }
+
private void updateSnapshotRecord(SourceRecord element, MySqlSplitState splitState) {
if (splitState.isSnapshotSplitState() && includeIncremental) {
toSnapshotRecord(element);
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index a4499389157..498244e4575 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -17,17 +17,16 @@
package org.apache.inlong.sort.cdc.mysql.table;
-import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
-
-import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getCanalData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getDebeziumData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getMetaData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getOpType;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.util.LinkedHashMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericArrayData;
@@ -38,17 +37,11 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
-import org.apache.inlong.sort.cdc.base.util.RecordUtils;
-import org.apache.inlong.sort.formats.json.canal.CanalJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
@@ -163,28 +156,7 @@ public Object read(SourceRecord record) {
@Override
public Object read(SourceRecord record,
@Nullable TableChanges.TableChange tableSchema, RowData rowData) {
- // construct debezium json
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
- GenericRowData data = (GenericRowData) rowData;
- Map field = (Map) data.getField(0);
-
- Source source = Source.builder().db(getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY))
- .table(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY))
- .name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
- .sqlType(getSqlType(tableSchema))
- .pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema))
- .build();
- DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
- .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
- .tableChange(tableSchema).incremental(isSnapshotRecord(sourceStruct)).build();
-
- try {
- return StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
- } catch (Exception e) {
- throw new IllegalStateException("exception occurs when get meta data", e);
- }
+ return getDebeziumData(record, tableSchema, (GenericRowData) rowData);
}
}),
@@ -436,46 +408,6 @@ public Object read(SourceRecord record) {
}
});
- private static StringData getCanalData(SourceRecord record, GenericRowData rowData,
- TableChange tableSchema) {
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
- // tableName
- String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
- // databaseName
- String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
- // opTs
- long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
- // actual data
- GenericRowData data = rowData;
- Map field = (Map) data.getField(0);
- List