Skip to content

Commit

Permalink
Use putIfAbsent for better thread safety.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed May 6, 2024
1 parent 6b2e245 commit fe71934
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -160,7 +159,7 @@ public String toString() {
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final Set<String> storeHasScheduledTask;
private final ConcurrentHashMap<String, AdminExecutionTask> storeHasScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
Expand Down Expand Up @@ -287,7 +286,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeHasScheduledTask = ConcurrentHashMap.newKeySet();
this.storeHasScheduledTask = new ConcurrentHashMap<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand Down Expand Up @@ -498,27 +497,29 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
// Create a task for each store that has admin messages pending to be processed.
boolean skipOffsetCommandHasBeenProcessed = false;
for (Map.Entry<String, Queue<AdminOperationWrapper>> entry: storeAdminOperationsMapWithOffset.entrySet()) {
if (!entry.getValue().isEmpty() && !storeHasScheduledTask.contains(entry.getKey())) {
if (!entry.getValue().isEmpty()) {
if (checkOffsetToSkip(entry.getValue().peek().getOffset(), false)) {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
}
storeHasScheduledTask.add(entry.getKey());
tasks.add(
new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName,
storeHasScheduledTask));
stores.add(entry.getKey());
AdminExecutionTask newTask = new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName,
storeHasScheduledTask);
// Check if there is previously created scheduled task still occupying one thread from the pool.
if (storeHasScheduledTask.putIfAbsent(entry.getKey(), newTask) == null) {
tasks.add(newTask);
stores.add(entry.getKey());
}
}
}
if (skipOffsetCommandHasBeenProcessed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class AdminExecutionTask implements Callable<Void> {
private final ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
private final long lastPersistedExecutionId;

private final Set<String> storeSet;
private final Map<String, AdminExecutionTask> storeToAdminExecutionTask;

AdminExecutionTask(
Logger LOGGER,
Expand All @@ -98,7 +98,7 @@ public class AdminExecutionTask implements Callable<Void> {
boolean isParentController,
AdminConsumptionStats stats,
String regionName,
Set<String> storeSet) {
Map<String, AdminExecutionTask> storeToAdminExecutionTask) {
this.LOGGER = LOGGER;
this.clusterName = clusterName;
this.storeName = storeName;
Expand All @@ -110,7 +110,7 @@ public class AdminExecutionTask implements Callable<Void> {
this.isParentController = isParentController;
this.stats = stats;
this.regionName = regionName;
this.storeSet = storeSet;
this.storeToAdminExecutionTask = storeToAdminExecutionTask;
}

@Override
Expand Down Expand Up @@ -160,7 +160,7 @@ public Void call() {
}
throw e;
} finally {
storeSet.remove(storeName);
storeToAdminExecutionTask.remove(storeName);
}
return null;
}
Expand Down

0 comments on commit fe71934

Please sign in to comment.