Skip to content

Commit

Permalink
Fix filling stateOptions for failure proceeding state (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Nov 28, 2023
1 parent b87c388 commit db7f7c7
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -161,7 +161,7 @@ signing {
}

group = "io.iworkflow"
version = "2.4.0"
version = "2.5.1"

nexusPublishing {
repositories {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/iworkflow/core/Client.java
@@ -1,6 +1,8 @@
package io.iworkflow.core;

import io.iworkflow.core.mapper.StateMovementMapper;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.ExecuteApiFailurePolicy;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
Expand Down Expand Up @@ -153,6 +155,9 @@ public String startWorkflow(
stateOptions.skipWaitUntil(true);
}
}

StateMovementMapper.autoFillFailureProceedingStateOptions(stateOptions, wfType, registry);

if (stateOptions != null) {
unregisterWorkflowOptions.startStateOptions(stateOptions);
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/io/iworkflow/core/mapper/StateMovementMapper.java
Expand Up @@ -3,6 +3,8 @@
import io.iworkflow.core.ObjectEncoder;
import io.iworkflow.core.Registry;
import io.iworkflow.core.StateDef;
import io.iworkflow.core.WorkflowDefinitionException;
import io.iworkflow.gen.models.ExecuteApiFailurePolicy;
import io.iworkflow.gen.models.StateMovement;
import io.iworkflow.gen.models.WorkflowStateOptions;

Expand Down Expand Up @@ -36,10 +38,40 @@ public static StateMovement toGenerated(final io.iworkflow.core.StateMovement st
stateOptions.skipWaitUntil(true);
}

autoFillFailureProceedingStateOptions(stateOptions, workflowType, registry);

if (stateOptions != null) {
movement.stateOptions(stateOptions);
}
}
return movement;
}

public static void autoFillFailureProceedingStateOptions(WorkflowStateOptions stateOptions, final String workflowType, final Registry registry) {
if (stateOptions == null) {
return;
}
if (stateOptions.getExecuteApiFailurePolicy() == ExecuteApiFailurePolicy.PROCEED_TO_CONFIGURED_STATE
&& stateOptions.getExecuteApiFailureProceedStateOptions() == null) {

// fill the state options for the proceeding state
String proceedStateId = stateOptions.getExecuteApiFailureProceedStateId();
final StateDef proceedStatDef = registry.getWorkflowState(workflowType, proceedStateId);
WorkflowStateOptions proceedStateOptions = proceedStatDef.getWorkflowState().getStateOptions();
if (proceedStateOptions != null &&
proceedStateOptions.getExecuteApiFailurePolicy() == ExecuteApiFailurePolicy.PROCEED_TO_CONFIGURED_STATE) {
throw new WorkflowDefinitionException("nested failure handling is not supported. You cannot set a failure proceeding state on top of another failure proceeding state.");
}

if (shouldSkipWaitUntil(proceedStatDef.getWorkflowState())) {
if (proceedStateOptions == null) {
proceedStateOptions = new WorkflowStateOptions().skipWaitUntil(true);
} else {
proceedStateOptions.skipWaitUntil(true);
}
}

stateOptions.executeApiFailureProceedStateOptions(proceedStateOptions);
}
}
}
21 changes: 18 additions & 3 deletions src/test/java/io/iworkflow/integ/StateRecoveryTest.java
Expand Up @@ -2,7 +2,8 @@

import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.integ.stateapifail.ExecuteApiFailProceedWorkflow;
import io.iworkflow.integ.stateapifail.WorkflowStateFailProceedToRecover;
import io.iworkflow.integ.stateapifail.WorkflowStateFailProceedToRecoverNoWaitUntil;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
Expand All @@ -26,10 +27,24 @@ public void testStateApiFailAndRecoveryWorkflow() throws InterruptedException {
final Integer input = 5;

client.startWorkflow(
ExecuteApiFailProceedWorkflow.class, wfId, 10, input);
WorkflowStateFailProceedToRecover.class, wfId, 10, input);

Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(5, output);
Assertions.assertEquals(10, output);
}

@Test
public void testStateApiFailAndRecoveryNoWaitUntilWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final long startTs = System.currentTimeMillis();
final String wfId = "testStateApiFailAndRecoveryNoWaitUntilWorkflow" + startTs / 1000;
final Integer input = 5;

client.startWorkflow(
WorkflowStateFailProceedToRecoverNoWaitUntil.class, wfId, 10, input);

Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(10, output);
}

}
4 changes: 2 additions & 2 deletions src/test/java/io/iworkflow/integ/WorkflowUncompletedTest.java
Expand Up @@ -7,7 +7,7 @@
import io.iworkflow.gen.models.WorkflowStopType;
import io.iworkflow.integ.forcefail.ForceFailWorkflow;
import io.iworkflow.integ.signal.BasicSignalWorkflow;
import io.iworkflow.integ.stateapifail.StateApiFailWorkflow;
import io.iworkflow.integ.stateapifail.WorkflowBasicStateFail;
import io.iworkflow.integ.stateapitimeout.StateApiTimeoutFailWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
Expand Down Expand Up @@ -203,7 +203,7 @@ public void testStateApiFailWorkflow() throws InterruptedException {
final Integer input = 5;

final String runId = client.startWorkflow(
StateApiFailWorkflow.class, wfId, 10, input);
WorkflowBasicStateFail.class, wfId, 10, input);

try {
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Expand Down
Expand Up @@ -10,7 +10,7 @@
import io.iworkflow.gen.models.RetryPolicy;
import io.iworkflow.gen.models.WorkflowStateOptions;

public class StateApiFailWorkflowState1 implements WorkflowState<Integer> {
public class StateFailBasic implements WorkflowState<Integer> {

@Override
public Class<Integer> getInputType() {
Expand Down
Expand Up @@ -4,11 +4,11 @@
import io.iworkflow.gen.models.RetryPolicy;
import io.iworkflow.gen.models.WorkflowStateOptions;

public class StateApiFailProceedState extends StateApiFailWorkflowState1 {
public class StateFailProceedToRecoverBasic extends StateFailBasic {
@Override
public WorkflowStateOptions getStateOptions() {
return new WorkflowStateOptionsExtension()
.setProceedOnExecuteFailure(StateApiRecoverState.class)
.setProceedOnExecuteFailure(StateRecoverBasic.class)
.executeApiRetryPolicy(
new RetryPolicy()
.maximumAttempts(1)
Expand Down
@@ -0,0 +1,18 @@
package io.iworkflow.integ.stateapifail;

import io.iworkflow.core.WorkflowStateOptionsExtension;
import io.iworkflow.gen.models.RetryPolicy;
import io.iworkflow.gen.models.WorkflowStateOptions;

public class StateFailProceedToRecoverNoWaitUntil extends StateFailBasic {
@Override
public WorkflowStateOptions getStateOptions() {
return new WorkflowStateOptionsExtension()
.setProceedOnExecuteFailure(StateRecoverNoWaitUntil.class)
.executeApiRetryPolicy(
new RetryPolicy()
.maximumAttempts(1)
.backoffCoefficient(2f)
);
}
}
Expand Up @@ -8,7 +8,7 @@
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

public class StateApiRecoverState implements WorkflowState<Integer> {
public class StateRecoverBasic implements WorkflowState<Integer> {

@Override
public Class<Integer> getInputType() {
Expand All @@ -31,6 +31,12 @@ public StateDecision execute(
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
return StateDecision.gracefulCompleteWorkflow(input);
if(input == 10){
return StateDecision.gracefulCompleteWorkflow(input);
}else if(input == 5){
return StateDecision.singleNextState(StateFailProceedToRecoverBasic.class, input * 2);
}else{
return StateDecision.forceFailWorkflow("unexpected input "+input);
}
}
}
@@ -0,0 +1,32 @@
package io.iworkflow.integ.stateapifail;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

public class StateRecoverNoWaitUntil implements WorkflowState<Integer> {

@Override
public Class<Integer> getInputType() {
return Integer.class;
}

@Override
public StateDecision execute(
Context context,
Integer input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
if(input == 10){
return StateDecision.gracefulCompleteWorkflow(input);
}else if(input == 5){
return StateDecision.singleNextState(StateFailProceedToRecoverNoWaitUntil.class, input * 2);
}else{
return StateDecision.forceFailWorkflow("unexpected input "+input);
}
}
}
Expand Up @@ -8,11 +8,11 @@
import java.util.List;

@Component
public class StateApiFailWorkflow implements ObjectWorkflow {
public class WorkflowBasicStateFail implements ObjectWorkflow {
@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new StateApiFailWorkflowState1())
StateDef.startingState(new StateFailBasic())
);
}
}
Expand Up @@ -8,12 +8,12 @@
import java.util.List;

@Component
public class ExecuteApiFailProceedWorkflow implements ObjectWorkflow {
public class WorkflowStateFailProceedToRecover implements ObjectWorkflow {
@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new StateApiFailProceedState()),
StateDef.nonStartingState(new StateApiRecoverState())
StateDef.startingState(new StateFailProceedToRecoverBasic()),
StateDef.nonStartingState(new StateRecoverBasic())
);
}
}
@@ -0,0 +1,19 @@
package io.iworkflow.integ.stateapifail;

import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.StateDef;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

@Component
public class WorkflowStateFailProceedToRecoverNoWaitUntil implements ObjectWorkflow {
@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new StateFailProceedToRecoverNoWaitUntil()),
StateDef.nonStartingState(new StateRecoverNoWaitUntil())
);
}
}

0 comments on commit db7f7c7

Please sign in to comment.