Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Fix multiple AdminExecutionTasks working on the same store at the same time. #918

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.linkedin.venice.controller.kafka.consumer;

import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;

import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
Expand All @@ -25,9 +29,12 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
Expand All @@ -48,6 +55,7 @@ public class AdminConsumptionTaskIntegrationTest {
private static final String keySchema = "\"string\"";
private static final String valueSchema = "\"string\"";

private Properties extraProperties = new Properties();
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

/**
Expand Down Expand Up @@ -101,6 +109,103 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce
}
}

@Test(timeOut = TIMEOUT)
public void testParallelAdminExecutionTasks() throws IOException {
try (ZkServerWrapper zkServer = ServiceFactory.getZkServer();
PubSubBrokerWrapper pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer).setRegionName(STANDALONE_REGION_NAME).build());
TopicManager topicManager =
IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
pubSubBrokerWrapper,
pubSubTopicRepository)
.getLocalTopicManager()) {
PubSubTopic adminTopic = pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(clusterName));
topicManager.createTopic(adminTopic, 1, 1, true);
String storeName = "test-store";
int adminConsumptionMaxWorkerPoolSize = 3;
extraProperties.put(ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE, adminConsumptionMaxWorkerPoolSize);
extraProperties.put(ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS, 3000);
try (
VeniceControllerWrapper controller = ServiceFactory.getVeniceController(
new VeniceControllerCreateOptions.Builder(clusterName, zkServer, pubSubBrokerWrapper)
.regionName(STANDALONE_REGION_NAME)
.extraProperties(extraProperties)
.build());
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriter<byte[], byte[], byte[]> writer =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory)
.createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build())) {
int executionId = 1;
byte[] goodMessage =
getStoreCreationMessage(clusterName, storeName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], goodMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

// Spin up a thread to occupy the store write lock to simulate the blocking admin execution task thread.
Runnable infiniteLockOccupy = getRunnable(controller, storeName);
Thread infiniteLockThread = new Thread(infiniteLockOccupy, "infiniteLockOccupy: " + storeName);
infiniteLockThread.start();
lluwm marked this conversation as resolved.
Show resolved Hide resolved

for (int i = 0; i < adminConsumptionMaxWorkerPoolSize; i++) {
// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store.
Utils.sleep(5000);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
executionId++;
byte[] valueSchemaMessage = getUpdateStoreMessage(clusterName, storeName, i + 1, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
}

// Create a new store to see if it is blocked by previous messages..
String otherStoreName = "other-test-store";
executionId++;
byte[] otherStoreMessage =
getStoreCreationMessage(clusterName, otherStoreName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], otherStoreMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, otherStoreName));
});
infiniteLockThread.interrupt(); // This will release the lock
}
}
}

private Runnable getRunnable(VeniceControllerWrapper controller, String storeName) {
VeniceHelixAdmin admin = controller.getVeniceHelixAdmin();
return () -> {
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createStoreWriteLock(storeName)) {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}

private byte[] getUpdateStoreMessage(String clusterName, String storeName, int partitionNum, long executionId) {
UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance();
updateStore.clusterName = clusterName;
updateStore.storeName = storeName;
updateStore.partitionNum = partitionNum;
updateStore.owner = owner;
updateStore.currentVersion = 1;
updateStore.enableReads = true;
updateStore.enableWrites = true;
updateStore.replicateAllConfigs = true;
updateStore.updatedConfigsList = Collections.emptyList();
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.UPDATE_STORE.getValue();
adminMessage.payloadUnion = updateStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreCreationMessage(
String clusterName,
String storeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -53,6 +55,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -158,6 +161,9 @@ public String toString() {
* operation is also attached as the first element of the {@link Pair}.
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final Set<String> storeHasScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
* that has the details about the exception and the offset of the problematic admin message.
Expand Down Expand Up @@ -283,6 +289,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeHasScheduledTask = new HashSet<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand Down Expand Up @@ -457,6 +464,7 @@ private void unSubscribe() {
storeAdminOperationsMapWithOffset.clear();
problematicStores.clear();
undelegatedRecords.clear();
storeHasScheduledTask.clear();
failingOffset = UNASSIGNED_VALUE;
offsetToSkip = UNASSIGNED_VALUE;
offsetToSkipDIV = UNASSIGNED_VALUE;
Expand Down Expand Up @@ -492,7 +500,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
// Create a task for each store that has admin messages pending to be processed.
boolean skipOffsetCommandHasBeenProcessed = false;
for (Map.Entry<String, Queue<AdminOperationWrapper>> entry: storeAdminOperationsMapWithOffset.entrySet()) {
if (!entry.getValue().isEmpty()) {
if (!entry.getValue().isEmpty() && !storeHasScheduledTask.contains(entry.getKey())) {
if (checkOffsetToSkip(entry.getValue().peek().getOffset(), false)) {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
Expand All @@ -510,6 +518,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
isParentController,
stats,
regionName));
storeHasScheduledTask.add(entry.getKey());
stores.add(entry.getKey());
}
}
Expand All @@ -522,9 +531,11 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
int pendingAdminMessagesCount = 0;
int storesWithPendingAdminMessagesCount = 0;
long adminExecutionTasksInvokeTime = System.currentTimeMillis();
// Wait for the worker threads to finish processing the internal admin topics.
List<Future<Void>> results =
executorService.invokeAll(tasks, processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
// Wait for the worker threads to finish processing the internal admin topics. Here
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < tasks.size(); i++) {
results.add(executorService.submit(tasks.get(i)));
}
stats.recordAdminConsumptionCycleDurationMs(System.currentTimeMillis() - adminExecutionTasksInvokeTime);
Map<String, Long> newLastSucceededExecutionIdMap =
executionIdAccessor.getLastSucceededExecutionIdMap(clusterName);
Expand All @@ -533,19 +544,19 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
String storeName = stores.get(i);
Future<Void> result = results.get(i);
try {
result.get();
result.get(processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
problematicStores.remove(storeName);
if (internalQueuesEmptied && storeAdminOperationsMapWithOffset.containsKey(storeName)
&& !storeAdminOperationsMapWithOffset.get(storeName).isEmpty()) {
internalQueuesEmptied = false;
}
} catch (ExecutionException | CancellationException e) {
} catch (ExecutionException | CancellationException | TimeoutException e) {
internalQueuesEmptied = false;
AdminErrorInfo errorInfo = new AdminErrorInfo();
int perStorePendingMessagesCount = storeAdminOperationsMapWithOffset.get(storeName).size();
pendingAdminMessagesCount += perStorePendingMessagesCount;
storesWithPendingAdminMessagesCount += perStorePendingMessagesCount > 0 ? 1 : 0;
if (e instanceof CancellationException) {
if (e instanceof CancellationException || e instanceof TimeoutException) {
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, -1L);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, -1L);

Expand All @@ -566,6 +577,12 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
problematicStores.put(storeName, errorInfo);
}
} finally {
// Here we only remove task is done. For some timeout task waiting for the lock (non-interruptable), we will
// not create new task for it.
if (result.isDone()) {
storeHasScheduledTask.remove(storeName);
}
}
}
if (problematicStores.isEmpty() && internalQueuesEmptied) {
Expand Down