Skip to content

Commit

Permalink
[INLONG-7774][Manager] Add permission verification for StreamSource (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Apr 4, 2023
1 parent fa68889 commit 231dbd5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Expand Up @@ -101,7 +101,8 @@ public Integer save(SourceRequest request, String operator) {
// Check if it can be added
String groupId = request.getInlongGroupId();
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);

// only the person in charges can query
userService.checkUser(groupEntity.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
String streamId = request.getInlongStreamId();
String sourceName = request.getSourceName();
List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
Expand Down Expand Up @@ -336,7 +337,12 @@ public Boolean update(SourceRequest request, String operator) {
// Check if it can be modified
String groupId = request.getInlongGroupId();
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);

if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", groupEntity.getInlongGroupId()));
}
// only the person in charges can query
userService.checkUser(groupEntity.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType());
// Remove id in sourceField when save
List<StreamField> streamFields = request.getFieldList();
Expand Down Expand Up @@ -399,6 +405,15 @@ public Boolean delete(Integer id, String operator) {
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));

// Check if it can be delete
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(entity.getInlongGroupId());
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", entity.getInlongGroupId()));
}
// only the person in charges can query
userService.checkUser(groupEntity.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());

SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
// if source is frozen|failed|new, or if it is a template source or auto push source, delete directly
Expand Down Expand Up @@ -435,7 +450,7 @@ public Boolean delete(Integer id, UserInfo opInfo) {
Preconditions.expectNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());

// Check if it can be added
// Check if it can be delete
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(entity.getInlongGroupId());
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
Expand Down Expand Up @@ -495,6 +510,13 @@ public Boolean restart(Integer id, String operator) {
LOGGER.info("begin to restart source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.expectNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(entity.getInlongGroupId());
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", entity.getInlongGroupId()));
}
// only the person in charges can query
userService.checkUser(groupEntity.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());

StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
SourceRequest sourceRequest = new SourceRequest();
Expand All @@ -511,6 +533,13 @@ public Boolean stop(Integer id, String operator) {
LOGGER.info("begin to stop source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.expectNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(entity.getInlongGroupId());
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", entity.getInlongGroupId()));
}
// only the person in charges can query
userService.checkUser(groupEntity.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());

StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
SourceRequest sourceRequest = new SourceRequest();
Expand Down
Expand Up @@ -63,7 +63,7 @@ public Response<Integer> save(@Validated(SaveValidation.class) @RequestBody Sour
@ApiOperation(value = "Get stream source")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
public Response<StreamSource> get(@PathVariable Integer id) {
return Response.success(sourceService.get(id));
return Response.success(sourceService.get(id, LoginUserUtils.getLoginUser()));
}

@RequestMapping(value = "/source/list", method = RequestMethod.POST)
Expand Down

0 comments on commit 231dbd5

Please sign in to comment.