Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Controller][all] Do not truncate Kafka topic immediately for fatal d… #937

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -2182,11 +2183,11 @@ public void processConsumerRecord(
int faultyPartition = record.getTopicPartition().getPartitionNumber();
String errorMessage;
if (amplificationFactor != 1 && record.getTopicPartition().getPubSubTopic().isRealTime()) {
errorMessage = "Fatal data validation problem with in RT topic partition " + faultyPartition + ", offset "
errorMessage = FATAL_DATA_VALIDATION_ERROR + " with in RT topic partition " + faultyPartition + ", offset "
+ record.getOffset() + ", leaderSubPartition: " + subPartition;
} else {
errorMessage =
"Fatal data validation problem with partition " + faultyPartition + ", offset " + record.getOffset();
FATAL_DATA_VALIDATION_ERROR + " with partition " + faultyPartition + ", offset " + record.getOffset();
}
// TODO need a way to safeguard DIV errors from backup version that have once been current (but not anymore)
// during re-balancing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,8 @@ public static boolean isError(String errorString) {
return false;
}
}

public static boolean isError(ExecutionStatus status) {
return status == ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -183,4 +184,15 @@ public void testRootStatus() {
assertEquals(status.getRootStatus(), rootStatusMap.getOrDefault(status, status));
}
}

@Test
public void testErrorExecutionStatus() {
for (ExecutionStatus status: ExecutionStatus.values()) {
if (status == ERROR) {
Assert.assertTrue(ExecutionStatus.isError(status));
} else {
Assert.assertFalse(ExecutionStatus.isError(status));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ private ConfigKeys() {
*/
public static final String DEPRECATED_TOPIC_RETENTION_MS = "deprecated.topic.retention.ms";

public static final String FATAL_DATA_VALIDATION_FAILURE_TOPIC_RETENTION_MS =
"fatal.data.validation.failure.topic.retention.ms";

/**
* This config is to indicate the max retention policy we have setup for deprecated jobs currently and in the past.
* And this is used to decide whether the topic is deprecated or not during topic cleanup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,19 @@ public String getLatestIncrementalPushVersion(PartitionAssignment partitionAssig
return latestIncrementalPushVersion;
}

public boolean hasFatalDataValidationError() {
return partitionIdToStatus.values().stream().anyMatch(partitionStatus -> {
if (partitionStatus.hasFatalDataValidationError()) {
LOGGER.warn(
"Fatal data validation error found in topic: {}, partition: {}",
kafkaTopic,
partitionStatus.getPartitionId());
return true;
}
return false;
});
}

public String getKafkaTopic() {
return kafkaTopic;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pushmonitor;

import static com.linkedin.venice.pushmonitor.ExecutionStatus.NOT_CREATED;
import static com.linkedin.venice.utils.Utils.*;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -86,6 +87,16 @@ public List<StatusSnapshot> getReplicaHistoricStatusList(String instanceId) {
return replicaStatus.getStatusHistory();
}

public boolean hasFatalDataValidationError() {
for (ReplicaStatus replicaStatus: replicaStatusMap.values()) {
if (ExecutionStatus.isError(replicaStatus.getCurrentStatus())
&& replicaStatus.getIncrementalPushVersion().contains(FATAL_DATA_VALIDATION_ERROR)) {
return true;
}
}
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class Utils {
public static final String NEW_LINE_CHAR = System.lineSeparator();
public static final AtomicBoolean SUPPRESS_SYSTEM_EXIT = new AtomicBoolean();

public static final String FATAL_DATA_VALIDATION_ERROR = "fatal data validation problem";

/**
* Print an error and exit with error code 1
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.TOPIC_SWITCH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.UNKNOWN;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.WARNING;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -30,6 +31,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.utils.DataProviderUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
Expand Down Expand Up @@ -272,6 +274,46 @@ public void testGetStatusUpdateTimestamp() {
assertEquals(statusUpdateTimestamp, Long.valueOf(startOfProgress.toEpochSecond(ZoneOffset.UTC)));
}

@Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
public void testHasFatalDataValidationError(boolean hasFatalDataValidationError) {
String kafkaTopic = "testTopic";
int numberOfPartition = 3;
int replicationFactor = 2;
OfflinePushStrategy strategy = OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION;

// Create an OfflinePushStatus object
OfflinePushStatus offlinePushStatus =
new OfflinePushStatus(kafkaTopic, numberOfPartition, replicationFactor, strategy);

// Create a PartitionStatus
PartitionStatus partitionStatus = new PartitionStatus(1);

// Create a ReplicaStatus with an error ExecutionStatus and an incrementalPushVersion that contains "Fatal data
// validation problem"
ReplicaStatus replicaStatus = new ReplicaStatus("instance1", true);
if (hasFatalDataValidationError) {
replicaStatus.updateStatus(ExecutionStatus.ERROR);
replicaStatus.setIncrementalPushVersion(
FATAL_DATA_VALIDATION_ERROR + " with partition 1, offset 1096534. Consumption will be halted.");
} else {
replicaStatus.updateStatus(ExecutionStatus.COMPLETED);
}

// Add the ReplicaStatus to the replicaStatusMap of the PartitionStatus
partitionStatus.setReplicaStatuses(Collections.singleton(replicaStatus));

// Set the PartitionStatus in the OfflinePushStatus
offlinePushStatus.setPartitionStatus(partitionStatus);

if (hasFatalDataValidationError) {
// Assert that hasFatalDataValidationError returns true
Assert.assertTrue(offlinePushStatus.hasFatalDataValidationError());
} else {
// Assert that hasFatalDataValidationError returns false
Assert.assertFalse(offlinePushStatus.hasFatalDataValidationError());
}
}

private void testValidTargetStatus(ExecutionStatus from, ExecutionStatus to) {
OfflinePushStatus offlinePushStatus =
new OfflinePushStatus(kafkaTopic, numberOfPartition, replicationFactor, strategy);
Expand Down