Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add support of XPENDING #2190

Open
wants to merge 8 commits into
base: unstable
Choose a base branch
from
Open

Conversation

LiuYuHui
Copy link
Contributor

@LiuYuHui LiuYuHui commented Mar 21, 2024

closes #1734

}

StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
consumer_metadata.pending_number -= 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yangsx-1 , It seems that we should also update pending_number in StreamConsumerMetadata, when xack id, currently, we only update pending_number in StreamConsumerGroupMetadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! You're right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this patch about add support of XPENDING?
If not can you move the change to a new PR? @LiuYuHui

@LiuYuHui LiuYuHui marked this pull request as ready for review March 31, 2024 14:53
}

StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
consumer_metadata.pending_number -= 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! You're right.

StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
consumer_metadata.pending_number -= 1;

batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
batch->Delete(stream_cf_handle_, entry_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line should be put after *acknowledged += 1 , because the entry_key may not be delete due to the early return.

return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::OK();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a return here. The above line is the same. It should not affect other consumers and the final result.

Copy link
Member

@jihuayu jihuayu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Pretty good.

const std::string &group_name = options.group_name;
std::string ns_key = AppendNamespacePrefix(stream_name);

LockGuard guard(storage_->GetLockManager(), ns_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the snapshot instead of the lock? @mapleFU
#2174

Comment on lines +69 to +70
rocksdb::Status GetPendingEntries(StreamPendingOptions &options, StreamGetPendingEntryResult &pending_infos,
std::vector<StreamGetExtPendingEntryResult> &ext_results);
Copy link
Member

@jihuayu jihuayu Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options here can be const.

The signature of this function is different from what we are accustomed to.

  1. We are accustomed to using pointers instead of references to represent return values.
  2. In general, we usually have only one return value.
  3. We usually place the key name(here is stream_name), group_name, consumer_name outside instead of inside the options.

@@ -23,11 +23,14 @@
#include <rocksdb/status.h>

#include <memory>
#include <unordered_set>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this included?

}

StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
consumer_metadata.pending_number -= 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this patch about add support of XPENDING?
If not can you move the change to a new PR? @LiuYuHui

Copy link

sonarcloud bot commented Apr 3, 2024

@Yangsx-1
Copy link
Contributor

@LiuYuHui Hi, how's it going now?

@LiuYuHui
Copy link
Contributor Author

@LiuYuHui Hi, how's it going now?

@Yangsx-1 Sorry for the long delay, will pick it up this weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support of the XPENDING command
3 participants