New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order #910
base: main
Are you sure you want to change the base?
Conversation
…age processing order This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts: -Drainer is skipped on leaders: In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR. -DCR logic must change Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR. -Transient Record is disabled transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache. There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others. NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change! Overall looks great, I leave some comment, especially about TR.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
Outdated
Show resolved
Hide resolved
...inci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
if (fieldTimestampObj instanceof GenericRecord) { | ||
int oldColoId = | ||
(int) ((GenericRecord) fieldTimestampObj).get(CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_NAME); | ||
return oldColoId <= newValueColoId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting this up. It looks like a correct fix to me...but why we didn't have have this implemented in the first place...let me also try to think of any argument about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, now I am not sure if I totally follow the logic as I post my concern in one of the reply......we probably should chat a bit offline to chew the code change.
Also, we probably need some tests coverage to prove our old mode can fail in the race condition case and new mode is correct, even if my concern is eventually proved invalid...
@@ -127,6 +127,7 @@ public void setUp() { | |||
Properties serverProperties = new Properties(); | |||
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); | |||
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); | |||
serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this mode is default false, can we make it to run both true and false for some sophisticated tests for AAWC?
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
if (this.runInThreadSafeMode) { | ||
// Write to rocksdb. At time of writing, this is the last step after a huge amount of processing and compression | ||
// and whatnot. At this stage we do not sync the offset, instead doing that after successfully produce. | ||
this.processConsumerRecord( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just suddenly think about this issue and left this comment so it might be totally wrong...
The issue I believe today we have in CC processing logic is:
In order to have good processing throughput, we use another thread to do View processing and producing, and does not block the same consumer thread to fetch next record from the same partition which might be the same key.
If it is the same key, then I feel like the problem is still there. Even if we disable TR, and do the drainer work directly here but since the upstream async logic is still unchanged, the race condition is still there. If let's say next key is the same key, then it will fetch from storage directly, assuming VW producing is a bit slow, and the commit does not happen. Then you loss the first update you just processed and directly apply the 2nd update in the original record...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I am wrong, given we don't play around with TR, it should be good. Let me re-think about this.
[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order
This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts:
Drainer is skipped on leaders:
In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR.
DCR logic must change:
Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.
Transient Record is disabled
transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache.
There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others.
NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag.
Resolves #XXX
How was this PR tested?
Does this PR introduce any user-facing changes?