Skip to content

Commit

Permalink
[INLONG-7948][Manager] Add user authentication when operate inlong co…
Browse files Browse the repository at this point in the history
…nsume (#7949)
  • Loading branch information
fuweng11 committed May 1, 2023
1 parent d0eb134 commit 7214e0b
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 24 deletions.
Expand Up @@ -47,12 +47,13 @@ public class InlongConsumeProcessService {
*/
public WorkflowResult startProcess(Integer id, String operator) {
consumeService.updateStatus(id, ConsumeStatus.TO_BE_APPROVAL.getCode(), operator);
return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, operator, genApplyConsumeProcessForm(id));
return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, operator,
genApplyConsumeProcessForm(id, operator));
}

private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id) {
private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id, String operator) {
ApplyConsumeProcessForm form = new ApplyConsumeProcessForm();
form.setConsumeInfo(consumeService.get(id));
form.setConsumeInfo(consumeService.get(id, operator));
return form;
}

Expand Down
Expand Up @@ -53,9 +53,10 @@ public interface InlongConsumeService {
* Get inlong consume info based on ID
*
* @param id inlong consume id
* @param currentUser currentUser
* @return detail of inlong group
*/
InlongConsumeInfo get(Integer id);
InlongConsumeInfo get(Integer id, String currentUser);

/**
* Check whether the consumer group exists or not
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
private InlongConsumeEntityMapper consumeMapper;
@Autowired
private InlongConsumeOperatorFactory consumeOperatorFactory;
@Autowired
private UserService userService;

@Override
public Integer save(InlongConsumeRequest request, String operator) {
Expand Down Expand Up @@ -131,13 +134,15 @@ public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId)
}

@Override
public InlongConsumeInfo get(Integer id) {
public InlongConsumeInfo get(Integer id, String currentUser) {
Preconditions.expectNotNull(id, "inlong consume id cannot be null");
InlongConsumeEntity entity = consumeMapper.selectById(id);
if (entity == null) {
LOGGER.error("inlong consume not found with id={}", id);
throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
}
userService.checkUser(entity.getInCharges(), currentUser,
"Current user does not have permission to get inlong consume");

InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(entity.getMqType());
InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity);
Expand Down Expand Up @@ -194,8 +199,8 @@ public Integer update(InlongConsumeRequest request, String operator) {
Integer consumeId = request.getId();
InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
Preconditions.expectNotNull(existEntity, "inlong consume not exist with id " + consumeId);
Preconditions.expectTrue(existEntity.getInCharges().contains(operator),
"operator" + operator + " has no privilege for the inlong consume");
userService.checkUser(existEntity.getInCharges(), operator,
"Current user does not have permission to update inlong consume");

if (!Objects.equals(existEntity.getVersion(), request.getVersion())) {
LOGGER.error(String.format("inlong consume has already updated, id=%s, curVersion=%s",
Expand Down Expand Up @@ -244,6 +249,8 @@ public Boolean delete(Integer id, String operator) {
Preconditions.expectNotNull(id, "inlong consume id cannot be null");
InlongConsumeEntity entity = consumeMapper.selectById(id);
Preconditions.expectNotNull(entity, "inlong consume not exist with id " + id);
userService.checkUser(entity.getInCharges(), operator,
"Current user does not have permission to delete inlong consume");

entity.setIsDeleted(id);
entity.setStatus(ConsumeStatus.DELETED.getCode());
Expand Down
Expand Up @@ -116,9 +116,10 @@ public interface StreamSinkService {
* Paging query stream sink info based on conditions.
*
* @param request paging request
* @param operator operator
* @return sink page list
*/
PageResult<? extends StreamSink> listByCondition(SinkPageRequest request);
PageResult<? extends StreamSink> listByCondition(SinkPageRequest request, String operator);

/**
* Paging query stream sink info based on conditions.
Expand Down
Expand Up @@ -24,6 +24,12 @@
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.table.ColDataType;
import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
Expand Down Expand Up @@ -65,12 +71,6 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.table.ColDataType;
import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -81,14 +81,14 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;

/**
* Implementation of sink service interface
Expand Down Expand Up @@ -315,15 +315,28 @@ public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List
}

@Override
public PageResult<? extends StreamSink> listByCondition(SinkPageRequest request) {
public PageResult<? extends StreamSink> listByCondition(SinkPageRequest request, String operator) {
Preconditions.expectNotBlank(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);

UserInfo userInfo = userService.getByName(operator);
boolean isAdmin = UserTypeEnum.ADMIN.getCode().equals(userInfo.getAccountType());
PageHelper.startPage(request.getPageNum(), request.getPageSize());
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
List<StreamSinkEntity> entityPage = sinkMapper.selectByCondition(request);
Map<String, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
for (StreamSinkEntity streamSink : entityPage) {
InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(streamSink.getInlongGroupId());
if (groupEntity == null) {
continue;
}
// only the person in charges can query
if (!isAdmin) {
List<String> inCharges = Arrays.asList(groupEntity.getInCharges().split(InlongConstants.COMMA));
if (!inCharges.contains(operator)) {
continue;
}
}
sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new Page<>()).add(streamSink);
}
List<StreamSink> responseList = Lists.newArrayList();
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void testAll() {
Assertions.assertNotNull(consumeId);

// test get operation
InlongConsumeInfo consumeInfo = this.testGet(consumeId);
InlongConsumeInfo consumeInfo = this.testGet(consumeId, GLOBAL_OPERATOR);
Assertions.assertEquals(consumeInfo.getId(), consumeId);

// test list operation
Expand Down Expand Up @@ -113,8 +113,8 @@ private Integer testSave() {
return consumeService.save(request, GLOBAL_OPERATOR);
}

private InlongConsumeInfo testGet(Integer id) {
return consumeService.get(id);
private InlongConsumeInfo testGet(Integer id, String operator) {
return consumeService.get(id, operator);
}

private PageResult<InlongConsumeBriefInfo> testList() {
Expand Down
Expand Up @@ -71,7 +71,7 @@ public Response<Integer> save(@RequestBody InlongConsumeRequest request) {
@ApiOperation(value = "Get inlong consume")
@ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
public Response<InlongConsumeInfo> get(@PathVariable(name = "id") Integer id) {
return Response.success(consumeService.get(id));
return Response.success(consumeService.get(id, LoginUserUtils.getLoginUser().getName()));
}

@GetMapping(value = "/consume/countStatus")
Expand Down
Expand Up @@ -74,7 +74,7 @@ public Response<StreamSink> get(@PathVariable Integer id) {
@RequestMapping(value = "/sink/list", method = RequestMethod.POST)
@ApiOperation(value = "List stream sinks by paginating")
public Response<PageResult<? extends StreamSink>> listByCondition(@RequestBody SinkPageRequest request) {
return Response.success(sinkService.listByCondition(request));
return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser().getName()));
}

@RequestMapping(value = "/sink/update", method = RequestMethod.POST)
Expand Down

0 comments on commit 7214e0b

Please sign in to comment.