Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add cache to dgraph.type predicate. #9068

Merged
merged 20 commits into from May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions posting/index.go
Expand Up @@ -1511,11 +1511,13 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
ResetCache()
return pstore.DropAll()
}

// DeleteData deletes all data for the namespace but leaves types and schema intact.
func DeleteData(ns uint64) error {
ResetCache()
prefix := make([]byte, 9)
prefix[0] = x.DefaultPrefix
binary.BigEndian.PutUint64(prefix[1:], ns)
Expand All @@ -1525,6 +1527,7 @@ func DeleteData(ns uint64) error {
// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string, ts uint64) error {
glog.Infof("Dropping predicate: [%s]", attr)
ResetCache()
preds := schema.State().PredicatesToDelete(attr)
for _, pred := range preds {
prefix := x.PredicatePrefix(pred)
Expand All @@ -1541,6 +1544,8 @@ func DeletePredicate(ctx context.Context, attr string, ts uint64) error {

// DeleteNamespace bans the namespace and deletes its predicates/types from the schema.
func DeleteNamespace(ns uint64) error {
// TODO: We should only delete cache for certain keys, not all the keys.
ResetCache()
schema.State().DeletePredsForNs(ns)
return pstore.BanNamespace(ns)
}
90 changes: 58 additions & 32 deletions posting/list.go
Expand Up @@ -557,6 +557,27 @@ func (l *List) getMutation(startTs uint64) []byte {
return nil
}

func (l *List) setMutationAfterCommit(startTs, commitTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))
pl.CommitTs = commitTs
for _, p := range pl.Postings {
p.CommitTs = commitTs
}

x.AssertTrue(pl.Pack == nil)

l.Lock()
if l.mutationMap == nil {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[startTs] = pl
if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
l.Unlock()
}

func (l *List) setMutation(startTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))
Expand All @@ -566,6 +587,9 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[startTs] = pl
if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
l.Unlock()
}

Expand Down Expand Up @@ -783,6 +807,38 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
return count == 0, nil
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found := len(uids) > 0 && uids[0] == uid

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found = false
length = 0
continue
}
if mpost.Uid == uid {
found = (mpost.Op == Set)
}
if mpost.Op == Set {
length += 1
} else {
length -= 1
}

}
}
}

return length, found, nil
}

func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()
var count int
Expand Down Expand Up @@ -816,38 +872,6 @@ func (l *List) length(readTs, afterUid uint64) int {
return count
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found1 := len(uids) > 0 && uids[0] == uid

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found1 = false
length = 0
continue
}
if mpost.Uid == uid {
found1 = (mpost.Op == Set)
}
if mpost.Op == Set {
length += 1
} else {
length -= 1
}

}
}
}

return length, found1, nil
}

// Length iterates over the mutation layer and counts number of elements.
func (l *List) Length(readTs, afterUid uint64) int {
l.RLock()
Expand Down Expand Up @@ -1183,6 +1207,8 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
}

if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 {
// In case there were splits, this would read all the splits from
// Badger.
if err := l.encode(out, readTs, split); err != nil {
return nil, errors.Wrapf(err, "while encoding")
}
Expand Down