Skip to content

Commit

Permalink
Add waitForStateExecutionCompletion API (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Oct 28, 2023
1 parent d2ae843 commit b87c388
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 12 deletions.
2 changes: 1 addition & 1 deletion iwf-idl
2 changes: 1 addition & 1 deletion script/docker-compose-init.sh
Expand Up @@ -33,7 +33,7 @@ for run in {1..120}; do
sleep 0.1
tctl search-attribute create -name CustomStringField -type text -y

if checkExists "IwfWorkflowType" ] && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField" ] ; then
if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField"; then
echo "All search attributes are registered"
break
fi
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/io/iworkflow/core/Client.java
@@ -1,6 +1,5 @@
package io.iworkflow.core;

import feign.FeignException;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.SearchAttribute;
Expand Down Expand Up @@ -128,6 +127,7 @@ public String startWorkflow(
unregisterWorkflowOptions.cronSchedule(options.getCronSchedule());
unregisterWorkflowOptions.workflowRetryPolicy(options.getWorkflowRetryPolicy());
unregisterWorkflowOptions.workflowConfigOverride(options.getWorkflowConfigOverride());
unregisterWorkflowOptions.waitForCompletionStateExecutionIds(options.getWaitForCompletionStateExecutionIds());

final Map<String, SearchAttributeValueType> saTypes = registry.getSearchAttributeKeyToTypeMap(wfType);
final List<SearchAttribute> convertedSAs = convertToSearchAttributeList(saTypes, options.getInitialSearchAttribute());
Expand Down Expand Up @@ -910,4 +910,11 @@ public void skipTimer(
final int timerCommandIndex) {
unregisteredClient.skipTimer(workflowId, workflowRunId, workflowStateId, stateExecutionNumber, timerCommandIndex);
}

public <T> T waitForStateExecutionCompletion(
final Class<T> valueClass,
final String workflowId,
final String stateExecutionId) {
return unregisteredClient.waitForStateExecutionCompletion(valueClass, workflowId, stateExecutionId);
}
}
24 changes: 24 additions & 0 deletions src/main/java/io/iworkflow/core/UnregisteredClient.java
Expand Up @@ -31,6 +31,8 @@
import io.iworkflow.gen.models.WorkflowStartResponse;
import io.iworkflow.gen.models.WorkflowStatus;
import io.iworkflow.gen.models.WorkflowStopRequest;
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionResponse;

import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -152,6 +154,8 @@ public String startWorkflow(
}

request.workflowStartOptions(startOptions);

request.waitForCompletionStateExecutionIds(options.getWaitForCompletionStateExecutionIds());
}

try {
Expand Down Expand Up @@ -341,6 +345,26 @@ private List<StateCompletionOutput> getWorkflowResults(
return results;
}

public <T> T waitForStateExecutionCompletion(
final Class<T> valueClass,
final String workflowId,
final String stateExecutionId) {
final WorkflowWaitForStateCompletionRequest request = new WorkflowWaitForStateCompletionRequest()
.stateExecutionId(stateExecutionId)
.workflowId(workflowId);
final WorkflowWaitForStateCompletionResponse response;
try {
response = defaultApi.apiV1WorkflowWaitForStateCompletionPost(request);
} catch (final FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}

if (response.getStateCompletionOutput() == null) {
return null;
}
return clientOptions.getObjectEncoder().decode(response.getStateCompletionOutput().getCompletedStateOutput(), valueClass);
}

private void throwUncompletedException(final WorkflowGetResponse workflowGetResponse) {
throw new WorkflowUncompletedException(
workflowGetResponse.getWorkflowRunId(),
Expand Down
Expand Up @@ -25,4 +25,6 @@ public abstract class UnregisteredWorkflowOptions {
public abstract Optional<WorkflowConfig> getWorkflowConfigOverride();

public abstract Optional<Boolean> getUsingMemoForDataAttributes();

public abstract List<String> getWaitForCompletionStateExecutionIds();
}
3 changes: 3 additions & 0 deletions src/main/java/io/iworkflow/core/WorkflowOptions.java
Expand Up @@ -5,6 +5,7 @@
import io.iworkflow.gen.models.WorkflowRetryPolicy;
import org.immutables.value.Value;

import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -19,4 +20,6 @@ public abstract class WorkflowOptions {
public abstract Map<String, Object> getInitialSearchAttribute();

public abstract Optional<WorkflowConfig> getWorkflowConfigOverride();

public abstract List<String> getWaitForCompletionStateExecutionIds();
}
4 changes: 2 additions & 2 deletions src/test/java/io/iworkflow/integ/RpcTest.java
Expand Up @@ -60,7 +60,7 @@ public void testRPCLocking() throws InterruptedException, ExecutionException {

ExecutorService executor = Executors.newFixedThreadPool(10);
final ArrayList<Future<String>> futures = new ArrayList<>();
int total = 1000;
int total = 100;
for (int i = 0; i < total; i++) {

final Future<String> future = executor.submit(() -> {
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testRPCWorkflowFunc1() throws InterruptedException {
final String runId = client.startWorkflow(
RpcWorkflow.class, wfId, 10, 999);

final RpcWorkflow rpcStub = client.newRpcStub(RpcWorkflow.class, wfId, "");
final RpcWorkflow rpcStub = client.newRpcStub(RpcWorkflow.class, wfId, "" );

client.invokeRPC(rpcStub::testRpcSetDataAttribute, "test-value");
String value = client.invokeRPC(rpcStub::testRpcGetDataAttribute);
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/io/iworkflow/integ/TimerTest.java
Expand Up @@ -2,6 +2,7 @@

import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.core.ImmutableWorkflowOptions;
import io.iworkflow.integ.timer.BasicTimerWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
Expand All @@ -26,8 +27,10 @@ public void testBasicTimerWorkflow() throws InterruptedException {
final Integer input = 5;

client.startWorkflow(
BasicTimerWorkflow.class, wfId, 10, input);
BasicTimerWorkflow.class, wfId, 10, input,
ImmutableWorkflowOptions.builder().addWaitForCompletionStateExecutionIds("BasicTimerWorkflowState1-1").build());

client.waitForStateExecutionCompletion(Void.class, wfId, "BasicTimerWorkflowState1-1");
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
final long elapsed = System.currentTimeMillis() - startTs;
Assertions.assertTrue(elapsed >= 4000 && elapsed <= 7000, String.format("actual duration: %d", elapsed));
Expand Down
Expand Up @@ -12,14 +12,8 @@
import java.time.Duration;

public class BasicTimerWorkflowState1 implements WorkflowState<Integer> {
public static final String STATE_ID = "timer-s1";
public static final String COMMAND_ID = "test-timer-id";

@Override
public String getStateId() {
return STATE_ID;
}

@Override
public Class<Integer> getInputType() {
return Integer.class;
Expand Down

0 comments on commit b87c388

Please sign in to comment.