Skip to content

Commit

Permalink
[INLONG-8447][Manager] Optimize paging logic (#8448)
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Sep 13, 2023
1 parent 4b5edd5 commit ddbdb22
Show file tree
Hide file tree
Showing 21 changed files with 117 additions and 155 deletions.
4 changes: 4 additions & 0 deletions inlong-manager/manager-pojo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.inlong.manager.pojo.common;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.github.pagehelper.Page;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down Expand Up @@ -68,6 +72,20 @@ public PageResult(List<T> list) {
this.total = (long) list.size();
}

public <R> PageResult<R> map(Function<? super T, ? extends R> mapper) {
List<R> newList = list.stream().map(mapper).collect(Collectors.toList());
return new PageResult<>(newList, total, pageNum, pageSize);
}

public PageResult<T> foreach(Consumer<? super T> action) {
list.forEach(action);
return this;
}

public static <T> PageResult<T> fromPage(Page<T> page) {
return new PageResult<>(page.getResult(), page.getTotal(), page.getPageNum(), page.getPageSize());
}

public static <T> PageResult<T> empty() {
return new PageResult<>(0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,9 @@ public PageResult<ClusterTagResponse> listTag(ClusterTagPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<InlongClusterTagEntity> entityPage = (Page<InlongClusterTagEntity>) clusterTagMapper
.selectByCondition(request);
PageResult<ClusterTagResponse> pageResult = PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, ClusterTagResponse::new));

List<ClusterTagResponse> tagList = CommonBeanUtils.copyListProperties(entityPage, ClusterTagResponse::new);

PageResult<ClusterTagResponse> pageResult = new PageResult<>(tagList,
entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
LOGGER.debug("success to list cluster tag by {}", request);
return pageResult;
}
Expand Down Expand Up @@ -567,16 +565,11 @@ public ClusterInfo get(Integer id, UserInfo opInfo) {
public PageResult<ClusterInfo> list(ClusterPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<InlongClusterEntity> entityPage = (Page<InlongClusterEntity>) clusterMapper.selectByCondition(request);
List<ClusterInfo> list = entityPage.stream()
PageResult<ClusterInfo> pageResult = PageResult.fromPage(entityPage)
.map(entity -> {
InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
return instance.getFromEntity(entity);
})
.collect(Collectors.toList());

PageResult<ClusterInfo> pageResult = new PageResult<>(
list, entityPage.getTotal(),
entityPage.getPageNum(), entityPage.getPageSize());
});

LOGGER.debug("success to list inlong cluster by {}", request);
return pageResult;
Expand Down Expand Up @@ -926,10 +919,8 @@ public PageResult<ClusterNodeResponse> listNode(ClusterPageRequest request, Stri
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<InlongClusterNodeEntity> entityPage =
(Page<InlongClusterNodeEntity>) clusterNodeMapper.selectByCondition(request);
List<ClusterNodeResponse> nodeList = CommonBeanUtils.copyListProperties(entityPage, ClusterNodeResponse::new);

PageResult<ClusterNodeResponse> pageResult = new PageResult<>(nodeList, entityPage.getTotal(),
entityPage.getPageNum(), entityPage.getPageSize());
PageResult<ClusterNodeResponse> pageResult = PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new));

LOGGER.debug("success to list inlong cluster node by {}", request);
return pageResult;
Expand Down Expand Up @@ -1501,9 +1492,10 @@ public PageResult<TenantClusterTagInfo> listTenantTag(TenantClusterTagPageReques
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<TenantClusterTagEntity> entityPage =
(Page<TenantClusterTagEntity>) tenantClusterTagMapper.selectByCondition(request);
List<TenantClusterTagInfo> infoList = CommonBeanUtils.copyListProperties(entityPage, TenantClusterTagInfo::new);
PageResult<TenantClusterTagInfo> pageResult = PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, TenantClusterTagInfo::new));
LOGGER.debug("success to list tenant tag with request={}", request);
return new PageResult<>(infoList, entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
return pageResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request)
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
Page<InlongConsumeBriefInfo> briefInfos = (Page<InlongConsumeBriefInfo>) consumeMapper.selectBriefList(request);
PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(briefInfos,
briefInfos.getTotal(), briefInfos.getPageNum(), briefInfos.getPageSize());
PageResult<InlongConsumeBriefInfo> pageResult = PageResult.fromPage(briefInfos);

LOGGER.debug("success to list inlong consume for {}", request);
return pageResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ public PageResult<ApproverResponse> listByCondition(ApproverPageRequest request)
PageHelper.startPage(request.getPageNum(), request.getPageSize());

Page<WorkflowApproverEntity> page = (Page<WorkflowApproverEntity>) approverMapper.selectByCondition(request);
List<ApproverResponse> resultList = CommonBeanUtils.copyListProperties(page,
ApproverResponse::new);

return new PageResult<>(resultList, page.getTotal(), page.getPageNum(), page.getPageSize());
return PageResult.fromPage(page)
.map(entity -> CommonBeanUtils.copyProperties(entity, ApproverResponse::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* Workflow event related services
*/
Expand All @@ -56,10 +54,8 @@ public EventLogResponse get(Integer id) {
public PageResult<EventLogResponse> list(EventLogRequest query) {
PageHelper.startPage(query.getPageNum(), query.getPageSize());
Page<WorkflowEventLogEntity> page = (Page<WorkflowEventLogEntity>) queryService.listEventLog(query);

List<EventLogResponse> viewList = CommonBeanUtils.copyListProperties(page, EventLogResponse::new);

return new PageResult<>(viewList, page.getTotal(), page.getPageNum(), page.getPageSize());
return PageResult.fromPage(page)
.map(entity -> CommonBeanUtils.copyProperties(entity, EventLogResponse::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,10 @@ public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
Page<InlongGroupEntity> entityPage = (Page<InlongGroupEntity>) groupMapper.selectByCondition(request);
PageResult<InlongGroupBriefInfo> pageResult = PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, InlongGroupBriefInfo::new));

List<InlongGroupBriefInfo> briefInfos = CommonBeanUtils.copyListProperties(entityPage,
InlongGroupBriefInfo::new);
List<InlongGroupBriefInfo> briefInfos = pageResult.getList();

// list all related sources
if (request.isListSources() && CollectionUtils.isNotEmpty(briefInfos)) {
Expand All @@ -337,9 +338,6 @@ public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request
});
}

PageResult<InlongGroupBriefInfo> pageResult = new PageResult<>(briefInfos,
entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());

LOGGER.debug("success to list inlong group for {}", request);
return pageResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,31 +278,24 @@ private PageResult<ComponentHeartbeatResponse> listComponentHeartbeatOpt(Heartbe
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<ComponentHeartbeatEntity> entityPage =
(Page<ComponentHeartbeatEntity>) componentHeartbeatMapper.selectByCondition(request);
List<ComponentHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
ComponentHeartbeatResponse::new);

return new PageResult<>(responseList, entityPage.getTotal());
return PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, ComponentHeartbeatResponse::new));
}

private PageResult<GroupHeartbeatResponse> listGroupHeartbeatOpt(HeartbeatPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<GroupHeartbeatEntity> entityPage = (Page<GroupHeartbeatEntity>) groupHeartbeatMapper.selectByCondition(
request);
List<GroupHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
GroupHeartbeatResponse::new);

return new PageResult<>(responseList,
entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
return PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, GroupHeartbeatResponse::new));
}

private PageResult<StreamHeartbeatResponse> listStreamHeartbeatOpt(HeartbeatPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<StreamHeartbeatEntity> entityPage =
(Page<StreamHeartbeatEntity>) streamHeartbeatMapper.selectByCondition(request);
List<StreamHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
StreamHeartbeatResponse::new);

return new PageResult<>(responseList, entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
return PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, StreamHeartbeatResponse::new));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,11 @@ public DataNodeInfo get(String name, String type) {
public PageResult<DataNodeInfo> list(DataNodePageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<DataNodeEntity> entityPage = (Page<DataNodeEntity>) dataNodeMapper.selectByCondition(request);
List<DataNodeInfo> list = entityPage.stream()
PageResult<DataNodeInfo> pageResult = PageResult.fromPage(entityPage)
.map(entity -> {
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType());
return dataNodeOperator.getFromEntity(entity);
}).collect(Collectors.toList());

PageResult<DataNodeInfo> pageResult = new PageResult<>(list, entityPage.getTotal(),
entityPage.getPageNum(), entityPage.getPageSize());

});
LOGGER.debug("success to list data node by {}", request);
return pageResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Default operation of stream sink.
Expand Down Expand Up @@ -110,13 +109,7 @@ public PageResult<? extends StreamSink> getPageInfo(Page<StreamSinkEntity> entit
if (CollectionUtils.isEmpty(entityPage)) {
return PageResult.empty();
}

List<StreamSink> streamSinks = entityPage.getResult()
.stream()
.map(this::getFromEntity)
.collect(Collectors.toList());

return new PageResult<>(streamSinks, entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
return PageResult.fromPage(entityPage).map(this::getFromEntity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Default operator of stream source.
Expand Down Expand Up @@ -112,11 +111,7 @@ public PageResult<? extends StreamSource> getPageInfo(Page<StreamSourceEntity> e
if (CollectionUtils.isEmpty(entityPage)) {
return PageResult.empty();
}

List<StreamSource> streamSources = entityPage.stream()
.map(this::getFromEntity)
.collect(Collectors.toList());
return new PageResult<>(streamSources, entityPage.getTotal(), entityPage.getPageNum(), entityPage.size());
return PageResult.fromPage(entityPage).map(this::getFromEntity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,8 @@ public PageResult<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest reque
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
Page<InlongStreamEntity> entityPage = (Page<InlongStreamEntity>) streamMapper.selectByCondition(request);
List<InlongStreamBriefInfo> streamList = CommonBeanUtils.copyListProperties(entityPage,
InlongStreamBriefInfo::new);

PageResult<InlongStreamBriefInfo> pageResult = new PageResult<>(streamList,
entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
PageResult<InlongStreamBriefInfo> pageResult = PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, InlongStreamBriefInfo::new));

LOGGER.debug("success to list inlong stream info for groupId={}", request.getInlongGroupId());
return pageResult;
Expand Down Expand Up @@ -399,31 +396,28 @@ public PageResult<InlongStreamInfo> listAll(InlongStreamPageRequest request) {
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
Page<InlongStreamEntity> page = (Page<InlongStreamEntity>) streamMapper.selectByCondition(request);
List<InlongStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamInfo::new);

// Convert and encapsulate the paged results
for (InlongStreamInfo streamInfo : streamInfoList) {
// Set the field information of the inlong stream
String streamId = streamInfo.getInlongStreamId();
unpackExtParams(streamInfo);
List<StreamField> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
List<InlongStreamExtInfo> streamExtInfos = CommonBeanUtils.copyListProperties(
extEntities, InlongStreamExtInfo::new);
streamInfo.setExtList(streamExtInfos);

// query all valid stream sources
List<StreamSource> sourceList = sourceService.listSource(groupId, streamId);
streamInfo.setSourceList(sourceList);

// query all valid stream sinks and its extended info, field info
List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
streamInfo.setSinkList(sinkList);
}

PageResult<InlongStreamInfo> pageResult = new PageResult<>(streamInfoList, page.getTotal(),
page.getPageNum(), page.getPageSize());
PageResult<InlongStreamInfo> pageResult = PageResult.fromPage(page)
.map(entity -> CommonBeanUtils.copyProperties(entity, InlongStreamInfo::new))
.foreach(streamInfo -> {
// Set the field information of the inlong stream
String streamId = streamInfo.getInlongStreamId();
unpackExtParams(streamInfo);
List<StreamField> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
List<InlongStreamExtInfo> streamExtInfos = CommonBeanUtils.copyListProperties(
extEntities, InlongStreamExtInfo::new);
streamInfo.setExtList(streamExtInfos);

// query all valid stream sources
List<StreamSource> sourceList = sourceService.listSource(groupId, streamId);
streamInfo.setSourceList(sourceList);

// query all valid stream sinks and its extended info, field info
List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
streamInfo.setSinkList(sinkList);
});

LOGGER.debug("success to list full inlong stream info by {}", request);
return pageResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,9 @@ public PageResult<InlongTenantInfo> listByCondition(InlongTenantPageRequest requ
}

PageHelper.startPage(request.getPageNum(), request.getPageSize());

Page<InlongTenantEntity> entityPage = inlongTenantEntityMapper.selectByCondition(request);

List<InlongTenantInfo> tenantList = CommonBeanUtils.copyListProperties(entityPage, InlongTenantInfo::new);
PageResult<InlongTenantInfo> pageResult = new PageResult<>(tenantList,
entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());

return pageResult;
return PageResult.fromPage(entityPage)
.map(entity -> CommonBeanUtils.copyProperties(entity, InlongTenantInfo::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public PageResult<TransformResponse> listByCondition(TransformPageRequest reques
String streamId = request.getInlongStreamId();
LOGGER.debug("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
PageHelper.startPage(request.getPageNum(), request.getPageSize());
PageResult<TransformResponse> pageResponse = new PageResult<>();
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
Expand All @@ -151,13 +150,11 @@ public PageResult<TransformResponse> listByCondition(TransformPageRequest reques

// query result
List<StreamTransformEntity> entityList = transformMapper.selectByCondition(request);
if (CollectionUtils.isEmpty(entityList)) {
pageResponse.setList(Collections.emptyList());
return pageResponse;
List<TransformResponse> responses = Collections.emptyList();
if (!CollectionUtils.isEmpty(entityList)) {
responses = getTransformResponse(entityList);
}

pageResponse.setList(getTransformResponse(entityList));
return pageResponse;
return new PageResult<>(responses, (long) responses.size(), request.getPageNum(), request.getPageSize());
}

@Override
Expand Down

0 comments on commit ddbdb22

Please sign in to comment.