Skip to content

Commit

Permalink
[3.0.7] CBG-2903: Cherry pick: CBG-2282: timeout/retry for gocbv2 boo…
Browse files Browse the repository at this point in the history
…tstrap (#6211)

* CBG-2903 - Cherry pick: CBG-2282: Added timeout and change retry handling for GoCB v2 ops (#5807)

* CBG-2282: Added timeout and change retry handling for GoCB v2 ops

* Added fast fail when getting collection from cluster

* Backport GetCollectionFromCluster waitUntilReady/fastFail changes from #5832

---------

Co-authored-by: Isaac Lambat <isaac.lambat@couchbase.com>
  • Loading branch information
bbrks and IsaacLambat committed Apr 25, 2023
1 parent ee58f36 commit 9ce91b6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 12 deletions.
3 changes: 2 additions & 1 deletion base/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func NewCouchbaseCluster(server, username, password,
clusterOptions := gocb.ClusterOptions{
Authenticator: authenticatorConfig,
SecurityConfig: securityConfig,
RetryStrategy: &goCBv2FailFastRetryStrategy{},
TimeoutsConfig: GoCBv2TimeoutsConfig(nil, nil),
RetryStrategy: gocb.NewBestEffortRetryStrategy(nil),
}

var configPersistence ConfigPersistence
Expand Down
17 changes: 13 additions & 4 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func GetCouchbaseCollection(spec BucketSpec) (*Collection, error) {
Authenticator: authenticator,
SecurityConfig: securityConfig,
TimeoutsConfig: timeoutsConfig,
RetryStrategy: &goCBv2FailFastRetryStrategy{},
RetryStrategy: gocb.NewBestEffortRetryStrategy(nil),
}

if spec.KvPoolSize > 0 {
Expand All @@ -87,15 +87,24 @@ func GetCouchbaseCollection(spec BucketSpec) (*Collection, error) {
return nil, err
}

return GetCollectionFromCluster(cluster, spec, 30)
return GetCollectionFromCluster(cluster, spec, time.Second*30, true)

}

func GetCollectionFromCluster(cluster *gocb.Cluster, spec BucketSpec, waitUntilReadySeconds int) (*Collection, error) {
func GetCollectionFromCluster(cluster *gocb.Cluster, spec BucketSpec, waitUntilReady time.Duration, failFast bool) (*Collection, error) {

// Connect to bucket
bucket := cluster.Bucket(spec.BucketName)
err := bucket.WaitUntilReady(time.Duration(waitUntilReadySeconds)*time.Second, nil)

var retryStrategy gocb.RetryStrategy
if failFast {
retryStrategy = &goCBv2FailFastRetryStrategy{}
} else {
retryStrategy = gocb.NewBestEffortRetryStrategy(nil)
}
err := bucket.WaitUntilReady(waitUntilReady, &gocb.WaitUntilReadyOptions{
RetryStrategy: retryStrategy,
})
if err != nil {
_ = cluster.Close(&gocb.ClusterCloseOptions{})
if errors.Is(err, gocb.ErrAuthenticationFailure) {
Expand Down
4 changes: 2 additions & 2 deletions base/main_test_bucket_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (tbp *TestBucketPool) createTestBuckets(numBuckets int, bucketQuotaMB int,
FatalfCtx(ctx, "Couldn't create test bucket: %v", err)
}

b, err := tbp.cluster.openTestBucket(tbpBucketName(bucketName), 10*numBuckets)
b, err := tbp.cluster.openTestBucket(tbpBucketName(bucketName), waitForReadyBucketTimeout)
if err != nil {
FatalfCtx(ctx, "Timed out trying to open new bucket: %v", err)
}
Expand Down Expand Up @@ -544,7 +544,7 @@ loop:
defer tbp.bucketReadierWaitGroup.Done()

start := time.Now()
b, err := tbp.cluster.openTestBucket(testBucketName, 5)
b, err := tbp.cluster.openTestBucket(testBucketName, waitForReadyBucketTimeout)
if err != nil {
tbp.Logf(ctx, "Couldn't open bucket to get ready, got error: %v", err)
return
Expand Down
10 changes: 5 additions & 5 deletions base/main_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type tbpCluster interface {
getBucketNames() ([]string, error)
insertBucket(name string, quotaMB int) error
removeBucket(name string) error
openTestBucket(name tbpBucketName, waitUntilReadySeconds int) (Bucket, error)
openTestBucket(name tbpBucketName, waitUntilReady time.Duration) (Bucket, error)
close() error
}

Expand Down Expand Up @@ -110,9 +110,9 @@ func (c *tbpClusterV1) removeBucket(name string) error {
}

// openTestBucket opens the bucket of the given name for the gocb cluster in the given TestBucketPool.
func (c *tbpClusterV1) openTestBucket(testBucketName tbpBucketName, waitUntilReadySeconds int) (Bucket, error) {
func (c *tbpClusterV1) openTestBucket(testBucketName tbpBucketName, waitUntilReady time.Duration) (Bucket, error) {

sleeper := CreateSleeperFunc(waitUntilReadySeconds, 1000)
sleeper := CreateSleeperFunc(int(waitUntilReady.Seconds()), 1000)
ctx := bucketNameCtx(context.Background(), string(testBucketName))

bucketSpec := getBucketSpec(testBucketName)
Expand Down Expand Up @@ -253,12 +253,12 @@ func (c *tbpClusterV2) removeBucket(name string) error {
}

// openTestBucket opens the bucket of the given name for the gocb cluster in the given TestBucketPool.
func (c *tbpClusterV2) openTestBucket(testBucketName tbpBucketName, waitUntilReadySeconds int) (Bucket, error) {
func (c *tbpClusterV2) openTestBucket(testBucketName tbpBucketName, waitUntilReady time.Duration) (Bucket, error) {

cluster := getCluster(c.server)
bucketSpec := getBucketSpec(testBucketName)

return GetCollectionFromCluster(cluster, bucketSpec, waitUntilReadySeconds)
return GetCollectionFromCluster(cluster, bucketSpec, waitUntilReady, false)
}

func (c *tbpClusterV2) close() error {
Expand Down

0 comments on commit 9ce91b6

Please sign in to comment.