Skip to content

Commit

Permalink
Support overriding state options (#193)
Browse files Browse the repository at this point in the history
Co-authored-by: Kaili Zhu <kzhu@indeed.com>
  • Loading branch information
zklgame and zklgame committed Jul 27, 2023
1 parent 6f1ffd6 commit 757eed4
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 46 deletions.
83 changes: 60 additions & 23 deletions src/main/java/io/iworkflow/core/StateDecision.java
@@ -1,6 +1,7 @@
package io.iworkflow.core;

import io.iworkflow.gen.models.WorkflowConditionalCloseType;
import io.iworkflow.gen.models.WorkflowStateOptions;
import org.immutables.value.Value;

import java.util.ArrayList;
Expand Down Expand Up @@ -145,54 +146,79 @@ public static StateDecision forceCompleteIfSignalChannelEmptyOrElse(final Object
.build();
}

public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass) {
return singleNextState(stateClass.getSimpleName());
/**
*
* @param stateClass required
* @param stateInput optional, can be null
* @param stateOptionsOverride optional, can be null. It is used to override the defined one in the State class
* @return state decision
*/
public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass, final Object stateInput, final WorkflowStateOptions stateOptionsOverride) {
return singleNextState(stateClass.getSimpleName(), stateInput, stateOptionsOverride);
}

/**
* use the other one with WorkflowState class param if the StateId is provided by default, to make your code cleaner
*
* @param stateId stateId
* @param stateClass required
* @param stateInput optional, can be null
* @return state decision
*/
public static StateDecision singleNextState(final String stateId) {
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
StateMovement.create(stateId)
)).build();
public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass, final Object stateInput) {
return singleNextState(stateClass, stateInput, null);
}

public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass, final Object stateInput) {
return singleNextState(stateClass.getSimpleName(), stateInput);
/**
*
* @param stateClass required
* @return state decision
*/
public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass) {
return singleNextState(stateClass, null, null);
}

/**
* use the other one with WorkflowState class param if the StateId is provided by default, to make your code cleaner
* @param stateId stateId of next state
* @param stateInput input for next state
* use the other one with WorkflowState class param if the stateId is provided by default, to make your code cleaner
* @param stateId required. StateId of next state
* @param stateInput optional, can be null. Input for next state
* @param stateOptionsOverride optional, can be null. It is used to override the defined one in the State class
* @return state decision
*/
public static StateDecision singleNextState(final String stateId, final Object stateInput) {
public static StateDecision singleNextState(final String stateId, final Object stateInput, final WorkflowStateOptions stateOptionsOverride) {
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
StateMovement.create(stateId, stateInput)
StateMovement.create(stateId, stateInput, stateOptionsOverride)
)).build();
}

public static StateDecision multiNextStates(final StateMovement... stateMovements) {
return ImmutableStateDecision.builder().nextStates(Arrays.asList(stateMovements)).build();
/**
* use the other one with WorkflowState class param if the stateId is provided by default, to make your code cleaner
* @param stateId stateId of next state
* @return state decision
*/
public static StateDecision singleNextState(final String stateId) {
return singleNextState(stateId, null, null);
}

/**
*
* @param stateMovements required
* @return state decision
*/
public static StateDecision multiNextStates(final List<StateMovement> stateMovements) {
return ImmutableStateDecision.builder().nextStates(stateMovements).build();
}

public static StateDecision multiNextStates(final Class<? extends WorkflowState>... states) {
List<String> stateIds = new ArrayList<>();
Arrays.stream(states).forEach(s -> stateIds.add(s.getSimpleName()));
return multiNextStates(stateIds.toArray(new String[0]));
/**
*
* @param stateMovements required
* @return state decision
*/
public static StateDecision multiNextStates(final StateMovement... stateMovements) {
return multiNextStates(Arrays.asList(stateMovements));
}

/**
* use the other one with WorkflowState class param if the StateId is provided by default, to make your code cleaner
* use the other one with WorkflowState class param if the stateId is provided by default, to make your code cleaner
* or use other ones with a list of StateMovement to enable the WorkflowStateOptions overriding
* @param stateIds stateIds of next states
* @return state decision
*/
Expand All @@ -201,6 +227,17 @@ public static StateDecision multiNextStates(final String... stateIds) {
Arrays.stream(stateIds).forEach(id -> {
stateMovements.add(StateMovement.create(id));
});
return ImmutableStateDecision.builder().nextStates(stateMovements).build();
return multiNextStates(stateMovements);
}

/**
* use other ones with a list of StateMovement to enable the WorkflowStateOptions overriding
* @param states required
* @return state decision
*/
public static StateDecision multiNextStates(final Class<? extends WorkflowState>... states) {
List<String> stateIds = new ArrayList<>();
Arrays.stream(states).forEach(s -> stateIds.add(s.getSimpleName()));
return multiNextStates(stateIds.toArray(new String[0]));
}
}
66 changes: 48 additions & 18 deletions src/main/java/io/iworkflow/core/StateMovement.java
@@ -1,5 +1,6 @@
package io.iworkflow.core;

import io.iworkflow.gen.models.WorkflowStateOptions;
import org.immutables.value.Value;

import java.util.Optional;
Expand All @@ -10,6 +11,7 @@ public abstract class StateMovement {
public abstract String getStateId();

public abstract Optional<Object> getStateInput();
public abstract Optional<WorkflowStateOptions> getStateOptionsOverride();

public final static String RESERVED_STATE_ID_PREFIX = "_SYS_";
private final static String GRACEFUL_COMPLETING_WORKFLOW_STATE_ID = "_SYS_GRACEFUL_COMPLETING_WORKFLOW";
Expand Down Expand Up @@ -50,40 +52,68 @@ public static StateMovement forceFailWorkflow(final Object output) {
.build();
}

public static StateMovement create(final Class<? extends WorkflowState> stateClass, final Object stateInput) {
return create(stateClass.getSimpleName(), stateInput);
/**
*
* @param stateClass required
* @param stateInput optional, can be null
* @param stateOptionsOverride optional, can be null. It is used to override the defined one in the State class
* @return state movement
*/
public static StateMovement create(final Class<? extends WorkflowState> stateClass, final Object stateInput, final WorkflowStateOptions stateOptionsOverride) {
return create(stateClass.getSimpleName(), stateInput, stateOptionsOverride);
}

/**
* use the other one with WorkflowState class param if the StateId is provided by default, to make your code cleaner
*
* @param stateId stateId
* @param stateInput input
* @param stateClass required
* @param stateInput optional, can be null
* @return state movement
*/
public static StateMovement create(final String stateId, final Object stateInput) {
if (stateId.startsWith(RESERVED_STATE_ID_PREFIX)) {
throw new WorkflowDefinitionException("Cannot use reserved stateId prefix for your stateId");
}
return ImmutableStateMovement.builder().stateId(stateId)
.stateInput(stateInput)
.build();
public static StateMovement create(final Class<? extends WorkflowState> stateClass, final Object stateInput) {
return create(stateClass, stateInput, null);
}

/**
*
* @param stateClass required
* @return state movement
*/
public static StateMovement create(final Class<? extends WorkflowState> stateClass) {
return create(stateClass.getSimpleName());
return create(stateClass, null, null);
}

/**
* use the other one with WorkflowState class param if the StateId is provided by default, to make your code cleaner
* @param stateId stateId
* use the other one with WorkflowState class param if the stateId is provided by default, to make your code cleaner
* @param stateId required
* @param stateInput optional, can be null
* @param stateOptionsOverride optional, can be null. It is used to override the defined one in the State class
* @return state movement
*/
public static StateMovement create(final String stateId) {
public static StateMovement create(final String stateId, final Object stateInput, final WorkflowStateOptions stateOptionsOverride) {
if (stateId.startsWith(RESERVED_STATE_ID_PREFIX)) {
throw new WorkflowDefinitionException("Cannot use reserved stateId prefix for your stateId");
}
return ImmutableStateMovement.builder().stateId(stateId)
.build();

final ImmutableStateMovement.Builder builder = ImmutableStateMovement.builder()
.stateId(stateId);

if (stateInput != null) {
builder.stateInput(stateInput);
}

if (stateOptionsOverride != null) {
builder.stateOptionsOverride(stateOptionsOverride);
}

return builder.build();
}

/**
* use the other one with WorkflowState class param if the stateId is provided by default, to make your code cleaner
* @param stateId stateId
* @return state movement
*/
public static StateMovement create(final String stateId) {
return create(stateId, null, null);
}
}
17 changes: 12 additions & 5 deletions src/main/java/io/iworkflow/core/mapper/StateMovementMapper.java
Expand Up @@ -11,7 +11,7 @@

public class StateMovementMapper {

public static StateMovement toGenerated(io.iworkflow.core.StateMovement stateMovement, final String workflowType, final Registry registry, final ObjectEncoder objectEncoder) {
public static StateMovement toGenerated(final io.iworkflow.core.StateMovement stateMovement, final String workflowType, final Registry registry, final ObjectEncoder objectEncoder) {
final Object input = stateMovement.getStateInput().orElse(null);
final StateMovement movement = new StateMovement()
.stateId(stateMovement.getStateId())
Expand All @@ -21,14 +21,21 @@ public static StateMovement toGenerated(io.iworkflow.core.StateMovement stateMov
if(stateDef == null){
throw new IllegalArgumentException("state "+stateMovement.getStateId() +" is not registered in the workflow "+workflowType);
}
WorkflowStateOptions stateOptions = stateDef.getWorkflowState().getStateOptions();

// Try to get the overrode stateOptions, if it's null, get the stateOptions from stateDef
WorkflowStateOptions stateOptions = stateMovement.getStateOptionsOverride().orElse(null);
if (stateOptions == null) {
stateOptions = stateDef.getWorkflowState().getStateOptions();
}

if (shouldSkipWaitUntil(stateDef.getWorkflowState())) {
if (stateOptions == null) {
stateOptions = new WorkflowStateOptions().skipWaitUntil(true);
} else {
stateOptions.skipWaitUntil(true);
stateOptions = new WorkflowStateOptions();
}

stateOptions.skipWaitUntil(true);
}

if (stateOptions != null) {
movement.stateOptions(stateOptions);
}
Expand Down
35 changes: 35 additions & 0 deletions src/test/java/io/iworkflow/integ/StateOptionsOverrideTest.java
@@ -0,0 +1,35 @@
package io.iworkflow.integ;

import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.core.ImmutableWorkflowOptions;
import io.iworkflow.core.WorkflowOptions;
import io.iworkflow.gen.models.IDReusePolicy;
import io.iworkflow.integ.stateoptionsoverride.StateOptionsOverrideWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;

public class StateOptionsOverrideTest {
@BeforeEach
public void setup() throws ExecutionException, InterruptedException {
TestSingletonWorkerService.startWorkerIfNotUp();
}
@Test
public void testStateOptionsOverrideWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "state-options-override-test-id" + System.currentTimeMillis() / 1000;
final WorkflowOptions startOptions = ImmutableWorkflowOptions.builder()
.workflowIdReusePolicy(IDReusePolicy.DISALLOW_REUSE)
.build();
final String input = "input";
client.startWorkflow(StateOptionsOverrideWorkflow.class, wfId, 10, input, startOptions);
// wait for workflow to finish
final String output = client.getSimpleWorkflowResultWithWait(String.class, wfId);
Assertions.assertEquals("input_state1_start_state1_decide_state2_start_state2_decide", output);
}
}
@@ -0,0 +1,84 @@
package io.iworkflow.integ.stateoptionsoverride;

import io.iworkflow.core.Context;
import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.StateDef;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.gen.models.RetryPolicy;
import io.iworkflow.gen.models.WaitUntilApiFailurePolicy;
import io.iworkflow.gen.models.WorkflowStateOptions;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

@Component
public class StateOptionsOverrideWorkflow implements ObjectWorkflow {

@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new StateOptionsOverrideWorkflowState1()),
StateDef.nonStartingState(new StateOptionsOverrideWorkflowState2())
);
}
}

class StateOptionsOverrideWorkflowState1 implements WorkflowState<String> {
private String output = "";

@Override
public Class<String> getInputType() {
return String.class;
}

@Override
public CommandRequest waitUntil(Context context, String input, Persistence persistence, Communication communication) {
output = input + "_state1_start";
return CommandRequest.empty;
}

@Override
public StateDecision execute(Context context, String input, CommandResults commandResults, Persistence persistence, Communication communication) {
output = output + "_state1_decide";
return StateDecision.singleNextState(
StateOptionsOverrideWorkflowState2.class, output,
new WorkflowStateOptions()
.waitUntilApiRetryPolicy(new RetryPolicy().maximumAttempts(2))
.waitUntilApiFailurePolicy(WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE)
);
}
}

class StateOptionsOverrideWorkflowState2 implements WorkflowState<String> {
private String output = "";

@Override
public Class<String> getInputType() {
return String.class;
}

@Override
public CommandRequest waitUntil(Context context, String input, Persistence persistence, Communication communication) {
output = input + "_state2_start";
throw new RuntimeException("");
}

@Override
public StateDecision execute(Context context, String input, CommandResults commandResults, Persistence persistence, Communication communication) {
output = output + "_state2_decide";
return StateDecision.gracefulCompleteWorkflow(output);
}

@Override
public WorkflowStateOptions getStateOptions() {
return new WorkflowStateOptions()
.waitUntilApiRetryPolicy(new RetryPolicy().maximumAttempts(1))
.waitUntilApiFailurePolicy(WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE);
}
}

0 comments on commit 757eed4

Please sign in to comment.