diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java index ddd942dc4ed..b1dd5595c39 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java @@ -47,12 +47,13 @@ public class InlongConsumeProcessService { */ public WorkflowResult startProcess(Integer id, String operator) { consumeService.updateStatus(id, ConsumeStatus.TO_BE_APPROVAL.getCode(), operator); - return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, operator, genApplyConsumeProcessForm(id)); + return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, operator, + genApplyConsumeProcessForm(id, operator)); } - private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id) { + private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id, String operator) { ApplyConsumeProcessForm form = new ApplyConsumeProcessForm(); - form.setConsumeInfo(consumeService.get(id)); + form.setConsumeInfo(consumeService.get(id, operator)); return form; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java index cb295cb22c0..72764878fd8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java @@ -53,9 +53,10 @@ public interface InlongConsumeService { * Get inlong consume info based on ID * * @param id inlong consume id + * @param currentUser currentUser * @return detail of inlong group */ - InlongConsumeInfo get(Integer id); + InlongConsumeInfo get(Integer id, String currentUser); /** * Check whether the consumer group exists or not diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java index f22cb174405..bb0b7608b61 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.service.user.UserService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -64,6 +65,8 @@ public class InlongConsumeServiceImpl implements InlongConsumeService { private InlongConsumeEntityMapper consumeMapper; @Autowired private InlongConsumeOperatorFactory consumeOperatorFactory; + @Autowired + private UserService userService; @Override public Integer save(InlongConsumeRequest request, String operator) { @@ -131,13 +134,15 @@ public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId) } @Override - public InlongConsumeInfo get(Integer id) { + public InlongConsumeInfo get(Integer id, String currentUser) { Preconditions.expectNotNull(id, "inlong consume id cannot be null"); InlongConsumeEntity entity = consumeMapper.selectById(id); if (entity == null) { LOGGER.error("inlong consume not found with id={}", id); throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND); } + userService.checkUser(entity.getInCharges(), currentUser, + "Current user does not have permission to get inlong consume"); InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(entity.getMqType()); InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity); @@ -194,8 +199,8 @@ public Integer update(InlongConsumeRequest request, String operator) { Integer consumeId = request.getId(); InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId); Preconditions.expectNotNull(existEntity, "inlong consume not exist with id " + consumeId); - Preconditions.expectTrue(existEntity.getInCharges().contains(operator), - "operator" + operator + " has no privilege for the inlong consume"); + userService.checkUser(existEntity.getInCharges(), operator, + "Current user does not have permission to update inlong consume"); if (!Objects.equals(existEntity.getVersion(), request.getVersion())) { LOGGER.error(String.format("inlong consume has already updated, id=%s, curVersion=%s", @@ -244,6 +249,8 @@ public Boolean delete(Integer id, String operator) { Preconditions.expectNotNull(id, "inlong consume id cannot be null"); InlongConsumeEntity entity = consumeMapper.selectById(id); Preconditions.expectNotNull(entity, "inlong consume not exist with id " + id); + userService.checkUser(entity.getInCharges(), operator, + "Current user does not have permission to delete inlong consume"); entity.setIsDeleted(id); entity.setStatus(ConsumeStatus.DELETED.getCode()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index bfaebf883be..17cd18f2faa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -116,9 +116,10 @@ public interface StreamSinkService { * Paging query stream sink info based on conditions. * * @param request paging request + * @param operator operator * @return sink page list */ - PageResult listByCondition(SinkPageRequest request); + PageResult listByCondition(SinkPageRequest request, String operator); /** * Paging query stream sink info based on conditions. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index f369d22140f..7a4180dffd7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -24,6 +24,12 @@ import com.github.pagehelper.PageHelper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserManager; +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.create.table.ColDataType; +import net.sf.jsqlparser.statement.create.table.ColumnDefinition; +import net.sf.jsqlparser.statement.create.table.CreateTable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; @@ -65,12 +71,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import net.sf.jsqlparser.JSQLParserException; -import net.sf.jsqlparser.parser.CCJSqlParserManager; -import net.sf.jsqlparser.statement.Statement; -import net.sf.jsqlparser.statement.create.table.ColDataType; -import net.sf.jsqlparser.statement.create.table.ColumnDefinition; -import net.sf.jsqlparser.statement.create.table.CreateTable; import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; @@ -81,14 +81,14 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP; import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET; import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; -import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP; -import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP; -import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP; /** * Implementation of sink service interface @@ -315,15 +315,28 @@ public Map> getSinksMap(InlongGroupInfo groupInfo, List } @Override - public PageResult listByCondition(SinkPageRequest request) { + public PageResult listByCondition(SinkPageRequest request, String operator) { Preconditions.expectNotBlank(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY); - + UserInfo userInfo = userService.getByName(operator); + boolean isAdmin = UserTypeEnum.ADMIN.getCode().equals(userInfo.getAccountType()); PageHelper.startPage(request.getPageNum(), request.getPageSize()); OrderFieldEnum.checkOrderField(request); OrderTypeEnum.checkOrderType(request); List entityPage = sinkMapper.selectByCondition(request); Map> sinkMap = Maps.newHashMap(); for (StreamSinkEntity streamSink : entityPage) { + InlongGroupEntity groupEntity = + groupMapper.selectByGroupId(streamSink.getInlongGroupId()); + if (groupEntity == null) { + continue; + } + // only the person in charges can query + if (!isAdmin) { + List inCharges = Arrays.asList(groupEntity.getInCharges().split(InlongConstants.COMMA)); + if (!inCharges.contains(operator)) { + continue; + } + } sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new Page<>()).add(streamSink); } List responseList = Lists.newArrayList(); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java index 3742aaaac0a..f6f847ca87d 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java @@ -73,7 +73,7 @@ public void testAll() { Assertions.assertNotNull(consumeId); // test get operation - InlongConsumeInfo consumeInfo = this.testGet(consumeId); + InlongConsumeInfo consumeInfo = this.testGet(consumeId, GLOBAL_OPERATOR); Assertions.assertEquals(consumeInfo.getId(), consumeId); // test list operation @@ -113,8 +113,8 @@ private Integer testSave() { return consumeService.save(request, GLOBAL_OPERATOR); } - private InlongConsumeInfo testGet(Integer id) { - return consumeService.get(id); + private InlongConsumeInfo testGet(Integer id, String operator) { + return consumeService.get(id, operator); } private PageResult testList() { diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java index 0dafa47657b..98e1686b95e 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java @@ -71,7 +71,7 @@ public Response save(@RequestBody InlongConsumeRequest request) { @ApiOperation(value = "Get inlong consume") @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true) public Response get(@PathVariable(name = "id") Integer id) { - return Response.success(consumeService.get(id)); + return Response.success(consumeService.get(id, LoginUserUtils.getLoginUser().getName())); } @GetMapping(value = "/consume/countStatus") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index 6faf550104d..b761a9221f3 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -74,7 +74,7 @@ public Response get(@PathVariable Integer id) { @RequestMapping(value = "/sink/list", method = RequestMethod.POST) @ApiOperation(value = "List stream sinks by paginating") public Response> listByCondition(@RequestBody SinkPageRequest request) { - return Response.success(sinkService.listByCondition(request)); + return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser().getName())); } @RequestMapping(value = "/sink/update", method = RequestMethod.POST)