Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Add jitter to workflowoffset time (#3305)
Browse files Browse the repository at this point in the history
* add jitter to workflowoffset time

* Add comment about jitter ranges

* Instrument unack method

Co-authored-by: Surafel Korse <skorse@netflix.com>
  • Loading branch information
skorse and Surafel Korse committed Oct 18, 2022
1 parent 145ab53 commit 4ee0203
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
*/
package com.netflix.conductor.core.reconciliation;

import java.time.Instant;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
Expand Down Expand Up @@ -95,19 +97,23 @@ public void sweep(String workflowId) {
Monitors.error(CLASS_NAME, "sweep");
LOGGER.error("Error running sweep for " + workflowId, e);
}
long workflowOffsetTimeout =
workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds());
if (workflow != null) {
unack(workflow);
long startTime = Instant.now().toEpochMilli();
unack(workflow, workflowOffsetTimeout);
long endTime = Instant.now().toEpochMilli();
Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
} else {
LOGGER.warn(
"Workflow with {} id can not be found. Attempting to unack using the id",
workflowId);
queueDAO.setUnackTimeout(
DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis());
queueDAO.setUnackTimeout(DECIDER_QUEUE, workflowId, workflowOffsetTimeout * 1000);
}
}

@VisibleForTesting
void unack(WorkflowModel workflowModel) {
void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) {
long postponeDurationSeconds = 0;
for (TaskModel taskModel : workflowModel.getTasks()) {
if (taskModel.getStatus() == Status.IN_PROGRESS) {
Expand All @@ -116,16 +122,15 @@ void unack(WorkflowModel workflowModel) {
postponeDurationSeconds =
(taskModel.getWaitTimeout() != 0)
? taskModel.getWaitTimeout() + 1
: properties.getWorkflowOffsetTimeout().getSeconds();
: workflowOffsetTimeout;
} else {
postponeDurationSeconds =
(taskModel.getResponseTimeoutSeconds() != 0)
? taskModel.getResponseTimeoutSeconds() + 1
: properties.getWorkflowOffsetTimeout().getSeconds();
: workflowOffsetTimeout;
}
break;
}
if (taskModel.getStatus() == Status.SCHEDULED) {
} else if (taskModel.getStatus() == Status.SCHEDULED) {
Optional<TaskDef> taskDefinition = taskModel.getTaskDefinition();
if (taskDefinition.isPresent()) {
TaskDef taskDef = taskDefinition.get();
Expand All @@ -137,18 +142,32 @@ void unack(WorkflowModel workflowModel) {
(workflowModel.getWorkflowDefinition().getTimeoutSeconds() != 0)
? workflowModel.getWorkflowDefinition().getTimeoutSeconds()
+ 1
: properties.getWorkflowOffsetTimeout().getSeconds();
: workflowOffsetTimeout;
}
} else {
postponeDurationSeconds =
(workflowModel.getWorkflowDefinition().getTimeoutSeconds() != 0)
? workflowModel.getWorkflowDefinition().getTimeoutSeconds() + 1
: properties.getWorkflowOffsetTimeout().getSeconds();
: workflowOffsetTimeout;
}
break;
}
}
queueDAO.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), postponeDurationSeconds * 1000);
}

/**
* jitter will be +- (1/3) workflowOffsetTimeout for example, if workflowOffsetTimeout is 45
* seconds, this function returns values between [30-60] seconds
*
* @param workflowOffsetTimeout
* @return
*/
@VisibleForTesting
long workflowOffsetWithJitter(long workflowOffsetTimeout) {
long range = workflowOffsetTimeout / 3;
long jitter = new Random().nextInt((int) (2 * range + 1)) - range;
return workflowOffsetTimeout + jitter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ public static void recordWorkflowCompletion(
.record(duration, TimeUnit.MILLISECONDS);
}

public static void recordUnackTime(String workflowType, long duration) {
getTimer(classQualifier, "workflow_unack", "workflowName", workflowType)
.record(duration, TimeUnit.MILLISECONDS);
}

public static void recordTaskRateLimited(String taskDefName, int limit) {
gauge(classQualifier, "task_rate_limited", limit, "taskType", taskDefName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void testPostponeDurationForWaitTaskType() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE,
Expand All @@ -88,7 +89,7 @@ public void testPostponeDurationForWaitTaskTypeWithWaitTime() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout + 1) * 1000);
Expand All @@ -105,7 +106,7 @@ public void testPostponeDurationForTaskInProgress() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE,
Expand All @@ -126,7 +127,7 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000);
Expand All @@ -146,7 +147,7 @@ public void testPostponeDurationForTaskInScheduled() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE,
Expand All @@ -169,7 +170,7 @@ public void testPostponeDurationForTaskInScheduledWithWorkflowTimeoutSet() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), (workflowTimeout + 1) * 1000);
Expand All @@ -190,7 +191,7 @@ public void testPostponeDurationForTaskInScheduledWithWorkflowTimeoutSetAndNoPol
when(taskModel.getStatus()).thenReturn(Status.SCHEDULED);
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), (workflowTimeout + 1) * 1000);
Expand All @@ -209,7 +210,7 @@ public void testPostponeDurationForTaskInScheduledWithNoWorkflowTimeoutSetAndNoP
when(taskModel.getStatus()).thenReturn(Status.SCHEDULED);
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE,
Expand All @@ -230,7 +231,7 @@ public void testPostponeDurationForTaskInScheduledWithNoPollTimeoutSet() {
when(taskModel.getTaskDefinition()).thenReturn(Optional.of(taskDef));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE,
Expand All @@ -252,9 +253,19 @@ public void testPostponeDurationForTaskInScheduledWithPollTimeoutSet() {
when(taskModel.getTaskDefinition()).thenReturn(Optional.of(taskDef));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
workflowSweeper.unack(workflowModel);
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), (pollTimeout + 1) * 1000);
}

@Test
public void testWorkflowOffsetJitter() {
long offset = 45;
for (int i = 0; i < 10; i++) {
long offsetWithJitter = workflowSweeper.workflowOffsetWithJitter(offset);
assertTrue(offsetWithJitter >= 30);
assertTrue(offsetWithJitter <= 60);
}
}
}

0 comments on commit 4ee0203

Please sign in to comment.