Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
JOIN task is made async (#3284)
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Oct 14, 2022
1 parent fd2816d commit ef57cd8
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
* <li>UPDATE conductor.workflows SET payload=? WHERE workflow_id=? AND shard_id=1 AND
* entity='workflow' AND task_id='';
* <li>UPDATE conductor.workflows SET total_tasks=? WHERE workflow_id=? AND shard_id=?;
* <li>UPDATE conductor.workflows SET * total_partitions=?,total_tasks=? WHERE workflow_id=? AND
* <li>UPDATE conductor.workflows SET total_partitions=?,total_tasks=? WHERE workflow_id=? AND
* shard_id=1;
* <li>UPDATE conductor.task_lookup SET workflow_id=? WHERE task_id=?;
* <li>UPDATE conductor.task_def_limit SET workflow_id=? WHERE task_def_name=? AND task_id=?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ public class WorkflowExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class);
private static final int EXPEDITED_PRIORITY = 10;

private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName();
private static final Predicate<TaskModel> UNSUCCESSFUL_TERMINAL_TASK =
task -> !task.getStatus().isSuccessful() && task.getStatus().isTerminal();
private static final Predicate<TaskModel> UNSUCCESSFUL_JOIN_TASK =
UNSUCCESSFUL_TERMINAL_TASK.and(t -> TaskType.TASK_TYPE_JOIN.equals(t.getTaskType()));
private static final Predicate<TaskModel> NON_TERMINAL_TASK =
task -> !task.getStatus().isTerminal();
private final MetadataDAO metadataDAO;
private final QueueDAO queueDAO;
private final DeciderService deciderService;
Expand All @@ -78,20 +84,9 @@ public class WorkflowExecutor {
private final WorkflowStatusListener workflowStatusListener;
private final SystemTaskRegistry systemTaskRegistry;
private final ApplicationEventPublisher eventPublisher;

private long activeWorkerLastPollMs;
private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName();
private final ExecutionLockService executionLockService;

private static final Predicate<TaskModel> UNSUCCESSFUL_TERMINAL_TASK =
task -> !task.getStatus().isSuccessful() && task.getStatus().isTerminal();

private static final Predicate<TaskModel> UNSUCCESSFUL_JOIN_TASK =
UNSUCCESSFUL_TERMINAL_TASK.and(t -> TaskType.TASK_TYPE_JOIN.equals(t.getTaskType()));

private static final Predicate<TaskModel> NON_TERMINAL_TASK =
task -> !task.getStatus().isTerminal();

private final Predicate<PollData> validateLastPolledTime =
pollData ->
pollData.getLastPollTime()
Expand Down Expand Up @@ -352,6 +347,7 @@ private void retry(WorkflowModel workflow) {
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
|| task.getTaskType().equalsIgnoreCase(TaskType.DO_WHILE.toString())) {
task.setStatus(IN_PROGRESS);
addTaskToQueue(task);
// Task doesn't have to be updated yet. Will be updated along with other
// Workflow tasks downstream.
} else {
Expand Down Expand Up @@ -870,10 +866,7 @@ public void updateTask(TaskResult taskResult) {
task.getTaskDefName(), lastDuration, false, task.getStatus());
}

// sync evaluate workflow only if the task is not within a forked branch
if (isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
expediteLazyWorkflowEvaluation(workflowId);
} else {
if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
decide(workflowId);
}
}
Expand Down Expand Up @@ -1094,7 +1087,11 @@ private void adjustStateIfSubWorkflowChanged(WorkflowModel workflow) {
// and the JOIN task(s) needs to be evaluated again, set them to IN_PROGRESS
workflow.getTasks().stream()
.filter(UNSUCCESSFUL_JOIN_TASK)
.peek(t -> t.setStatus(TaskModel.Status.IN_PROGRESS))
.peek(
task -> {
task.setStatus(TaskModel.Status.IN_PROGRESS);
addTaskToQueue(task);
})
.forEach(executionDAOFacade::updateTask);
}
}
Expand Down Expand Up @@ -1305,6 +1302,7 @@ public void skipTaskFromWorkflow(
taskToBeSkipped.setWorkflowInstanceId(workflowId);
taskToBeSkipped.setWorkflowPriority(workflow.getPriority());
taskToBeSkipped.setStatus(SKIPPED);
taskToBeSkipped.setEndTime(System.currentTimeMillis());
taskToBeSkipped.setTaskType(workflowTask.getName());
taskToBeSkipped.setCorrelationId(workflow.getCorrelationId());
if (skipTaskRequest != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ public boolean execute(
}
return false;
}

public boolean isAsync() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
break;
}
}
} catch (TransientException te) {
} catch (TransientException | TerminateWorkflowException te) {
throw te;
} catch (TerminateWorkflowException twe) {
throw twe;
} catch (Exception e) {
LOGGER.error(
"Unable to upload payload to external storage for workflow: {}", workflowId, e);
Expand Down Expand Up @@ -223,7 +221,6 @@ void failTask(TaskModel task, PayloadType payloadType, String errorMsg) {
} else {
task.setOutputData(new HashMap<>());
}
throw new TerminateWorkflowException(errorMsg, WorkflowModel.Status.FAILED, task);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import static com.netflix.conductor.model.TaskModel.Status.FAILED_WITH_TERMINAL_ERROR;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -199,23 +201,23 @@ public void testFailTaskWithInputPayload() {
TaskModel task = new TaskModel();
task.setInputData(new HashMap<>());

expectedException.expect(TerminateWorkflowException.class);
externalPayloadStorageUtils.failTask(
task, ExternalPayloadStorage.PayloadType.TASK_INPUT, "error");
assertNotNull(task);
assertTrue(task.getInputData().isEmpty());
assertEquals(FAILED_WITH_TERMINAL_ERROR, task.getStatus());
}

@Test
public void testFailTaskWithOutputPayload() {
TaskModel task = new TaskModel();
task.setOutputData(new HashMap<>());

expectedException.expect(TerminateWorkflowException.class);
externalPayloadStorageUtils.failTask(
task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT, "error");
assertNotNull(task);
assertTrue(task.getOutputData().isEmpty());
assertEquals(FAILED_WITH_TERMINAL_ERROR, task.getStatus());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
*/
package com.netflix.conductor.test.integration

import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.execution.tasks.Join
import com.netflix.conductor.dao.QueueDAO
import com.netflix.conductor.test.base.AbstractSpecification

import spock.lang.Shared
Expand All @@ -23,6 +27,9 @@ import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAc

class DecisionTaskSpec extends AbstractSpecification {

@Autowired
Join joinTask

@Shared
def DECISION_WF = "DecisionWorkflow"

Expand Down Expand Up @@ -139,6 +146,7 @@ class DecisionTaskSpec extends AbstractSpecification {
}

when: "the tasks 'integration_task_1' and 'integration_task_10' are polled and completed"
def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("joinTask").taskId
def polledAndCompletedTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker')
def polledAndCompletedTask10Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_10', 'task1.integration.worker')

Expand Down Expand Up @@ -189,7 +197,10 @@ class DecisionTaskSpec extends AbstractSpecification {
then: "verify that the task is completed and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1)

and: "verify that the 'integration_task_2' is COMPLETED and the workflow has progressed"
when: "JOIN task is polled and executed"
asyncSystemTaskExecutor.execute(joinTask, joinTaskId)

then: "verify that JOIN is COMPLETED and the workflow has progressed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@ import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.common.utils.TaskUtils
import com.netflix.conductor.core.execution.tasks.Join
import com.netflix.conductor.core.execution.tasks.SubWorkflow
import com.netflix.conductor.test.base.AbstractSpecification

import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask

class DoWhileSpec extends AbstractSpecification {

@Autowired
Join joinTask

@Autowired
SubWorkflow subWorkflowTask

def setup() {
workflowTestUtil.registerWorkflows("do_while_integration_test.json",
"do_while_multiple_integration_test.json",
"do_while_as_subtask_integration_test.json",
workflowTestUtil.registerWorkflows('do_while_integration_test.json',
'do_while_multiple_integration_test.json',
'do_while_as_subtask_integration_test.json',
'simple_one_task_sub_workflow_integration_test.json',
'do_while_iteration_fix_test.json',
"do_while_sub_workflow_integration_test.json",
"do_while_five_loop_over_integration_test.json",
"do_while_system_tasks.json",
"do_while_set_variable_fix.json")
'do_while_sub_workflow_integration_test.json',
'do_while_five_loop_over_integration_test.json',
'do_while_system_tasks.json',
'do_while_set_variable_fix.json')
}

def "Test workflow with 2 iterations of five tasks"() {
Expand Down Expand Up @@ -275,6 +279,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing second task"
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand All @@ -300,6 +305,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, joinId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
Expand Down Expand Up @@ -363,6 +371,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing second task"
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand All @@ -388,6 +397,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, joinId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
Expand Down Expand Up @@ -528,6 +540,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing second task"
def join1Id = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand All @@ -553,6 +566,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, join1Id)

then: "Verify that the task was polled and acknowledged and workflow is in running state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
Expand Down Expand Up @@ -609,6 +625,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing second iteration of second task"
def join2Id = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__2").taskId
Tuple polledAndCompletedSecondIterationTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand Down Expand Up @@ -644,6 +661,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing second iteration of third task"
Tuple polledAndCompletedSecondIterationTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, join2Id)

then: "Verify that the task was polled and acknowledged and workflow is in running state"
verifyPolledAndAcknowledgedTask(polledAndCompletedSecondIterationTask2)
verifyTaskIteration(polledAndCompletedSecondIterationTask2[0] as Task, 2)
Expand Down Expand Up @@ -796,6 +816,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing second task"
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand Down Expand Up @@ -823,6 +844,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, joinId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
Expand Down Expand Up @@ -920,6 +944,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing second task"
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand Down Expand Up @@ -947,6 +972,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, joinId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
Expand Down Expand Up @@ -998,6 +1026,7 @@ class DoWhileSpec extends AbstractSpecification {
}

when: "Polling and completing first task in DO While"
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join").taskId
Tuple polledAndCompletedTask0 = workflowTestUtil.pollAndCompleteTask('integration_task_0', 'integration.test.worker')

then: "Verify that the task was polled and acknowledged and workflow is in running state"
Expand Down Expand Up @@ -1049,6 +1078,9 @@ class DoWhileSpec extends AbstractSpecification {
and: "the workflow is evaluated"
sweep(workflowInstanceId)

and: "JOIN task is executed"
asyncSystemTaskExecutor.execute(joinTask, joinId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down

0 comments on commit ef57cd8

Please sign in to comment.