Skip to content

Commit

Permalink
[controller] Fix multiple AdminExecutionTasks working on the same sto…
Browse files Browse the repository at this point in the history
…re at the same time. (#918)

Current state: when we create AdminExecutionTask , we did not check if there is AdminExecutionTask for one store on the fly, so if one AdminExecutionTask is waiting for a lock (e.g. updating store operation waiting for the store level write lock and that lock is currently unavailable), it is possible that AdminConsumptionTask will create multiple AdminExecutionTask s for a single store until they occupy all threads from ExecutorService 's thread pool, they are all waiting for the same store-level lock.

Besides, executorService.invokeAll(tasks, processingCycleTimeoutInMs, TimeUnit.MILLISECONDS)
will cancel every invoked tasks, even the task is emitted by not getting thread from pool to execute. ExecutorService will try to cancel the AdminExecutionTask waiting for a lock after timeout, but that will not terminate that thread (as the acquiring locking operation by AutoCloseableLock is not interruptible). Then that AdminExecutionTask will keep occupying one thread from ExecutorService 's thread pool until the lock is released.

This PR has the following changes to address the issue:

Adding check to see if there is AdminExecutionTask is running for a store.
Integration test to simulate the lock blocking one thread from pool and recover after acquiring the lock.


Co-authored-by: Hao Xu <xhao@xhao-mn3.linkedin.biz>
  • Loading branch information
haoxu07 and Hao Xu committed May 8, 2024
1 parent d7997d7 commit 4b5e4ff
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 39 deletions.
@@ -1,10 +1,16 @@
package com.linkedin.venice.controller.kafka.consumer;

import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;

import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteStore;
import com.linkedin.venice.controller.kafka.protocol.admin.DisableStoreRead;
import com.linkedin.venice.controller.kafka.protocol.admin.PauseStore;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
Expand All @@ -25,9 +31,12 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
Expand All @@ -48,6 +57,7 @@ public class AdminConsumptionTaskIntegrationTest {
private static final String keySchema = "\"string\"";
private static final String valueSchema = "\"string\"";

private Properties extraProperties = new Properties();
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

/**
Expand Down Expand Up @@ -101,6 +111,128 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce
}
}

@Test(timeOut = TIMEOUT)
public void testParallelAdminExecutionTasks() throws IOException, InterruptedException {
try (ZkServerWrapper zkServer = ServiceFactory.getZkServer();
PubSubBrokerWrapper pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer).setRegionName(STANDALONE_REGION_NAME).build());
TopicManager topicManager =
IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
pubSubBrokerWrapper,
pubSubTopicRepository)
.getLocalTopicManager()) {
PubSubTopic adminTopic = pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(clusterName));
topicManager.createTopic(adminTopic, 1, 1, true);
String storeName = "test-store";
int adminConsumptionMaxWorkerPoolSize = 3;
extraProperties.put(ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE, adminConsumptionMaxWorkerPoolSize);
extraProperties.put(ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS, 3000);
try (
VeniceControllerWrapper controller = ServiceFactory.getVeniceController(
new VeniceControllerCreateOptions.Builder(clusterName, zkServer, pubSubBrokerWrapper)
.regionName(STANDALONE_REGION_NAME)
.extraProperties(extraProperties)
.build());
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriter<byte[], byte[], byte[]> writer =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory)
.createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build())) {
int executionId = 1;
byte[] goodMessage =
getStoreCreationMessage(clusterName, storeName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], goodMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});

// Spin up a thread to occupy the store write lock to simulate the blocking admin execution task thread.
CountDownLatch lockOccupyThreadStartedSignal = new CountDownLatch(1);
Runnable infiniteLockOccupy = getRunnable(controller, storeName, lockOccupyThreadStartedSignal);
Thread infiniteLockThread = new Thread(infiniteLockOccupy, "infiniteLockOccupy: " + storeName);
infiniteLockThread.start();
Assert.assertTrue(lockOccupyThreadStartedSignal.await(5, TimeUnit.SECONDS));

// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store, as the waiting time of 5 seconds > ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS setting.
for (int i = 0; i < adminConsumptionMaxWorkerPoolSize; i++) {
Utils.sleep(5000);
executionId++;
byte[] valueSchemaMessage = getDisableWrite(clusterName, storeName, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
}

// Store deletion need to disable read.
Utils.sleep(5000);
executionId++;
byte[] valueSchemaMessage = getDisableRead(clusterName, storeName, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

// Create a new store to see if it is blocked by previous messages.
String otherStoreName = "other-test-store";
executionId++;
byte[] otherStoreMessage =
getStoreCreationMessage(clusterName, otherStoreName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], otherStoreMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, otherStoreName));
});

infiniteLockThread.interrupt(); // This will release the lock
// Check this store is unblocked or not.
executionId++;
byte[] storeDeletionMessage = getStoreDeletionMessage(clusterName, storeName, executionId);
writer.put(new byte[0], storeDeletionMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertFalse(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});
}
}
}

private Runnable getRunnable(VeniceControllerWrapper controller, String storeName, CountDownLatch latch) {
VeniceHelixAdmin admin = controller.getVeniceHelixAdmin();
return () -> {
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createStoreWriteLock(storeName)) {
latch.countDown();
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}

private byte[] getDisableRead(String clusterName, String storeName, long executionId) {
DisableStoreRead disableStoreRead = (DisableStoreRead) AdminMessageType.DISABLE_STORE_READ.getNewInstance();
disableStoreRead.clusterName = clusterName;
disableStoreRead.storeName = storeName;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DISABLE_STORE_READ.getValue();
adminMessage.payloadUnion = disableStoreRead;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getDisableWrite(String clusterName, String storeName, long executionId) {
PauseStore pauseStore = (PauseStore) AdminMessageType.DISABLE_STORE_WRITE.getNewInstance();
pauseStore.clusterName = clusterName;
pauseStore.storeName = storeName;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DISABLE_STORE_WRITE.getValue();
adminMessage.payloadUnion = pauseStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreCreationMessage(
String clusterName,
String storeName,
Expand All @@ -124,4 +256,17 @@ private byte[] getStoreCreationMessage(
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreDeletionMessage(String clusterName, String storeName, long executionId) {
DeleteStore deleteStore = (DeleteStore) AdminMessageType.DELETE_STORE.getNewInstance();
deleteStore.clusterName = clusterName;
deleteStore.storeName = storeName;
// Tell each prod colo the largest used version number in corp to make it consistent.
deleteStore.largestUsedVersionNumber = 0;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DELETE_STORE.getValue();
adminMessage.payloadUnion = deleteStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}
}
Expand Up @@ -158,6 +158,9 @@ public String toString() {
* operation is also attached as the first element of the {@link Pair}.
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final ConcurrentHashMap<String, AdminExecutionTask> storeToScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
* that has the details about the exception and the offset of the problematic admin message.
Expand Down Expand Up @@ -283,6 +286,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeToScheduledTask = new ConcurrentHashMap<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand Down Expand Up @@ -457,6 +461,7 @@ private void unSubscribe() {
storeAdminOperationsMapWithOffset.clear();
problematicStores.clear();
undelegatedRecords.clear();
storeToScheduledTask.clear();
failingOffset = UNASSIGNED_VALUE;
offsetToSkip = UNASSIGNED_VALUE;
offsetToSkipDIV = UNASSIGNED_VALUE;
Expand Down Expand Up @@ -497,20 +502,24 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
}
tasks.add(
new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName));
stores.add(entry.getKey());
AdminExecutionTask newTask = new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName,
storeToScheduledTask);
// Check if there is previously created scheduled task still occupying one thread from the pool.
if (storeToScheduledTask.putIfAbsent(entry.getKey(), newTask) == null) {
tasks.add(newTask);
stores.add(entry.getKey());
}
}
}
if (skipOffsetCommandHasBeenProcessed) {
Expand Down
Expand Up @@ -84,6 +84,8 @@ public class AdminExecutionTask implements Callable<Void> {
private final ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
private final long lastPersistedExecutionId;

private final Map<String, AdminExecutionTask> storeToScheduledTask;

AdminExecutionTask(
Logger LOGGER,
String clusterName,
Expand All @@ -95,7 +97,8 @@ public class AdminExecutionTask implements Callable<Void> {
ExecutionIdAccessor executionIdAccessor,
boolean isParentController,
AdminConsumptionStats stats,
String regionName) {
String regionName,
Map<String, AdminExecutionTask> storeToScheduledTask) {
this.LOGGER = LOGGER;
this.clusterName = clusterName;
this.storeName = storeName;
Expand All @@ -107,18 +110,19 @@ public class AdminExecutionTask implements Callable<Void> {
this.isParentController = isParentController;
this.stats = stats;
this.regionName = regionName;
this.storeToScheduledTask = storeToScheduledTask;
}

@Override
public Void call() {
while (!internalTopic.isEmpty()) {
if (!admin.isLeaderControllerFor(clusterName)) {
throw new VeniceRetriableException(
"This controller is no longer the leader of: " + clusterName
+ ". The consumption task should unsubscribe soon");
}
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
try {
try {
while (!internalTopic.isEmpty()) {
if (!admin.isLeaderControllerFor(clusterName)) {
throw new VeniceRetriableException(
"This controller is no longer the leader of: " + clusterName
+ ". The consumption task should unsubscribe soon");
}
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
if (adminOperationWrapper.getStartProcessingTimestamp() == null) {
adminOperationWrapper.setStartProcessingTimestamp(System.currentTimeMillis());
stats.recordAdminMessageStartProcessingLatency(
Expand All @@ -138,23 +142,25 @@ public Void call() {
stats.recordAdminMessageTotalLatency(
Math.max(0, completionTimestamp - adminOperationWrapper.getProducerTimestamp()));
internalTopic.remove();
} catch (Exception e) {
// Retry of the admin operation is handled automatically by keeping the failed admin operation inside the queue.
// The queue with the problematic operation will be delegated and retried by the worker thread in the next
// cycle.
String logMessage =
"when processing admin message for store " + storeName + " with offset " + adminOperationWrapper.getOffset()
+ " and execution id " + adminOperationWrapper.getAdminOperation().executionId;
if (e instanceof VeniceRetriableException) {
// These retriable exceptions are expected, therefore logging at the info level should be sufficient.
stats.recordFailedRetriableAdminConsumption();
LOGGER.info("Retriable exception thrown {}", logMessage, e);
} else {
stats.recordFailedAdminConsumption();
LOGGER.error("Error {}", logMessage, e);
}
throw e;
}
} catch (Exception e) {
// Retry of the admin operation is handled automatically by keeping the failed admin operation inside the queue.
// The queue with the problematic operation will be delegated and retried by the worker thread in the next cycle.
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
String logMessage =
"when processing admin message for store " + storeName + " with offset " + adminOperationWrapper.getOffset()
+ " and execution id " + adminOperationWrapper.getAdminOperation().executionId;
if (e instanceof VeniceRetriableException) {
// These retriable exceptions are expected, therefore logging at the info level should be sufficient.
stats.recordFailedRetriableAdminConsumption();
LOGGER.info("Retriable exception thrown {}", logMessage, e);
} else {
stats.recordFailedAdminConsumption();
LOGGER.error("Error {}", logMessage, e);
}
throw e;
} finally {
storeToScheduledTask.remove(storeName);
}
return null;
}
Expand Down

0 comments on commit 4b5e4ff

Please sign in to comment.