Skip to content

Commit

Permalink
updated heuristics
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Apr 18, 2024
1 parent b63be39 commit f52b96e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 9 deletions.
2 changes: 1 addition & 1 deletion posting/index.go
Expand Up @@ -239,7 +239,7 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
plist.Lock()
defer plist.Unlock()
if hasCountIndex {
countBefore, found, _ = plist.getPostingAndLength(txn.StartTs, 0, edge.ValueId)
countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId)
if countBefore == -1 {
return emptyCountParams, ErrTsTooOld
}
Expand Down
92 changes: 92 additions & 0 deletions posting/list.go
Expand Up @@ -556,6 +556,22 @@ func (l *List) getMutation(startTs uint64) []byte {
return nil
}

func (l *List) setMutationAfterCommit(startTs, commitTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))
pl.CommitTs = commitTs

l.Lock()
if l.mutationMap == nil {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[startTs] = pl
if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
l.Unlock()
}

func (l *List) setMutation(startTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))
Expand All @@ -565,6 +581,9 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[startTs] = pl
if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
l.Unlock()
}

Expand Down Expand Up @@ -649,11 +668,64 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) {
return deleteBelowTs, posts
}

func (l *List) iterateNoSort(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error {
l.AssertRLock()

effective := func(start, commit uint64) uint64 {
if commit > 0 && commit <= readTs {
// Has been committed and below the readTs.
return commit
}
if start == readTs {
// This mutation is by ME. So, I must be able to read it.
return start
}
return 0
}

// First pick up the postings.
var deleteBelowTs uint64
var posts []*pb.Posting
for startTs, plist := range l.mutationMap {
// Pick up the transactions which are either committed, or the one which is ME.
effectiveTs := effective(startTs, plist.CommitTs)
if effectiveTs > deleteBelowTs {
// We're above the deleteBelowTs marker. We wouldn't reach here if effectiveTs is zero.
for _, mpost := range plist.Postings {
if hasDeleteAll(mpost) {
deleteBelowTs = effectiveTs
continue
}
posts = append(posts, mpost)
}
}
}

if deleteBelowTs > 0 {
// There was a delete all marker. So, trim down the list of postings.
result := posts[:0]
for _, post := range posts {
effectiveTs := effective(post.StartTs, post.CommitTs)
if effectiveTs < deleteBelowTs { // Do pick the posts at effectiveTs == deleteBelowTs.
continue
}
result = append(result, post)
}
posts = result
}

return l.iterateInternal(readTs, afterUid, f, deleteBelowTs, posts)
}

func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error {
l.AssertRLock()

// mposts is the list of mutable postings
deleteBelowTs, mposts := l.pickPostings(readTs)
return l.iterateInternal(readTs, afterUid, f, deleteBelowTs, mposts)
}

func (l *List) iterateInternal(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error, deleteBelowTs uint64, mposts []*pb.Posting) error {

Check failure on line 728 in posting/list.go

View workflow job for this annotation

GitHub Actions / lint

line is 145 characters (lll)
if readTs < l.minTs {
return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key)
}
Expand Down Expand Up @@ -762,6 +834,26 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
return count == 0, nil
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()
var count int
var found bool
var post *pb.Posting
err := l.iterateNoSort(readTs, afterUid, func(p *pb.Posting) error {
if p.Uid == uid {
post = p
found = true
}
count++
return nil
})
if err != nil {
return -1, false, nil
}

return count, found, post
}

func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()
var count int
Expand Down
15 changes: 13 additions & 2 deletions posting/lists.go
Expand Up @@ -107,7 +107,8 @@ func GetNoStore(key []byte, readTs uint64) (rlist *List, err error) {
type LocalCache struct {
sync.RWMutex

startTs uint64
startTs uint64
commitTs uint64

// The keys for these maps is a string representation of the Badger key for the posting list.
// deltas keep track of the updates made by txn. These must be kept around until written to disk
Expand Down Expand Up @@ -175,6 +176,12 @@ func NoCache(startTs uint64) *LocalCache {
return &LocalCache{startTs: startTs}
}

func (lc *LocalCache) UpdateCommitTs(commitTs uint64) {
lc.Lock()
defer lc.Unlock()
lc.commitTs = commitTs
}

func (lc *LocalCache) Find(pred []byte, filter func([]byte) bool) (uint64, error) {
txn := pstore.NewTransactionAt(lc.startTs, false)
defer txn.Discard()
Expand Down Expand Up @@ -318,7 +325,11 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
// apply it before returning the list.
lc.RLock()
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
pl.setMutation(lc.startTs, delta)
if lc.commitTs == 0 {
pl.setMutation(lc.startTs, delta)
} else {
pl.setMutationAfterCommit(lc.startTs, lc.commitTs, delta)
}
}
lc.RUnlock()
return lc.SetIfAbsent(skey, pl), nil
Expand Down
23 changes: 17 additions & 6 deletions posting/mvcc.go
Expand Up @@ -19,8 +19,6 @@ package posting
import (
"bytes"
"encoding/hex"
"fmt"
"math"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -109,7 +107,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
}
}

l, err := GetNoStore(key, math.MaxUint64)
l, err := GetNoStore(key, ts)
if err != nil {
return err
}
Expand All @@ -119,9 +117,18 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
return err
}

if len(kvs) > 0 {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(kvs[0].Value))
l.Lock()
l.plist = pl
l.mutationMap = nil
l.maxTs = kvs[0].Version
l.Unlock()
}

l.RLock()
fmt.Println("Rolling up")
lCache.Set(key, copyList(l), int64(l.DeepSize()))
lCache.Set(key, l, int64(l.DeepSize()))
l.RUnlock()

// If we do a rollup, we typically won't need to update the key in cache.
Expand Down Expand Up @@ -487,7 +494,10 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
l := val.(*List)

Check failure on line 494 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

S1034(related information): could eliminate this type assertion (gosimple)
// l.maxTs can be greater than readTs. We might have the latest
// version cached, while readTs is looking for an older version.
if l != nil && l.maxTs >= readTs {
if l != nil {

Check failure on line 497 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)
//fmt.Println(l.maxTs, readTs)
}
if l != nil && l.maxTs > 0 {
l.RLock()
lCopy := copyList(l)
l.RUnlock()
Expand Down Expand Up @@ -518,6 +528,7 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if readTs >= l.maxTs {
l.RLock()
defer l.RUnlock()
//fmt.Println("Setting", l.maxTs)
lCache.Set(key, copyList(l), int64(l.DeepSize()))
}
return l, nil
Expand Down

0 comments on commit f52b96e

Please sign in to comment.