Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9867][Manager] Unified configuration process for standalone and sortflink #9868

Merged
merged 12 commits into from
May 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
@Builder
public class SortClusterConfig implements Serializable {

private String clusterTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.inlong.common.pojo.sort.node.NodeConfig;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
@Builder
public class SortTaskConfig implements Serializable {

private String sortTaskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.Map;

@Data
@Builder
public class DataFlowConfig implements Serializable {

private String dataflowId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.common.pojo.sort.dataflow;

import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;

Expand All @@ -32,5 +33,6 @@ public class SourceConfig implements Serializable {
private String subscription;
private String encodingType;
private DeserializationConfig deserializationConfig;
private DataTypeConfig dataTypeConfig;
private List<FieldConfig> fieldConfigs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sort.dataflow.deserialization;
package org.apache.inlong.common.pojo.sort.dataflow.dataType;

import lombok.Data;

@Data
public class CsvDeserializationConfig implements DeserializationConfig {
public class CsvConfig implements DataTypeConfig {

private char delimiter;
private Character escapeChar;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sort.dataflow.dataType;

import org.apache.inlong.common.constant.DeserializationType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;

import java.io.Serializable;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = CsvConfig.class, name = DeserializationType.CSV),
@JsonSubTypes.Type(value = KvConfig.class, name = DeserializationType.KV),
})
public interface DataTypeConfig extends Serializable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sort.dataflow.deserialization;
package org.apache.inlong.common.pojo.sort.dataflow.dataType;

import lombok.Data;

@Data
public class KvDeserializationConfig implements DeserializationConfig {
public class KvConfig implements DataTypeConfig {

private char entrySplitter;
private char kvSplitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = DeserializationType.INLONG_MSG_PB),
@JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = DeserializationType.CSV),
@JsonSubTypes.Type(value = KvDeserializationConfig.class, name = DeserializationType.KV),
})
public interface DeserializationConfig extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@
public class InlongMsgDeserializationConfig implements DeserializationConfig {

private String streamId;
private DeserializationConfig innerDeserializationConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@
@Data
public class InlongMsgPbDeserialiationConfig implements DeserializationConfig {

private final String compressionType;
private DeserializationConfig innerDeserializationConfig;
private String compressionType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sortstandalone;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SortConfigResponse {

public static final int SUCC = 0;
public static final int NO_UPDATE = 1;
public static final int FAIL = -1;
public static final int REQUEST_PARAMS_ERROR = -101;

String msg;
int code;
String md5;
String data;

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public enum ProcessName {
/**
* Delete inlong stream process
*/
DELETE_STREAM_RESOURCE("Delete Stream");
DELETE_STREAM_RESOURCE("Delete Stream"),

/**
* Create cluster resource process
*/
CREATE_CLUSTER_RESOURCE("Create Cluster");
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved

private final String displayName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao.entity;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* Cluster config entity, including cluster tag, etc.
*/
@Data
public class ClusterConfigEntity implements Serializable {

private static final long serialVersionUID = 1L;
private Integer id;
private String clusterTag;
private String clusterType;
private String configParams;
private Integer isDeleted;
private String creator;
private String modifier;
private Date createTime;
private Date modifyTime;
private Integer version;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao.entity;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* Sort config entity, including sink type, sink id, etc.
*/
@Data
public class SortConfigEntity implements Serializable {

private static final long serialVersionUID = 1L;
private Integer id;
private Integer sinkId;
private String sinkType;
private String inlongClusterTag;
private String inlongClusterName;
private String sortTaskName;
private String dataNodeName;
private String configParams;
private Integer isDeleted;
private String creator;
private String modifier;
private Date createTime;
private Date modifyTime;
private Integer version;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao.mapper;

import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
import org.apache.inlong.manager.dao.entity.ClusterConfigEntity;

import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.mapping.ResultSetType;
import org.springframework.stereotype.Repository;

@Repository
public interface ClusterConfigEntityMapper {

int insert(ClusterConfigEntity record);

ClusterConfigEntity selectByPrimaryKey(Integer id);

ClusterConfigEntity selectByClusterTag(@Param("clusterTag") String clusterTag);

int updateByIdSelective(ClusterConfigEntity record);

@MultiTenantQuery(with = false)
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
Cursor<ClusterConfigEntity> selectAllClusterConfigs();

boolean deleteByClusterTag(@Param("clusterTag") String clusterTag);

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ List<Map<String, Object>> countGroupByUser(@Param(value = "username") String use

List<InlongGroupEntity> selectByClusterTag(@Param(value = "inlongClusterTag") String inlongClusterTag);

@MultiTenantQuery(with = false)
List<InlongGroupEntity> selectByClusterTagWithoutTenant(@Param(value = "inlongClusterTag") String inlongClusterTag);

List<InlongGroupEntity> selectByTopicRequest(InlongGroupTopicRequest request);

List<InlongGroupEntity> selectByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);

/**
* Select all group info for sort sdk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ InlongGroupExtEntity selectByUniqueKey(
@Param("inlongGroupId") String inlongGroupId,
@Param("keyName") String keyName);

List<String> selectGroupIdByKeyNameAndValue(@Param("keyName") String keyName, @Param("keyValue") String keyValue);

@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
Cursor<InlongGroupExtEntity> selectByKeyName(@Param("keyName") String keyName);

Expand Down