Skip to content

Commit

Permalink
CBG-3241: 3.0.8.1 Backport: CBG-3235: Query resync loop (#6355)
Browse files Browse the repository at this point in the history
Co-authored-by: Mohammed Madi <66013188+mohammed-madi@users.noreply.github.com>
  • Loading branch information
bbrks and mohammed-madi committed Aug 2, 2023
1 parent 1d2b3a9 commit 3b4d8e6
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 8 deletions.
2 changes: 1 addition & 1 deletion db/background_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (b *BackgroundManager) GetRunState(t *testing.T) BackgroundProcessState {

// For test use only
// Returns empty string if background process is not cluster aware
func (b *BackgroundManager) GetHeartbeatDocID(t *testing.T) string {
func (b *BackgroundManager) GetHeartbeatDocID(t testing.TB) string {
if b.isClusterAware() {
return b.clusterAwareOptions.HeartbeatDocID()
}
Expand Down
24 changes: 20 additions & 4 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,18 +1295,34 @@ func (db *Database) UpdateAllDocChannels(regenerateSequences bool, callback upda
defer callback(&docsProcessed, &docsChanged)

var unusedSequences []uint64

highSeq := uint64(0)
for {
results, err := db.QueryResync(queryLimit, startSeq, endSeq)
if err != nil {
return 0, err
}

queryRowCount := 0
highSeq := uint64(0)

var importRow QueryIdRow
for results.Next(&importRow) {
for {
var found bool
if db.UseViews() {
var viewRow channelsViewRow
found = results.Next(&viewRow)
if !found {
break
}
importRow = QueryIdRow{
Seq: uint64(viewRow.Key[1].(float64)),
Id: viewRow.ID,
}
} else {
found = results.Next(&importRow)
if !found {
break
}
}
select {
case <-terminator.Done():
base.Infof(base.KeyAll, "Resync was stopped before the operation could be completed. System "+
Expand All @@ -1324,7 +1340,6 @@ func (db *Database) UpdateAllDocChannels(regenerateSequences bool, callback upda
queryRowCount++
docsProcessed++
documentUpdateFunc := func(doc *Document) (updatedDoc *Document, shouldUpdate bool, updatedExpiry *uint32, err error) {
highSeq = doc.Sequence
forceUpdate := false
if !doc.HasValidSyncData() {
// This is a document not known to the sync gateway. Ignore it:
Expand Down Expand Up @@ -1448,6 +1463,7 @@ func (db *Database) UpdateAllDocChannels(regenerateSequences bool, callback upda
} else if err != base.ErrUpdateCancel {
base.Warnf("Error updating doc %q: %v", base.UD(docid), err)
}
highSeq = importRow.Seq
}

callback(&docsProcessed, &docsChanged)
Expand Down
3 changes: 2 additions & 1 deletion db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (

// Used for queries that only return doc id
type QueryIdRow struct {
Id string
Id string
Seq uint64
}

const (
Expand Down
20 changes: 20 additions & 0 deletions rest/attachment_compaction_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,26 @@ func (rt *RestTester) WaitForAttachmentCompactionStatus(t *testing.T, state db.B
return response
}

func (rt *RestTester) WaitForResyncStatus(status db.BackgroundProcessState) db.ResyncManagerResponse {
var resyncStatus db.ResyncManagerResponse
successFunc := func() bool {
response := rt.SendAdminRequest("GET", fmt.Sprintf("/%s/_resync", rt.GetDatabase().Name), "")
err := base.JSONUnmarshal(response.BodyBytes(), &resyncStatus)
require.NoError(rt.tb, err)

var val interface{}
_, err = rt.Bucket().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.tb), &val)

if status == db.BackgroundProcessStateCompleted {
return resyncStatus.State == status && base.IsDocNotFoundError(err)
} else {
return resyncStatus.State == status
}
}
require.NoError(rt.tb, rt.WaitForCondition(successFunc), "Expected status: %s, actual status: %s", status, resyncStatus.State)
return resyncStatus
}

func CreateLegacyAttachmentDoc(t *testing.T, testDB *db.Database, docID string, body []byte, attID string, attBody []byte) string {
if !base.TestUseXattrs() {
t.Skip("Requires xattrs")
Expand Down
51 changes: 51 additions & 0 deletions rest/changes_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3846,6 +3846,57 @@ func TestCacheCompactDuringChangesWait(t *testing.T) {
longpollWg.Wait()
}

func TestResyncAllTombstones(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("Walrus does not support Xattrs")
}

if !base.TestUseXattrs() {
t.Skip("If running with no xattrs compact acts as a no-op")
}

const queryPaginationLimit = 5
tests := []int{
0,
queryPaginationLimit - 1,
queryPaginationLimit,
queryPaginationLimit + 1,
(queryPaginationLimit * 2) - 1,
(queryPaginationLimit * 2),
(queryPaginationLimit * 2) + 1,
}

for _, numTombstones := range tests {
t.Run(fmt.Sprintf("limit:%d-numTombstones:%d", queryPaginationLimit, numTombstones), func(t *testing.T) {
rt := NewRestTester(t, &RestTesterConfig{
DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{
QueryPaginationLimit: base.IntPtr(queryPaginationLimit),
}},
})
rt.GetDatabase().PurgeInterval = 0
defer rt.Close()

for i := 0; i < numTombstones; i++ {
docID := fmt.Sprintf("doc%d", i)
resp := rt.putDoc(docID, `{"foo":"bar"}`)
require.True(t, resp.Ok)
rt.deleteDoc(docID, resp.Rev)
}

resp := rt.SendAdminRequest(http.MethodPost, fmt.Sprintf("/%s/_offline", rt.GetDatabase().Name), "")
RequireStatus(t, resp, http.StatusOK)
require.NoError(t, rt.WaitForDBState(db.RunStateString[db.DBOffline]))

resp = rt.SendAdminRequest(http.MethodPost, fmt.Sprintf("/%s/_resync?action=start", rt.GetDatabase().Name), "")
RequireStatus(t, resp, http.StatusOK)

status := rt.WaitForResyncStatus(db.BackgroundProcessStateCompleted)
assert.Equal(t, numTombstones, status.DocsProcessed)
assert.Equal(t, 0, status.DocsChanged)
})
}
}

func TestTombstoneCompaction(t *testing.T) {
defer base.SetUpTestLogging(base.LevelDebug, base.KeyAll)()

Expand Down
4 changes: 2 additions & 2 deletions rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,10 @@ func (rt *RestTester) GetDBState() string {
}

func (rt *RestTester) WaitForDBOnline() (err error) {
return rt.waitForDBState("Online")
return rt.WaitForDBState("Online")
}

func (rt *RestTester) waitForDBState(stateWant string) (err error) {
func (rt *RestTester) WaitForDBState(stateWant string) (err error) {
var stateCurr string
maxTries := 20

Expand Down

0 comments on commit 3b4d8e6

Please sign in to comment.