diff --git a/core/src/main/java/com/flowci/core/agent/controller/AgentController.java b/core/src/main/java/com/flowci/core/agent/controller/AgentController.java index 278777c59..b0cb01b1c 100644 --- a/core/src/main/java/com/flowci/core/agent/controller/AgentController.java +++ b/core/src/main/java/com/flowci/core/agent/controller/AgentController.java @@ -18,7 +18,7 @@ import com.flowci.core.agent.domain.Agent; import com.flowci.core.agent.domain.AgentAction; -import com.flowci.core.agent.domain.CreateOrUpdateAgent; +import com.flowci.core.agent.domain.AgentOption; import com.flowci.core.agent.domain.DeleteAgent; import com.flowci.core.agent.service.AgentService; import com.flowci.core.auth.annotation.Action; @@ -28,7 +28,6 @@ import org.springframework.web.bind.annotation.*; import java.util.List; -import java.util.Optional; /** * @author yang @@ -55,12 +54,12 @@ public List list() { @PostMapping() @Action(AgentAction.CREATE_UPDATE) - public Agent createOrUpdate(@Validated @RequestBody CreateOrUpdateAgent body) { + public Agent createOrUpdate(@Validated @RequestBody AgentOption body) { if (body.hasToken()) { - return agentService.update(body.getToken(), body.getName(), body.getTags()); + return agentService.update(body); } - return agentService.create(body.getName(), body.getTags(), Optional.empty()); + return agentService.create(body); } @DeleteMapping() diff --git a/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java b/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java index 6b65ca406..781dda4d4 100644 --- a/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java +++ b/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java @@ -18,7 +18,7 @@ import com.flowci.core.agent.domain.AgentHost; import com.flowci.core.agent.domain.AgentHostAction; -import com.flowci.core.agent.domain.SaveAgentHost; +import com.flowci.core.agent.domain.AgentHostOption; import com.flowci.core.agent.service.AgentHostService; import com.flowci.core.auth.annotation.Action; import lombok.extern.log4j.Log4j2; @@ -56,7 +56,7 @@ public AgentHost deleteByName(@PathVariable String name) { @PostMapping @Action(AgentHostAction.CREATE_UPDATE) - public AgentHost createOrUpdate(@RequestBody @Validated SaveAgentHost body) { + public AgentHost createOrUpdate(@RequestBody @Validated AgentHostOption body) { AgentHost host = body.toObj(); return agentHostService.createOrUpdate(host); } diff --git a/core/src/main/java/com/flowci/core/agent/dao/AgentProfileDao.java b/core/src/main/java/com/flowci/core/agent/dao/AgentProfileDao.java new file mode 100644 index 000000000..93b23710b --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/dao/AgentProfileDao.java @@ -0,0 +1,9 @@ +package com.flowci.core.agent.dao; + +import com.flowci.core.agent.domain.AgentProfile; +import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface AgentProfileDao extends MongoRepository { +} diff --git a/core/src/main/java/com/flowci/core/agent/domain/Agent.java b/core/src/main/java/com/flowci/core/agent/domain/Agent.java index f83bc15fb..6680c34ce 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/Agent.java +++ b/core/src/main/java/com/flowci/core/agent/domain/Agent.java @@ -22,14 +22,15 @@ import com.flowci.domain.SimpleKeyPair; import com.flowci.tree.Selector; import com.google.common.base.Strings; -import lombok.*; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.experimental.Accessors; import org.springframework.data.mongodb.core.index.Indexed; import org.springframework.data.mongodb.core.mapping.Document; import java.time.Instant; import java.util.Collections; -import java.util.Date; import java.util.HashSet; import java.util.Set; @@ -64,22 +65,6 @@ public static Status fromBytes(byte[] bytes) { } } - @Getter - @Setter - @Accessors(chain = true) - public static class Resource { - - private int cpu; - - private int totalMemory; // in MB - - private int freeMemory; // in MB - - private int totalDisk; // in MB - - private int freeDisk; // in MB - } - @Indexed(name = "index_agent_name", unique = true) private String name; @@ -97,13 +82,15 @@ public static class Resource { private OS os = OS.UNKNOWN; - private Resource resource = new Resource(); - private Set tags = Collections.emptySet(); + private int exitOnIdle; + private Status status = Status.OFFLINE; - private Date statusUpdatedAt; + private Instant statusUpdatedAt; + + private Instant connectedAt; private String jobId; @@ -121,9 +108,14 @@ public Agent(String name, Set tags) { this.setTags(tags); } + @JsonIgnore + public AgentConfig getConfig() { + return new AgentConfig().setExitOnIdle(exitOnIdle); + } + public void setStatus(Status status) { this.status = status; - this.statusUpdatedAt = new Date(); + this.statusUpdatedAt = Instant.now(); } public void setTags(Set tags) { diff --git a/core/src/main/java/com/flowci/core/agent/domain/AgentConfig.java b/core/src/main/java/com/flowci/core/agent/domain/AgentConfig.java new file mode 100644 index 000000000..2e30a9561 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentConfig.java @@ -0,0 +1,13 @@ +package com.flowci.core.agent.domain; + +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +@Getter +@Setter +@Accessors(chain = true) +public class AgentConfig { + + private int ExitOnIdle; +} diff --git a/core/src/main/java/com/flowci/core/agent/domain/AgentHost.java b/core/src/main/java/com/flowci/core/agent/domain/AgentHost.java index e3cf81221..e08205019 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/AgentHost.java +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentHost.java @@ -80,6 +80,11 @@ public enum Type { */ private Set tags = new HashSet<>(); + /** + * Ref to Agent.exitOnIdle + */ + private int exitOnIdle; + /** * Error message if connection fail */ diff --git a/core/src/main/java/com/flowci/core/agent/domain/SaveAgentHost.java b/core/src/main/java/com/flowci/core/agent/domain/AgentHostOption.java similarity index 92% rename from core/src/main/java/com/flowci/core/agent/domain/SaveAgentHost.java rename to core/src/main/java/com/flowci/core/agent/domain/AgentHostOption.java index 46ec6fd98..b34465b31 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/SaveAgentHost.java +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentHostOption.java @@ -29,12 +29,14 @@ @Getter @Setter -public class SaveAgentHost { +public class AgentHostOption { private String id; private Set tags = new HashSet<>(); + private int exitOnIdle; + @NotNull private AgentHost.Type type; @@ -68,6 +70,7 @@ public AgentHost toObj() { host.setIp(ip); host.setMaxSize(maxSize); host.setPort(port); + host.setExitOnIdle(exitOnIdle); return host; } @@ -77,6 +80,7 @@ public AgentHost toObj() { host.setName(name); host.setTags(tags); host.setMaxSize(maxSize); + host.setExitOnIdle(exitOnIdle); return host; } @@ -88,6 +92,7 @@ public AgentHost toObj() { host.setSecret(secret); host.setNamespace(namespace); host.setMaxSize(maxSize); + host.setExitOnIdle(exitOnIdle); return host; } diff --git a/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java b/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java index a4fbafa31..2ded1fcb3 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java @@ -38,6 +38,4 @@ public class AgentInit { private Common.OS os; private Agent.Status status; - - private Agent.Resource resource = new Agent.Resource(); } diff --git a/core/src/main/java/com/flowci/core/agent/domain/CreateOrUpdateAgent.java b/core/src/main/java/com/flowci/core/agent/domain/AgentOption.java similarity index 86% rename from core/src/main/java/com/flowci/core/agent/domain/CreateOrUpdateAgent.java rename to core/src/main/java/com/flowci/core/agent/domain/AgentOption.java index b9f609b32..e42938848 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/CreateOrUpdateAgent.java +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentOption.java @@ -17,15 +17,18 @@ package com.flowci.core.agent.domain; import com.google.common.base.Strings; -import java.util.Set; -import javax.validation.constraints.NotEmpty; import lombok.Data; +import lombok.experimental.Accessors; + +import javax.validation.constraints.NotEmpty; +import java.util.Set; /** * @author yang */ @Data -public class CreateOrUpdateAgent { +@Accessors(chain = true) +public class AgentOption { @NotEmpty private String name; @@ -34,6 +37,10 @@ public class CreateOrUpdateAgent { private String token; + private int exitOnIdle; + + private String hostId; + public boolean hasToken() { return !Strings.isNullOrEmpty(token); } diff --git a/core/src/main/java/com/flowci/core/agent/domain/AgentProfile.java b/core/src/main/java/com/flowci/core/agent/domain/AgentProfile.java new file mode 100644 index 000000000..09655e883 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentProfile.java @@ -0,0 +1,29 @@ +package com.flowci.core.agent.domain; + +import com.flowci.core.common.domain.Mongoable; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +/** + * Id is agent token + */ +@Getter +@Setter +@Accessors(chain = true) +public class AgentProfile extends Mongoable { + + public static final AgentProfile EMPTY = new AgentProfile(); + + private int cpuNum; + + private double cpuUsage; + + private int totalMemory; // in MB + + private int freeMemory; // in MB + + private int totalDisk; // in MB + + private int freeDisk; // in MB +} diff --git a/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java b/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java index 8ad851a73..d1b8bcfed 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java +++ b/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java @@ -50,6 +50,8 @@ public enum ShellType { private Set envFilters; + private Set secrets; + public ShellIn() { super(Type.SHELL); } diff --git a/core/src/main/java/com/flowci/core/agent/domain/ShellOut.java b/core/src/main/java/com/flowci/core/agent/domain/ShellOut.java index 9e7485e41..9e1a47e9b 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/ShellOut.java +++ b/core/src/main/java/com/flowci/core/agent/domain/ShellOut.java @@ -5,11 +5,13 @@ import com.flowci.domain.Vars; import lombok.Getter; import lombok.Setter; +import lombok.experimental.Accessors; import java.util.Date; @Getter @Setter +@Accessors(chain = true) public final class ShellOut implements Executed, CmdOut { private String id; diff --git a/core/src/main/java/com/flowci/core/agent/event/OnAgentProfileEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnAgentProfileEvent.java new file mode 100644 index 000000000..1618d7b50 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/OnAgentProfileEvent.java @@ -0,0 +1,15 @@ +package com.flowci.core.agent.event; + +import com.flowci.core.agent.domain.AgentProfile; +import lombok.Getter; + +@Getter +public class OnAgentProfileEvent extends EventFromClient { + + private final AgentProfile profile; + + public OnAgentProfileEvent(Object source, AgentProfile profile) { + super(source, null, null); + this.profile = profile; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java index bacd20758..6a6695e04 100644 --- a/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java +++ b/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java @@ -1,14 +1,19 @@ package com.flowci.core.agent.event; +import com.flowci.core.agent.domain.Agent; import com.flowci.core.agent.domain.AgentInit; import lombok.Getter; +import lombok.Setter; import org.springframework.web.socket.WebSocketSession; @Getter +@Setter public class OnConnectedEvent extends EventFromClient { private final AgentInit init; + private Agent agent; + public OnConnectedEvent(Object source, String token, WebSocketSession session, AgentInit init) { super(source, token, session); this.init = init; diff --git a/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java b/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java index 2b137f1ee..9601747b2 100644 --- a/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java +++ b/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java @@ -1,9 +1,7 @@ package com.flowci.core.agent.manager; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flowci.core.agent.domain.AgentInit; -import com.flowci.core.agent.domain.ShellLog; -import com.flowci.core.agent.domain.TtyCmd; +import com.flowci.core.agent.domain.*; import com.flowci.core.agent.event.*; import com.flowci.core.common.domain.StatusCode; import com.flowci.core.common.domain.http.ResponseMessage; @@ -33,6 +31,8 @@ public class AgentEventManager extends BinaryWebSocketHandler { private final static String EventConnect = "connect___"; + private final static String EventProfile = "profile___"; + private final static String EventCmdOut = "cmd_out___"; private final static String EventShellLog = "slog______"; @@ -104,6 +104,11 @@ protected void handleBinaryMessage(WebSocketSession session, BinaryMessage messa if (EventTTYLog.equals(event)) { onTtyLog(body); + return; + } + + if (EventProfile.equals(event)) { + onProfile(token, body); } } @@ -144,11 +149,14 @@ private void onConnected(WebSocketSession session, String token, byte[] body) { Objects.requireNonNull(init.getStatus(), "Agent status is missing"); init.setToken(token); - init.setIp(session.getRemoteAddress() == null ? null : session.getRemoteAddress().toString()); + init.setIp(session.getRemoteAddress() == null ? null : session.getRemoteAddress().getAddress().toString()); - eventManager.publish(new OnConnectedEvent(this, token, session, init)); + OnConnectedEvent event = new OnConnectedEvent(this, token, session, init); + eventManager.publish(event); agentSessionStore.put(token, session); - writeMessage(token, new ResponseMessage(StatusCode.OK, null)); + + Agent agent = event.getAgent(); + writeMessage(token, new ResponseMessage<>(StatusCode.OK, agent.getConfig())); log.debug("Agent {} is connected with status {}", token, init.getStatus()); } catch (Exception e) { log.warn(e); @@ -158,8 +166,8 @@ private void onConnected(WebSocketSession session, String token, byte[] body) { private void onCmdOut(String token, byte[] body) { try { - eventManager.publish(new OnCmdOutEvent(this, body)); log.debug("Agent {} got cmd back: {}", token, new String(body)); + eventManager.publish(new OnCmdOutEvent(this, body)); } catch (Exception e) { log.warn(e); } @@ -183,6 +191,16 @@ private void onTtyLog(byte[] body) { } } + private void onProfile(String token, byte[] body) { + try { + AgentProfile profile = objectMapper.readValue(body, AgentProfile.class); + profile.setId(token); + eventManager.publish(new OnAgentProfileEvent(this, profile)); + } catch (IOException e) { + log.warn(e); + } + } + private static String getToken(WebSocketSession session) { return session.getHandshakeHeaders().get(HeaderToken).get(0); } diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java b/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java index 04f41d95d..488bfe9f7 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java @@ -267,7 +267,12 @@ public boolean start(AgentHost host) { Agent agent = null; try { - agent = agentService.create(name, host.getTags(), Optional.of(host.getId())); + agent = agentService.create(new AgentOption() + .setName(name) + .setTags(host.getTags()) + .setExitOnIdle(host.getExitOnIdle()) + .setHostId(host.getId()) + ); StartOption startOption = mapping.get(host.getClass()).buildStartOption(host, agent); String cid = cm.start(startOption); diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentService.java b/core/src/main/java/com/flowci/core/agent/service/AgentService.java index e78796b70..0d53bf0c2 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentService.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentService.java @@ -17,13 +17,14 @@ package com.flowci.core.agent.service; import com.flowci.core.agent.domain.Agent; +import com.flowci.core.agent.domain.AgentProfile; import com.flowci.core.agent.domain.CmdIn; +import com.flowci.core.agent.domain.AgentOption; import com.flowci.tree.Selector; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.Set; /** * @author yang @@ -35,6 +36,11 @@ public interface AgentService { */ Agent get(String id); + /** + * Get agent profile by token + */ + AgentProfile getProfile(String token); + /** * Get agent by name */ @@ -88,17 +94,12 @@ public interface AgentService { /** * Create agent by name and tags */ - Agent create(String name, Set tags, Optional hostId); + Agent create(AgentOption option); /** * Update agent name or and tags */ - Agent update(String token, String name, Set tags); - - /** - * Update agent resource - */ - Agent update(String token, Agent.Resource resource); + Agent update(AgentOption option); /** * Update agent status diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java b/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java index 57d4aad43..5479fc4cf 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java @@ -18,16 +18,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flowci.core.agent.dao.AgentDao; -import com.flowci.core.agent.domain.Agent; +import com.flowci.core.agent.dao.AgentProfileDao; +import com.flowci.core.agent.domain.*; import com.flowci.core.agent.domain.Agent.Status; -import com.flowci.core.agent.domain.AgentInit; -import com.flowci.core.agent.domain.CmdIn; -import com.flowci.core.agent.domain.Util; import com.flowci.core.agent.event.*; import com.flowci.core.agent.manager.AgentEventManager; import com.flowci.core.common.config.AppProperties; +import com.flowci.core.common.domain.PushEvent; import com.flowci.core.common.helper.CipherHelper; import com.flowci.core.common.helper.ThreadHelper; +import com.flowci.core.common.manager.SocketPushManager; import com.flowci.core.common.manager.SpringEventManager; import com.flowci.core.common.manager.SpringTaskManager; import com.flowci.core.common.rabbit.RabbitOperations; @@ -48,6 +48,7 @@ import org.springframework.stereotype.Service; import java.io.IOException; +import java.time.Instant; import java.util.*; import static com.flowci.core.agent.domain.Agent.Status.*; @@ -71,6 +72,9 @@ public class AgentServiceImpl implements AgentService { private static final int MaxIdleAgentPushBack = 10; // seconds + @Autowired + private String topicForAgentProfile; + @Autowired private AppProperties.Zookeeper zkProperties; @@ -80,6 +84,9 @@ public class AgentServiceImpl implements AgentService { @Autowired private AgentDao agentDao; + @Autowired + private AgentProfileDao agentProfileDao; + @Autowired private SpringEventManager eventManager; @@ -98,6 +105,9 @@ public class AgentServiceImpl implements AgentService { @Autowired private RabbitOperations idleAgentQueueManager; + @Autowired + private SocketPushManager socketPushManager; + @EventListener(ContextRefreshedEvent.class) public void initAgentStatus() { taskManager.run("init-agent-status", true, () -> { @@ -118,6 +128,12 @@ public void subscribeIdleAgentQueue() throws IOException { String agentId = new String(body); log.debug("Got an idle agent {}", agentId); + Agent agent = get(agentId); + if (!agent.isIdle()) { + log.debug("Agent {} is not idle", agentId); + return true; + } + try { IdleAgentEvent event = new IdleAgentEvent(this, agentId); eventManager.publish(event); @@ -149,6 +165,12 @@ public Agent get(String id) { return optional.get(); } + @Override + public AgentProfile getProfile(String token) { + Optional optional = agentProfileDao.findById(token); + return optional.orElse(AgentProfile.EMPTY); + } + @Override public Agent getByName(String name) { Agent agent = agentDao.findByName(name); @@ -278,7 +300,9 @@ public void release(Collection ids) { } @Override - public Agent create(String name, Set tags, Optional hostId) { + public Agent create(AgentOption option) { + String name = option.getName(); + Agent exist = agentDao.findByName(name); if (exist != null) { throw new DuplicateException("Agent name {0} is already defined", name); @@ -286,9 +310,10 @@ public Agent create(String name, Set tags, Optional hostId) { try { // create agent - Agent agent = new Agent(name, tags); + Agent agent = new Agent(name, option.getTags()); agent.setToken(UUID.randomUUID().toString()); - hostId.ifPresent(agent::setHostId); + agent.setHostId(option.getHostId()); + agent.setExitOnIdle(option.getExitOnIdle()); String dummyEmailForAgent = "agent." + name + "@flow.ci"; agent.setRsa(CipherHelper.RSA.gen(dummyEmailForAgent)); @@ -302,26 +327,19 @@ public Agent create(String name, Set tags, Optional hostId) { } @Override - public Agent update(String token, String name, Set tags) { - Agent agent = getByToken(token); - agent.setName(name); - agent.setTags(tags); + public Agent update(AgentOption option) { + Agent agent = getByToken(option.getToken()); + agent.setName(option.getName()); + agent.setTags(option.getTags()); + agent.setExitOnIdle(option.getExitOnIdle()); try { return agentDao.save(agent); } catch (DuplicateKeyException e) { - throw new DuplicateException("Agent name {0} is already defined", name); + throw new DuplicateException("Agent name {0} is already defined", option.getName()); } } - @Override - public Agent update(String token, Agent.Resource resource) { - Agent agent = getByToken(token); - agent.setResource(resource); - agentDao.save(agent); - return agent; - } - @Override public Agent update(Agent agent, Status status) { if (agent.getStatus() == status) { @@ -382,18 +400,26 @@ public void onConnected(OnConnectedEvent event) { target.setK8sCluster(init.getK8sCluster()); target.setUrl("http://" + init.getIp() + ":" + init.getPort()); target.setOs(init.getOs()); - target.setResource(init.getResource()); + target.setConnectedAt(Instant.now()); update(target, init.getStatus()); if (target.isIdle()) { idleAgentQueueManager.send(idleAgentQueue, target.getId().getBytes()); } + + event.setAgent(target); } finally { unlock(lock.get()); } } + @EventListener + public void onProfileReceived(OnAgentProfileEvent event) { + agentProfileDao.save(event.getProfile()); + socketPushManager.push(topicForAgentProfile, PushEvent.STATUS_CHANGE, event.getProfile()); + } + @EventListener public void onDisconnected(OnDisconnectedEvent event) { Optional lock = lock(); diff --git a/core/src/main/java/com/flowci/core/api/OpenRestController.java b/core/src/main/java/com/flowci/core/api/OpenRestController.java index dcde64886..0ebddbc7e 100644 --- a/core/src/main/java/com/flowci/core/api/OpenRestController.java +++ b/core/src/main/java/com/flowci/core/api/OpenRestController.java @@ -17,9 +17,7 @@ package com.flowci.core.api; -import com.flowci.core.agent.domain.Agent; import com.flowci.core.agent.service.AgentService; -import com.flowci.core.api.adviser.ApiAuth; import com.flowci.core.api.domain.AddStatsItem; import com.flowci.core.api.domain.CreateJobArtifact; import com.flowci.core.api.domain.CreateJobReport; @@ -133,12 +131,6 @@ public void uploadJobArtifact(@PathVariable String name, openRestService.saveJobArtifact(name, number, meta, file); } - @PostMapping("/profile") - public void profile(@RequestHeader(ApiAuth.HeaderAgentToken) String token, - @RequestBody Agent.Resource resource) { - agentService.update(token, resource); - } - @PostMapping("/logs/upload") public void upload(@RequestPart("file") MultipartFile file) { try (InputStream stream = file.getInputStream()) { diff --git a/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java b/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java index 74396c8ec..b629acecb 100644 --- a/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java @@ -108,6 +108,11 @@ public String topicForAgents() { return "/topic/agents"; } + @Bean("topicForAgentProfile") + public String topicForAgentProfile() { + return "/topic/agent_profile"; + } + /** * To subscribe agent host update */ diff --git a/core/src/main/java/com/flowci/core/common/helper/DateHelper.java b/core/src/main/java/com/flowci/core/common/helper/DateHelper.java index 39948bedc..11079d3c0 100644 --- a/core/src/main/java/com/flowci/core/common/helper/DateHelper.java +++ b/core/src/main/java/com/flowci/core/common/helper/DateHelper.java @@ -16,10 +16,20 @@ package com.flowci.core.common.helper; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; import com.flowci.exception.ArgumentException; + +import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Date; /** @@ -29,6 +39,26 @@ public abstract class DateHelper { private static final SimpleDateFormat intDayFormatter = new SimpleDateFormat("yyyyMMdd"); + private static final DateTimeFormatter utaDateFormat = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC); + + public static class InstantUTCSerializer extends JsonSerializer { + + @Override + public void serialize(Instant instant, JsonGenerator gen, SerializerProvider provider) throws IOException { + String str = utaDateFormat.format(instant); + gen.writeString(str); + } + } + + public static class InstantUTCDeserializer extends JsonDeserializer { + + @Override + public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + return Instant.from(utaDateFormat.parse(p.getText())); + } + } + public static synchronized int toIntDay(Date date) { return Integer.parseInt(intDayFormatter.format(date)); } @@ -37,7 +67,7 @@ public static synchronized Instant toInstant(int day) { try { Date date = intDayFormatter.parse("" + day); return date.toInstant(); - } catch (ParseException e) { + } catch (ParseException e) { throw new ArgumentException("Invalid day format"); } } diff --git a/core/src/main/java/com/flowci/core/common/helper/JacksonHelper.java b/core/src/main/java/com/flowci/core/common/helper/JacksonHelper.java index d249aec19..4f875be16 100644 --- a/core/src/main/java/com/flowci/core/common/helper/JacksonHelper.java +++ b/core/src/main/java/com/flowci/core/common/helper/JacksonHelper.java @@ -23,6 +23,8 @@ import com.flowci.core.common.domain.JsonablePage; import org.springframework.data.domain.Pageable; +import java.time.Instant; + /** * @author yang */ @@ -35,6 +37,10 @@ public static ObjectMapper create() { SimpleModule module = new SimpleModule(); module.addDeserializer(Pageable.class, new JsonablePage.PageableDeserializer()); + + module.addSerializer(Instant.class, new DateHelper.InstantUTCSerializer()); + module.addDeserializer(Instant.class, new DateHelper.InstantUTCDeserializer()); + mapper.registerModule(module); return mapper; diff --git a/core/src/main/java/com/flowci/core/flow/service/YmlServiceImpl.java b/core/src/main/java/com/flowci/core/flow/service/YmlServiceImpl.java index 583108047..040415ea1 100644 --- a/core/src/main/java/com/flowci/core/flow/service/YmlServiceImpl.java +++ b/core/src/main/java/com/flowci/core/flow/service/YmlServiceImpl.java @@ -23,6 +23,7 @@ import com.flowci.core.flow.domain.Flow; import com.flowci.core.flow.domain.Yml; import com.flowci.core.plugin.event.GetPluginEvent; +import com.flowci.core.secret.event.GetSecretEvent; import com.flowci.domain.Vars; import com.flowci.exception.ArgumentException; import com.flowci.exception.DuplicateException; @@ -32,13 +33,15 @@ import com.flowci.tree.YmlParser; import com.flowci.util.StringHelper; import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.collect.ImmutableList; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import java.util.List; import java.util.Optional; -import java.util.Set; +import java.util.function.Function; + /** * @author yang @@ -46,6 +49,12 @@ @Service public class YmlServiceImpl implements YmlService { + private final List elementCheckers = ImmutableList.of( + new ConditionChecker(), + new PluginChecker(), + new SecretChecker() + ); + @Autowired private Cache flowTreeCache; @@ -102,18 +111,14 @@ public Yml saveYml(Flow flow, String name, String ymlInB64) { FlowNode root = YmlParser.load(yaml); NodeTree tree = NodeTree.create(root); - Optional hasErr = verifyPlugins(tree.getPlugins()); - if (hasErr.isPresent()) { - throw hasErr.get(); - } - - hasErr = verifyConditions(tree.getConditions()); - if (hasErr.isPresent()) { - throw hasErr.get(); + for (NodeElementChecker checker : elementCheckers) { + Optional exception = checker.apply(tree); + if (exception.isPresent()) { + throw exception.get(); + } } Yml ymlObj = getOrCreate(flow.getId(), name, ymlInB64); - try { ymlDao.save(ymlObj); } catch (DuplicateKeyException e) { @@ -155,36 +160,50 @@ private Yml getOrCreate(String flowId, String name, String ymlInB64) { return new Yml(flowId, name, ymlInB64); } - /** - * Check conditions are existed - * - * @param conditions list of condition script - * @return Optional exception - */ - private Optional verifyConditions(Set conditions) { - try { - for (String condition : conditions) { - conditionManager.verify(condition); + private interface NodeElementChecker extends Function> { + + } + + private class ConditionChecker implements NodeElementChecker { + + @Override + public Optional apply(NodeTree tree) { + try { + for (String c : tree.getConditions()) { + conditionManager.verify(c); + } + return Optional.empty(); + } catch (Throwable e) { + return Optional.of(new RuntimeException(e.getMessage())); + } + } + } + + private class PluginChecker implements NodeElementChecker { + + @Override + public Optional apply(NodeTree tree) { + for (String p : tree.getPlugins()) { + GetPluginEvent event = eventManager.publish(new GetPluginEvent(this, p)); + if (event.hasError()) { + return Optional.of(event.getError()); + } } return Optional.empty(); - } catch (Throwable e) { - return Optional.of(new RuntimeException(e.getMessage())); } } - /** - * Check plugins are existed - * - * @param plugins input plugin name list - * @return Optional exception - */ - private Optional verifyPlugins(Set plugins) { - for (String plugin : plugins) { - GetPluginEvent event = eventManager.publish(new GetPluginEvent(this, plugin)); - if (event.hasError()) { - return Optional.of(event.getError()); + private class SecretChecker implements NodeElementChecker { + + @Override + public Optional apply(NodeTree tree) { + for (String s : tree.getSecrets()) { + GetSecretEvent event = eventManager.publish(new GetSecretEvent(this, s)); + if (event.hasError()) { + return Optional.of(event.getError()); + } } + return Optional.empty(); } - return Optional.empty(); } } diff --git a/core/src/main/java/com/flowci/core/job/controller/JobController.java b/core/src/main/java/com/flowci/core/job/controller/JobController.java index aa93a6ee4..d01af78a6 100644 --- a/core/src/main/java/com/flowci/core/job/controller/JobController.java +++ b/core/src/main/java/com/flowci/core/job/controller/JobController.java @@ -139,6 +139,12 @@ public void createAndStart(@Validated @RequestBody CreateJob body) { public void rerun(@Validated @RequestBody RerunJob body) { Job job = jobService.get(body.getJobId()); Flow flow = flowService.getById(job.getFlowId()); + + if (body.isFromFailureStep()) { + jobService.rerunFromFailureStep(flow, job); + return; + } + jobService.rerun(flow, job); } diff --git a/core/src/main/java/com/flowci/core/job/dao/ExecutedCmdDao.java b/core/src/main/java/com/flowci/core/job/dao/ExecutedCmdDao.java index c3ca25e3c..f91841235 100644 --- a/core/src/main/java/com/flowci/core/job/dao/ExecutedCmdDao.java +++ b/core/src/main/java/com/flowci/core/job/dao/ExecutedCmdDao.java @@ -37,7 +37,9 @@ public interface ExecutedCmdDao extends MongoRepository { Optional findByJobIdAndNodePath(String jobId, String nodePath); - List findByFlowIdAndBuildNumber(String flowId, long buildNumber); + List findAllByJobIdAndNodePathIn(String jobId, Collection nodePaths); - List findByJobIdAndStatusIn(String jobId, Collection statuses); + List findAllByFlowIdAndBuildNumber(String flowId, long buildNumber); + + List findAllByJobIdAndStatusIn(String jobId, Collection statuses); } diff --git a/core/src/main/java/com/flowci/core/job/domain/Executed.java b/core/src/main/java/com/flowci/core/job/domain/Executed.java index ad3fd870d..e1f917dd9 100644 --- a/core/src/main/java/com/flowci/core/job/domain/Executed.java +++ b/core/src/main/java/com/flowci/core/job/domain/Executed.java @@ -15,7 +15,6 @@ public interface Executed { ); Set OngoingStatus = ImmutableSet.of( - Status.PENDING, Status.WAITING_AGENT, Status.RUNNING ); @@ -46,6 +45,8 @@ enum Status { RUNNING(1), + KILLING(1), + SUCCESS(2), SKIPPED(2), diff --git a/core/src/main/java/com/flowci/core/job/domain/Job.java b/core/src/main/java/com/flowci/core/job/domain/Job.java index cb0948042..1d582f376 100644 --- a/core/src/main/java/com/flowci/core/job/domain/Job.java +++ b/core/src/main/java/com/flowci/core/job/domain/Job.java @@ -18,14 +18,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.flowci.core.agent.domain.Agent; +import com.flowci.core.agent.domain.AgentProfile; import com.flowci.core.common.domain.Mongoable; import com.flowci.core.common.domain.Variables; import com.flowci.domain.StringVars; import com.flowci.domain.Vars; import com.flowci.store.Pathable; -import com.flowci.tree.Node; import com.flowci.util.StringHelper; import com.google.common.collect.ImmutableSet; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import org.springframework.data.mongodb.core.index.CompoundIndex; @@ -43,6 +44,7 @@ @Getter @Setter @Document(collection = "job") +@EqualsAndHashCode(callSuper = true) @CompoundIndex( name = "index_job_flowid_and_buildnum", def = "{'flowId': 1, 'buildNumber': 1}", @@ -159,7 +161,9 @@ public static class AgentSnapshot { private String os; - private int cpu; + private int cpuNum; + + private double cpuUsage; private int totalMemory; @@ -185,9 +189,9 @@ public static Pathable path(Long buildNumber) { private static final SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - private static final Integer MinPriority = 1; + public static final Integer MinPriority = 1; - private static final Integer MaxPriority = 255; + public static final Integer MaxPriority = 255; /** * Job key is generated from {flow id}-{build number} @@ -206,9 +210,12 @@ public static Pathable path(Long buildNumber) { private Status status = Status.PENDING; + private boolean onPostSteps = false; + // agent id : info private Map snapshots = new HashMap<>(); + // current running steps private Set currentPath = new HashSet<>(); private Vars context = new StringVars(); @@ -269,6 +276,11 @@ public boolean isDone() { return FINISH_STATUS.contains(status); } + @JsonIgnore + public boolean isFailure() { + return status == Status.FAILURE; + } + @JsonIgnore public String getQueueName() { return "flow.q." + flowId + ".job"; @@ -323,15 +335,16 @@ public boolean isExpired() { return Instant.now().compareTo(expireAt) > 0; } - public void addAgentSnapshot(Agent agent) { + public void addAgentSnapshot(Agent agent, AgentProfile profile) { AgentSnapshot s = new AgentSnapshot(); s.name = agent.getName(); s.os = agent.getOs().name(); - s.cpu = agent.getResource().getCpu(); - s.totalMemory = agent.getResource().getTotalMemory(); - s.freeMemory = agent.getResource().getFreeMemory(); - s.totalDisk = agent.getResource().getTotalDisk(); - s.freeDisk = agent.getResource().getFreeDisk(); + s.cpuNum = profile.getCpuNum(); + s.cpuUsage = profile.getCpuUsage(); + s.totalMemory = profile.getTotalMemory(); + s.freeMemory = profile.getFreeMemory(); + s.totalDisk = profile.getTotalDisk(); + s.freeDisk = profile.getFreeDisk(); this.snapshots.put(agent.getId(), s); } @@ -340,25 +353,16 @@ public void setErrorToContext(String err) { context.put(Variables.Job.Error, err); } - public void setCurrentPathFromNodes(List nodes) { + public Job resetCurrentPath() { this.currentPath.clear(); - for (Node n : nodes) { - this.currentPath.add(n.getPathAsString()); - } + return this; } - public void setCurrentPathFromNodes(Node node) { - this.currentPath.clear(); - this.currentPath.add(node.getPathAsString()); + public void addToCurrentPath(Step step) { + this.currentPath.add(step.getNodePath()); } - @Override - public boolean equals(Object o) { - return super.equals(o); - } - - @Override - public int hashCode() { - return super.hashCode(); + public void removeFromCurrentPath(Step step) { + this.currentPath.remove(step.getNodePath()); } } diff --git a/core/src/main/java/com/flowci/core/job/domain/JobAgent.java b/core/src/main/java/com/flowci/core/job/domain/JobAgent.java index f5f4cfd8c..0752e104e 100644 --- a/core/src/main/java/com/flowci/core/job/domain/JobAgent.java +++ b/core/src/main/java/com/flowci/core/job/domain/JobAgent.java @@ -39,16 +39,6 @@ public Collection all() { return agents.keySet(); } - public Collection allBusyAgents() { - List busy = new LinkedList<>(); - this.agents.forEach((k, v) -> { - if (!v.isEmpty()) { - busy.add(k); - } - }); - return busy; - } - public boolean isOccupiedByFlow(String agentId) { Set flows = agents.get(agentId); return flows != null && !flows.isEmpty(); @@ -85,7 +75,7 @@ public void remove(String agentId) { * Get agent if an agent was occupied within same flow */ public Optional getAgent(Node node) { - FlowNode flow = node.getParentFlowNode(); + FlowNode flow = node.getParent(FlowNode.class); for (Map.Entry> entry : agents.entrySet()) { String agentId = entry.getKey(); Set set = entry.getValue(); @@ -102,7 +92,7 @@ public Optional getAgent(Node node) { * find candidate agents within job, but still need to check Selector */ public List getCandidates(Node node) { - FlowNode flow = node.getParentFlowNode(); + FlowNode flow = node.getParent(FlowNode.class); int flowDepth = flow.getPath().depth(); List candidates = new ArrayList<>(agents.size()); diff --git a/core/src/main/java/com/flowci/core/job/domain/JobSmContext.java b/core/src/main/java/com/flowci/core/job/domain/JobSmContext.java index 427afcca4..8e807b44d 100644 --- a/core/src/main/java/com/flowci/core/job/domain/JobSmContext.java +++ b/core/src/main/java/com/flowci/core/job/domain/JobSmContext.java @@ -2,19 +2,31 @@ import com.flowci.sm.Context; import com.flowci.zookeeper.InterLock; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +@Getter +@Setter +@RequiredArgsConstructor public class JobSmContext extends Context { - public Job job; + private final String jobId; - public String yml; + private Job job; - public Step step; + private String yml; - public InterLock lock; + private Step step; + + private InterLock lock; public Job.Status getTargetToJobStatus() { String name = this.to.getName(); return Job.Status.valueOf(name); } + + public boolean hasLock() { + return lock != null; + } } diff --git a/core/src/main/java/com/flowci/core/job/domain/RerunJob.java b/core/src/main/java/com/flowci/core/job/domain/RerunJob.java index 924090f51..9bbed481d 100644 --- a/core/src/main/java/com/flowci/core/job/domain/RerunJob.java +++ b/core/src/main/java/com/flowci/core/job/domain/RerunJob.java @@ -25,4 +25,6 @@ public class RerunJob { @NotEmpty private String jobId; + + private boolean fromFailureStep; } diff --git a/core/src/main/java/com/flowci/core/job/domain/Step.java b/core/src/main/java/com/flowci/core/job/domain/Step.java index a704a337f..f6427ac91 100644 --- a/core/src/main/java/com/flowci/core/job/domain/Step.java +++ b/core/src/main/java/com/flowci/core/job/domain/Step.java @@ -80,6 +80,8 @@ public enum Type { private boolean allowFailure; + private boolean post; + private String plugin; private List dockers; @@ -166,6 +168,16 @@ public boolean isOngoing() { return OngoingStatus.contains(status); } + @JsonIgnore + public boolean isKilling() { + return status == Status.KILLING; + } + + @JsonIgnore + public boolean isFinished() { + return FinishStatus.contains(status); + } + public void setFrom(ShellOut out) { this.processId = out.getProcessId(); this.containerId = out.getContainerId(); diff --git a/core/src/main/java/com/flowci/core/job/manager/CmdManagerImpl.java b/core/src/main/java/com/flowci/core/job/manager/CmdManagerImpl.java index 4011d23ab..781f1f169 100644 --- a/core/src/main/java/com/flowci/core/job/manager/CmdManagerImpl.java +++ b/core/src/main/java/com/flowci/core/job/manager/CmdManagerImpl.java @@ -64,6 +64,7 @@ public ShellIn createShellCmd(Job job, Step step, Node node) { .setInputs(r.fetchEnvs().merge(job.getContext(), false)) .setTimeout(r.fetchTimeout(job.getTimeout())) .setRetry(r.fetchRetry(0)) + .setSecrets(r.getSecrets()) .setCache(r.getCache()); if (r.hasPlugin()) { diff --git a/core/src/main/java/com/flowci/core/job/manager/JobActionManager.java b/core/src/main/java/com/flowci/core/job/manager/JobActionManager.java deleted file mode 100644 index 63b1bc65b..000000000 --- a/core/src/main/java/com/flowci/core/job/manager/JobActionManager.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.flowci.core.job.manager; - -import com.flowci.core.job.domain.Step; -import com.flowci.core.job.domain.Job; - -public interface JobActionManager { - - void toLoading(Job job); - - void toCreated(Job job, String yml); - - void toStart(Job job); - - void toRun(Job job); - - void toContinue(Job job, Step step); - - void toCancelled(Job job, String reason); - - void toTimeout(Job job); -} diff --git a/core/src/main/java/com/flowci/core/job/service/JobActionService.java b/core/src/main/java/com/flowci/core/job/service/JobActionService.java new file mode 100644 index 000000000..8574ff484 --- /dev/null +++ b/core/src/main/java/com/flowci/core/job/service/JobActionService.java @@ -0,0 +1,21 @@ +package com.flowci.core.job.service; + +import com.flowci.core.agent.domain.ShellOut; +import com.flowci.exception.CIException; + +public interface JobActionService { + + void toLoading(String jobId); + + void toCreated(String jobId, String yml); + + void toStart(String jobId); + + void toRun(String jobId); + + void toContinue(String jobId, ShellOut shellOut); + + void toCancelled(String jobId, CIException exception); + + void toTimeout(String jobId); +} diff --git a/core/src/main/java/com/flowci/core/job/manager/JobActionManagerImpl.java b/core/src/main/java/com/flowci/core/job/service/JobActionServiceImpl.java similarity index 66% rename from core/src/main/java/com/flowci/core/job/manager/JobActionManagerImpl.java rename to core/src/main/java/com/flowci/core/job/service/JobActionServiceImpl.java index 3547bbe8d..13d659e0c 100644 --- a/core/src/main/java/com/flowci/core/job/manager/JobActionManagerImpl.java +++ b/core/src/main/java/com/flowci/core/job/service/JobActionServiceImpl.java @@ -14,11 +14,9 @@ * limitations under the License. */ -package com.flowci.core.job.manager; +package com.flowci.core.job.service; -import com.flowci.core.agent.domain.Agent; -import com.flowci.core.agent.domain.CmdIn; -import com.flowci.core.agent.domain.ShellIn; +import com.flowci.core.agent.domain.*; import com.flowci.core.agent.event.IdleAgentEvent; import com.flowci.core.agent.service.AgentService; import com.flowci.core.common.domain.Variables; @@ -33,8 +31,9 @@ import com.flowci.core.job.domain.*; import com.flowci.core.job.event.JobReceivedEvent; import com.flowci.core.job.event.JobStatusChangeEvent; -import com.flowci.core.job.service.LocalTaskService; -import com.flowci.core.job.service.StepService; +import com.flowci.core.job.manager.CmdManager; +import com.flowci.core.job.manager.YmlManager; +import com.flowci.core.job.util.Errors; import com.flowci.core.job.util.StatusHelper; import com.flowci.core.secret.domain.Secret; import com.flowci.core.secret.service.SecretService; @@ -48,7 +47,6 @@ import com.flowci.util.ObjectsHelper; import com.flowci.util.StringHelper; import com.flowci.zookeeper.InterLock; -import com.flowci.zookeeper.ZookeeperClient; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -57,7 +55,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.io.IOException; @@ -65,12 +62,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import static com.flowci.core.job.domain.Executed.Status.RUNNING; +import static com.flowci.core.job.domain.Executed.Status.WAITING_AGENT; + @Log4j2 @Service -public class JobActionManagerImpl implements JobActionManager { +public class JobActionServiceImpl implements JobActionService { private static final Status Pending = new Status(Job.Status.PENDING.name()); private static final Status Created = new Status(Job.Status.CREATED.name()); @@ -107,7 +106,7 @@ public class JobActionManagerImpl implements JobActionManager { private static final Transition RunningToRunning = new Transition(Running, Running); private static final Transition RunningToSuccess = new Transition(Running, Success); private static final Transition RunningToCancelling = new Transition(Running, Cancelling); - private static final Transition RunningToCanceled = new Transition(Running, Cancelled); + private static final Transition RunningToCancelled = new Transition(Running, Cancelled); private static final Transition RunningToTimeout = new Transition(Running, Timeout); private static final Transition RunningToFailure = new Transition(Running, Failure); @@ -116,17 +115,12 @@ public class JobActionManagerImpl implements JobActionManager { private static final long RetryInterval = 10 * 1000; // 10 seconds - private static final int DefaultJobLockTimeout = 20; // seconds - @Autowired private Path repoDir; @Autowired private Path tmpDir; - @Autowired - private ZookeeperClient zk; - @Autowired private JobDao jobDao; @@ -154,6 +148,9 @@ public class JobActionManagerImpl implements JobActionManager { @Autowired private LocalTaskService localTaskService; + @Autowired + private JobService jobService; + @Autowired private AgentService agentService; @@ -166,9 +163,6 @@ public class JobActionManagerImpl implements JobActionManager { @Autowired private StateMachine sm; - // job node execute thread pool - private final Map pool = new ConcurrentHashMap<>(); - @EventListener public void init(ContextRefreshedEvent ignore) { try { @@ -207,9 +201,9 @@ public void onIdleAgent(IdleAgentEvent event) { continue; } - Optional lock = lock(job.getId(), "LockJobFromIdleAgent"); + Optional lock = jobService.lock(job.getId()); if (!lock.isPresent()) { - toFailureStatus(job, null, new CIException("Fail to lock job")); + toFailureStatus(job, new CIException("Fail to lock job")); continue; } @@ -222,79 +216,88 @@ public void onIdleAgent(IdleAgentEvent event) { return; } } catch (Exception e) { - toFailureStatus(job, null, new CIException(e.getMessage())); + toFailureStatus(job, new CIException(e.getMessage())); } finally { - unlock(lock.get(), "LockJobFromIdleAgent"); + jobService.unlock(lock.get(), job.getId()); } } } @Override - public void toLoading(Job job) { - on(job, Job.Status.LOADING, null); + public void toLoading(String jobId) { + onTransition(jobId, Loading, null); } @Override - public void toCreated(Job job, String yml) { - on(job, Job.Status.CREATED, context -> { - context.yml = yml; + public void toCreated(String jobId, String yml) { + onTransition(jobId, Created, context -> { + context.setYml(yml); }); } @Override - public void toStart(Job job) { - on(job, Job.Status.QUEUED, null); + public void toStart(String jobId) { + onTransition(jobId, Queued, null); } @Override - public void toRun(Job job) { - if (job.isDone()) { - return; - } - on(job, Job.Status.RUNNING, null); + public void toRun(String jobId) { + onTransition(jobId, Running, null); } @Override - public void toContinue(Job job, Step step) { - if (job.isCancelling()) { - on(job, Job.Status.CANCELLED, (context) -> context.step = step); - return; - } - - on(job, Job.Status.RUNNING, (context) -> context.step = step); + public void toContinue(String jobId, ShellOut so) { + onTransition(jobId, Running, c -> { + Step step = stepService.get(so.getId()); + step.setFrom(so); + stepService.resultUpdate(step); + log.info("[Callback]: {}-{} = {}", step.getJobId(), step.getNodePath(), step.getStatus()); + + c.setStep(step); + Job job = c.getJob(); + log.debug("---- Job Status {} {} {} {}", job.isOnPostSteps(), step.getNodePath(), job.getStatus(), job.getStatusFromContext()); + + if (job.isCancelling()) { + c.setTo(Cancelled); + } + }); } @Override - public void toCancelled(Job job, String reason) { - on(job, Job.Status.CANCELLED, context -> { - context.setError(new CIException(reason)); + public void toCancelled(String jobId, CIException exception) { + onTransition(jobId, Cancelled, context -> { + context.setError(exception); + + Job job = context.getJob(); + Set currentPath = job.getCurrentPath(); + + if (!currentPath.isEmpty()) { + String nodePath = currentPath.iterator().next(); + context.setStep(stepService.get(job.getId(), nodePath)); + } }); } @Override - public void toTimeout(Job job) { - on(job, Job.Status.TIMEOUT, null); + public void toTimeout(String jobId) { + onTransition(jobId, Timeout, null); } private void fromPending() { - sm.add(PendingToCreated, new Action() { + sm.add(PendingToCreated, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; - String yml = context.yml; - - setupJobYamlAndSteps(job, yml); - setJobStatusAndSave(job, Job.Status.CREATED, StringHelper.EMPTY); + doFromXToCreated(context); } }); - sm.add(PendingToLoading, new Action() { + sm.add(PendingToLoading, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); setJobStatusAndSave(job, Job.Status.LOADING, null); - context.yml = fetchYamlFromGit(job); + context.setYml(fetchYamlFromGit(job)); sm.execute(Loading, Created, context); } @@ -305,7 +308,7 @@ public void onException(Throwable e, JobSmContext context) { } }); - sm.add(PendingToCancelled, new Action() { + sm.add(PendingToCancelled, new JobActionBase() { @Override public void accept(JobSmContext context) { context.setError(new Exception("cancelled while pending")); @@ -314,46 +317,50 @@ public void accept(JobSmContext context) { } private void fromLoading() { - sm.add(LoadingToFailure, new Action() { + sm.add(LoadingToFailure, new JobActionBase() { @Override public void accept(JobSmContext context) { // handled on ActionOnFinishStatus } }); - sm.add(LoadingToCreated, new Action() { + sm.add(LoadingToCreated, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; - String yml = context.yml; - - setupJobYamlAndSteps(job, yml); - setJobStatusAndSave(job, Job.Status.CREATED, StringHelper.EMPTY); + doFromXToCreated(context); } }); } + private void doFromXToCreated(JobSmContext context) { + Job job = context.getJob(); + String yml = context.getYml(); + + setupJobYamlAndSteps(job, yml); + setJobStatusAndSave(job, Job.Status.CREATED, StringHelper.EMPTY); + } + private void fromCreated() { - sm.add(CreatedToTimeout, new Action() { + sm.add(CreatedToTimeout, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); context.setError(new Exception("expired before enqueue")); log.debug("[Job: Timeout] {} has expired", job.getKey()); } }); - sm.add(CreatedToFailure, new Action() { + sm.add(CreatedToFailure, new JobActionBase() { @Override public void accept(JobSmContext context) { // handled on ActionOnFinishStatus } }); - sm.add(CreatedToQueued, new Action() { + sm.add(CreatedToQueued, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); setJobStatusAndSave(job, Job.Status.QUEUED, null); String queue = job.getQueueName(); @@ -372,14 +379,14 @@ public void onException(Throwable e, JobSmContext context) { } private void fromQueued() { - sm.add(QueuedToTimeout, new Action() { + sm.add(QueuedToTimeout, new JobActionBase() { @Override public void accept(JobSmContext context) { // handled on ActionOnFinishStatus } }); - sm.add(QueuedToCancelled, new Action() { + sm.add(QueuedToCancelled, new JobActionBase() { @Override public void accept(JobSmContext context) { context.setError(new Exception("cancelled from queue")); @@ -387,28 +394,40 @@ public void accept(JobSmContext context) { } }); - sm.add(QueuedToRunning, new Action() { + sm.add(QueuedToRunning, new JobActionBase() { + @Override public boolean canRun(JobSmContext context) { - return lockJobBefore(context); + return !context.getJob().isDone(); } @Override public void accept(JobSmContext context) throws Exception { - Job job = context.job; + Job job = context.getJob(); eventManager.publish(new JobReceivedEvent(this, job)); jobPriorityDao.addJob(job.getFlowId(), job.getBuildNumber()); - if (!waitIfJobNotOnTopPriority(job)) { + if (!waitIfJobNotOnTopPriority(context)) { return; } job.setStartAt(new Date()); setJobStatusAndSave(job, Job.Status.RUNNING, null); - // start from root path, and block current thread since don't send ack back to queue NodeTree tree = ymlManager.getTree(job); - executeJob(job, Lists.newArrayList(tree.getRoot())); + + // start job from job's current step path + List nodesToStart = Lists.newLinkedList(); + for (String p : job.getCurrentPath()) { + nodesToStart.add(tree.get(p)); + } + + if (nodesToStart.isEmpty()) { + nodesToStart.add(tree.getRoot()); + } + + logInfo(job, "QueuedToRunning: start from nodes " + nodesToStart.toString()); + executeJob(job, nodesToStart); } @Override @@ -416,17 +435,12 @@ public void onException(Throwable e, JobSmContext context) { context.setError(e); sm.execute(Queued, Failure, context); } - - @Override - public void onFinally(JobSmContext context) { - unlockJobAfter(context); - } }); - sm.add(QueuedToFailure, new Action() { + sm.add(QueuedToFailure, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); // set current step to exception for (String path : job.getCurrentPath()) { @@ -438,41 +452,31 @@ public void accept(JobSmContext context) { } private void fromRunning() { - sm.add(RunningToRunning, new Action() { + sm.add(RunningToRunning, new JobActionBase() { @Override public boolean canRun(JobSmContext context) { - return lockJobBefore(context); + return !context.getJob().isDone(); } @Override public void accept(JobSmContext context) throws Exception { - Job job = context.job; - Step step = context.step; + Job job = context.getJob(); + Step step = context.getStep(); - stepService.resultUpdate(step); updateJobContextAndLatestStatus(job, step); - log.debug("Step {} been recorded", step.getNodePath()); + setJobStatusAndSave(job, Job.Status.RUNNING, null); + log.debug("Step {} {} been recorded", step.getNodePath(), step.getStatus()); if (!step.isSuccess()) { toFinishStatus(context); return; } - setJobStatusAndSave(job, Job.Status.RUNNING, null); - NodeTree tree = ymlManager.getTree(job); - Node node = tree.get(step.getNodePath()); - - if (node.isLastChildOfParent()) { - String agentId = releaseAgentFromJob(context.job, node, step); - - if (assignAgentToWaitingStep(agentId, job, tree, false)) { - return; - } - - releaseAgentToPool(context.job, node, step); + if (releaseAgentOrAssignToWaitingStep(job, step)) { + return; } - if (toNextStep(context.job, context.step)) { + if (toNextStep(job, step)) { return; } @@ -484,61 +488,54 @@ public void onException(Throwable e, JobSmContext context) { context.setError(e); sm.execute(context.getCurrent(), Failure, context); } - - @Override - public void onFinally(JobSmContext context) { - unlockJobAfter(context); - } }); // do not lock job since it will be called from RunningToRunning status - sm.add(RunningToSuccess, new Action() { + sm.add(RunningToSuccess, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); logInfo(job, "finished with status {}", Success); } }); - sm.add(RunningToTimeout, new Action() { + sm.add(RunningToTimeout, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; - setOngingStepsToSkipped(job); + Job job = context.getJob(); + killOngoingSteps(job, job.isOnPostSteps()); } @Override public void onException(Throwable e, JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); setJobStatusAndSave(job, Job.Status.TIMEOUT, null); } }); // failure from job end or exception // do not lock job since it will be called from RunningToRunning status - sm.add(RunningToFailure, new Action() { + sm.add(RunningToFailure, new JobActionBase() { @Override - public void accept(JobSmContext context) { - Job job = context.job; - Step step = context.step; - stepService.toStatus(step, Step.Status.EXCEPTION, null, false); - setOngingStepsToSkipped(job); - logInfo(job, "finished with status {}", Failure); + public void accept(JobSmContext context) throws ScriptException { + Job job = context.getJob(); + killOngoingSteps(job, job.isOnPostSteps()); + runPostStepsIfNeeded(context); } @Override public void onException(Throwable e, JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); setJobStatusAndSave(job, Job.Status.FAILURE, e.getMessage()); } }); - sm.add(RunningToCancelling, new Action() { + sm.add(RunningToCancelling, new JobActionBase() { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); setJobStatusAndSave(job, Job.Status.CANCELLING, null); - setOngingStepsToSkipped(job); + killOngoingSteps(job, job.isOnPostSteps()); } @Override @@ -547,19 +544,20 @@ public void onException(Throwable e, JobSmContext context) { } }); - sm.add(RunningToCanceled, new Action() { + sm.add(RunningToCancelled, new JobActionBase() { @Override - public boolean canRun(JobSmContext context) { - return lockJobBefore(context); - } + public void accept(JobSmContext context) throws ScriptException { + Job job = context.getJob(); + + List steps = stepService.list(job, Sets.newHashSet(WAITING_AGENT)); + stepService.toStatus(steps, Step.Status.SKIPPED, null); - @Override - public void accept(JobSmContext context) { - Job job = context.job; JobAgent jobAgent = getJobAgent(job.getId()); + steps = stepService.list(job, Sets.newHashSet(RUNNING)); - if (jobAgent.allBusyAgents().isEmpty()) { - setOngingStepsToSkipped(job); + // no busy agents, run post steps directly if needed + if (getBusyAgents(jobAgent, steps).isEmpty()) { + runPostStepsIfNeeded(context); return; } @@ -568,65 +566,51 @@ public void accept(JobSmContext context) { @Override public void onException(Throwable e, JobSmContext context) { - Job job = context.job; - setOngingStepsToSkipped(job); + Job job = context.getJob(); + killOngoingSteps(job, job.isOnPostSteps()); setJobStatusAndSave(job, Job.Status.CANCELLED, e.getMessage()); } - - @Override - public void onFinally(JobSmContext context) { - unlockJobAfter(context); - } }); } private void fromCancelling() { - sm.add(CancellingToCancelled, new Action() { - @Override - public boolean canRun(JobSmContext context) { - return lockJobBefore(context); - } - - @Override - public void accept(JobSmContext context) { - Job job = context.job; - setOngingStepsToSkipped(job); - setJobStatusAndSave(job, Job.Status.CANCELLED, null); - } - + sm.add(CancellingToCancelled, new JobActionBase() { @Override - public void onFinally(JobSmContext context) { - unlockJobAfter(context); + public void accept(JobSmContext context) throws ScriptException { + Job job = context.getJob(); + if (!runPostStepsIfNeeded(context)) { + setJobStatusAndSave(job, Job.Status.CANCELLED, null); + } } }); } - private boolean lockJobBefore(JobSmContext context) { - Job job = context.job; - - Optional lock = lock(job.getId(), "LockJobBefore"); + /** + * Release agent from job, and try to assign to waiting step, or release to pool + * + * @return true = agent assigned to other waiting step, false = released + */ + private boolean releaseAgentOrAssignToWaitingStep(Job job, Step step) { + NodeTree tree = ymlManager.getTree(job); + Node node = tree.get(step.getNodePath()); - if (!lock.isPresent()) { - toFailureStatus(context.job, context.step, new CIException("Fail to lock job")); - return false; + if (node.isLastChildOfParent()) { + String agentId = releaseAgentFromJob(job, node, step); + if (assignAgentToWaitingStep(agentId, job, tree, false)) { + return true; + } + releaseAgentToPool(job, node, step); } - - context.lock = lock.get(); - context.job = reload(job.getId()); - log.debug("Job {} is locked", job.getId()); - return true; + return false; } - private void unlockJobAfter(JobSmContext context) { - Job job = context.job; - InterLock lock = context.lock; - unlock(lock, job.getId()); - } + private boolean waitIfJobNotOnTopPriority(JobSmContext context) { + Job job = context.getJob(); - private boolean waitIfJobNotOnTopPriority(Job job) { while (true) { if (job.isExpired()) { - on(job, Job.Status.TIMEOUT, (c) -> c.setError(new Exception("time out while queueing"))); + context.setError(new Exception("time out while queueing")); + sm.execute(context.getCurrent(), Timeout, context); return false; } @@ -640,13 +624,40 @@ private boolean waitIfJobNotOnTopPriority(Job job) { } ThreadHelper.sleep(RetryInterval); - job = reload(job.getId()); + job = getJob(job.getId()); log.debug("Job {}/{} wait since not on top priority", job.getFlowName(), job.getBuildNumber()); } } + /** + * All busy agents, which are occupied by flow and assigned to step + */ + private Collection getBusyAgents(JobAgent jobAgent, Collection ongoingSteps) { + Map> agents = jobAgent.getAgents(); + Set busy = new HashSet<>(agents.size()); + + agents.forEach((agentId, v) -> { + if (v.isEmpty()) { + return; + } + + Agent agent = agentService.get(agentId); + if (!agent.isBusy()) { + return; + } + + for (Step s : ongoingSteps) { + if (s.hasAgent() && s.getAgentId().equals(agentId)) { + busy.add(agent); + return; + } + } + }); + return busy; + } + private Optional fetchAgentFromJob(Job job, Node node) { - FlowNode flow = node.getParentFlowNode(); + FlowNode flow = node.getParent(FlowNode.class); Selector selector = flow.fetchSelector(); JobAgent agents = getJobAgent(job.getId()); @@ -674,14 +685,15 @@ private Optional fetchAgentFromJob(Job job, Node node) { } private Optional fetchAgentFromPool(Job job, Node node) { - FlowNode flow = node.getParentFlowNode(); + FlowNode flow = node.getParent(FlowNode.class); Selector selector = flow.fetchSelector(); // find agent outside job, blocking thread Optional optional = agentService.acquire(job.getId(), selector); if (optional.isPresent()) { Agent agent = optional.get(); - job.addAgentSnapshot(agent); + AgentProfile profile = agentService.getProfile(agent.getToken()); + job.addAgentSnapshot(agent, profile); jobAgentDao.addFlowToAgent(job.getId(), agent.getId(), flow.getPathAsString()); setJobStatusAndSave(job, job.getStatus(), null); return optional; @@ -691,7 +703,7 @@ private Optional fetchAgentFromPool(Job job, Node node) { } private boolean assignAgentToWaitingStep(String agentId, Job job, NodeTree tree, boolean shouldIdle) { - List steps = stepService.list(job, Lists.newArrayList(Executed.Status.WAITING_AGENT)); + List steps = stepService.list(job, Lists.newArrayList(WAITING_AGENT)); if (steps.isEmpty()) { return false; } @@ -702,14 +714,14 @@ private boolean assignAgentToWaitingStep(String agentId, Job job, NodeTree tree, } Node n = tree.get(waitingForAgentStep.getNodePath()); - FlowNode f = n.getParentFlowNode(); + FlowNode f = n.getParent(FlowNode.class); Selector s = f.fetchSelector(); Optional acquired = agentService.acquire(job.getId(), s, agentId, shouldIdle); if (acquired.isPresent()) { Agent agent = acquired.get(); - - job.addAgentSnapshot(agent); + AgentProfile profile = agentService.getProfile(agent.getToken()); + job.addAgentSnapshot(agent, profile); setJobStatusAndSave(job, job.getStatus(), null); jobAgentDao.addFlowToAgent(job.getId(), agent.getId(), f.getPathAsString()); @@ -723,7 +735,7 @@ private boolean assignAgentToWaitingStep(String agentId, Job job, NodeTree tree, private String releaseAgentFromJob(Job job, Node node, Step step) { String agentId = step.getAgentId(); - FlowNode flow = node.getParentFlowNode(); + FlowNode flow = node.getParent(FlowNode.class); jobAgentDao.removeFlowFromAgent(job.getId(), agentId, flow.getPathAsString()); return agentId; } @@ -737,14 +749,14 @@ private void releaseAgentToPool(Job job, Node node, Step step) { } NodeTree tree = ymlManager.getTree(job); - Selector currentSelector = node.getParentFlowNode().fetchSelector(); + Selector currentSelector = node.getParent(FlowNode.class).fetchSelector(); // find selectors of pending steps List notStartedSteps = stepService.list(job, Executed.WaitingStatus); Set selectors = new HashSet<>(tree.getSelectors().size()); for (Step s : notStartedSteps) { Node n = tree.get(s.getNodePath()); - Selector selector = n.getParentFlowNode().getSelector(); + Selector selector = n.getParent(FlowNode.class).getSelector(); selectors.add(selector); } @@ -757,11 +769,13 @@ private void releaseAgentToPool(Job job, Node node, Step step) { jobAgentDao.removeAgent(job.getId(), agentId); } - private void toFailureStatus(Job job, Step step, CIException e) { - on(job, Job.Status.FAILURE, (c) -> { - c.step = step; - c.setError(e); - }); + private void toFailureStatus(Job job, CIException e) { + JobSmContext context = new JobSmContext(job.getId()); + context.setJob(job); + context.setCurrent(new Status(job.getStatus().name())); + context.setTo(Failure); + context.setError(e); + sm.execute(context); } private void setupJobYamlAndSteps(Job job, String yml) { @@ -770,39 +784,60 @@ private void setupJobYamlAndSteps(Job job, String yml) { localTaskService.init(job); FlowNode root = ymlManager.parse(yml); - - job.setCurrentPathFromNodes(root); job.getContext().merge(root.getEnvironments(), false); } - private void setOngingStepsToSkipped(Job job) { - List steps = stepService.list(job, Executed.OngoingStatus); - for (Step step : steps) { - if (!step.hasAgent()) { + private void killOngoingSteps(Job job, boolean includePost) { + List steps = stepService.list(job, Sets.newHashSet(WAITING_AGENT)); + stepService.toStatus(steps, Step.Status.SKIPPED, null); + + steps = stepService.list(job, Sets.newHashSet(RUNNING)); + Iterator iter = steps.iterator(); + + while (iter.hasNext()) { + Step step = iter.next(); + + if (!includePost && step.isPost()) { + iter.remove(); continue; } - Agent agent = agentService.get(step.getAgentId()); - if (agent.isBusy()) { - CmdIn killCmd = cmdManager.createKillCmd(); - agentService.dispatch(killCmd, agent); + if (step.hasAgent()) { + Agent agent = agentService.get(step.getAgentId()); + if (agent.isBusy()) { + CmdIn killCmd = cmdManager.createKillCmd(); + agentService.dispatch(killCmd, agent); + iter.remove(); // update step status from callback + } } } - stepService.toStatus(steps, Step.Status.SKIPPED, null); + + stepService.toStatus(steps, Step.Status.KILLING, null); } - private void on(Job job, Job.Status target, Consumer configContext) { - Status current = new Status(job.getStatus().name()); - Status to = new Status(target.name()); + private void onTransition(String jobId, Status to, Consumer onContext) { + Optional lock = jobService.lock(jobId); - JobSmContext context = new JobSmContext(); - context.job = job; + if (!lock.isPresent()) { + Job job = getJob(jobId); + toFailureStatus(job, new CIException("Fail to lock job")); + return; + } + + log.debug("Job {} is locked", jobId); + Job job = getJob(jobId); + + JobSmContext context = new JobSmContext(jobId); + context.setLock(lock.get()); + context.setJob(job); + context.setCurrent(new Status(job.getStatus().name())); + context.setTo(to); - if (configContext != null) { - configContext.accept(context); + if (onContext != null) { + onContext.accept(context); } - sm.execute(current, to, context); + sm.execute(context); } private String fetchYamlFromGit(Job job) { @@ -856,6 +891,8 @@ private SimpleSecret getSimpleSecret(String credentialName) { /** * Dispatch next step to agent, job will be saved on final function of Running status * + * @param job current job + * @param step current step * @return true if next step dispatched or have to wait for previous steps, false if no more steps or failure */ private boolean toNextStep(Job job, Step step) throws ScriptException { @@ -863,6 +900,10 @@ private boolean toNextStep(Job job, Step step) throws ScriptException { Node node = tree.get(NodePath.create(step.getNodePath())); // current node List next = node.getNext(); + if (job.isOnPostSteps()) { + next = tree.post(step.getNodePath()); + } + if (next.isEmpty()) { Collection ends = Sets.newHashSet(tree.ends()); ends.remove(node); // do not check current node @@ -872,15 +913,18 @@ private boolean toNextStep(Job job, Step step) throws ScriptException { } // check prev steps status - Set previous = getStepsStatus(job, tree.prevs(next)); + Collection prevs = tree.prevs(next, job.isOnPostSteps()); + Set previous = getStepsStatus(job, prevs); boolean hasFailure = !Collections.disjoint(previous, Executed.FailureStatus); - boolean hasOngoing = !Collections.disjoint(previous, Executed.OngoingStatus); if (hasFailure) { return false; } + boolean hasOngoing = !Collections.disjoint(previous, Executed.OngoingStatus); + boolean hasWaiting = !Collections.disjoint(previous, Executed.WaitingStatus); + // do not execute next - if (hasOngoing) { + if (hasOngoing || hasWaiting) { return true; } @@ -909,7 +953,6 @@ private Set getStepsStatus(Job job, Collection nodes) { } private void executeJob(Job job, List nodes) throws ScriptException { - job.setCurrentPathFromNodes(nodes); setJobStatusAndSave(job, job.getStatus(), null); NodeTree tree = ymlManager.getTree(job); @@ -933,6 +976,10 @@ private void executeJob(Job job, List nodes) throws ScriptException { continue; } + // add dispatchable step + job.addToCurrentPath(step); + jobDao.save(job); + Optional optionalFromJob = fetchAgentFromJob(job, node); if (optionalFromJob.isPresent()) { dispatch(job, node, step, optionalFromJob.get()); @@ -946,7 +993,7 @@ private void executeJob(Job job, List nodes) throws ScriptException { continue; } - stepService.toStatus(step, Executed.Status.WAITING_AGENT, null, false); + stepService.toStatus(step, WAITING_AGENT, null, false); } } @@ -955,6 +1002,7 @@ private void executeJob(Job job, List nodes) throws ScriptException { */ private boolean runCondition(Job job, Node node) throws ScriptException { boolean shouldRun = true; + if (job.getTrigger() == Job.Trigger.MANUAL || job.getTrigger() == Job.Trigger.API) { if (node.getPath().isRoot()) { shouldRun = false; @@ -971,7 +1019,7 @@ private boolean runCondition(Job job, Node node) throws ScriptException { private void dispatch(Job job, Node node, Step step, Agent agent) { step.setAgentId(agent.getId()); - stepService.toStatus(step, Step.Status.RUNNING, null, false); + stepService.toStatus(step, RUNNING, null, false); ShellIn cmd = cmdManager.createShellCmd(job, step, node); agentService.dispatch(cmd, agent); @@ -988,7 +1036,7 @@ private void setSkipStatusToStep(Step step) { } private void toFinishStatus(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); Job.Status statusFromContext = job.getStatusFromContext(); String error = job.getErrorFromContext(); @@ -997,6 +1045,65 @@ private void toFinishStatus(JobSmContext context) { sm.execute(context.getCurrent(), new Status(statusFromContext.name()), context); } + /** + * To running post status, skip current state machine action and switch job status to Running + * Return `true` if post step found + */ + private boolean runPostStepsIfNeeded(JobSmContext context) throws ScriptException { + if (context.getError() == Errors.AgentOffline) { + return false; + } + + Job job = context.getJob(); + Step step = context.getStep(); + Objects.requireNonNull(step, "The step not defined when running post steps"); + + if (job.isOnPostSteps()) { + return true; + } + + NodeTree tree = ymlManager.getTree(job); + List nextPostSteps = tree.post(step.getNodePath()); + if (nextPostSteps.isEmpty()) { + return false; + } + + // remove running or finished post steps + Iterator iterator = nextPostSteps.iterator(); + while(iterator.hasNext()) { + Node postNode = iterator.next(); + Step postStep = stepService.get(job.getId(), postNode.getPathAsString()); + if (postStep.isOngoing() || postStep.isKilling() || postStep.isFinished()) { + iterator.remove(); + } + } + + // check current all steps that should be in finish status + List steps = stepService.listByPath(job, job.getCurrentPath()); + for (Step s : steps) { + log.debug("Step {} status ------------- {}", s.getNodePath(), s.getStatus()); + + if (s.isOngoing() || s.isKilling()) { + context.setSkip(true); + log.debug("Step ({} = {}) is ongoing or killing status", s.getNodePath(), s.getStatus()); + return true; + } + } + + log.debug("All current steps finished, will run post steps"); + context.setSkip(true); + + job.setOnPostSteps(true); + job.resetCurrentPath(); + job.setStatus(Job.Status.RUNNING); + job.setStatusToContext(Job.Status.valueOf(context.getTo().getName())); + + log.debug("Run post steps: {}", nextPostSteps); + log.debug("---- Job Status Before Post {} {}", job.getStatus(), job.getStatusFromContext()); + executeJob(job, nextPostSteps); + return true; + } + private void setJobStatusAndSave(Job job, Job.Status newStatus, String message) { // check status order, just save job if new status is downgrade if (job.getStatus().getOrder() >= newStatus.getOrder()) { @@ -1018,6 +1125,11 @@ private void setJobStatusAndSave(Job job, Job.Status newStatus, String message) private void updateJobContextAndLatestStatus(Job job, Step step) { job.setFinishAt(step.getFinishAt()); + // remove step path if success, keep failure for rerun later + if (step.isSuccess()) { + job.removeFromCurrentPath(step); + } + // merge output to job context Vars context = job.getContext(); context.merge(step.getOutput()); @@ -1026,6 +1138,11 @@ private void updateJobContextAndLatestStatus(Job job, Step step) { context.put(Variables.Job.FinishAt, job.finishAtInStr()); context.put(Variables.Job.Steps, stepService.toVarString(job, step)); + // DO NOT update job status from post step + if (step.isPost()) { + return; + } + // DO NOT update job status from context job.setStatusToContext(StatusHelper.convert(step)); job.setErrorToContext(step.getError()); @@ -1043,38 +1160,21 @@ private void logInfo(Job job, String message, Object... params) { log.info("[Job] " + job.getKey() + " " + message, params); } - private Job reload(String jobId) { + private Job getJob(String jobId) { return jobDao.findById(jobId).get(); } - private Optional lock(String key, String message) { - String path = zk.makePath("/job-locks", key); - Optional lock = zk.lock(path, DefaultJobLockTimeout); - lock.ifPresent(interLock -> log.debug("Lock: {} - {}", key, message)); - return lock; - } - - private void unlock(InterLock lock, String key) { - try { - zk.release(lock); - log.debug("Unlock: {}", key); - } catch (Exception warn) { - log.warn(warn); - } - } - private class ActionOnFinishStatus implements Consumer { @Override public void accept(JobSmContext context) { - Job job = context.job; + Job job = context.getJob(); // save job with status Throwable error = context.getError(); String message = error == null ? "" : error.getMessage(); setJobStatusAndSave(job, context.getTargetToJobStatus(), message); jobPriorityDao.removeJob(job.getFlowId(), job.getBuildNumber()); - pool.remove(job.getId()); JobAgent agents = getJobAgent(job.getId()); agentService.release(agents.all()); @@ -1082,4 +1182,19 @@ public void accept(JobSmContext context) { localTaskService.executeAsync(job); } } + + private abstract class JobActionBase extends Action { + + @Override + public void onFinally(JobSmContext context) { + InterLock lock = context.getLock(); + if (lock == null) { + return; + } + + Job job = context.getJob(); + jobService.unlock(lock, job.getId()); + context.setLock(null); + } + } } diff --git a/core/src/main/java/com/flowci/core/job/service/JobEventService.java b/core/src/main/java/com/flowci/core/job/service/JobEventService.java index d0f03b17b..5be4c719c 100644 --- a/core/src/main/java/com/flowci/core/job/service/JobEventService.java +++ b/core/src/main/java/com/flowci/core/job/service/JobEventService.java @@ -16,12 +16,12 @@ package com.flowci.core.job.service; -import com.flowci.core.job.domain.Step; +import com.flowci.core.agent.domain.ShellOut; public interface JobEventService { /** * Handle the executed cmd form callback queue */ - void handleCallback(Step execCmd); + void handleCallback(ShellOut shellOut); } diff --git a/core/src/main/java/com/flowci/core/job/service/JobEventServiceImpl.java b/core/src/main/java/com/flowci/core/job/service/JobEventServiceImpl.java index e554230b8..f9f738883 100644 --- a/core/src/main/java/com/flowci/core/job/service/JobEventServiceImpl.java +++ b/core/src/main/java/com/flowci/core/job/service/JobEventServiceImpl.java @@ -32,11 +32,10 @@ import com.flowci.core.flow.event.FlowDeletedEvent; import com.flowci.core.flow.event.FlowInitEvent; import com.flowci.core.job.domain.Job; -import com.flowci.core.job.domain.Step; import com.flowci.core.job.event.CreateNewJobEvent; import com.flowci.core.job.event.TtyStatusUpdateEvent; -import com.flowci.core.job.manager.JobActionManager; import com.flowci.core.job.manager.YmlManager; +import com.flowci.core.job.util.Errors; import com.flowci.tree.FlowNode; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; @@ -65,7 +64,7 @@ public class JobEventServiceImpl implements JobEventService { private RabbitOperations jobsQueueManager; @Autowired - private JobActionManager jobActionManager; + private JobActionService jobActionService; @Autowired private ConditionManager conditionManager; @@ -144,8 +143,7 @@ public void updateJobAndStepWhenOffline(AgentStatusEvent event) { return; } - Job job = jobService.get(agent.getJobId()); - jobActionManager.toCancelled(job, "Agent unexpected offline"); + jobActionService.toCancelled(agent.getJobId(), Errors.AgentOffline); } @EventListener @@ -158,11 +156,7 @@ public void handleCmdOutFromAgent(OnCmdOutEvent event) { switch (ind) { case CmdOut.ShellOutInd: ShellOut shellOut = objectMapper.readValue(body, ShellOut.class); - Step step = stepService.get(shellOut.getId()); - step.setFrom(shellOut); - - log.info("[Callback]: {}-{} = {}", step.getJobId(), step.getNodePath(), step.getStatus()); - handleCallback(step); + handleCallback(shellOut); break; case CmdOut.TtyOutInd: @@ -179,9 +173,9 @@ public void handleCmdOutFromAgent(OnCmdOutEvent event) { } @Override - public void handleCallback(Step step) { - Job job = jobService.get(step.getJobId()); - jobActionManager.toContinue(job, step); + public void handleCallback(ShellOut so) { + String jobId = stepService.get(so.getId()).getJobId(); + jobActionService.toContinue(jobId, so); } //==================================================================== @@ -194,8 +188,7 @@ public void startJobDeadLetterConsumer() throws IOException { jobsQueueManager.startConsumer(deadLetterQueue, true, (header, body, envelope) -> { String jobId = new String(body); try { - Job job = jobService.get(jobId); - jobActionManager.toTimeout(job); + jobActionService.toTimeout(jobId); } catch (Exception e) { log.warn(e); } @@ -207,21 +200,15 @@ public void startJobDeadLetterConsumer() throws IOException { // %% Utils //==================================================================== - private void logInfo(Job job, String message, Object... params) { - log.info("[Job] " + job.getKey() + " " + message, params); - } - private void declareJobQueueAndStartConsumer(Flow flow) { try { final String queue = flow.getQueueName(); jobsQueueManager.declare(queue, true, 255, rabbitProperties.getJobDlExchange()); jobsQueueManager.startConsumer(queue, false, (header, body, envelope) -> { - String jobId = new String(body); try { - Job job = jobService.get(jobId); - logInfo(job, "received from queue"); - jobActionManager.toRun(job); + String jobId = new String(body); + jobActionService.toRun(jobId); } catch (Exception e) { log.warn(e); } diff --git a/core/src/main/java/com/flowci/core/job/service/JobService.java b/core/src/main/java/com/flowci/core/job/service/JobService.java index a0c22ceb0..203730e20 100644 --- a/core/src/main/java/com/flowci/core/job/service/JobService.java +++ b/core/src/main/java/com/flowci/core/job/service/JobService.java @@ -23,9 +23,11 @@ import com.flowci.core.job.domain.JobItem; import com.flowci.core.job.domain.JobYml; import com.flowci.domain.StringVars; +import com.flowci.zookeeper.InterLock; import org.springframework.data.domain.Page; import javax.annotation.Nullable; +import java.util.Optional; /** * @author yang @@ -88,9 +90,23 @@ public interface JobService { */ Job rerun(Flow flow, Job job); + /** + * Restart job from failure step + */ + Job rerunFromFailureStep(Flow flow, Job job); + /** * Delete all jobs of the flow within an executor */ void delete(Flow flow); + + /** + * Lock job by id + * + * @param jobId + */ + Optional lock(String jobId); + + void unlock(InterLock lock, String jobId); } diff --git a/core/src/main/java/com/flowci/core/job/service/JobServiceImpl.java b/core/src/main/java/com/flowci/core/job/service/JobServiceImpl.java index 5ada4c2af..f2087e766 100644 --- a/core/src/main/java/com/flowci/core/job/service/JobServiceImpl.java +++ b/core/src/main/java/com/flowci/core/job/service/JobServiceImpl.java @@ -26,7 +26,6 @@ import com.flowci.core.job.domain.Job.Trigger; import com.flowci.core.job.event.JobCreatedEvent; import com.flowci.core.job.event.JobDeletedEvent; -import com.flowci.core.job.manager.JobActionManager; import com.flowci.core.job.manager.YmlManager; import com.flowci.core.user.domain.User; import com.flowci.domain.StringVars; @@ -37,6 +36,8 @@ import com.flowci.store.FileManager; import com.flowci.tree.FlowNode; import com.flowci.util.StringHelper; +import com.flowci.zookeeper.InterLock; +import com.flowci.zookeeper.ZookeeperClient; import com.google.common.collect.Maps; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; @@ -66,6 +67,8 @@ public class JobServiceImpl implements JobService { private static final Sort SortByBuildNumber = Sort.by(Direction.DESC, "buildNumber"); + private static final int DefaultJobLockTimeout = 20; // seconds + //==================================================================== // %% Spring injection //==================================================================== @@ -105,7 +108,7 @@ public class JobServiceImpl implements JobService { private FileManager fileManager; @Autowired - private JobActionManager jobActionManager; + private JobActionService jobActionService; @Autowired private StepService stepService; @@ -116,6 +119,9 @@ public class JobServiceImpl implements JobService { @Autowired private SettingService settingService; + @Autowired + private ZookeeperClient zk; + //==================================================================== // %% Public functions //==================================================================== @@ -179,7 +185,7 @@ public Job create(Flow flow, String yml, Trigger trigger, StringVars input) { eventManager.publish(new JobCreatedEvent(this, job)); if (job.isYamlFromRepo()) { - jobActionManager.toLoading(job); + jobActionService.toLoading(job.getId()); return job; } @@ -187,18 +193,18 @@ public Job create(Flow flow, String yml, Trigger trigger, StringVars input) { throw new ArgumentException("YAML config is required to start a job"); } - jobActionManager.toCreated(job, yml); - return job; + jobActionService.toCreated(job.getId(), yml); + return get(job.getId()); } @Override public void start(Job job) { - jobActionManager.toStart(job); + jobActionService.toStart(job.getId()); } @Override public void cancel(Job job) { - jobActionManager.toCancelled(job, StringHelper.EMPTY); + jobActionService.toCancelled(job.getId(), null); } @Override @@ -220,7 +226,8 @@ public Job rerun(Flow flow, Job job) { job.setSnapshots(Maps.newHashMap()); job.setStatus(Job.Status.PENDING); job.setTrigger(Trigger.MANUAL); - job.setCurrentPathFromNodes(root); + job.resetCurrentPath(); + job.setPriority(Job.MaxPriority); job.setCreatedBy(sessionManager.getUserEmail()); // re-init job context @@ -232,14 +239,45 @@ public Job rerun(Flow flow, Job job) { context.put(GIT_COMMIT_ID, lastCommitId); context.put(Variables.Job.TriggerBy, sessionManager.get().getEmail()); context.merge(root.getEnvironments(), false); + jobDao.save(job); + + // reset job agent + jobAgentDao.save(new JobAgent(job.getId(), flow.getId())); // cleanup stepService.delete(job); localTaskService.delete(job); ymlManager.delete(job); - jobActionManager.toCreated(job, yml.getRaw()); - jobActionManager.toStart(job); + jobActionService.toCreated(job.getId(), yml.getRaw()); + jobActionService.toStart(job.getId()); + return get(job.getId()); + } + + @Override + public Job rerunFromFailureStep(Flow flow, Job job) { + if (!job.isDone()) { + throw new StatusException("Job not finished, cannot re-start"); + } + + if (!job.isFailure()) { + throw new StatusException("Job is not failure status, cannot re-start from failure step"); + } + + // reset job properties + job.setFinishAt(null); + job.setStartAt(null); + job.setExpire(flow.getStepTimeout()); + job.setSnapshots(Maps.newHashMap()); + job.setPriority(Job.MaxPriority); + job.setStatus(Job.Status.CREATED); + job.setTrigger(Trigger.MANUAL); + job.setCreatedBy(sessionManager.getUserEmail()); + + // reset job agent + jobAgentDao.save(new JobAgent(job.getId(), flow.getId())); + + jobActionService.toStart(job.getId()); return job; } @@ -268,6 +306,24 @@ public void delete(Flow flow) { }); } + @Override + public Optional lock(String jobId) { + String path = zk.makePath("/job-locks", jobId); + Optional lock = zk.lock(path, DefaultJobLockTimeout); + lock.ifPresent(interLock -> log.debug("Lock: {}", jobId)); + return lock; + } + + @Override + public void unlock(InterLock lock, String jobId) { + try { + zk.release(lock); + log.debug("Unlock: {}", jobId); + } catch (Exception warn) { + log.warn(warn); + } + } + //==================================================================== // %% Utils //==================================================================== diff --git a/core/src/main/java/com/flowci/core/job/service/StepService.java b/core/src/main/java/com/flowci/core/job/service/StepService.java index b07c8a07e..24f2a4d8d 100644 --- a/core/src/main/java/com/flowci/core/job/service/StepService.java +++ b/core/src/main/java/com/flowci/core/job/service/StepService.java @@ -54,6 +54,11 @@ public interface StepService { */ List list(Job job, Collection status); + /** + * List steps by given paths + */ + List listByPath(Job job, Collection paths); + /** * Get step list in string, {name}={stats};{name}={stats} * No steps after current node @@ -74,7 +79,7 @@ public interface StepService { /** * To update properties are related with cmd executed result */ - void resultUpdate(Step result); + void resultUpdate(Step stepFromAgent); /** * Delete steps by flow id diff --git a/core/src/main/java/com/flowci/core/job/service/StepServiceImpl.java b/core/src/main/java/com/flowci/core/job/service/StepServiceImpl.java index f6dd29ed3..fa09ba727 100644 --- a/core/src/main/java/com/flowci/core/job/service/StepServiceImpl.java +++ b/core/src/main/java/com/flowci/core/job/service/StepServiceImpl.java @@ -34,8 +34,6 @@ import java.util.*; -import static com.google.common.base.Preconditions.checkNotNull; - /** * ExecutedCmd == Step * @@ -100,7 +98,12 @@ public List list(Job job) { @Override public List list(Job job, Collection status) { - return executedCmdDao.findByJobIdAndStatusIn(job.getId(), status); + return executedCmdDao.findAllByJobIdAndStatusIn(job.getId(), status); + } + + @Override + public List listByPath(Job job, Collection paths) { + return executedCmdDao.findAllByJobIdAndNodePathIn(job.getId(), paths); } @Override @@ -151,19 +154,23 @@ public Collection toStatus(Collection steps, Executed.Status status, public Step toStatus(Step entity, Executed.Status status, String err, boolean allChildren) { saveStatus(entity, status, err); - Step parent = getWithNullReturn(entity.getJobId(), entity.getParent()); - updateAllParents(parent, entity); + // update parent status if not post step + if (!entity.isPost()) { + Step parent = getWithNullReturn(entity.getJobId(), entity.getParent()); + updateAllParents(parent, entity); + } if (allChildren) { NodeTree tree = ymlManager.getTree(entity.getJobId()); NodePath path = NodePath.create(entity.getNodePath()); Node node = tree.get(path); - node.forEachChildren((childNode) -> { - Step childStep = get(entity.getJobId(), childNode.getPathAsString()); + + for (Node child : node.getChildren()) { + Step childStep = get(entity.getJobId(), child.getPathAsString()); childStep.setStartAt(entity.getStartAt()); childStep.setFinishAt(entity.getFinishAt()); saveStatus(childStep, status, err); - }); + } } String jobId = entity.getJobId(); @@ -175,20 +182,9 @@ public Step toStatus(Step entity, Executed.Status status, String err, boolean al } @Override - public void resultUpdate(Step cmd) { - checkNotNull(cmd.getId()); - Step entity = get(cmd.getId()); - - // only update properties should from agent - entity.setProcessId(cmd.getProcessId()); - entity.setCode(cmd.getCode()); - entity.setStartAt(cmd.getStartAt()); - entity.setFinishAt(cmd.getFinishAt()); - entity.setLogSize(cmd.getLogSize()); - entity.setOutput(cmd.getOutput()); - + public void resultUpdate(Step stepFromAgent) { // change status and save - toStatus(entity, cmd.getStatus(), cmd.getError(), false); + toStatus(stepFromAgent, stepFromAgent.getStatus(), stepFromAgent.getError(), false); } @Override @@ -230,19 +226,15 @@ private void updateAllParents(Step parent, Step current) { } private Step saveStatus(Step step, Step.Status status, String error) { - if (status == step.getStatus()) { - return step; - } - - step.setStatus(status); step.setError(error); + step.setStatus(status); executedCmdDao.save(step); return step; } private List list(String jobId, String flowId, long buildNumber) { return jobStepCache.get(jobId, - s -> executedCmdDao.findByFlowIdAndBuildNumber(flowId, buildNumber)); + s -> executedCmdDao.findAllByFlowIdAndBuildNumber(flowId, buildNumber)); } private static Step newInstance(Job job, Node node) { @@ -268,6 +260,7 @@ private static Step newInstance(Job job, Node node) { step.setAllowFailure(r.isAllowFailure()); step.setPlugin(r.getPlugin()); step.setType(Step.Type.STEP); + step.setPost(r.isPost()); if (r.hasChildren()) { step.setType(Step.Type.STAGE); diff --git a/core/src/main/java/com/flowci/core/job/util/Errors.java b/core/src/main/java/com/flowci/core/job/util/Errors.java new file mode 100644 index 000000000..9d547d72c --- /dev/null +++ b/core/src/main/java/com/flowci/core/job/util/Errors.java @@ -0,0 +1,9 @@ +package com.flowci.core.job.util; + +import com.flowci.exception.CIException; +import com.flowci.exception.StatusException; + +public abstract class Errors { + + public final static CIException AgentOffline = new StatusException("Agent unexpected offline"); +} diff --git a/core/src/main/java/com/flowci/core/secret/event/GetSecretEvent.java b/core/src/main/java/com/flowci/core/secret/event/GetSecretEvent.java index b617698f5..1830de538 100644 --- a/core/src/main/java/com/flowci/core/secret/event/GetSecretEvent.java +++ b/core/src/main/java/com/flowci/core/secret/event/GetSecretEvent.java @@ -30,8 +30,8 @@ public class GetSecretEvent extends AbstractEvent { private final String name; - public GetSecretEvent(Object source, String credentialName) { + public GetSecretEvent(Object source, String secretName) { super(source); - this.name = credentialName; + this.name = secretName; } } diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties index 596116916..ed15459a8 100644 --- a/core/src/main/resources/application.properties +++ b/core/src/main/resources/application.properties @@ -2,7 +2,7 @@ logging.level.org.apache.zookeeper=ERROR logging.level.org.apache.curator.framework=ERROR logging.level.com.flowci.core=${FLOWCI_LOG_LEVEL:INFO} -info.app.version=0.21.05 +info.app.version=0.21.21 info.app.name=flow.ci server.port=${FLOWCI_SERVER_PORT:8080} diff --git a/core/src/test/java/com/flowci/core/test/agent/AgentControllerTest.java b/core/src/test/java/com/flowci/core/test/agent/AgentControllerTest.java index 8c292cdf3..156726b58 100644 --- a/core/src/test/java/com/flowci/core/test/agent/AgentControllerTest.java +++ b/core/src/test/java/com/flowci/core/test/agent/AgentControllerTest.java @@ -19,7 +19,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.flowci.core.agent.domain.Agent; -import com.flowci.core.agent.domain.CreateOrUpdateAgent; +import com.flowci.core.agent.domain.AgentOption; import com.flowci.core.agent.domain.DeleteAgent; import com.flowci.core.common.config.AppProperties; import com.flowci.core.common.domain.StatusCode; @@ -110,7 +110,7 @@ public void should_delete_agent() throws Throwable { } private Agent createAgent(String name, Set tags, Integer code) throws Exception { - CreateOrUpdateAgent create = new CreateOrUpdateAgent(); + AgentOption create = new AgentOption(); create.setName(name); create.setTags(tags); diff --git a/core/src/test/java/com/flowci/core/test/agent/AgentServiceTest.java b/core/src/test/java/com/flowci/core/test/agent/AgentServiceTest.java index f172c2e41..0771a3995 100644 --- a/core/src/test/java/com/flowci/core/test/agent/AgentServiceTest.java +++ b/core/src/test/java/com/flowci/core/test/agent/AgentServiceTest.java @@ -19,6 +19,7 @@ import com.flowci.core.agent.domain.Agent; import com.flowci.core.agent.domain.Agent.Status; import com.flowci.core.agent.domain.CmdIn; +import com.flowci.core.agent.domain.AgentOption; import com.flowci.core.agent.domain.ShellIn; import com.flowci.core.agent.event.CmdSentEvent; import com.flowci.core.agent.service.AgentService; @@ -31,7 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -56,7 +56,10 @@ public void should_init_root_node() { @Test public void should_create_agent_in_db() { - Agent agent = agentService.create("hello.test", ImmutableSet.of("local", "android"), Optional.empty()); + Agent agent = agentService.create(new AgentOption() + .setName("hello.test") + .setTags(ImmutableSet.of("local", "android")) + ); Assert.assertNotNull(agent); Assert.assertEquals(agent, agentService.get(agent.getId())); } @@ -64,7 +67,10 @@ public void should_create_agent_in_db() { @Test public void should_make_agent_online() throws InterruptedException { // init: - Agent agent = agentService.create("hello.test", ImmutableSet.of("local", "android"), Optional.empty()); + Agent agent = agentService.create(new AgentOption() + .setName("hello.test") + .setTags(ImmutableSet.of("local", "android")) + ); // when: Agent online = mockAgentOnline(agent.getToken()); @@ -78,7 +84,7 @@ public void should_make_agent_online() throws InterruptedException { public void should_dispatch_cmd_to_agent() throws InterruptedException { // init: CmdIn cmd = new ShellIn(); - Agent agent = agentService.create("hello.agent", null, Optional.empty()); + Agent agent = agentService.create(new AgentOption().setName("hello.agent")); // when: CountDownLatch counter = new CountDownLatch(1); diff --git a/core/src/test/java/com/flowci/core/test/job/JobServiceTest.java b/core/src/test/java/com/flowci/core/test/job/JobServiceTest.java index 049a48c36..48c9906f8 100644 --- a/core/src/test/java/com/flowci/core/test/job/JobServiceTest.java +++ b/core/src/test/java/com/flowci/core/test/job/JobServiceTest.java @@ -17,9 +17,7 @@ package com.flowci.core.test.job; import com.flowci.core.agent.dao.AgentDao; -import com.flowci.core.agent.domain.Agent; -import com.flowci.core.agent.domain.CmdIn; -import com.flowci.core.agent.domain.ShellIn; +import com.flowci.core.agent.domain.*; import com.flowci.core.agent.event.AgentStatusEvent; import com.flowci.core.agent.event.CmdSentEvent; import com.flowci.core.agent.service.AgentService; @@ -32,21 +30,23 @@ import com.flowci.core.job.dao.ExecutedCmdDao; import com.flowci.core.job.dao.JobAgentDao; import com.flowci.core.job.dao.JobDao; -import com.flowci.core.job.domain.Step; import com.flowci.core.job.domain.Job; import com.flowci.core.job.domain.Job.Status; import com.flowci.core.job.domain.Job.Trigger; +import com.flowci.core.job.domain.Step; import com.flowci.core.job.event.JobReceivedEvent; import com.flowci.core.job.event.JobStatusChangeEvent; import com.flowci.core.job.event.StartAsyncLocalTaskEvent; -import com.flowci.core.job.manager.JobActionManager; import com.flowci.core.job.manager.YmlManager; import com.flowci.core.job.service.*; import com.flowci.core.plugin.dao.PluginDao; import com.flowci.core.plugin.domain.Plugin; import com.flowci.core.test.ZookeeperScenario; import com.flowci.domain.*; -import com.flowci.tree.*; +import com.flowci.tree.FlowNode; +import com.flowci.tree.Node; +import com.flowci.tree.NodeTree; +import com.flowci.tree.YmlParser; import com.flowci.util.StringHelper; import lombok.extern.log4j.Log4j2; import org.junit.Assert; @@ -63,7 +63,6 @@ import java.io.IOException; import java.util.Date; import java.util.List; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -111,7 +110,7 @@ public class JobServiceTest extends ZookeeperScenario { private AgentService agentService; @Autowired - private JobActionManager jobActionManager; + private JobActionService jobActionService; @Autowired private YmlManager ymlManager; @@ -193,12 +192,13 @@ public void should_start_new_job() throws Throwable { // when: create and start job Job job = jobService.create(flow, yml.getRaw(), Trigger.MANUAL, StringVars.EMPTY); - NodeTree tree = ymlManager.getTree(job); + job = jobService.get(job.getId()); Assert.assertEquals(Status.CREATED, job.getStatus()); - Assert.assertTrue(job.getCurrentPath().contains(tree.getRoot().getPathAsString())); + Assert.assertTrue(job.getCurrentPath().isEmpty()); - jobActionManager.toStart(job); + jobActionService.toStart(job.getId()); + job = jobService.get(job.getId()); Assert.assertEquals(Status.QUEUED, job.getStatus()); Assert.assertNotNull(job); @@ -227,7 +227,7 @@ public void should_finish_whole_job() throws InterruptedException, IOException { String yaml = StringHelper.toString(load("flow-with-notify.yml")); yml = ymlService.saveYml(flow, Yml.DEFAULT_NAME, StringHelper.toBase64(yaml)); - Agent agent = agentService.create("hello.agent", null, Optional.empty()); + Agent agent = agentService.create(new AgentOption().setName("hello.agent")); mockAgentOnline(agent.getToken()); Job job = jobService.create(flow, yml.getRaw(), Trigger.MANUAL, StringVars.EMPTY); @@ -283,7 +283,7 @@ public void should_finish_whole_job() throws InterruptedException, IOException { } }); - jobActionManager.toStart(job); + jobActionService.toStart(job.getId()); Assert.assertTrue(counterForStep1.await(10, TimeUnit.SECONDS)); // then: verify step 1 agent @@ -309,7 +309,7 @@ public void should_finish_whole_job() throws InterruptedException, IOException { firstStep.setFinishAt(new Date()); executedCmdDao.save(firstStep); - jobEventService.handleCallback(firstStep); + jobEventService.handleCallback(getShellOutFromStep(firstStep)); // then: verify step 2 agent Assert.assertTrue(counterForStep2.await(10, TimeUnit.SECONDS)); @@ -334,7 +334,7 @@ public void should_finish_whole_job() throws InterruptedException, IOException { secondStep.setFinishAt(new Date()); executedCmdDao.save(secondStep); - jobEventService.handleCallback(secondStep); + jobEventService.handleCallback(getShellOutFromStep(secondStep)); // // then: should job with SUCCESS status and sent notification task Assert.assertEquals(Status.SUCCESS, jobService.get(job.getId()).getStatus()); @@ -344,7 +344,7 @@ public void should_finish_whole_job() throws InterruptedException, IOException { @Test public void should_handle_cmd_callback_for_success_status() { // init: agent and job - Agent agent = agentService.create("hello.agent", null, Optional.empty()); + Agent agent = agentService.create(new AgentOption().setName("hello.agent")); Job job = prepareJobForRunningStatus(agent); NodeTree tree = ymlManager.getTree(job); @@ -357,8 +357,9 @@ public void should_handle_cmd_callback_for_success_status() { firstStep.setStatus(Step.Status.SUCCESS); firstStep.setOutput(output); + executedCmdDao.save(firstStep); - jobEventService.handleCallback(firstStep); + jobEventService.handleCallback(getShellOutFromStep(firstStep)); // then: job context should be updated job = jobDao.findById(job.getId()).get(); @@ -375,7 +376,7 @@ public void should_handle_cmd_callback_for_success_status() { secondStep.setStatus(Step.Status.SUCCESS); secondStep.setOutput(output); executedCmdDao.save(secondStep); - jobEventService.handleCallback(secondStep); + jobEventService.handleCallback(getShellOutFromStep(secondStep)); // then: job context should be updated job = jobDao.findById(job.getId()).get(); @@ -389,7 +390,7 @@ public void should_handle_cmd_callback_for_failure_status() throws IOException { // init: agent and job String yaml = StringHelper.toString(load("flow-with-failure.yml")); yml = ymlService.saveYml(flow, Yml.DEFAULT_NAME, StringHelper.toBase64(yaml)); - Agent agent = agentService.create("hello.agent", null, Optional.empty()); + Agent agent = agentService.create(new AgentOption().setName("hello.agent")); Job job = prepareJobForRunningStatus(agent); NodeTree tree = ymlManager.getTree(job); @@ -399,7 +400,7 @@ public void should_handle_cmd_callback_for_failure_status() throws IOException { // when: cmd of first node with failure firstStep.setStatus(Step.Status.EXCEPTION); executedCmdDao.save(firstStep); - jobEventService.handleCallback(firstStep); + jobEventService.handleCallback(getShellOutFromStep(firstStep)); // then: job should be failure job = jobDao.findById(job.getId()).get(); @@ -412,7 +413,7 @@ public void should_handle_cmd_callback_for_failure_status_but_allow_failure() th // init: agent and job String yaml = StringHelper.toString(load("flow-all-failure.yml")); yml = ymlService.saveYml(flow, Yml.DEFAULT_NAME, StringHelper.toBase64(yaml)); - Agent agent = agentService.create("hello.agent", null, Optional.empty()); + Agent agent = agentService.create(new AgentOption().setName("hello.agent")); Job job = prepareJobForRunningStatus(agent); NodeTree tree = ymlManager.getTree(job); @@ -426,7 +427,7 @@ public void should_handle_cmd_callback_for_failure_status_but_allow_failure() th firstStep.setStatus(Step.Status.EXCEPTION); firstStep.setOutput(output); executedCmdDao.save(firstStep); - jobEventService.handleCallback(firstStep); + jobEventService.handleCallback(getShellOutFromStep(firstStep)); // then: job status should be running and current path should be change to second node job = jobDao.findById(job.getId()).get(); @@ -444,7 +445,7 @@ public void should_handle_cmd_callback_for_failure_status_but_allow_failure() th secondCmd.setStatus(Step.Status.TIMEOUT); secondCmd.setOutput(output); executedCmdDao.save(secondCmd); - jobEventService.handleCallback(secondCmd); + jobEventService.handleCallback(getShellOutFromStep(secondCmd)); // then: job should be timeout with error message job = jobDao.findById(job.getId()).get(); @@ -459,7 +460,7 @@ public void should_cancel_job_if_agent_offline() throws IOException, Interrupted yml = ymlService.saveYml(flow, Yml.DEFAULT_NAME, StringHelper.toBase64(yaml)); // mock agent online - Agent agent = agentService.create("hello.agent.2", null, Optional.empty()); + Agent agent = agentService.create(new AgentOption().setName("hello.agent.2")); mockAgentOnline(agent.getToken()); // given: start job and wait for running @@ -485,8 +486,9 @@ public void should_cancel_job_if_agent_offline() throws IOException, Interrupted Assert.assertEquals(Status.CANCELLED, job.getStatus()); // then: step should be skipped - for (Step cmd : stepService.list(job)) { - Assert.assertEquals(Step.Status.SKIPPED, cmd.getStatus()); + List steps = stepService.list(job); + for (Step step : steps) { + Assert.assertEquals(Step.Status.PENDING, step.getStatus()); } } @@ -512,7 +514,7 @@ public void should_rerun_job() { Assert.assertNotNull(context.get(Variables.Job.TriggerBy)); // then: verify job properties - Assert.assertTrue(job.getCurrentPath().contains(FlowNode.DEFAULT_ROOT_NAME)); + Assert.assertTrue(job.getCurrentPath().isEmpty()); Assert.assertFalse(job.isExpired()); Assert.assertNotNull(job.getCreatedAt()); Assert.assertNotNull(job.getCreatedBy()); @@ -532,7 +534,7 @@ private Job prepareJobForRunningStatus(Agent agent) { jobAgentDao.addFlowToAgent(job.getId(), agent.getId(), tree.getRoot().getPathAsString()); - job.setCurrentPathFromNodes(firstNode); + job.resetCurrentPath().getCurrentPath().add(firstNode.getPathAsString()); job.setStatus(Status.RUNNING); job.setStatusToContext(Status.RUNNING); @@ -541,4 +543,12 @@ private Job prepareJobForRunningStatus(Agent agent) { return jobDao.save(job); } + + private ShellOut getShellOutFromStep(Step step) { + return new ShellOut() + .setId(step.getId()) + .setStatus(step.getStatus()) + .setOutput(step.getOutput()) + .setFinishAt(new Date()); + } } \ No newline at end of file diff --git a/sm/pom.xml b/sm/pom.xml index 0ad86adea..8a14934ed 100644 --- a/sm/pom.xml +++ b/sm/pom.xml @@ -10,4 +10,12 @@ 4.0.0 sm + + + + junit + junit + test + + \ No newline at end of file diff --git a/sm/src/main/java/com/flowci/sm/Context.java b/sm/src/main/java/com/flowci/sm/Context.java index 02426d682..9683fb127 100644 --- a/sm/src/main/java/com/flowci/sm/Context.java +++ b/sm/src/main/java/com/flowci/sm/Context.java @@ -13,4 +13,5 @@ public abstract class Context { protected Throwable error; + protected boolean skip; } diff --git a/sm/src/main/java/com/flowci/sm/StateMachine.java b/sm/src/main/java/com/flowci/sm/StateMachine.java index 4f9474f91..fb8a88063 100644 --- a/sm/src/main/java/com/flowci/sm/StateMachine.java +++ b/sm/src/main/java/com/flowci/sm/StateMachine.java @@ -40,13 +40,17 @@ public void add(Transition t, Action action) { map.put(t.getTo(), action); } - public void executeInExecutor(Status current, Status target, T context) { - executor.execute(() -> execute(current, target, context)); - } - public void execute(Status current, Status target, T context) { context.setCurrent(current); context.setTo(target); + execute(context); + } + + public void execute(T context) { + Status current = context.getCurrent(); + Status target = context.getTo(); + Objects.requireNonNull(current, "SM current status is missing"); + Objects.requireNonNull(target, "SM target status is missing"); Map> actionMap = actions.get(current); if (Objects.isNull(actionMap)) { @@ -67,6 +71,10 @@ public void execute(Status current, Status target, T context) { try { action.accept(context); + if (!isOnSameContext(current, target, context) || context.skip) { + return; + } + // execute target hook hooksOnTargetStatus.computeIfPresent(target, (status, hooks) -> { for (Consumer hook : hooks) { @@ -82,4 +90,8 @@ public void execute(Status current, Status target, T context) { action.onFinally(context); } } + + private boolean isOnSameContext(Status current, Status target, T context) { + return context.getTo() == target && context.getCurrent() == current; + } } diff --git a/sm/src/test/java/com/flowci/sm/test/StateMachineTest.java b/sm/src/test/java/com/flowci/sm/test/StateMachineTest.java new file mode 100644 index 000000000..43adce7ea --- /dev/null +++ b/sm/src/test/java/com/flowci/sm/test/StateMachineTest.java @@ -0,0 +1,82 @@ +package com.flowci.sm.test; + +import com.flowci.sm.*; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class StateMachineTest { + + private final Status A = new Status("A"); + private final Status B = new Status("B"); + private final Status C = new Status("C"); + + private final Transition AtoB = new Transition(A, B); + private final Transition AtoC = new Transition(A, C); + private final Transition BtoC = new Transition(B, C); + + private final StateMachine sm = new StateMachine<>("TEST", null); + + private static class TestContext extends Context { + + } + + @Test + public void should_call_hook_only_once_when_current_state_changed() { + sm.add(AtoC, new Action() { + @Override + public void accept(TestContext ctx) throws Exception { + sm.execute(B, C, ctx); + } + }); + + AtomicReference shouldExecute = new AtomicReference<>(Boolean.FALSE); + sm.add(BtoC, new Action() { + @Override + public void accept(TestContext ctx) throws Exception { + shouldExecute.set(Boolean.TRUE); + } + }); + + AtomicInteger shouldCallOnceOnly = new AtomicInteger(0); + sm.addHookActionOnTargetStatus(testContext -> shouldCallOnceOnly.getAndIncrement(), C); + + sm.execute(A, C, new TestContext()); + + Assert.assertTrue(shouldExecute.get()); + Assert.assertEquals(1, shouldCallOnceOnly.get()); + } + + @Test + public void should_skip_target_hook_after_event_if_target_state_changed() { + sm.add(AtoB, new Action() { + @Override + public void accept(TestContext ctx) throws Exception { + // switch to C + sm.execute(A, C, ctx); + } + }); + + AtomicReference shouldExecute = new AtomicReference<>(Boolean.FALSE); + sm.add(AtoC, new Action() { + @Override + public void accept(TestContext ctx) throws Exception { + shouldExecute.set(Boolean.TRUE); + } + }); + + AtomicReference shouldNotExecute = new AtomicReference<>(Boolean.TRUE); + sm.addHookActionOnTargetStatus(testContext -> shouldNotExecute.set(Boolean.FALSE), B); + + AtomicReference shouldExecuteOnHook = new AtomicReference<>(Boolean.FALSE); + sm.addHookActionOnTargetStatus(testContext -> shouldExecuteOnHook.set(Boolean.TRUE), C); + + sm.execute(A, B, new TestContext()); + + Assert.assertTrue(shouldExecute.get()); + Assert.assertTrue(shouldNotExecute.get()); + Assert.assertTrue(shouldExecuteOnHook.get()); + } +} diff --git a/tree/src/main/java/com/flowci/tree/Node.java b/tree/src/main/java/com/flowci/tree/Node.java index 8223a8042..4c4251929 100644 --- a/tree/src/main/java/com/flowci/tree/Node.java +++ b/tree/src/main/java/com/flowci/tree/Node.java @@ -31,7 +31,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -123,11 +122,11 @@ public String getEnv(String name) { } @JsonIgnore - public FlowNode getParentFlowNode() { - ObjectWrapper wrapper = new ObjectWrapper<>(); + public T getParent(Class klass) { + ObjectWrapper wrapper = new ObjectWrapper<>(); this.forEachBottomUp(this, (n) -> { - if (n instanceof FlowNode) { - wrapper.setValue((FlowNode) n); + if (klass.isInstance(n)) { + wrapper.setValue((T) n); return false; } return true; @@ -135,11 +134,6 @@ public FlowNode getParentFlowNode() { return wrapper.getValue(); } - @JsonIgnore - public boolean hasParent() { - return parent != null; - } - @JsonIgnore public boolean hasChildren() { return !getChildren().isEmpty(); @@ -182,24 +176,6 @@ public List fetchDockerOptions() { return wrapper.getValue(); } - public void forEachNext(Node node, Consumer onNext) { - for (Node next : node.next) { - onNext.accept(next); - forEachNext(next, onNext); - } - } - - public void forEachChildren(Consumer onChild) { - forEachChildren(this, onChild); - } - - private void forEachChildren(Node current, Consumer onChild) { - for (Node child : current.getChildren()) { - onChild.accept(child); - forEachChildren(child, onChild); - } - } - protected final void forEachBottomUp(Node node, Function onNode) { Boolean canContinue = onNode.apply(node); if (!canContinue) { diff --git a/tree/src/main/java/com/flowci/tree/NodeTree.java b/tree/src/main/java/com/flowci/tree/NodeTree.java index d5da99a21..45f9056cc 100644 --- a/tree/src/main/java/com/flowci/tree/NodeTree.java +++ b/tree/src/main/java/com/flowci/tree/NodeTree.java @@ -18,6 +18,7 @@ import com.flowci.exception.ArgumentException; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.Getter; import java.util.*; @@ -30,6 +31,8 @@ public final class NodeTree { private static final int DefaultSize = 20; + private static final int DefaultSizeForPrev = 5; + /** * Create node tree from FlowNode object */ @@ -52,6 +55,8 @@ public static NodeTree create(FlowNode root) { private final Set plugins = new HashSet<>(DefaultSize); + private final Set secrets = new HashSet<>(DefaultSize); + private int maxHeight = 1; public NodeTree(FlowNode root) { @@ -67,21 +72,32 @@ public int numOfNode() { /** * Get all last steps + * * @return */ public Collection ends() { return ends; } - /** - * Get all previous node list from path - */ - public Collection prevs(List nodes) { - List list = new LinkedList<>(); - for (Node n : nodes) { - list.addAll(n.getPrev()); + public Collection prevs(Collection nodes, boolean post) { + Collection ps = prevs(nodes); + + if (!post) { + return ps; } - return list; + + Set prevPost = new HashSet<>(DefaultSizeForPrev); + for (Node p : ps) { + if (isPostStep(p)) { + prevPost.add(p); + } + } + + if (prevPost.isEmpty()) { + return prevs(ps, true); + } + + return prevPost; } /** @@ -112,6 +128,32 @@ public List skip(NodePath current) { return Lists.newArrayList(nextWithSameParent); } + /** + * Find next post step + */ + public List post(NodePath path) { + Node n = get(path); + + // check if step in parallel + if (!isPostStep(n)) { + ParallelStepNode parent = n.getParent(ParallelStepNode.class); + if (parent != null) { + return Lists.newArrayList(findPostSteps(parent)); + } + } + + Collection post = new HashSet<>(); + for (Node next : n.next) { + post.addAll(findNextPost(next)); + } + + return Lists.newArrayList(post); + } + + public List post(String path) { + return post(NodePath.create(path)); + } + public Node get(NodePath path) { Node node = flatted.get(path); if (node == null) { @@ -124,6 +166,17 @@ public Node get(String nodePath) { return get(NodePath.create(nodePath)); } + /** + * Get all previous node list from path + */ + private Collection prevs(Collection nodes) { + Set list = new HashSet<>(DefaultSizeForPrev); + for (Node n : nodes) { + list.addAll(n.getPrev()); + } + return list; + } + private Node findNextWithSameParent(Node node, Node parent) { for (Node next : node.next) { if (parent.equals((next.parent))) { @@ -150,6 +203,10 @@ private void buildMetaData() { if (r.hasPlugin()) { plugins.add(r.getPlugin()); } + + if (r.hasSecrets()) { + secrets.addAll(r.getSecrets()); + } } if (node instanceof FlowNode) { @@ -212,4 +269,70 @@ private void buildEndNodes() { } }); } + + private Collection findNextPost(Node node) { + if (node instanceof ParallelStepNode) { + Collection output = findPostSteps((ParallelStepNode) node); + if (!output.isEmpty()) { + return output; + } + } + + if (node instanceof FlowNode) { + Collection output = findPostSteps((FlowNode) node); + if (!output.isEmpty()) { + return output; + } + } + + if (node instanceof RegularStepNode) { + Collection output = findPostSteps((RegularStepNode) node); + if (!output.isEmpty()) { + return output; + } + } + + Collection post = new HashSet<>(); + for (Node next : node.next) { + post.addAll(findNextPost(next)); + } + return post; + } + + private Collection findPostSteps(ParallelStepNode p) { + Collection post = new HashSet<>(); + for (Node child : p.getChildren()) { + FlowNode f = (FlowNode) child; + post.addAll(findPostSteps(f)); + } + return post; + } + + private Collection findPostSteps(FlowNode f) { + Collection post = new HashSet<>(); + for (Node child : f.getChildren()) { + if (child instanceof RegularStepNode) { + post.addAll(findPostSteps((RegularStepNode) child)); + } + + if (child instanceof ParallelStepNode) { + post.addAll(findPostSteps((ParallelStepNode) child)); + } + } + return post; + } + + private Collection findPostSteps(RegularStepNode r) { + if (r.isPost()) { + return Sets.newHashSet(r); + } + return Collections.emptyList(); + } + + private static boolean isPostStep(Node n) { + if (n instanceof RegularStepNode) { + return ((RegularStepNode) n).isPost(); + } + return false; + } } diff --git a/tree/src/main/java/com/flowci/tree/RegularStepNode.java b/tree/src/main/java/com/flowci/tree/RegularStepNode.java index 23c14cdfc..bf8300003 100644 --- a/tree/src/main/java/com/flowci/tree/RegularStepNode.java +++ b/tree/src/main/java/com/flowci/tree/RegularStepNode.java @@ -17,6 +17,11 @@ public final class RegularStepNode extends Node { public static final boolean ALLOW_FAILURE_DEFAULT = false; + /** + * Indicate the step is post step + */ + private boolean post; + /** * bash script */ @@ -47,6 +52,11 @@ public final class RegularStepNode extends Node { */ private Set exports = new HashSet<>(0); + /** + * Included secret name in the step + */ + private Set secrets = new HashSet<>(0); + /** * Is allow failure */ @@ -76,6 +86,11 @@ public boolean hasCondition() { return !Strings.isNullOrEmpty(condition); } + @JsonIgnore + public boolean hasSecrets() { + return !secrets.isEmpty(); + } + @JsonIgnore public boolean hasTimeout() { return timeout != null; diff --git a/tree/src/main/java/com/flowci/tree/yml/FlowYml.java b/tree/src/main/java/com/flowci/tree/yml/FlowYml.java index 7e1dd080b..8f98d1ecd 100644 --- a/tree/src/main/java/com/flowci/tree/yml/FlowYml.java +++ b/tree/src/main/java/com/flowci/tree/yml/FlowYml.java @@ -43,6 +43,9 @@ public class FlowYml extends YmlBase { private List notifications = new LinkedList<>(); + // post steps + private List post = new LinkedList<>(); + public FlowNode toNode(Node parent) { if (!NodePath.validate(name)) { throw new YmlException("Invalid name {0}", name); @@ -60,7 +63,9 @@ public FlowNode toNode(Node parent) { throw new YmlException("The 'steps' section must be defined"); } - setStepsToNode(node); + Set uniqueNames = new HashSet<>(steps.size() + post.size()); + setStepsToParent(node, steps, false, uniqueNames); + setStepsToParent(node, post, true, uniqueNames); return node; } diff --git a/tree/src/main/java/com/flowci/tree/yml/StepYml.java b/tree/src/main/java/com/flowci/tree/yml/StepYml.java index fce9c38ec..34855189a 100644 --- a/tree/src/main/java/com/flowci/tree/yml/StepYml.java +++ b/tree/src/main/java/com/flowci/tree/yml/StepYml.java @@ -58,12 +58,14 @@ public class StepYml extends YmlBase { private Integer timeout; // timeout in seconds - private List exports = new LinkedList<>(); - private Boolean allow_failure; private Cache cache; + private List exports = new LinkedList<>(); + + private List secrets = new LinkedList<>(); + /** * Only for parallel step, other fields will not valid */ @@ -111,6 +113,7 @@ public Node toNode(Node parent, int index) { step.setExports(Sets.newHashSet(exports)); step.setAllowFailure(allow_failure != null && allow_failure); step.setEnvironments(getVariableMap()); + step.setSecrets(Sets.newHashSet(secrets)); setCacheToNode(step); setDockerToNode(step); @@ -123,8 +126,7 @@ public Node toNode(Node parent, int index) { if (step.hasPlugin()) { throw new YmlException("The plugin section is not allowed on the step with sub steps"); } - - setStepsToNode(step); + setStepsToParent(step, steps, false, new HashSet<>(steps.size())); } // backward compatible, set script to bash diff --git a/tree/src/main/java/com/flowci/tree/yml/YmlBase.java b/tree/src/main/java/com/flowci/tree/yml/YmlBase.java index 22bf208fe..361a4717d 100644 --- a/tree/src/main/java/com/flowci/tree/yml/YmlBase.java +++ b/tree/src/main/java/com/flowci/tree/yml/YmlBase.java @@ -60,24 +60,6 @@ protected StringVars getVariableMap() { return variables; } - protected void setStepsToNode(T parent) { - int index = 1; - Set uniqueName = new HashSet<>(steps.size()); - - for (StepYml child : steps) { - Node step = child.toNode(parent, index++); - - if (step instanceof RegularStepNode) { - String stepName = step.getName(); - if (!uniqueName.add(stepName)) { - throw new YmlException("Duplicate name {0} in step", stepName); - } - } - - parent.getChildren().add(step); - } - } - void setDockerToNode(T node) { if (hasDocker() && hasDockers()) { throw new YmlException("Only accept either 'docker' or 'dockers' section"); @@ -129,4 +111,21 @@ private boolean hasDocker() { private boolean hasDockers() { return dockers != null && dockers.size() > 0; } + + protected static void setStepsToParent(T parent, List steps, boolean post, Set nameSet) { + int index = 1; + for (StepYml child : steps) { + Node step = child.toNode(parent, index++); + + if (step instanceof RegularStepNode) { + String stepName = step.getName(); + if (!nameSet.add(stepName)) { + throw new YmlException("Duplicate name {0} in step", stepName); + } + ((RegularStepNode) step).setPost(post); + } + + parent.getChildren().add(step); + } + } } diff --git a/tree/src/test/java/com/flowci/tree/test/YmlParserTest.java b/tree/src/test/java/com/flowci/tree/test/YmlParserTest.java index adfd3d1ae..f73d9b321 100644 --- a/tree/src/test/java/com/flowci/tree/test/YmlParserTest.java +++ b/tree/src/test/java/com/flowci/tree/test/YmlParserTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.List; import static com.flowci.tree.FlowNode.DEFAULT_ROOT_NAME; @@ -82,6 +83,8 @@ public void should_get_node_from_yml() { Assert.assertEquals("echo step", step1.getEnv("FLOW_WORKSPACE")); Assert.assertEquals("echo step version", step1.getEnv("FLOW_VERSION")); Assert.assertEquals(3600, step1.getTimeout().intValue()); + Assert.assertEquals(1, step1.getSecrets().size()); + Assert.assertTrue(step1.getSecrets().contains("my-secret")); Assert.assertTrue(step1.isAllowFailure()); Assert.assertEquals("println(FLOW_WORKSPACE)\ntrue\n", step1.getCondition()); @@ -319,6 +322,55 @@ public void should_load_parallel_step_yaml() throws IOException { Assert.assertEquals(step2Path, tree.skip(subflowB_APath).get(0).getPath()); } + @Test + public void should_load_and_get_post_steps() throws IOException { + content = loadContent("flow-with-post.yml"); + FlowNode root = YmlParser.load(content); + Assert.assertNotNull(root); + + NodeTree tree = NodeTree.create(root); + Assert.assertNotNull(tree); + + NodePath postOfSubA = NodePath.create("flow/parallel-2/subflow-A/subA-post-1"); + NodePath postOfSubC = NodePath.create("flow/parallel-3/subflow-C/Post-C"); + NodePath postOfSubD = NodePath.create("flow/parallel-3/subflow-D/Post-D"); + NodePath post1OfRoot = NodePath.create("flow/post-1"); + NodePath post2OfRoot = NodePath.create("flow/post-2"); + + List nextFromRoot = tree.post(NodePath.create("flow")); + Assert.assertEquals(1, nextFromRoot.size()); + Assert.assertEquals(postOfSubA, nextFromRoot.get(0).getPath()); + + List nextFromSubflowC = tree.post(NodePath.create("flow/parallel-3/subflow-C/C")); + Assert.assertEquals(2, nextFromSubflowC.size()); + Assert.assertEquals(postOfSubC, nextFromSubflowC.get(0).getPath()); + Assert.assertEquals(postOfSubD, nextFromSubflowC.get(1).getPath()); + + List nextFromSubA = tree.post(postOfSubA); + Assert.assertEquals(2, nextFromSubA.size()); + Assert.assertEquals(postOfSubC, nextFromSubA.get(0).getPath()); + Assert.assertEquals(postOfSubD, nextFromSubA.get(1).getPath()); + + List nextFromSubC = tree.post(postOfSubC); + Assert.assertEquals(1, nextFromSubC.size()); + Assert.assertEquals(post1OfRoot, nextFromSubC.get(0).getPath()); + + List nextFromSubD = tree.post(postOfSubD); + Assert.assertEquals(1, nextFromSubD.size()); + Assert.assertEquals(post1OfRoot, nextFromSubD.get(0).getPath()); + + List nextPostFromRootPost1 = tree.post(post1OfRoot); + Assert.assertEquals(1, nextPostFromRootPost1.size()); + Assert.assertEquals(post2OfRoot, nextPostFromRootPost1.get(0).getPath()); + + // get prev post step + Collection prevsOfPost1 = tree.prevs(nextFromSubD, false); + Assert.assertEquals(1, prevsOfPost1.size()); + + prevsOfPost1 = tree.prevs(nextFromSubD, true); + Assert.assertEquals(2, prevsOfPost1.size()); + } + private String loadContent(String resource) throws IOException { ClassLoader classLoader = YmlParserTest.class.getClassLoader(); URL url = classLoader.getResource(resource); diff --git a/tree/src/test/resources/flow-with-post.yml b/tree/src/test/resources/flow-with-post.yml new file mode 100644 index 000000000..a87e22565 --- /dev/null +++ b/tree/src/test/resources/flow-with-post.yml @@ -0,0 +1,106 @@ +name: root +envs: + FLOW_WORKSPACE: "echo hello" + FLOW_VERSION: "echo version" + +condition: | + return $FLOWCI_GIT_BRANCH == "develop" || $FLOWCI_GIT_BRANCH == "master"; + +docker: + image: "helloworld:0.1" + +selector: + label: + - ios + - local + +notifications: + - plugin: 'email-notify' + envs: + FLOWCI_SMTP_CONFIG: 'test-config' + +steps: + - name: clone + bash: | + echo "git clone" + + - parallel: + subflow-A: + selector: + label: [ "linux" ] + steps: + - name: A + plugin: 'A-plugin' + script: | + echo "A" + - name: B + condition: | + return true + script: | + echo "B" + post: + - name: subA-post-1 + bash: "echo sub A post-1" + + subflow-B: + selector: + label: [ "linux" ] + steps: + - name: A + plugin: 'B-plugin' + script: | + echo "A" + + + - parallel: + subflow-C: + steps: + - name: C + bash: | + echo "Sub-C" + post: + - name: Post-C + bash: | + echo "Post-C" + + subflow-D: + steps: + - name: D + bash: | + echo "Sub-D" + post: + - name: Post-D + bash: | + echo "Post-D" + + - name: step3 + steps: + - name: step-3-1 + bash: | + echo "step-3-1" + + - name: step-3-2 + bash: | + echo "step-3-2" + + - name: step4 + allow_failure: false + docker: + image: "ubuntu:18.04" + ports: + - "6400:6400" + - "2700:2700" + entrypoint: [ "/bin/sh" ] + network: host + bash: "echo 2" + pwsh: "echo powershell" + + +post: + - name: post-1 + bash: "echo post-1" + + - name: post-2 + condition: | + return ${FLOWCI_STATUS} == "SUCCESS" + bash: "echo post-2" \ No newline at end of file diff --git a/tree/src/test/resources/flow.yml b/tree/src/test/resources/flow.yml index 23ed14775..4818c7d07 100644 --- a/tree/src/test/resources/flow.yml +++ b/tree/src/test/resources/flow.yml @@ -26,6 +26,8 @@ steps: envs: FLOW_WORKSPACE: "echo step" FLOW_VERSION: "echo step version" + secrets: + - "my-secret" cache: key: mycache paths: