Skip to content

Commit

Permalink
[INLONG-8564][Manager] Fix unable to issue tasks when modifying data …
Browse files Browse the repository at this point in the history
…node info (#8565)

* [INLONG-8564][Manager] Fix modify dataNode information unable to issue task

* [INLONG-8564][Manager] Improve the code readability

---------

Co-authored-by: healchow <healchow@gmail.com>
(cherry picked from commit f8039b6)
  • Loading branch information
fuweng11 authored and vernedeng committed Jul 19, 2023
1 parent ccf040e commit 91368c4
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Boolean testConnection(DataNodeRequest request) {
}

@Override
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator) {
LOGGER.info("do nothing for the data node type ={}", request.getType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public interface DataNodeOperator {
* Update related stream source.
*
* @param request data node request
* @param entity data node entity
* @param oldEntity old data node entity
* @param operator operator
*/
void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator);
void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
Expand Down Expand Up @@ -184,68 +185,35 @@ public List<DataNodeInfo> list(DataNodePageRequest request, UserInfo opInfo) {
@Transactional(rollbackFor = Throwable.class)
public Boolean update(DataNodeRequest request, String operator) {
LOGGER.info("begin to update data node by id: {}", request);
// check whether record existed
// check whether the record existed
DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId());
if (curEntity == null) {
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
String.format("data node record not found by id=%d", request.getId()));
}
userService.checkUser(curEntity.getInCharges(), operator,
"Current user does not have permission to update data node info");

// check whether modify unmodifiable parameters
chkUnmodifiableParams(curEntity, request);
// Check whether the data node name exists with the same name and type
if (request.getName() != null) {
if (StringUtils.isBlank(request.getName())) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"the name changed of data node is blank!");
}
DataNodeEntity existEntity =
dataNodeMapper.selectByUniqueKey(request.getName(), request.getType());
if (existEntity != null && !existEntity.getId().equals(request.getId())) {
throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
String.format("data node already exist for name=%s, type=%s, required id=%s, exist id=%s",
request.getName(), request.getType(), request.getId(), existEntity.getId()));
}
}

// after the update operation, `curEntity` will be updated to the latest info by the MyBatis cache mechanism,
// so we need to get an `oldEntity` by copying `curEntity` before the update operation.
DataNodeEntity oldEntity = CommonBeanUtils.copyProperties(curEntity, DataNodeEntity::new);
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
dataNodeOperator.updateOpt(request, operator);
dataNodeOperator.updateRelatedStreamSource(request, curEntity, operator);

// update the related stream sources if the request and old entity ha
dataNodeOperator.updateRelatedStreamSource(request, oldEntity, operator);
LOGGER.info("success to update data node={}", request);
return true;
}

@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(DataNodeRequest request, UserInfo opInfo) {
// check the record existed
DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId());
if (curEntity == null) {
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
String.format("data node record not found by id=%d", request.getId()));
}
userService.checkUser(curEntity.getInCharges(), opInfo.getName(),
"Current user does not have permission to update data node info");
// check whether modify unmodifiable parameters
chkUnmodifiableParams(curEntity, request);
// Check whether the data node name exists with the same name and type
if (request.getName() != null) {
if (StringUtils.isBlank(request.getName())) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"the name changed of data node is blank!");
}
DataNodeEntity existEntity =
dataNodeMapper.selectByUniqueKey(request.getName(), request.getType());
if (existEntity != null && !existEntity.getId().equals(request.getId())) {
throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
String.format("data node already exist for name=%s, type=%s, required id=%s, exist id=%s",
request.getName(), request.getType(), request.getId(), existEntity.getId()));
}
}
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
dataNodeOperator.updateOpt(request, opInfo.getName());
dataNodeOperator.updateRelatedStreamSource(request, curEntity, opInfo.getName());
return true;
String operator = opInfo.getName();
return this.update(request, operator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ public Boolean testConnection(DataNodeRequest request) {
}

@Override
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) request;
MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) this.getFromEntity(entity);
boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), mySQLDataNodeInfo.getUrl())
|| !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), mySQLDataNodeInfo.getBackupUrl())
|| !Objects.equals(mySQLDataNodeRequest.getUsername(), mySQLDataNodeInfo.getUsername())
|| !Objects.equals(mySQLDataNodeRequest.getToken(), mySQLDataNodeInfo.getToken());
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator) {
MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request;
MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo) this.getFromEntity(oldEntity);
boolean changed = !Objects.equals(nodeRequest.getUrl(), nodeInfo.getUrl())
|| !Objects.equals(nodeRequest.getBackupUrl(), nodeInfo.getBackupUrl())
|| !Objects.equals(nodeRequest.getUsername(), nodeInfo.getUsername())
|| !Objects.equals(nodeRequest.getToken(), nodeInfo.getToken());
if (changed) {
retryStreamSourceByDataNodeNameAndType(request.getName(), SourceType.MYSQL_SQL, operator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,8 @@ public void login(UserLoginRequest req) {
public void checkUser(String inCharges, String user, String errMsg) {
UserEntity userEntity = userMapper.selectByName(user);
boolean isInCharge = Preconditions.inSeparatedString(user, inCharges, InlongConstants.COMMA);
Preconditions.expectTrue(
isInCharge || TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()),
errMsg);
Preconditions.expectTrue(isInCharge
|| TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()), errMsg);
}

public void removeInChargeForGroup(String user, String operator) {
Expand Down

0 comments on commit 91368c4

Please sign in to comment.