Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Fix multiple AdminExecutionTasks working on the same store at the same time. #918

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package com.linkedin.venice.controller.kafka.consumer;

import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;

import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteStore;
import com.linkedin.venice.controller.kafka.protocol.admin.DisableStoreRead;
import com.linkedin.venice.controller.kafka.protocol.admin.PauseStore;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
Expand All @@ -25,9 +31,12 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
Expand All @@ -48,6 +57,7 @@ public class AdminConsumptionTaskIntegrationTest {
private static final String keySchema = "\"string\"";
private static final String valueSchema = "\"string\"";

private Properties extraProperties = new Properties();
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

/**
Expand Down Expand Up @@ -101,6 +111,128 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce
}
}

@Test(timeOut = TIMEOUT)
public void testParallelAdminExecutionTasks() throws IOException, InterruptedException {
try (ZkServerWrapper zkServer = ServiceFactory.getZkServer();
PubSubBrokerWrapper pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer).setRegionName(STANDALONE_REGION_NAME).build());
TopicManager topicManager =
IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
pubSubBrokerWrapper,
pubSubTopicRepository)
.getLocalTopicManager()) {
PubSubTopic adminTopic = pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(clusterName));
topicManager.createTopic(adminTopic, 1, 1, true);
String storeName = "test-store";
int adminConsumptionMaxWorkerPoolSize = 3;
extraProperties.put(ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE, adminConsumptionMaxWorkerPoolSize);
extraProperties.put(ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS, 3000);
try (
VeniceControllerWrapper controller = ServiceFactory.getVeniceController(
new VeniceControllerCreateOptions.Builder(clusterName, zkServer, pubSubBrokerWrapper)
.regionName(STANDALONE_REGION_NAME)
.extraProperties(extraProperties)
.build());
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriter<byte[], byte[], byte[]> writer =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory)
.createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build())) {
int executionId = 1;
byte[] goodMessage =
getStoreCreationMessage(clusterName, storeName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], goodMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});

// Spin up a thread to occupy the store write lock to simulate the blocking admin execution task thread.
CountDownLatch lockOccupyThreadStartedSignal = new CountDownLatch(1);
Runnable infiniteLockOccupy = getRunnable(controller, storeName, lockOccupyThreadStartedSignal);
Thread infiniteLockThread = new Thread(infiniteLockOccupy, "infiniteLockOccupy: " + storeName);
infiniteLockThread.start();
lluwm marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertTrue(lockOccupyThreadStartedSignal.await(5, TimeUnit.SECONDS));

// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store, as the waiting time of 5 seconds > ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS setting.
for (int i = 0; i < adminConsumptionMaxWorkerPoolSize; i++) {
Utils.sleep(5000);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
executionId++;
byte[] valueSchemaMessage = getDisableWrite(clusterName, storeName, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
}

// Store deletion need to disable read.
Utils.sleep(5000);
executionId++;
byte[] valueSchemaMessage = getDisableRead(clusterName, storeName, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

// Create a new store to see if it is blocked by previous messages.
String otherStoreName = "other-test-store";
executionId++;
byte[] otherStoreMessage =
getStoreCreationMessage(clusterName, otherStoreName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], otherStoreMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, otherStoreName));
});

infiniteLockThread.interrupt(); // This will release the lock
// Check this store is unblocked or not.
executionId++;
byte[] storeDeletionMessage = getStoreDeletionMessage(clusterName, storeName, executionId);
writer.put(new byte[0], storeDeletionMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertFalse(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});
}
}
}

private Runnable getRunnable(VeniceControllerWrapper controller, String storeName, CountDownLatch latch) {
VeniceHelixAdmin admin = controller.getVeniceHelixAdmin();
return () -> {
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createStoreWriteLock(storeName)) {
latch.countDown();
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}

private byte[] getDisableRead(String clusterName, String storeName, long executionId) {
DisableStoreRead disableStoreRead = (DisableStoreRead) AdminMessageType.DISABLE_STORE_READ.getNewInstance();
disableStoreRead.clusterName = clusterName;
disableStoreRead.storeName = storeName;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DISABLE_STORE_READ.getValue();
adminMessage.payloadUnion = disableStoreRead;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getDisableWrite(String clusterName, String storeName, long executionId) {
PauseStore pauseStore = (PauseStore) AdminMessageType.DISABLE_STORE_WRITE.getNewInstance();
pauseStore.clusterName = clusterName;
pauseStore.storeName = storeName;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DISABLE_STORE_WRITE.getValue();
adminMessage.payloadUnion = pauseStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreCreationMessage(
String clusterName,
String storeName,
Expand All @@ -124,4 +256,17 @@ private byte[] getStoreCreationMessage(
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreDeletionMessage(String clusterName, String storeName, long executionId) {
DeleteStore deleteStore = (DeleteStore) AdminMessageType.DELETE_STORE.getNewInstance();
deleteStore.clusterName = clusterName;
deleteStore.storeName = storeName;
// Tell each prod colo the largest used version number in corp to make it consistent.
deleteStore.largestUsedVersionNumber = 0;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DELETE_STORE.getValue();
adminMessage.payloadUnion = deleteStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ public String toString() {
* operation is also attached as the first element of the {@link Pair}.
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final ConcurrentHashMap<String, AdminExecutionTask> storeToScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
* that has the details about the exception and the offset of the problematic admin message.
Expand Down Expand Up @@ -283,6 +286,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeToScheduledTask = new ConcurrentHashMap<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand Down Expand Up @@ -457,6 +461,7 @@ private void unSubscribe() {
storeAdminOperationsMapWithOffset.clear();
problematicStores.clear();
undelegatedRecords.clear();
storeToScheduledTask.clear();
failingOffset = UNASSIGNED_VALUE;
offsetToSkip = UNASSIGNED_VALUE;
offsetToSkipDIV = UNASSIGNED_VALUE;
Expand Down Expand Up @@ -497,20 +502,24 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
}
tasks.add(
new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName));
stores.add(entry.getKey());
AdminExecutionTask newTask = new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName,
storeToScheduledTask);
// Check if there is previously created scheduled task still occupying one thread from the pool.
if (storeToScheduledTask.putIfAbsent(entry.getKey(), newTask) == null) {
tasks.add(newTask);
stores.add(entry.getKey());
}
}
}
if (skipOffsetCommandHasBeenProcessed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class AdminExecutionTask implements Callable<Void> {
private final ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
private final long lastPersistedExecutionId;

private final Map<String, AdminExecutionTask> storeToScheduledTask;

AdminExecutionTask(
Logger LOGGER,
String clusterName,
Expand All @@ -95,7 +97,8 @@ public class AdminExecutionTask implements Callable<Void> {
ExecutionIdAccessor executionIdAccessor,
boolean isParentController,
AdminConsumptionStats stats,
String regionName) {
String regionName,
Map<String, AdminExecutionTask> storeToScheduledTask) {
this.LOGGER = LOGGER;
this.clusterName = clusterName;
this.storeName = storeName;
Expand All @@ -107,18 +110,19 @@ public class AdminExecutionTask implements Callable<Void> {
this.isParentController = isParentController;
this.stats = stats;
this.regionName = regionName;
this.storeToScheduledTask = storeToScheduledTask;
}

@Override
public Void call() {
while (!internalTopic.isEmpty()) {
if (!admin.isLeaderControllerFor(clusterName)) {
throw new VeniceRetriableException(
"This controller is no longer the leader of: " + clusterName
+ ". The consumption task should unsubscribe soon");
}
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
try {
try {
while (!internalTopic.isEmpty()) {
if (!admin.isLeaderControllerFor(clusterName)) {
throw new VeniceRetriableException(
"This controller is no longer the leader of: " + clusterName
+ ". The consumption task should unsubscribe soon");
}
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
if (adminOperationWrapper.getStartProcessingTimestamp() == null) {
adminOperationWrapper.setStartProcessingTimestamp(System.currentTimeMillis());
stats.recordAdminMessageStartProcessingLatency(
Expand All @@ -138,23 +142,25 @@ public Void call() {
stats.recordAdminMessageTotalLatency(
Math.max(0, completionTimestamp - adminOperationWrapper.getProducerTimestamp()));
internalTopic.remove();
} catch (Exception e) {
// Retry of the admin operation is handled automatically by keeping the failed admin operation inside the queue.
// The queue with the problematic operation will be delegated and retried by the worker thread in the next
// cycle.
String logMessage =
"when processing admin message for store " + storeName + " with offset " + adminOperationWrapper.getOffset()
+ " and execution id " + adminOperationWrapper.getAdminOperation().executionId;
if (e instanceof VeniceRetriableException) {
// These retriable exceptions are expected, therefore logging at the info level should be sufficient.
stats.recordFailedRetriableAdminConsumption();
LOGGER.info("Retriable exception thrown {}", logMessage, e);
} else {
stats.recordFailedAdminConsumption();
LOGGER.error("Error {}", logMessage, e);
}
throw e;
}
} catch (Exception e) {
// Retry of the admin operation is handled automatically by keeping the failed admin operation inside the queue.
// The queue with the problematic operation will be delegated and retried by the worker thread in the next cycle.
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
String logMessage =
"when processing admin message for store " + storeName + " with offset " + adminOperationWrapper.getOffset()
+ " and execution id " + adminOperationWrapper.getAdminOperation().executionId;
if (e instanceof VeniceRetriableException) {
// These retriable exceptions are expected, therefore logging at the info level should be sufficient.
stats.recordFailedRetriableAdminConsumption();
LOGGER.info("Retriable exception thrown {}", logMessage, e);
} else {
stats.recordFailedAdminConsumption();
LOGGER.error("Error {}", logMessage, e);
}
throw e;
} finally {
storeToScheduledTask.remove(storeName);
}
return null;
}
Expand Down