-
Notifications
You must be signed in to change notification settings - Fork 432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: add support of XPENDING #2190
base: unstable
Are you sure you want to change the base?
Conversation
} | ||
|
||
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); | ||
consumer_metadata.pending_number -= 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yangsx-1 , It seems that we should also update pending_number
in StreamConsumerMetadata
, when xack
id, currently, we only update pending_number
in StreamConsumerGroupMetadata
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! You're right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this patch about add support of XPENDING
?
If not can you move the change to a new PR? @LiuYuHui
} | ||
|
||
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); | ||
consumer_metadata.pending_number -= 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! You're right.
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); | ||
consumer_metadata.pending_number -= 1; | ||
|
||
batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata)); | ||
batch->Delete(stream_cf_handle_, entry_key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line should be put after *acknowledged += 1
, because the entry_key may not be delete due to the early return.
return s; | ||
} | ||
if (s.IsNotFound()) { | ||
return rocksdb::Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a return here. The above line is the same. It should not affect other consumers and the final result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! Pretty good.
const std::string &group_name = options.group_name; | ||
std::string ns_key = AppendNamespacePrefix(stream_name); | ||
|
||
LockGuard guard(storage_->GetLockManager(), ns_key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rocksdb::Status GetPendingEntries(StreamPendingOptions &options, StreamGetPendingEntryResult &pending_infos, | ||
std::vector<StreamGetExtPendingEntryResult> &ext_results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
options
here can be const.
The signature of this function is different from what we are accustomed to.
- We are accustomed to using pointers instead of references to represent return values.
- In general, we usually have only one return value.
- We usually place the key name(here is stream_name), group_name, consumer_name outside instead of inside the options.
@@ -23,11 +23,14 @@ | |||
#include <rocksdb/status.h> | |||
|
|||
#include <memory> | |||
#include <unordered_set> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this included?
} | ||
|
||
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); | ||
consumer_metadata.pending_number -= 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this patch about add support of XPENDING
?
If not can you move the change to a new PR? @LiuYuHui
Quality Gate passedIssues Measures |
@LiuYuHui Hi, how's it going now? |
closes #1734