Skip to content

Commit

Permalink
(3.0.8 backport) CBG-2855 Allow one-shot replications to wait for DCP…
Browse files Browse the repository at this point in the history
… to catch up on changes feed (#6284)

* CBG-2853 Allow one-shot replications to wait for DCP to catch up on changes feed (#6243)

* CBG-2853 Add requestPlus option for changes feeds

Adds requestPlus option for changes feeds.  When set, changes feeds will loop until the cached sequence (via DCP) is greater than the database sequence at the time the changes request was issued.

requestPlus can be enabled for non-continuous changes requests in one of three ways:
- by setting request_plus=true on a REST API changes call
- by setting the requestPlus property to "true" on a subChanges message
- by setting "changes_request_plus":true in the database config (default=false)

The request setting is given priority - if not set on a request, the value will fall back to the database config value.

Required minor refactoring of how options.Wait was used in changes.go, to support use of requestPlus and longpoll together.  No functional changes to longpoll if requestPlus is not set.

* Update docs for request_plus changes parameter.

* lint fixes

* Add fix for race condition

* Fix another race condition

* Fixes from merge

---------

Co-authored-by: Adam Fraser <adam.fraser@couchbase.com>
  • Loading branch information
torcolvin and adamcfraser committed Jun 7, 2023
1 parent c39824f commit e3d9fac
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 44 deletions.
34 changes: 26 additions & 8 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,23 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

continuous := subChangesParams.continuous()

// used for stats tracking
bh.continuous = continuous

requestPlusSeq := uint64(0)
// If non-continuous, check whether requestPlus handling is set for request or via database config
if continuous == false {
useRequestPlus := subChangesParams.requestPlus(bh.db.Options.ChangesRequestPlus)
if useRequestPlus {
seq, requestPlusErr := bh.db.GetRequestPlusSequence()
if requestPlusErr != nil {
return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve current sequence for requestPlus=true: %v", requestPlusErr)
}
requestPlusSeq = seq
}
}

// Start asynchronous changes goroutine
go func() {
// Pull replication stats by type
Expand Down Expand Up @@ -249,6 +264,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
revocations: subChangesParams.revocations(),
clientType: clientType,
ignoreNoConflicts: clientType == clientTypeSGR2, // force this side to accept a "changes" message, even in no conflicts mode for SGR2.
requestPlusSeq: requestPlusSeq,
})
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
}()
Expand All @@ -273,6 +289,7 @@ type sendChangesOptions struct {
clientType clientType
revocations bool
ignoreNoConflicts bool
requestPlusSeq uint64
}

type changesDeletedFlag uint
Expand All @@ -299,14 +316,15 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
base.InfofCtx(bh.loggingCtx, base.KeySync, "Sending changes since %v", opts.since)

options := ChangesOptions{
Since: opts.since,
Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
Continuous: opts.continuous,
ActiveOnly: opts.activeOnly,
Revocations: opts.revocations,
Terminator: bh.BlipSyncContext.terminator,
Ctx: bh.loggingCtx,
clientType: opts.clientType,
Since: opts.since,
Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
Continuous: opts.continuous,
ActiveOnly: opts.activeOnly,
Revocations: opts.revocations,
Terminator: bh.BlipSyncContext.terminator,
Ctx: bh.loggingCtx,
clientType: opts.clientType,
RequestPlusSeq: opts.requestPlusSeq,
}

channelSet := opts.channels
Expand Down
9 changes: 9 additions & 0 deletions db/blip_sync_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
SubChangesContinuous = "continuous"
SubChangesBatch = "batch"
SubChangesRevocations = "revocations"
SubChangesRequestPlus = "requestPlus"

// rev message properties
RevMessageId = "id"
Expand Down Expand Up @@ -199,6 +200,14 @@ func (s *SubChangesParams) activeOnly() bool {
return (s.rq.Properties[SubChangesActiveOnly] == "true")
}

func (s *SubChangesParams) requestPlus(defaultValue bool) (value bool) {
propertyValue, isDefined := s.rq.Properties[SubChangesRequestPlus]
if !isDefined {
return defaultValue
}
return propertyValue == "true"
}

func (s *SubChangesParams) filter() string {
return s.rq.Properties[SubChangesFilter]
}
Expand Down
55 changes: 32 additions & 23 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ import (
// Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as
// changes processing currently assumes a deep copy when doing chanOpts := changesOptions.
type ChangesOptions struct {
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
Terminator chan bool // Caller can close this channel to terminate the feed
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions
Revocations bool // Specifies whether revocation messages should be sent on the changes feed
clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client
Ctx context.Context // Used for adding context to logs
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
Terminator chan bool // Caller can close this channel to terminate the feed
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions
Revocations bool // Specifies whether revocation messages should be sent on the changes feed
clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client
Ctx context.Context // Used for adding context to logs
RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq
}

// A changes entry; Database.GetChanges returns an array of these.
Expand Down Expand Up @@ -626,11 +627,13 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
var userChanged bool // Whether the user document has changed in a given iteration loop
var deferredBackfill bool // Whether there's a backfill identified in the user doc that's deferred while the SG cache catches up

var useLateSequenceFeeds bool // LateSequence feeds are only used for continuous, or one-shot where options.RequestPlusSeq > currentCachedSequence
// Retrieve the current max cached sequence - ensures there isn't a race between the subsequent channel cache queries
currentCachedSequence = db.changeCache.getChannelCache().GetHighCacheSequence()
if options.Wait {
options.Wait = false
changeWaiter = db.startChangeWaiter(base.Set{}) // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop

// If changes feed requires more than one ChangesLoop iteration, initialize changeWaiter
if options.Wait || options.RequestPlusSeq > currentCachedSequence {
changeWaiter = db.startChangeWaiter(nil) // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop
userCounter = changeWaiter.CurrentUserCount()
// Reload user to pick up user changes that happened between auth and the change waiter
// initialization. Without this, notification for user doc changes in that window (a) won't be
Expand Down Expand Up @@ -665,7 +668,8 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption

// For a continuous feed, initialise the lateSequenceFeeds that track late-arriving sequences
// to the channel caches.
if options.Continuous {
if options.Continuous || options.RequestPlusSeq > currentCachedSequence {
useLateSequenceFeeds = true
lateSequenceFeeds = make(map[string]*lateSequenceFeed)
defer db.closeLateFeeds(lateSequenceFeeds)
}
Expand Down Expand Up @@ -723,7 +727,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
// Handles previously skipped sequences prior to options.Since that
// have arrived in the channel cache since this changes request started. Only needed for
// continuous feeds - one-off changes requests only require the standard channel cache.
if options.Continuous {
if useLateSequenceFeeds {
lateSequenceFeedHandler := lateSequenceFeeds[name]
if lateSequenceFeedHandler != nil {
latefeed, err := db.getLateFeed(lateSequenceFeedHandler, singleChannelCache)
Expand Down Expand Up @@ -940,14 +944,18 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
}
}

if !options.Continuous && (sentSomething || changeWaiter == nil) {
break
// Check whether non-continuous changes feeds that aren't waiting to reach requestPlus sequence can exit
if !options.Continuous && currentCachedSequence >= options.RequestPlusSeq {
// If non-longpoll, or longpoll has sent something, can exit
if !options.Wait || sentSomething {
break
}
}

// For longpoll requests that didn't send any results, reset low sequence to the original since value,
// as the system low sequence may change before the longpoll request wakes up, and longpoll feeds don't
// use lateSequenceFeeds.
if !options.Continuous {
if !useLateSequenceFeeds {
options.Since.LowSeq = requestLowSeq
}

Expand All @@ -964,6 +972,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption

waitForChanges:
for {
db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1)
// If we're in a deferred Backfill, the user may not get notification when the cache catches up to the backfill (e.g. when the granting doc isn't
// visible to the user), and so ChangeWaiter.Wait() would block until the next user-visible doc arrives. Use a hardcoded wait instead
// Similar handling for when we see sequences later than the stable sequence.
Expand All @@ -975,7 +984,6 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
break waitForChanges
}

db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1)
db.DbStats.CBLReplicationPull().NumPullReplCaughtUp.Add(1)
waitResponse := changeWaiter.Wait()
db.DbStats.CBLReplicationPull().NumPullReplCaughtUp.Add(-1)
Expand Down Expand Up @@ -1288,7 +1296,7 @@ func createChangesEntry(docid string, db *Database, options ChangesOptions) *Cha

func (options ChangesOptions) String() string {
return fmt.Sprintf(
`{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t}`,
`{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, RequestPlusSeq: %d}`,
options.Since,
options.Limit,
options.Conflicts,
Expand All @@ -1298,6 +1306,7 @@ func (options ChangesOptions) String() string {
options.HeartbeatMs,
options.TimeoutMs,
options.ActiveOnly,
options.RequestPlusSeq,
)
}

Expand Down
8 changes: 8 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type DatabaseContextOptions struct {
ClientPartitionWindow time.Duration
BcryptCost int
GroupID string
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
}

type SGReplicateOptions struct {
Expand Down Expand Up @@ -1711,3 +1712,10 @@ func (context *DatabaseContext) IsGuestReadOnly() bool {
return context.Options.UnsupportedOptions != nil && context.Options.UnsupportedOptions.GuestReadOnly

}

// GetRequestPlusSequence fetches the current value of the sequence counter for the database.
// Uses getSequence (instead of lastSequence) as it's intended to be up to date with allocations
// across all nodes, while lastSequence is just the latest allocation from this node
func (dbc *DatabaseContext) GetRequestPlusSequence() (uint64, error) {
return dbc.sequences.getSequence()
}
21 changes: 21 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ func (db *DatabaseContext) WaitForCaughtUp(targetCount int64) error {
return errors.New("WaitForCaughtUp didn't catch up")
}

func (db *DatabaseContext) WaitForTotalCaughtUp(targetCount int64) error {
for i := 0; i < 100; i++ {
caughtUpCount := db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value()
if caughtUpCount >= targetCount {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return errors.New("WaitForCaughtUp didn't catch up")
}

type StatWaiter struct {
initCount int64 // Document cached count when NewStatWaiter is called
targetCount int64 // Target count used when Wait is called
Expand Down Expand Up @@ -368,3 +379,13 @@ func SuspendSequenceBatching() func() {
MaxSequenceIncrFrequency = 0 * time.Millisecond
return func() { MaxSequenceIncrFrequency = oldFrequency }
}

// AllocateTestSequence allocates a sequence via the sequenceAllocator. For use by non-db tests
func AllocateTestSequence(database *DatabaseContext) (uint64, error) {
return database.sequences.incrementSequence(1)
}

// ReleaseTestSequence releases a sequence via the sequenceAllocator. For use by non-db tests
func ReleaseTestSequence(database *DatabaseContext, sequence uint64) error {
return database.sequences.releaseSequence(sequence)
}

0 comments on commit e3d9fac

Please sign in to comment.