Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order #910

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -73,6 +73,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_MAX_IDLE_COUNT;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_THREAD_SAFE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_ENABLED;
Expand Down Expand Up @@ -445,6 +446,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final int ingestionTaskMaxIdleCount;

private final boolean threadSafeMode;

private final long metaStoreWriterCloseTimeoutInMS;
private final int metaStoreWriterCloseConcurrency;

Expand Down Expand Up @@ -727,6 +730,7 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
pubSubClientsFactory = new PubSubClientsFactory(serverProperties);
routerPrincipalName = serverProperties.getString(ROUTER_PRINCIPAL_NAME, "CN=venice-router");
ingestionTaskMaxIdleCount = serverProperties.getInt(SERVER_INGESTION_TASK_MAX_IDLE_COUNT, 10000);
threadSafeMode = serverProperties.getBoolean(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false);
metaStoreWriterCloseTimeoutInMS = serverProperties.getLong(META_STORE_WRITER_CLOSE_TIMEOUT_MS, 300000L);
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
ingestionHeartbeatIntervalMs =
Expand Down Expand Up @@ -1280,6 +1284,10 @@ public int getIngestionTaskMaxIdleCount() {
return ingestionTaskMaxIdleCount;
}

public boolean isThreadSafeMode() {
return threadSafeMode;
}

public boolean isKMERegistrationFromMessageHeaderEnabled() {
return isKMERegistrationFromMessageHeaderEnabled;
}
Expand Down
Expand Up @@ -21,15 +21,17 @@ public ActiveActiveProducerCallback(
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestamp) {
long beforeProcessingRecordTimestamp,
boolean syncOffsetsOnlyAfterProducing) {
super(
ingestionTask,
sourceConsumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestamp);
beforeProcessingRecordTimestamp,
syncOffsetsOnlyAfterProducing);
}

@Override
Expand Down
Expand Up @@ -178,7 +178,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
beforeProcessingBatchRecordsTimestampMs);
} else {
/**
* The below flow must be executed in a critical session for the same key:
* The below flow must be executed in a critical section for the same key:
* Read existing value/RMD from transient record cache/disk -> perform DCR and decide incoming value wins
* -> update transient record cache -> produce to VT (just call send, no need to wait for the produce future in the critical session)
*
Expand Down Expand Up @@ -499,7 +499,6 @@ protected void processMessageAndMaybeProduceToKafka(
.updateLatestIgnoredUpstreamRTOffset(kafkaClusterIdToUrlMap.get(kafkaClusterId), sourceOffset);
} else {
validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation);

// Apply this update to any views for this store
// TODO: It'd be good to be able to do this in LeaderFollowerStoreIngestionTask instead, however, AA currently is
// the
Expand Down Expand Up @@ -1572,14 +1571,16 @@ protected LeaderProducerCallback createProducerCallback(
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
boolean syncOffsetsOnlyAfterProducing) {
return new ActiveActiveProducerCallback(
this,
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
syncOffsetsOnlyAfterProducing);
}
}
Expand Up @@ -1602,13 +1602,27 @@ protected void produceToLocalKafka(
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs) {

if (this.runInThreadSafeMode) {
// Write to rocksdb. At time of writing, this is the last step after a huge amount of processing and compression
// and whatnot. At this stage we do not sync the offset, instead doing that after successfully produce.
this.processConsumerRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just suddenly think about this issue and left this comment so it might be totally wrong...

The issue I believe today we have in CC processing logic is:
In order to have good processing throughput, we use another thread to do View processing and producing, and does not block the same consumer thread to fetch next record from the same partition which might be the same key.
If it is the same key, then I feel like the problem is still there. Even if we disable TR, and do the drainer work directly here but since the upstream async logic is still unchanged, the race condition is still there. If let's say next key is the same key, then it will fetch from storage directly, assuming VW producing is a bit slow, and the commit does not happen. Then you loss the first update you just processed and directly apply the 2nd update in the original record...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am wrong, given we don't play around with TR, it should be good. Let me re-think about this.

consumerRecord,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs,
false);
}

LeaderProducerCallback callback = createProducerCallback(
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
this.runInThreadSafeMode);
long sourceTopicOffset = consumerRecord.getOffset();
LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(sourceTopicOffset, kafkaClusterId);
partitionConsumptionState.setLastLeaderPersistFuture(leaderProducedRecordContext.getPersistedToDBFuture());
Expand Down Expand Up @@ -2093,7 +2107,8 @@ private void propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
leaderProducedRecordContext,
partition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
this.runInThreadSafeMode);
LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(consumerRecord.getOffset(), kafkaClusterId);
List<Integer> subPartitions =
PartitionUtils.getSubPartitions(partitionConsumptionState.getUserPartition(), amplificationFactor);
Expand Down Expand Up @@ -2160,7 +2175,7 @@ protected void recordHeartbeatReceived(
* This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic.
*
* The caller of this function should only process this {@param consumerRecord} further if the return is
* {@link DelegateConsumerRecordResult#QUEUED_TO_DRAINER}.
* {@link DelegateConsumerRecordResult#QUEUE_TO_DRAINER}.
*
* This function assumes {@link #shouldProcessRecord(PubSubMessage, int)} has been called which happens in
* {@link StoreIngestionTask#produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)}
Expand All @@ -2182,7 +2197,6 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
int kafkaClusterId,
long beforeProcessingPerRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs) {
boolean produceToLocalKafka = false;
try {
KafkaKey kafkaKey = consumerRecord.getKey();
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
Expand All @@ -2198,9 +2212,9 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition);
if (partitionConsumptionState == null) {
// The partition is likely unsubscribed, will skip these messages.
return DelegateConsumerRecordResult.SKIPPED_MESSAGE;
return DelegateConsumerRecordResult.END_PROCESSING;
}
produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
// UPDATE message is only expected in LEADER which must be produced to kafka.
MessageType msgType = MessageType.valueOf(kafkaValue);
if (msgType == MessageType.UPDATE && !produceToLocalKafka) {
Expand All @@ -2222,7 +2236,10 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* (i) it's a follower or (ii) leader is consuming from VT
*/
if (!produceToLocalKafka) {
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
// TODO: The next step will put in the drainer queue. When threadsafe mode is enabled, it means we skip
// the drainer during rt consumption and commit straight to rocksdb. To remove drainer completely,
// we should do the same here
return DelegateConsumerRecordResult.QUEUE_TO_DRAINER;
}

// If we are here the message must be produced to local kafka or silently consumed.
Expand Down Expand Up @@ -2271,7 +2288,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
*/
divErrorMetricCallback.accept(e);
LOGGER.debug("{} : Skipping a duplicate record at offset: {}", ingestionTaskName, consumerRecord.getOffset());
return DelegateConsumerRecordResult.DUPLICATE_MESSAGE;
return DelegateConsumerRecordResult.END_PROCESSING;
}

if (kafkaKey.isControlMessage()) {
Expand Down Expand Up @@ -2427,7 +2444,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
if (isDataRecovery && !partitionConsumptionState.isBatchOnly()) {
// Ignore remote VT's TS message since we might need to consume more RT or incremental push data from VT
// that's no longer in the local/remote RT due to retention.
return DelegateConsumerRecordResult.SKIPPED_MESSAGE;
return DelegateConsumerRecordResult.END_PROCESSING;
}
leaderProducedRecordContext =
LeaderProducedRecordContext.newControlMessageRecord(kafkaKey.getKey(), controlMessage);
Expand All @@ -2448,7 +2465,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
beforeProcessingPerRecordTimestampNs);
break;
case VERSION_SWAP:
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
return DelegateConsumerRecordResult.QUEUE_TO_DRAINER;
default:
// do nothing
break;
Expand Down Expand Up @@ -2478,7 +2495,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
beforeProcessingPerRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs);
}
return DelegateConsumerRecordResult.PRODUCED_TO_KAFKA;
return DelegateConsumerRecordResult.END_PROCESSING;
} catch (Exception e) {
throw new VeniceException(
ingestionTaskName + " hasProducedToKafka: exception for message received from: "
Expand Down Expand Up @@ -3418,15 +3435,17 @@ protected LeaderProducerCallback createProducerCallback(
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
boolean syncOffsetsOnlyAfterProducing) {
return new LeaderProducerCallback(
this,
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
syncOffsetsOnlyAfterProducing);
}

protected Lazy<VeniceWriter<byte[], byte[], byte[]>> getVeniceWriter() {
Expand Down
Expand Up @@ -55,14 +55,17 @@ public class LeaderProducerCallback implements ChunkAwareCallback {
protected ChunkedValueManifest oldValueManifest = null;
protected ChunkedValueManifest oldRmdManifest = null;

private final boolean syncOffsetsOnlyAfterProducing;

public LeaderProducerCallback(
LeaderFollowerStoreIngestionTask ingestionTask,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> sourceConsumerRecord,
PartitionConsumptionState partitionConsumptionState,
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
boolean syncOffsetsOnlyAfterProducing) {
this.ingestionTask = ingestionTask;
this.sourceConsumerRecord = sourceConsumerRecord;
this.partitionConsumptionState = partitionConsumptionState;
Expand All @@ -71,6 +74,7 @@ public LeaderProducerCallback(
this.leaderProducedRecordContext = leaderProducedRecordContext;
this.produceTimeNs = ingestionTask.isUserSystemStore() ? 0 : System.nanoTime();
this.beforeProcessingRecordTimestampNs = beforeProcessingRecordTimestampNs;
this.syncOffsetsOnlyAfterProducing = syncOffsetsOnlyAfterProducing;
}

@Override
Expand Down Expand Up @@ -156,7 +160,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) {
*/
if (chunkedValueManifest == null) {
leaderProducedRecordContext.setProducedOffset(produceResult.getOffset());
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
leaderProducedRecordContext,
subPartition,
Expand Down Expand Up @@ -194,7 +198,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) {
manifestPut,
leaderProducedRecordContext.getPersistedToDBFuture());
producedRecordForManifest.setProducedOffset(produceResult.getOffset());
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
producedRecordForManifest,
subPartition,
Expand Down Expand Up @@ -321,7 +325,7 @@ private long produceChunksToStoreBufferService(
LeaderProducedRecordContext producedRecordForChunk =
LeaderProducedRecordContext.newChunkPutRecord(ByteUtils.extractByteArray(chunkKey), chunkPut);
producedRecordForChunk.setProducedOffset(-1);
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
producedRecordForChunk,
subPartition,
Expand All @@ -347,7 +351,7 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man
LeaderProducedRecordContext producedRecordForChunk =
LeaderProducedRecordContext.newChunkDeleteRecord(ByteUtils.extractByteArray(chunkKey), chunkDelete);
producedRecordForChunk.setProducedOffset(-1);
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
producedRecordForChunk,
subPartition,
Expand All @@ -357,6 +361,28 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man
}
}

protected void produceToStoreBufferService(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumedRecord,
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs,
long currentTimeForMetricsMs) throws InterruptedException {
if (this.syncOffsetsOnlyAfterProducing) {
// sync offsets
ingestionTask
.maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition);
} else {
ingestionTask.produceToStoreBufferService(
consumedRecord,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs,
currentTimeForMetricsMs);
}
}

// Visible for VeniceWriter unit test.
public PartitionConsumptionState getPartitionConsumptionState() {
return partitionConsumptionState;
Expand Down
Expand Up @@ -223,7 +223,14 @@ public class PartitionConsumptionState {
*/
private boolean firstHeartBeatSOSReceived;

public PartitionConsumptionState(int partition, int amplificationFactor, OffsetRecord offsetRecord, boolean hybrid) {
private boolean threadSafeMode;

public PartitionConsumptionState(
int partition,
int amplificationFactor,
OffsetRecord offsetRecord,
boolean hybrid,
boolean threadSafeMode) {
this.partition = partition;
this.amplificationFactor = amplificationFactor;
this.userPartition = PartitionUtils.getUserPartition(partition, amplificationFactor);
Expand All @@ -237,6 +244,7 @@ public PartitionConsumptionState(int partition, int amplificationFactor, OffsetR
this.processedRecordSizeSinceLastSync = 0;
this.leaderFollowerState = LeaderFollowerStateType.STANDBY;
this.expectedSSTFileChecksum = null;
this.threadSafeMode = threadSafeMode;
/**
* Initialize the latest consumed time with current time; otherwise, it's 0 by default
* and leader will be promoted immediately.
Expand Down Expand Up @@ -565,6 +573,10 @@ public void setTransientRecord(
int valueLen,
int valueSchemaId,
GenericRecord replicationMetadataRecord) {
if (this.threadSafeMode) {
// NoOp
ZacAttack marked this conversation as resolved.
Show resolved Hide resolved
return;
}
TransientRecord transientRecord =
new TransientRecord(value, valueOffset, valueLen, valueSchemaId, kafkaClusterId, kafkaConsumedOffset);
if (replicationMetadataRecord != null) {
Expand Down