Skip to content

Commit

Permalink
[Improvement][Monitor] Show master && worker Busy Or Normal Status an…
Browse files Browse the repository at this point in the history
…d Show Commands table list (#15978)

* update

* test

* add monitor enhance ui

* update

* update

* update doc

* fix spotless

* update

* update

* Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataAnalysisController.java

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataAnalysisController.java

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.java

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* Update dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* Update dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* update

* fix spotless

* update

---------

Co-authored-by: Wenjun Ruan <wenjun@apache.org>
  • Loading branch information
qingwli and ruanwenjun committed May 15, 2024
1 parent 4f4a13d commit 0e5cb66
Show file tree
Hide file tree
Showing 49 changed files with 1,268 additions and 188 deletions.
21 changes: 13 additions & 8 deletions docs/docs/en/guide/monitor.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

![worker](../../../img/new_ui/dev/monitor/worker.png)

### Alert Server

- Mainly related to alert server information.

![alert-server](../../../img/new_ui/dev/monitor/alert-server.png)

### Database

- Mainly the health status of the DB.
Expand All @@ -26,18 +32,17 @@

### Statistics

![statistics](../../../img/new_ui/dev/monitor/statistics.png)
![Command Statistics List](../../../img/new_ui/dev/monitor/command-list.png)

Shows the command list in the system. Data is from the `t_ds_command` table.

![Failure Command Statistics List](../../../img/new_ui/dev/monitor/failure-command-list.png)

| **Parameter** | **Description** |
|----------------------------------------|----------------------------------------------------|
| Number of commands wait to be executed | Statistics of the `t_ds_command` table data. |
| The number of failed commands | Statistics of the `t_ds_error_command` table data. |
| Number of tasks wait to run | Count the data of `task_queue` in the ZooKeeper. |
| Number of tasks wait to be killed | Count the data of `task_kill` in the ZooKeeper. |
Shows the failure command list in the system. Data is from the `t_ds_error_command` table.

### Audit Log

The audit log provides information about who accesses the system and the operations made to the system and record related
time, which strengthen the security of the system and maintenance.

![audit-log](../../../img/new_ui/dev/monitor/audit-log.jpg)
![audit-log](../../../img/new_ui/dev/monitor/audit-log.png)
21 changes: 14 additions & 7 deletions docs/docs/zh/guide/monitor.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

![worker](../../../img/new_ui/dev/monitor/worker.png)

### Alert Server

- 主要是 alert server 的相关信息。

![alert-server](../../../img/new_ui/dev/monitor/alert-server.png)

### Database

- 主要是 DB 的健康状况
Expand All @@ -26,15 +32,16 @@

### Statistics

![statistics](../../../img/new_ui/dev/monitor/statistics.png)
![Command Statistics List](../../../img/new_ui/dev/monitor/command-list.png)

展示系统中的命令列表,数据来自`t_ds_command`表。

![Failure Command Statistics List](../../../img/new_ui/dev/monitor/failure-command-list.png)

- 待执行命令数:统计 t_ds_command 表的数据
- 执行失败的命令数:统计 t_ds_error_command 表的数据
- 待运行任务数:统计 Zookeeper 中 task_queue 的数据
- 待杀死任务数:统计 Zookeeper 中 task_kill 的数据
展示系统中的失败命令列表,数据来自`t_ds_error_command`表。

### 审计日志

审计日志的记录提供了有关谁访问了系统,以及他或她在给定时间段内执行了哪些操作的信息,他对于维护安全都很有用
审计日志的记录提供了有关谁访问了系统,以及他或她在给定时间段内执行了哪些操作的信息,对于维护安全都很有用

![audit-log](../../../img/new_ui/dev/monitor/audit-log.jpg)
![audit-log](../../../img/new_ui/dev/monitor/audit-log.png)
Binary file added docs/img/new_ui/dev/monitor/alert-server.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/img/new_ui/dev/monitor/audit-log.jpg
Binary file not shown.
Binary file added docs/img/new_ui/dev/monitor/audit-log.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/new_ui/dev/monitor/command-list.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/new_ui/dev/monitor/master.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/img/new_ui/dev/monitor/statistics.png
Binary file not shown.
Binary file modified docs/img/new_ui/dev/monitor/worker.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
import static org.apache.dolphinscheduler.api.enums.Status.COMMAND_STATE_COUNT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.COUNT_PROCESS_DEFINITION_USER_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.COUNT_PROCESS_INSTANCE_STATE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.LIST_PAGING_ALERT_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUEUE_COUNT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_INSTANCE_STATE_COUNT_ERROR;

import org.apache.dolphinscheduler.api.dto.CommandStateCount;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.DataAnalysisService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.vo.TaskInstanceCountVO;
import org.apache.dolphinscheduler.api.vo.WorkflowDefinitionCountVO;
import org.apache.dolphinscheduler.api.vo.WorkflowInstanceCountVO;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.List;
Expand Down Expand Up @@ -148,4 +152,54 @@ public Result<Map<String, Integer>> countQueueState(@Parameter(hidden = true) @R
Map<String, Integer> stringIntegerMap = dataAnalysisService.countQueueState(loginUser);
return Result.success(stringIntegerMap);
}

/**
* command queue
*
* @param loginUser login user
* @return queue state count
*/
@Operation(summary = "listPendingCommands", description = "LIST_PENDING_COMMANDS")
@Parameters({
@Parameter(name = "searchVal", description = "SEARCH_VAL", schema = @Schema(implementation = String.class)),
@Parameter(name = "pageNo", description = "PAGE_NO", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "pageSize", description = "PAGE_SIZE", required = true, schema = @Schema(implementation = int.class, example = "20"))
})
@GetMapping("/listCommand")
@ResponseStatus(HttpStatus.OK)
@ApiException(LIST_PAGING_ALERT_GROUP_ERROR)
public Result<PageInfo<Command>> listPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "projectCode", required = false) Long projectCode,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
checkPageParams(pageNo, pageSize);
PageInfo<Command> commandPageInfo =
dataAnalysisService.listPendingCommands(loginUser, projectCode, pageNo, pageSize);
return Result.success(commandPageInfo);
}

/**
* error command
*
* @param loginUser login user
* @return queue state count
*/
@Operation(summary = "listErrorCommand", description = "LIST_ERROR_COMMAND_LIST_PAGING_NOTES")
@Parameters({
@Parameter(name = "searchVal", description = "SEARCH_VAL", schema = @Schema(implementation = String.class)),
@Parameter(name = "pageNo", description = "PAGE_NO", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "pageSize", description = "PAGE_SIZE", required = true, schema = @Schema(implementation = int.class, example = "20"))
})
@GetMapping("/listErrorCommand")
@ResponseStatus(HttpStatus.OK)
@ApiException(LIST_PAGING_ALERT_GROUP_ERROR)
public Result<PageInfo<ErrorCommand>> listErrorCommand(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "projectCode", required = false) Long projectCode,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
checkPageParams(pageNo, pageSize);
PageInfo<ErrorCommand> errorCommandPageInfo =
dataAnalysisService.listErrorCommand(loginUser, projectCode, pageNo, pageSize);
return Result.success(errorCommandPageInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
package org.apache.dolphinscheduler.api.controller;

import static org.apache.dolphinscheduler.api.enums.Status.LIST_MASTERS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.LIST_WORKERS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATABASE_STATE_ERROR;

import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.plugin.api.monitor.DatabaseMetrics;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
Expand All @@ -56,35 +56,19 @@ public class MonitorController extends BaseController {
private MonitorService monitorService;

/**
* master list
* server list
*
* @param loginUser login user
* @return master list
* @return server list
*/
@Operation(summary = "listMaster", description = "MASTER_LIST_NOTES")
@GetMapping(value = "/masters")
@Operation(summary = "listServer", description = "SERVER_LIST_NOTES")
@GetMapping(value = "/{nodeType}")
@ResponseStatus(HttpStatus.OK)
@ApiException(LIST_MASTERS_ERROR)
public Result<List<Server>> listMaster(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
List<Server> servers = monitorService.queryMaster(loginUser);
public Result<List<Server>> listServer(@PathVariable("nodeType") RegistryNodeType nodeType) {
List<Server> servers = monitorService.listServer(nodeType);
return Result.success(servers);
}

/**
* worker list
*
* @param loginUser login user
* @return worker information list
*/
@Operation(summary = "listWorker", description = "WORKER_LIST_NOTES")
@GetMapping(value = "/workers")
@ResponseStatus(HttpStatus.OK)
@ApiException(LIST_WORKERS_ERROR)
public Result<List<WorkerServerModel>> listWorker(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
List<WorkerServerModel> workerServerModels = monitorService.queryWorker(loginUser);
return Result.success(workerServerModels);
}

/**
* query database state
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.dolphinscheduler.api.dto.DefineUserDto;
import org.apache.dolphinscheduler.api.dto.TaskCountDto;
import org.apache.dolphinscheduler.api.dto.project.StatisticsStateRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.vo.TaskInstanceCountVO;
import org.apache.dolphinscheduler.api.vo.WorkflowDefinitionCountVO;
import org.apache.dolphinscheduler.api.vo.WorkflowInstanceCountVO;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.List;
Expand Down Expand Up @@ -117,4 +120,7 @@ TaskCountDto countWorkflowStates(User loginUser,
*/
TaskCountDto countOneTaskStates(User loginUser, Long taskCode);

PageInfo<Command> listPendingCommands(User loginUser, Long projectCode, Integer pageNo, Integer pageSize);

PageInfo<ErrorCommand> listErrorCommand(User loginUser, Long projectCode, Integer pageNo, Integer pageSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.plugin.api.monitor.DatabaseMetrics;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

import java.util.List;

Expand All @@ -38,20 +38,10 @@ public interface MonitorService {
List<DatabaseMetrics> queryDatabaseState(User loginUser);

/**
* query master list
* query server list
*
* @param loginUser login user
* @return master information list
* @param nodeType RegistryNodeType
* @return server information list
*/
List<Server> queryMaster(User loginUser);

/**
* query worker list
*
* @param loginUser login user
* @return worker information list
*/
List<WorkerServerModel> queryWorker(User loginUser);

List<Server> getServerListFromRegistry(boolean isMaster);
List<Server> listServer(RegistryNodeType nodeType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.DataAnalysisService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.vo.TaskInstanceCountVO;
import org.apache.dolphinscheduler.api.vo.WorkflowDefinitionCountVO;
import org.apache.dolphinscheduler.api.vo.WorkflowInstanceCountVO;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
Expand Down Expand Up @@ -71,6 +75,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -380,6 +386,66 @@ public TaskCountDto countOneTaskStates(User loginUser, Long taskCode) {
return new TaskCountDto(executeStatusCounts);
}

@Override
public PageInfo<Command> listPendingCommands(User loginUser, Long projectCode, Integer pageNo, Integer pageSize) {
Page<Command> page = new Page<>(pageNo, pageSize);
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
IPage<Command> commandIPage = commandMapper.queryCommandPage(page);
return PageInfo.of(commandIPage);
}

List<Long> workflowDefinitionCodes = getAuthDefinitionCodes(loginUser, projectCode);

if (workflowDefinitionCodes.isEmpty()) {
return PageInfo.of(pageNo, pageSize);
}

IPage<Command> commandIPage =
commandMapper.queryCommandPageByIds(page, new ArrayList<>(workflowDefinitionCodes));
return PageInfo.of(commandIPage);
}

@Override
public PageInfo<ErrorCommand> listErrorCommand(User loginUser, Long projectCode, Integer pageNo, Integer pageSize) {
Page<ErrorCommand> page = new Page<>(pageNo, pageSize);
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
IPage<ErrorCommand> commandIPage = errorCommandMapper.queryErrorCommandPage(page);
return PageInfo.of(commandIPage);
}

List<Long> workflowDefinitionCodes = getAuthDefinitionCodes(loginUser, projectCode);

if (workflowDefinitionCodes.isEmpty()) {
return PageInfo.of(pageNo, pageSize);
}

IPage<ErrorCommand> commandIPage =
errorCommandMapper.queryErrorCommandPageByIds(page, new ArrayList<>(workflowDefinitionCodes));
return PageInfo.of(commandIPage);
}

private List<Long> getAuthDefinitionCodes(User loginUser, Long projectCode) {
Set<Integer> projectIds = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), log);
if (CollectionUtils.isEmpty(projectIds)) {
return Collections.emptyList();
}
List<Long> projectCodes = projectMapper.selectBatchIds(projectIds)
.stream()
.map(Project::getCode)
.collect(Collectors.toList());

if (projectCode != null) {
if (!projectCodes.contains(projectCode)) {
return Collections.emptyList();
}

projectCodes = Collections.singletonList(projectCode);
}

return processDefinitionMapper.queryDefinitionCodeListByProjectCodes(projectCodes);
}

/**
* statistics the process definition quantities of a certain person
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
Expand Down Expand Up @@ -289,7 +290,7 @@ public Map<String, Object> execProcessInstance(User loginUser, long projectCode,

private void checkMasterExists() {
// check master server exists
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
List<Server> masterServers = monitorService.listServer(RegistryNodeType.MASTER);

// no master
if (masterServers.isEmpty()) {
Expand Down Expand Up @@ -1142,7 +1143,7 @@ public void execStreamTaskInstance(User loginUser, long projectCode, long taskDe
checkValidTenant(tenantCode);
checkMasterExists();
// todo dispatch improvement
List<Server> masterServerList = monitorService.getServerListFromRegistry(true);
List<Server> masterServerList = monitorService.listServer(RegistryNodeType.MASTER);
Server server = masterServerList.get(0);

StreamingTaskTriggerRequest taskExecuteStartMessage = new StreamingTaskTriggerRequest();
Expand Down

0 comments on commit 0e5cb66

Please sign in to comment.