Skip to content

Commit

Permalink
[controller] Fix the admin stuck.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Mar 29, 2024
1 parent 945ed58 commit a853b62
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.linkedin.venice.controller.kafka.consumer;

import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;

import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
Expand All @@ -25,9 +29,12 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
Expand All @@ -48,6 +55,7 @@ public class AdminConsumptionTaskIntegrationTest {
private static final String keySchema = "\"string\"";
private static final String valueSchema = "\"string\"";

private Properties extraProperties = new Properties();
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

/**
Expand Down Expand Up @@ -101,6 +109,103 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce
}
}

@Test(timeOut = TIMEOUT)
public void testParallelAdminExecutionTasks() throws IOException {
try (ZkServerWrapper zkServer = ServiceFactory.getZkServer();
PubSubBrokerWrapper pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer).setRegionName(STANDALONE_REGION_NAME).build());
TopicManager topicManager =
IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
pubSubBrokerWrapper,
pubSubTopicRepository)
.getLocalTopicManager()) {
PubSubTopic adminTopic = pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(clusterName));
topicManager.createTopic(adminTopic, 1, 1, true);
String storeName = "test-store";
int adminConsumptionMaxWorkerPoolSize = 3;
extraProperties.put(ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE, adminConsumptionMaxWorkerPoolSize);
extraProperties.put(ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS, 3000);
try (
VeniceControllerWrapper controller = ServiceFactory.getVeniceController(
new VeniceControllerCreateOptions.Builder(clusterName, zkServer, pubSubBrokerWrapper)
.regionName(STANDALONE_REGION_NAME)
.extraProperties(extraProperties)
.build());
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriter<byte[], byte[], byte[]> writer =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory)
.createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build())) {
int executionId = 1;
byte[] goodMessage =
getStoreCreationMessage(clusterName, storeName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], goodMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

// Spin up a thread to occupy the store write lock to simulate the blocking admin execution task thread.
Runnable infiniteLockOccupy = getRunnable(controller, storeName);
Thread infiniteLockThread = new Thread(infiniteLockOccupy, "infiniteLockOccupy: " + storeName);
infiniteLockThread.start();

for (int i = 0; i < adminConsumptionMaxWorkerPoolSize; i++) {
// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store.
Utils.sleep(5000);
executionId++;
byte[] valueSchemaMessage = getUpdateStoreMessage(clusterName, storeName, i + 1, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
}

// Create a new store to see if it is blocked by previous messages..
String otherStoreName = "other-test-store";
executionId++;
byte[] otherStoreMessage =
getStoreCreationMessage(clusterName, otherStoreName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], otherStoreMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, otherStoreName));
});
infiniteLockThread.interrupt(); // This will release the lock
}
}
}

private Runnable getRunnable(VeniceControllerWrapper controller, String storeName) {
VeniceHelixAdmin admin = controller.getVeniceHelixAdmin();
return () -> {
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createStoreWriteLock(storeName)) {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}

private byte[] getUpdateStoreMessage(String clusterName, String storeName, int partitionNum, long executionId) {
UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance();
updateStore.clusterName = clusterName;
updateStore.storeName = storeName;
updateStore.partitionNum = partitionNum;
updateStore.owner = owner;
updateStore.currentVersion = 1;
updateStore.enableReads = true;
updateStore.enableWrites = true;
updateStore.replicateAllConfigs = true;
updateStore.updatedConfigsList = Collections.emptyList();
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.UPDATE_STORE.getValue();
adminMessage.payloadUnion = updateStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreCreationMessage(
String clusterName,
String storeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -53,6 +55,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -158,6 +161,9 @@ public String toString() {
* operation is also attached as the first element of the {@link Pair}.
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final Set<String> storeHasScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
* that has the details about the exception and the offset of the problematic admin message.
Expand Down Expand Up @@ -283,6 +289,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeHasScheduledTask = new HashSet<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand Down Expand Up @@ -457,6 +464,7 @@ private void unSubscribe() {
storeAdminOperationsMapWithOffset.clear();
problematicStores.clear();
undelegatedRecords.clear();
storeHasScheduledTask.clear();
failingOffset = UNASSIGNED_VALUE;
offsetToSkip = UNASSIGNED_VALUE;
offsetToSkipDIV = UNASSIGNED_VALUE;
Expand Down Expand Up @@ -492,7 +500,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
// Create a task for each store that has admin messages pending to be processed.
boolean skipOffsetCommandHasBeenProcessed = false;
for (Map.Entry<String, Queue<AdminOperationWrapper>> entry: storeAdminOperationsMapWithOffset.entrySet()) {
if (!entry.getValue().isEmpty()) {
if (!entry.getValue().isEmpty() && !storeHasScheduledTask.contains(entry.getKey())) {
if (checkOffsetToSkip(entry.getValue().peek().getOffset(), false)) {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
Expand All @@ -510,6 +518,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
isParentController,
stats,
regionName));
storeHasScheduledTask.add(entry.getKey());
stores.add(entry.getKey());
}
}
Expand All @@ -522,9 +531,11 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
int pendingAdminMessagesCount = 0;
int storesWithPendingAdminMessagesCount = 0;
long adminExecutionTasksInvokeTime = System.currentTimeMillis();
// Wait for the worker threads to finish processing the internal admin topics.
List<Future<Void>> results =
executorService.invokeAll(tasks, processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
// Wait for the worker threads to finish processing the internal admin topics. Here
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < tasks.size(); i++) {
results.add(executorService.submit(tasks.get(i)));
}
stats.recordAdminConsumptionCycleDurationMs(System.currentTimeMillis() - adminExecutionTasksInvokeTime);
Map<String, Long> newLastSucceededExecutionIdMap =
executionIdAccessor.getLastSucceededExecutionIdMap(clusterName);
Expand All @@ -533,19 +544,19 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
String storeName = stores.get(i);
Future<Void> result = results.get(i);
try {
result.get();
result.get(processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
problematicStores.remove(storeName);
if (internalQueuesEmptied && storeAdminOperationsMapWithOffset.containsKey(storeName)
&& !storeAdminOperationsMapWithOffset.get(storeName).isEmpty()) {
internalQueuesEmptied = false;
}
} catch (ExecutionException | CancellationException e) {
} catch (ExecutionException | CancellationException | TimeoutException e) {
internalQueuesEmptied = false;
AdminErrorInfo errorInfo = new AdminErrorInfo();
int perStorePendingMessagesCount = storeAdminOperationsMapWithOffset.get(storeName).size();
pendingAdminMessagesCount += perStorePendingMessagesCount;
storesWithPendingAdminMessagesCount += perStorePendingMessagesCount > 0 ? 1 : 0;
if (e instanceof CancellationException) {
if (e instanceof CancellationException || e instanceof TimeoutException) {
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, -1L);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, -1L);

Expand All @@ -566,6 +577,12 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
problematicStores.put(storeName, errorInfo);
}
} finally {
// Here we only remove task is done. For some timeout task waiting for the lock (non-interruptable), we will
// not create new task for it.
if (result.isDone()) {
storeHasScheduledTask.remove(storeName);
}
}
}
if (problematicStores.isEmpty() && internalQueuesEmptied) {
Expand Down

0 comments on commit a853b62

Please sign in to comment.