Skip to content

Commit

Permalink
[Controller][all] Do not truncate Kafka topic immediately for fatal d…
Browse files Browse the repository at this point in the history
…ata validation error before EOP (#937)

When the ingestion of a new store version is failed, today, we truncate the Kafka topic of the store version by updating its retention time to a small value (15 seconds), specified by DEPRECATED_TOPIC_RETENTION_MS.

This is fine for regular failures but for fatal data validation errors, which indicates critical issues happened during the batch push period (before EOP), truncating the topic too early can prevent us from finding the root cause, as the Kafka data is gone.

This change adjusts the Kafka topic retention time (to 2 days) when fatal data validation errors is identified, so that we can have enough time to investigate.

Meanwhile, there is additional logic added to the TopicCleanupService, i.e. even if a topic's retention time is > DEPRECATED_TOPIC_MAX_RETENTION_MS, it can still be consider for deletion:

- If topic retention is 2 (DEPRECATED_TOPIC_RETENTION_MS) days.
- If The topic is a version topic.
- Get topic creation time (from venice_system_store_push_job_details_store) and check it's already more than 2 days (DEPRECATED_TOPIC_RETENTION_MS), if yes, delete it.
  • Loading branch information
lluwm committed May 6, 2024
1 parent 40ed887 commit d7997d7
Show file tree
Hide file tree
Showing 16 changed files with 601 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -2182,11 +2183,11 @@ public void processConsumerRecord(
int faultyPartition = record.getTopicPartition().getPartitionNumber();
String errorMessage;
if (amplificationFactor != 1 && record.getTopicPartition().getPubSubTopic().isRealTime()) {
errorMessage = "Fatal data validation problem with in RT topic partition " + faultyPartition + ", offset "
errorMessage = FATAL_DATA_VALIDATION_ERROR + " with in RT topic partition " + faultyPartition + ", offset "
+ record.getOffset() + ", leaderSubPartition: " + subPartition;
} else {
errorMessage =
"Fatal data validation problem with partition " + faultyPartition + ", offset " + record.getOffset();
FATAL_DATA_VALIDATION_ERROR + " with partition " + faultyPartition + ", offset " + record.getOffset();
}
// TODO need a way to safeguard DIV errors from backup version that have once been current (but not anymore)
// during re-balancing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,8 @@ public static boolean isError(String errorString) {
return false;
}
}

public static boolean isError(ExecutionStatus status) {
return status == ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -183,4 +184,15 @@ public void testRootStatus() {
assertEquals(status.getRootStatus(), rootStatusMap.getOrDefault(status, status));
}
}

@Test
public void testErrorExecutionStatus() {
for (ExecutionStatus status: ExecutionStatus.values()) {
if (status == ERROR) {
Assert.assertTrue(ExecutionStatus.isError(status));
} else {
Assert.assertFalse(ExecutionStatus.isError(status));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ private ConfigKeys() {
*/
public static final String DEPRECATED_TOPIC_RETENTION_MS = "deprecated.topic.retention.ms";

public static final String FATAL_DATA_VALIDATION_FAILURE_TOPIC_RETENTION_MS =
"fatal.data.validation.failure.topic.retention.ms";

/**
* This config is to indicate the max retention policy we have setup for deprecated jobs currently and in the past.
* And this is used to decide whether the topic is deprecated or not during topic cleanup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,19 @@ public String getLatestIncrementalPushVersion(PartitionAssignment partitionAssig
return latestIncrementalPushVersion;
}

public boolean hasFatalDataValidationError() {
return partitionIdToStatus.values().stream().anyMatch(partitionStatus -> {
if (partitionStatus.hasFatalDataValidationError()) {
LOGGER.warn(
"Fatal data validation error found in topic: {}, partition: {}",
kafkaTopic,
partitionStatus.getPartitionId());
return true;
}
return false;
});
}

public String getKafkaTopic() {
return kafkaTopic;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pushmonitor;

import static com.linkedin.venice.pushmonitor.ExecutionStatus.NOT_CREATED;
import static com.linkedin.venice.utils.Utils.*;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -86,6 +87,16 @@ public List<StatusSnapshot> getReplicaHistoricStatusList(String instanceId) {
return replicaStatus.getStatusHistory();
}

public boolean hasFatalDataValidationError() {
for (ReplicaStatus replicaStatus: replicaStatusMap.values()) {
if (ExecutionStatus.isError(replicaStatus.getCurrentStatus())
&& replicaStatus.getIncrementalPushVersion().contains(FATAL_DATA_VALIDATION_ERROR)) {
return true;
}
}
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class Utils {
public static final String NEW_LINE_CHAR = System.lineSeparator();
public static final AtomicBoolean SUPPRESS_SYSTEM_EXIT = new AtomicBoolean();

public static final String FATAL_DATA_VALIDATION_ERROR = "fatal data validation problem";

/**
* Print an error and exit with error code 1
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.TOPIC_SWITCH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.UNKNOWN;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.WARNING;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -30,6 +31,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.utils.DataProviderUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
Expand Down Expand Up @@ -272,6 +274,46 @@ public void testGetStatusUpdateTimestamp() {
assertEquals(statusUpdateTimestamp, Long.valueOf(startOfProgress.toEpochSecond(ZoneOffset.UTC)));
}

@Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
public void testHasFatalDataValidationError(boolean hasFatalDataValidationError) {
String kafkaTopic = "testTopic";
int numberOfPartition = 3;
int replicationFactor = 2;
OfflinePushStrategy strategy = OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION;

// Create an OfflinePushStatus object
OfflinePushStatus offlinePushStatus =
new OfflinePushStatus(kafkaTopic, numberOfPartition, replicationFactor, strategy);

// Create a PartitionStatus
PartitionStatus partitionStatus = new PartitionStatus(1);

// Create a ReplicaStatus with an error ExecutionStatus and an incrementalPushVersion that contains "Fatal data
// validation problem"
ReplicaStatus replicaStatus = new ReplicaStatus("instance1", true);
if (hasFatalDataValidationError) {
replicaStatus.updateStatus(ExecutionStatus.ERROR);
replicaStatus.setIncrementalPushVersion(
FATAL_DATA_VALIDATION_ERROR + " with partition 1, offset 1096534. Consumption will be halted.");
} else {
replicaStatus.updateStatus(ExecutionStatus.COMPLETED);
}

// Add the ReplicaStatus to the replicaStatusMap of the PartitionStatus
partitionStatus.setReplicaStatuses(Collections.singleton(replicaStatus));

// Set the PartitionStatus in the OfflinePushStatus
offlinePushStatus.setPartitionStatus(partitionStatus);

if (hasFatalDataValidationError) {
// Assert that hasFatalDataValidationError returns true
Assert.assertTrue(offlinePushStatus.hasFatalDataValidationError());
} else {
// Assert that hasFatalDataValidationError returns false
Assert.assertFalse(offlinePushStatus.hasFatalDataValidationError());
}
}

private void testValidTargetStatus(ExecutionStatus from, ExecutionStatus to) {
OfflinePushStatus offlinePushStatus =
new OfflinePushStatus(kafkaTopic, numberOfPartition, replicationFactor, strategy);
Expand Down

0 comments on commit d7997d7

Please sign in to comment.