Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order #910

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

ZacAttack
Copy link
Contributor

[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order

This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts:

Drainer is skipped on leaders:
In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR.

DCR logic must change:
Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.

Transient Record is disabled
transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache.

There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others.

NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag.

Resolves #XXX

How was this PR tested?

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

…age processing order

This is an initial phase PR.  It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing.  This change in order has the following impacts:

-Drainer is skipped on leaders:
 In a later refactor it might be prudent to remove the drainer entirely.  However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb.  We do not attempt to make this change in this PR.

-DCR logic must change
 Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader.  To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record.  The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.

-Transient Record is disabled
 transient record cache is disabled for those ingestion tasks which enable this mode.  This is itself was one of the goals, but we should go here with some validation.  Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate.  Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting.  If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache.

There are also some cleanups here and there.  Getting rid of some code paths that we no longer need and cleaning up others.

NOTE: Integration tests haven't been completely added to this PR yet.  Part of that is because while switching some of the existing integration tests to this mode, some tests are failing.  This needs some more diagnosis.  Hence the WIP tag.
Copy link
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! Overall looks great, I leave some comment, especially about TR.

if (fieldTimestampObj instanceof GenericRecord) {
int oldColoId =
(int) ((GenericRecord) fieldTimestampObj).get(CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_NAME);
return oldColoId <= newValueColoId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this up. It looks like a correct fix to me...but why we didn't have have this implemented in the first place...let me also try to think of any argument about this.

Copy link
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, now I am not sure if I totally follow the logic as I post my concern in one of the reply......we probably should chat a bit offline to chew the code change.
Also, we probably need some tests coverage to prove our old mode can fail in the race condition case and new mode is correct, even if my concern is eventually proved invalid...

@@ -127,6 +127,7 @@ public void setUp() {
Properties serverProperties = new Properties();
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1));
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false);
serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this mode is default false, can we make it to run both true and false for some sophisticated tests for AAWC?

if (this.runInThreadSafeMode) {
// Write to rocksdb. At time of writing, this is the last step after a huge amount of processing and compression
// and whatnot. At this stage we do not sync the offset, instead doing that after successfully produce.
this.processConsumerRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just suddenly think about this issue and left this comment so it might be totally wrong...

The issue I believe today we have in CC processing logic is:
In order to have good processing throughput, we use another thread to do View processing and producing, and does not block the same consumer thread to fetch next record from the same partition which might be the same key.
If it is the same key, then I feel like the problem is still there. Even if we disable TR, and do the drainer work directly here but since the upstream async logic is still unchanged, the race condition is still there. If let's say next key is the same key, then it will fetch from storage directly, assuming VW producing is a bit slow, and the commit does not happen. Then you loss the first update you just processed and directly apply the 2nd update in the original record...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am wrong, given we don't play around with TR, it should be good. Let me re-think about this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants