Skip to content

Commit

Permalink
Add to see the thread recover.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed May 2, 2024
1 parent a853b62 commit 6b2e245
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteStore;
import com.linkedin.venice.controller.kafka.protocol.admin.DisableStoreRead;
import com.linkedin.venice.controller.kafka.protocol.admin.PauseStore;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
Expand All @@ -33,7 +35,6 @@
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -150,16 +151,21 @@ public void testParallelAdminExecutionTasks() throws IOException {
Thread infiniteLockThread = new Thread(infiniteLockOccupy, "infiniteLockOccupy: " + storeName);
infiniteLockThread.start();

// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store, as the waiting time of 5 seconds > ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS setting.
for (int i = 0; i < adminConsumptionMaxWorkerPoolSize; i++) {
// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store.
Utils.sleep(5000);
executionId++;
byte[] valueSchemaMessage = getUpdateStoreMessage(clusterName, storeName, i + 1, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
getDisableWrite(clusterName, storeName, executionId);
}

// Create a new store to see if it is blocked by previous messages..
// Store deletion need to disable read.
Utils.sleep(5000);
executionId++;
byte[] valueSchemaMessage = getDisableRead(clusterName, storeName, executionId);
writer.put(new byte[0], valueSchemaMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

// Create a new store to see if it is blocked by previous messages.
String otherStoreName = "other-test-store";
executionId++;
byte[] otherStoreMessage =
Expand All @@ -169,7 +175,15 @@ public void testParallelAdminExecutionTasks() throws IOException {
TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, otherStoreName));
});

infiniteLockThread.interrupt(); // This will release the lock
// Check this store is unblocked or not.
executionId++;
byte[] storeDeletionMessage = getStoreDeletionMessage(clusterName, storeName, executionId);
writer.put(new byte[0], storeDeletionMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
Assert.assertFalse(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});
}
}
}
Expand All @@ -188,20 +202,24 @@ private Runnable getRunnable(VeniceControllerWrapper controller, String storeNam
};
}

private byte[] getUpdateStoreMessage(String clusterName, String storeName, int partitionNum, long executionId) {
UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance();
updateStore.clusterName = clusterName;
updateStore.storeName = storeName;
updateStore.partitionNum = partitionNum;
updateStore.owner = owner;
updateStore.currentVersion = 1;
updateStore.enableReads = true;
updateStore.enableWrites = true;
updateStore.replicateAllConfigs = true;
updateStore.updatedConfigsList = Collections.emptyList();
private byte[] getDisableRead(String clusterName, String storeName, long executionId) {
DisableStoreRead disableStoreRead = (DisableStoreRead) AdminMessageType.DISABLE_STORE_READ.getNewInstance();
disableStoreRead.clusterName = clusterName;
disableStoreRead.storeName = storeName;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DISABLE_STORE_READ.getValue();
adminMessage.payloadUnion = disableStoreRead;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getDisableWrite(String clusterName, String storeName, long executionId) {
PauseStore pauseStore = (PauseStore) AdminMessageType.DISABLE_STORE_WRITE.getNewInstance();
pauseStore.clusterName = clusterName;
pauseStore.storeName = storeName;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.UPDATE_STORE.getValue();
adminMessage.payloadUnion = updateStore;
adminMessage.operationType = AdminMessageType.DISABLE_STORE_WRITE.getValue();
adminMessage.payloadUnion = pauseStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}
Expand Down Expand Up @@ -229,4 +247,17 @@ private byte[] getStoreCreationMessage(
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}

private byte[] getStoreDeletionMessage(String clusterName, String storeName, long executionId) {
DeleteStore deleteStore = (DeleteStore) AdminMessageType.DELETE_STORE.getNewInstance();
deleteStore.clusterName = clusterName;
deleteStore.storeName = storeName;
// Tell each prod colo the largest used version number in corp to make it consistent.
deleteStore.largestUsedVersionNumber = 0;
AdminOperation adminMessage = new AdminOperation();
adminMessage.operationType = AdminMessageType.DELETE_STORE.getValue();
adminMessage.payloadUnion = deleteStore;
adminMessage.executionId = executionId;
return adminOperationSerializer.serialize(adminMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -55,7 +54,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -289,7 +287,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeHasScheduledTask = new HashSet<>();
this.storeHasScheduledTask = ConcurrentHashMap.newKeySet();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand Down Expand Up @@ -505,6 +503,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
}
storeHasScheduledTask.add(entry.getKey());
tasks.add(
new AdminExecutionTask(
LOGGER,
Expand All @@ -517,8 +516,8 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
executionIdAccessor,
isParentController,
stats,
regionName));
storeHasScheduledTask.add(entry.getKey());
regionName,
storeHasScheduledTask));
stores.add(entry.getKey());
}
}
Expand All @@ -531,11 +530,9 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
int pendingAdminMessagesCount = 0;
int storesWithPendingAdminMessagesCount = 0;
long adminExecutionTasksInvokeTime = System.currentTimeMillis();
// Wait for the worker threads to finish processing the internal admin topics. Here
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < tasks.size(); i++) {
results.add(executorService.submit(tasks.get(i)));
}
// Wait for the worker threads to finish processing the internal admin topics.
List<Future<Void>> results =
executorService.invokeAll(tasks, processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
stats.recordAdminConsumptionCycleDurationMs(System.currentTimeMillis() - adminExecutionTasksInvokeTime);
Map<String, Long> newLastSucceededExecutionIdMap =
executionIdAccessor.getLastSucceededExecutionIdMap(clusterName);
Expand All @@ -544,19 +541,19 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
String storeName = stores.get(i);
Future<Void> result = results.get(i);
try {
result.get(processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
result.get();
problematicStores.remove(storeName);
if (internalQueuesEmptied && storeAdminOperationsMapWithOffset.containsKey(storeName)
&& !storeAdminOperationsMapWithOffset.get(storeName).isEmpty()) {
internalQueuesEmptied = false;
}
} catch (ExecutionException | CancellationException | TimeoutException e) {
} catch (ExecutionException | CancellationException e) {
internalQueuesEmptied = false;
AdminErrorInfo errorInfo = new AdminErrorInfo();
int perStorePendingMessagesCount = storeAdminOperationsMapWithOffset.get(storeName).size();
pendingAdminMessagesCount += perStorePendingMessagesCount;
storesWithPendingAdminMessagesCount += perStorePendingMessagesCount > 0 ? 1 : 0;
if (e instanceof CancellationException || e instanceof TimeoutException) {
if (e instanceof CancellationException) {
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, -1L);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, -1L);

Expand All @@ -577,12 +574,6 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
problematicStores.put(storeName, errorInfo);
}
} finally {
// Here we only remove task is done. For some timeout task waiting for the lock (non-interruptable), we will
// not create new task for it.
if (result.isDone()) {
storeHasScheduledTask.remove(storeName);
}
}
}
if (problematicStores.isEmpty() && internalQueuesEmptied) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class AdminExecutionTask implements Callable<Void> {
private final ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
private final long lastPersistedExecutionId;

private final Set<String> storeSet;

AdminExecutionTask(
Logger LOGGER,
String clusterName,
Expand All @@ -95,7 +97,8 @@ public class AdminExecutionTask implements Callable<Void> {
ExecutionIdAccessor executionIdAccessor,
boolean isParentController,
AdminConsumptionStats stats,
String regionName) {
String regionName,
Set<String> storeSet) {
this.LOGGER = LOGGER;
this.clusterName = clusterName;
this.storeName = storeName;
Expand All @@ -107,18 +110,19 @@ public class AdminExecutionTask implements Callable<Void> {
this.isParentController = isParentController;
this.stats = stats;
this.regionName = regionName;
this.storeSet = storeSet;
}

@Override
public Void call() {
while (!internalTopic.isEmpty()) {
if (!admin.isLeaderControllerFor(clusterName)) {
throw new VeniceRetriableException(
"This controller is no longer the leader of: " + clusterName
+ ". The consumption task should unsubscribe soon");
}
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
try {
try {
while (!internalTopic.isEmpty()) {
if (!admin.isLeaderControllerFor(clusterName)) {
throw new VeniceRetriableException(
"This controller is no longer the leader of: " + clusterName
+ ". The consumption task should unsubscribe soon");
}
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
if (adminOperationWrapper.getStartProcessingTimestamp() == null) {
adminOperationWrapper.setStartProcessingTimestamp(System.currentTimeMillis());
stats.recordAdminMessageStartProcessingLatency(
Expand All @@ -138,23 +142,25 @@ public Void call() {
stats.recordAdminMessageTotalLatency(
Math.max(0, completionTimestamp - adminOperationWrapper.getProducerTimestamp()));
internalTopic.remove();
} catch (Exception e) {
// Retry of the admin operation is handled automatically by keeping the failed admin operation inside the queue.
// The queue with the problematic operation will be delegated and retried by the worker thread in the next
// cycle.
String logMessage =
"when processing admin message for store " + storeName + " with offset " + adminOperationWrapper.getOffset()
+ " and execution id " + adminOperationWrapper.getAdminOperation().executionId;
if (e instanceof VeniceRetriableException) {
// These retriable exceptions are expected, therefore logging at the info level should be sufficient.
stats.recordFailedRetriableAdminConsumption();
LOGGER.info("Retriable exception thrown {}", logMessage, e);
} else {
stats.recordFailedAdminConsumption();
LOGGER.error("Error {}", logMessage, e);
}
throw e;
}
} catch (Exception e) {
// Retry of the admin operation is handled automatically by keeping the failed admin operation inside the queue.
// The queue with the problematic operation will be delegated and retried by the worker thread in the next cycle.
AdminOperationWrapper adminOperationWrapper = internalTopic.peek();
String logMessage =
"when processing admin message for store " + storeName + " with offset " + adminOperationWrapper.getOffset()
+ " and execution id " + adminOperationWrapper.getAdminOperation().executionId;
if (e instanceof VeniceRetriableException) {
// These retriable exceptions are expected, therefore logging at the info level should be sufficient.
stats.recordFailedRetriableAdminConsumption();
LOGGER.info("Retriable exception thrown {}", logMessage, e);
} else {
stats.recordFailedAdminConsumption();
LOGGER.error("Error {}", logMessage, e);
}
throw e;
} finally {
storeSet.remove(storeName);
}
return null;
}
Expand Down

0 comments on commit 6b2e245

Please sign in to comment.