Skip to content

Commit

Permalink
KAFKA-10199: Remove queue-based remove from state updater
Browse files Browse the repository at this point in the history
Removes the unused remove operation from the state updater
that asynchronously removed tasks and put them into an
output queue.
  • Loading branch information
cadonna committed May 8, 2024
1 parent b306901 commit ab9d4e2
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 544 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ private void verifyStateFor(final Task task) {
}

@Override
public CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId) {
public CompletableFuture<RemovedTaskResult> remove(final TaskId taskId) {
final CompletableFuture<RemovedTaskResult> future = new CompletableFuture<>();
tasksAndActionsLock.lock();
try {
Expand All @@ -864,17 +864,6 @@ public CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId
return future;
}

@Override
public void remove(final TaskId taskId) {
tasksAndActionsLock.lock();
try {
tasksAndActions.add(TaskAndAction.createRemoveTask(taskId));
tasksAndActionsCondition.signalAll();
} finally {
tasksAndActionsLock.unlock();
}
}

@Override
public void signalResume() {
tasksAndActionsLock.lock();
Expand Down Expand Up @@ -914,17 +903,6 @@ public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
return result;
}

@Override
public Set<Task> drainRemovedTasks() {
final List<Task> result = new ArrayList<>();
removedTasks.drainTo(result);
return new HashSet<>(result);
}

public boolean hasRemovedTasks() {
return !removedTasks.isEmpty();
}

@Override
public List<ExceptionAndTask> drainExceptionsAndFailedTasks() {
final List<ExceptionAndTask> result = new ArrayList<>();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,6 @@ public String toString() {
*/
void add(final Task task);

/**
* Removes a task (active or standby) from the state updater and adds the removed task to the removed tasks.
*
* This method does not block until the removed task is removed from the state updater.
*
* The task to be removed is not removed from the restored active tasks and the failed tasks.
* Stateless tasks will never be added to the removed tasks since they are immediately added to the
* restored active tasks.
*
* @param taskId ID of the task to remove
*/
void remove(final TaskId taskId);

/**
* Removes a task (active or standby) from the state updater.
*
Expand All @@ -158,7 +145,7 @@ public String toString() {
*
* @param taskId ID of the task to remove
*/
CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId);
CompletableFuture<RemovedTaskResult> remove(final TaskId taskId);

/**
* Wakes up the state updater if it is currently dormant, to check if a paused task should be resumed.
Expand All @@ -178,27 +165,6 @@ public String toString() {
*/
Set<StreamTask> drainRestoredActiveTasks(final Duration timeout);


/**
* Drains the removed tasks (active and standbys) from the state updater.
*
* Removed tasks returned by this method are tasks extraordinarily removed from the state updater. These do not
* include restored or failed tasks.
*
* The returned removed tasks are removed from the state updater
*
* @return set of tasks removed from the state updater
*/
Set<Task> drainRemovedTasks();

/**
* Checks if the state updater has any tasks that should be removed and returned to the StreamThread
* using `drainRemovedTasks`.
*
* @return true if a subsequent call to `drainRemovedTasks` would return a non-empty collection.
*/
boolean hasRemovedTasks();

/**
* Drains the failed tasks and the corresponding exceptions.
*
Expand All @@ -223,9 +189,8 @@ public String toString() {
* not been removed from the state updater with one of the following methods:
* <ul>
* <li>{@link StateUpdater#drainRestoredActiveTasks(Duration)}</li>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* <li>{@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}</li>
* <li>{@link StateUpdater#remove(org.apache.kafka.streams.processor.TaskId)}</li>
* </ul>
*
* @return set of all tasks managed by the state updater
Expand All @@ -236,8 +201,8 @@ public String toString() {
* Gets all tasks that are currently being restored inside the state updater.
*
* Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
* or have restored completely or removed will not be returned; similarly tasks that have just being
* removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
* or have restored completely or removed will not be returned; tasks that have just being
* removed via {@link StateUpdater#remove(TaskId)} may still be returned.
*
* @return set of all updating tasks inside the state updater
*/
Expand All @@ -250,9 +215,8 @@ public String toString() {
* and the task was not removed from the state updater with one of the following methods:
* <ul>
* <li>{@link StateUpdater#drainRestoredActiveTasks(Duration)}</li>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* <li>{@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}</li>
* <li>{@link StateUpdater#remove(org.apache.kafka.streams.processor.TaskId)}</li>
* </ul>
*
* @return {@code true} if the state updater restores active tasks, {@code false} otherwise
Expand All @@ -269,7 +233,6 @@ public String toString() {
* The state updater manages all standby tasks that were added with the {@link StateUpdater#add(Task)} and that have
* not been removed from the state updater with one of the following methods:
* <ul>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* </ul>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,25 +602,25 @@ private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> ac
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
if (!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForUpdatingInputPartitions.put(taskId, future);
newInputPartitions.put(taskId, activeTasksToCreate.get(taskId));
}
} else {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForStandbyTasksToRecycle.put(taskId, future);
activeInputPartitions.put(taskId, activeTasksToCreate.get(taskId));
}
activeTasksToCreate.remove(taskId);
} else if (standbyTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForActiveTasksToRecycle.put(taskId, future);
standbyInputPartitions.put(taskId, standbyTasksToCreate.get(taskId));
}
standbyTasksToCreate.remove(taskId);
} else {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForTasksToClose.put(taskId, future);
}
}
Expand Down Expand Up @@ -1177,7 +1177,7 @@ private void revokeTasksInStateUpdater(final Set<TopicPartition> remainingRevoke
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id()));
futures.put(restoringTask.id(), stateUpdater.remove(restoringTask.id()));
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
}
}
Expand Down Expand Up @@ -1251,7 +1251,7 @@ private void removeLostActiveTasksFromStateUpdater() {
final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id()));
futures.put(restoringTask.id(), stateUpdater.remove(restoringTask.id()));
}
}

Expand Down Expand Up @@ -1494,7 +1494,7 @@ private void shutdownStateUpdater() {
if (stateUpdater != null) {
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
for (final Task task : stateUpdater.getTasks()) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
futures.put(task.id(), future);
}
final Set<Task> tasksToCloseClean = new HashSet<>();
Expand Down

0 comments on commit ab9d4e2

Please sign in to comment.