Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed May 7, 2024
1 parent c168adc commit 25310f8
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce
}

@Test(timeOut = TIMEOUT)
public void testParallelAdminExecutionTasks() throws IOException {
public void testParallelAdminExecutionTasks() throws IOException, InterruptedException {
try (ZkServerWrapper zkServer = ServiceFactory.getZkServer();
PubSubBrokerWrapper pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer).setRegionName(STANDALONE_REGION_NAME).build());
Expand Down Expand Up @@ -146,14 +147,16 @@ public void testParallelAdminExecutionTasks() throws IOException {
getStoreCreationMessage(clusterName, storeName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], goodMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});

// Spin up a thread to occupy the store write lock to simulate the blocking admin execution task thread.
Runnable infiniteLockOccupy = getRunnable(controller, storeName);
CountDownLatch delayedLockOccupyThreadStartedSignal = new CountDownLatch(1);
Runnable infiniteLockOccupy = getRunnable(controller, storeName, delayedLockOccupyThreadStartedSignal);
Thread infiniteLockThread = new Thread(infiniteLockOccupy, "infiniteLockOccupy: " + storeName);
infiniteLockThread.start();
delayedLockOccupyThreadStartedSignal.await(5, TimeUnit.SECONDS);

// Here we wait here to send every operation to let each consumer pool has at most one admin operation from
// this store, as the waiting time of 5 seconds > ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS setting.
Expand All @@ -177,7 +180,7 @@ public void testParallelAdminExecutionTasks() throws IOException {
getStoreCreationMessage(clusterName, otherStoreName, owner, keySchema, valueSchema, executionId);
writer.put(new byte[0], otherStoreMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);

TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertTrue(controller.getVeniceAdmin().hasStore(clusterName, otherStoreName));
});

Expand All @@ -186,18 +189,19 @@ public void testParallelAdminExecutionTasks() throws IOException {
executionId++;
byte[] storeDeletionMessage = getStoreDeletionMessage(clusterName, storeName, executionId);
writer.put(new byte[0], storeDeletionMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 3, TimeUnit.MILLISECONDS, () -> {
TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Assert.assertFalse(controller.getVeniceAdmin().hasStore(clusterName, storeName));
});
}
}
}

private Runnable getRunnable(VeniceControllerWrapper controller, String storeName) {
private Runnable getRunnable(VeniceControllerWrapper controller, String storeName, CountDownLatch latch) {
VeniceHelixAdmin admin = controller.getVeniceHelixAdmin();
return () -> {
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createStoreWriteLock(storeName)) {
latch.countDown();
while (true) {
Thread.sleep(10000);
}
Expand Down

0 comments on commit 25310f8

Please sign in to comment.