Skip to content

Commit

Permalink
Merge branch 'master' into metrics-v3-cluster-webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
anjalshireesh committed Apr 23, 2024
2 parents 5e298f5 + 5ea5ab1 commit f2f1243
Show file tree
Hide file tree
Showing 16 changed files with 427 additions and 232 deletions.
12 changes: 11 additions & 1 deletion cmd/api-errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ const (
ErrInvalidResourceName
ErrInvalidLifecycleQueryParameter
ErrServerNotInitialized
ErrBucketMetadataNotInitialized
ErrRequestTimedout
ErrClientDisconnected
ErrTooManyRequests
Expand Down Expand Up @@ -1295,7 +1296,12 @@ var errorCodes = errorCodeMap{
},
ErrServerNotInitialized: {
Code: "XMinioServerNotInitialized",
Description: "Server not initialized, please try again.",
Description: "Server not initialized yet, please try again.",
HTTPStatusCode: http.StatusServiceUnavailable,
},
ErrBucketMetadataNotInitialized: {
Code: "XMinioBucketMetadataNotInitialized",
Description: "Bucket metadata not initialized yet, please try again.",
HTTPStatusCode: http.StatusServiceUnavailable,
},
ErrMalformedJSON: {
Expand Down Expand Up @@ -2211,6 +2217,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrInvalidMaxParts
case ioutil.ErrOverread:
apiErr = ErrExcessData
case errServerNotInitialized:
apiErr = ErrServerNotInitialized
case errBucketMetadataNotInitialized:
apiErr = ErrBucketMetadataNotInitialized
}

// Compression errors
Expand Down
313 changes: 157 additions & 156 deletions cmd/apierrorcode_string.go

Large diffs are not rendered by default.

24 changes: 22 additions & 2 deletions cmd/bucket-metadata-sys.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type BucketMetadataSys struct {
objAPI ObjectLayer

sync.RWMutex
initialized bool
metadataMap map[string]BucketMetadata
}

Expand Down Expand Up @@ -433,6 +434,8 @@ func (sys *BucketMetadataSys) GetConfigFromDisk(ctx context.Context, bucket stri
return loadBucketMetadata(ctx, objAPI, bucket)
}

var errBucketMetadataNotInitialized = errors.New("bucket metadata not initialized yet")

// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
// reloaded will be true if metadata refreshed from disk
Expand All @@ -454,6 +457,10 @@ func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (met
}
meta, err = loadBucketMetadata(ctx, objAPI, bucket)
if err != nil {
if !sys.Initialized() {
// bucket metadata not yet initialized
return newBucketMetadata(bucket), reloaded, errBucketMetadataNotInitialized
}
return meta, reloaded, err
}
sys.Lock()
Expand Down Expand Up @@ -498,9 +505,10 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
}

errs := g.Wait()
for _, err := range errs {
for index, err := range errs {
if err != nil {
internalLogIf(ctx, err, logger.WarningKind)
internalLogOnceIf(ctx, fmt.Errorf("Unable to load bucket metadata, will be retried: %w", err),
"load-bucket-metadata-"+buckets[index].Name, logger.WarningKind)
}
}

Expand Down Expand Up @@ -583,6 +591,14 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa
}
}

// Initialized indicates if bucket metadata sys is initialized atleast once.
func (sys *BucketMetadataSys) Initialized() bool {
sys.RLock()
defer sys.RUnlock()

return sys.initialized
}

// Loads bucket metadata for all buckets into BucketMetadataSys.
func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) {
count := 100 // load 100 bucket metadata at a time.
Expand All @@ -596,6 +612,10 @@ func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) {
buckets = buckets[count:]
}

sys.Lock()
sys.initialized = true
sys.Unlock()

if globalIsDistErasure {
go sys.refreshBucketsMetadataLoop(ctx, failedBuckets)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/bucket-object-lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (sys *BucketObjectLockSys) Get(bucketName string) (r objectlock.Retention,
if errors.Is(err, errInvalidArgument) {
return r, err
}
logger.CriticalIf(context.Background(), err)
return r, err
}
return config.ToRetention(), nil
Expand Down
27 changes: 19 additions & 8 deletions cmd/bucket-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,10 @@ const (
// gets replication config associated to a given bucket name.
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
rCfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
if err != nil {
if errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) || errors.Is(err, errInvalidArgument) {
return rCfg, err
}
logger.CriticalIf(ctx, err)
if err != nil && !errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) {
return rCfg, err
}
return rCfg, err
return rCfg, nil
}

// validateReplicationDestination returns error if replication destination bucket missing or not configured
Expand Down Expand Up @@ -261,10 +258,16 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica
if mopts.replicationRequest { // incoming replication request on target cluster
return
}

cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
replLogOnceIf(ctx, err, bucket)
return
}
if cfg == nil {
return
}

opts := replication.ObjectOpts{
Name: object,
SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
Expand Down Expand Up @@ -312,6 +315,7 @@ var standardHeaders = []string{
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
c, err := getReplicationConfig(ctx, bucket)
if err != nil || c == nil {
replLogOnceIf(ctx, err, bucket)
return false
}
for _, obj := range objects {
Expand All @@ -331,6 +335,7 @@ func isStandardHeader(matchHeaderKey string) bool {
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, delOpts ObjectOptions, gerr error) (dsc ReplicateDecision) {
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
replLogOnceIf(ctx, err, bucket)
return
}
// If incoming request is a replication request, it does not need to be re-replicated.
Expand Down Expand Up @@ -2231,6 +2236,8 @@ func getProxyTargets(ctx context.Context, bucket, object string, opts ObjectOpti
}
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil || cfg == nil {
replLogOnceIf(ctx, err, bucket)

return &madmin.BucketTargets{}
}
topts := replication.ObjectOpts{Name: object}
Expand Down Expand Up @@ -3124,7 +3131,7 @@ func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationR
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) {
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
replLogIf(ctx, err)
replLogOnceIf(ctx, err, bucket)
return nil, err
}
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
Expand Down Expand Up @@ -3217,7 +3224,11 @@ func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, ret
if oi.ModTime.IsZero() {
return
}
rcfg, _ := getReplicationConfig(ctx, bucket)
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
replLogOnceIf(ctx, err, bucket)
return
}
tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
queueReplicationHeal(ctx, bucket, oi, replicationConfig{
Config: rcfg,
Expand Down
2 changes: 1 addition & 1 deletion cmd/bucket-versioning-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r
}, r.URL)
return
}
if _, err := getReplicationConfig(ctx, bucket); err == nil && v.Suspended() {
if rc, _ := getReplicationConfig(ctx, bucket); rc != nil && v.Suspended() {
writeErrorResponse(ctx, w, APIError{
Code: "InvalidBucketState",
Description: "A replication configuration is present on this bucket, bucket wide versioning cannot be suspended.",
Expand Down
34 changes: 29 additions & 5 deletions cmd/data-scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/heal"
"github.com/minio/minio/internal/event"
Expand Down Expand Up @@ -952,10 +954,32 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
}

versionID := oi.VersionID
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
rCfg, _ := globalBucketObjectLockSys.Get(i.bucket)
replcfg, _ := getReplicationConfig(ctx, i.bucket)
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, replcfg, oi)

var vc *versioning.Versioning
var lr objectlock.Retention
var rcfg *replication.Config
if i.bucket != minioMetaBucket {
vc, err = globalBucketVersioningSys.Get(i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}

// Check if bucket is object locked.
lr, err = globalBucketObjectLockSys.Get(i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}

rcfg, err = getReplicationConfig(ctx, i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}
}

lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, lr, rcfg, oi)
if i.debug {
if versionID != "" {
console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, lcEvt.Action)
Expand All @@ -973,7 +997,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
size = 0
case lifecycle.DeleteAction:
// On a non-versioned bucket, DeleteObject removes the only version permanently.
if !vcfg.PrefixEnabled(oi.Name) {
if !vc.PrefixEnabled(oi.Name) {
size = 0
}
}
Expand Down
33 changes: 15 additions & 18 deletions cmd/data-usage-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package cmd

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -36,7 +35,6 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/tinylib/msgp/msgp"
"github.com/valyala/bytebufferpool"
)
Expand Down Expand Up @@ -1005,11 +1003,23 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, name), nil, http.Header{}, ObjectOptions{NoLock: true})
if err != nil {
switch err.(type) {
case ObjectNotFound, BucketNotFound:
return false, nil
r, err = store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
if err != nil {
switch err.(type) {
case ObjectNotFound, BucketNotFound:
return false, nil
case InsufficientReadQuorum, StorageErr:
return true, nil
}
return false, err
}
err = d.deserialize(r)
r.Close()
return err != nil, nil
case InsufficientReadQuorum, StorageErr:
return true, nil
}
Expand Down Expand Up @@ -1070,24 +1080,11 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
}

save := func(name string, timeout time.Duration) error {
hr, err := hash.NewReader(ctx, bytes.NewReader(buf.Bytes()), int64(buf.Len()), "", "", int64(buf.Len()))
if err != nil {
return err
}

// Abandon if more than a minute, so we don't hold up scanner.
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

_, err = store.PutObject(ctx,
dataUsageBucket,
name,
NewPutObjReader(hr),
ObjectOptions{NoLock: true})
if isErrBucketNotFound(err) {
return nil
}
return err
return saveConfig(ctx, store, pathJoin(bucketMetaPrefix, name), buf.Bytes())
}
defer save(name+".bkp", 5*time.Second) // Keep a backup as well

Expand Down
15 changes: 12 additions & 3 deletions cmd/erasure-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,9 +1833,18 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
var replcfg *replication.Config
if opts.Expiration.Expire {
// Check if the current bucket has a configured lifecycle policy
lc, _ = globalLifecycleSys.Get(bucket)
rcfg, _ = globalBucketObjectLockSys.Get(bucket)
replcfg, _ = getReplicationConfig(ctx, bucket)
lc, err = globalLifecycleSys.Get(bucket)
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bucket}) {
return objInfo, err
}
rcfg, err = globalBucketObjectLockSys.Get(bucket)
if err != nil {
return objInfo, err
}
replcfg, err = getReplicationConfig(ctx, bucket)
if err != nil {
return objInfo, err
}
}

// expiration attempted on a bucket with no lifecycle
Expand Down

0 comments on commit f2f1243

Please sign in to comment.