Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a new purge-uploads sub command #4075

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 11 additions & 63 deletions registry/handlers/app.go
Expand Up @@ -30,6 +30,7 @@ import (
registrymiddleware "github.com/distribution/distribution/v3/registry/middleware/registry"
repositorymiddleware "github.com/distribution/distribution/v3/registry/middleware/repository"
"github.com/distribution/distribution/v3/registry/proxy"
"github.com/distribution/distribution/v3/registry/purge"
"github.com/distribution/distribution/v3/registry/storage"
memorycache "github.com/distribution/distribution/v3/registry/storage/cache/memory"
rediscache "github.com/distribution/distribution/v3/registry/storage/cache/redis"
Expand Down Expand Up @@ -124,7 +125,7 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
panic(err)
}

purgeConfig := uploadPurgeDefaultConfig()
purgeConfig := purge.UploadPurgeDefaultConfig()
if mc, ok := config.Storage["maintenance"]; ok {
if v, ok := mc["uploadpurging"]; ok {
purgeConfig, ok = v.(map[interface{}]interface{})
Expand Down Expand Up @@ -949,70 +950,17 @@ func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []co
return driver, nil
}

// uploadPurgeDefaultConfig provides a default configuration for upload
// purging to be used in the absence of configuration in the
// configuration file
func uploadPurgeDefaultConfig() map[interface{}]interface{} {
config := map[interface{}]interface{}{}
config["enabled"] = true
config["age"] = "168h"
config["interval"] = "24h"
config["dryrun"] = false
return config
}

func badPurgeUploadConfig(reason string) {
panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason))
}

// startUploadPurger schedules a goroutine which will periodically
// check upload directories for old files and delete them
func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log dcontext.Logger, config map[interface{}]interface{}) {
if config["enabled"] == false {
return
}

var purgeAgeDuration time.Duration
var err error
purgeAge, ok := config["age"]
if ok {
ageStr, ok := purgeAge.(string)
if !ok {
badPurgeUploadConfig("age is not a string")
}
purgeAgeDuration, err = time.ParseDuration(ageStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error()))
}
} else {
badPurgeUploadConfig("age missing")
}

var intervalDuration time.Duration
interval, ok := config["interval"]
if ok {
intervalStr, ok := interval.(string)
if !ok {
badPurgeUploadConfig("interval is not a string")
}

intervalDuration, err = time.ParseDuration(intervalStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error()))
}
} else {
badPurgeUploadConfig("interval missing")
purgeOption, err := purge.ParseConfig(config)
if err != nil {
// log an error and skip purging
log.Errorf("Unable to parse upload purge configuration: %s", err.Error())
}

var dryRunBool bool
dryRun, ok := config["dryrun"]
if ok {
dryRunBool, ok = dryRun.(bool)
if !ok {
badPurgeUploadConfig("cannot parse dryrun")
}
} else {
badPurgeUploadConfig("dryrun missing")
if !purgeOption.Enabled {
return
}

go func() {
Expand All @@ -1027,9 +975,9 @@ func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageD
time.Sleep(jitter)

for {
storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
log.Infof("Starting upload purge in %s", intervalDuration)
time.Sleep(intervalDuration)
storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeOption.Age), !purgeOption.DryRun)
log.Infof("Starting upload purge in %s", purgeOption.Interval)
time.Sleep(purgeOption.Interval)
}
}()
}
93 changes: 93 additions & 0 deletions registry/purge/purgeuploads.go
@@ -0,0 +1,93 @@
package purge

import (
"fmt"
"time"
)

// PurgeOption contains options for purging uploads
type PurgeOption struct {
Enabled bool
Age time.Duration
Interval time.Duration
DryRun bool
}

func (po *PurgeOption) String() string {
return fmt.Sprintf(`
Purge Option:
Enabled: %t
DryRun: %t
Age: %s
Interval: %s
`, po.Enabled, po.DryRun, po.Age, po.Interval)
}

// UploadPurgeDefaultConfig provides a default configuration for upload
// purging to be used in the absence of configuration in the
// configuration file
func UploadPurgeDefaultConfig() map[interface{}]interface{} {
config := map[interface{}]interface{}{}
config["enabled"] = true
config["age"] = "168h"
config["interval"] = "24h"
config["dryrun"] = false
return config
}

func badPurgeUploadConfig(reason string) (*PurgeOption, error) {
return nil, fmt.Errorf("Unable to parse upload purge configuration: %s", reason)
}

// ParseConfig will parse purge uploads configs and set default values if not set
func ParseConfig(config map[interface{}]interface{}) (*PurgeOption, error) {
po := &PurgeOption{}

if config["enabled"] == true {
po.Enabled = true
}

var err error
purgeAgeDuration := 168 * time.Hour
purgeAge, ok := config["age"]
if ok {
ageStr, ok := purgeAge.(string)
if !ok {
return badPurgeUploadConfig("age is not a string")
}
purgeAgeDuration, err = time.ParseDuration(ageStr)
if err != nil {
return badPurgeUploadConfig(fmt.Sprintf("cannot parse age: %s", err.Error()))
}
}
po.Age = purgeAgeDuration

intervalDuration := 24 * time.Hour
interval, ok := config["interval"]
if ok {
intervalStr, ok := interval.(string)
if !ok {
return badPurgeUploadConfig("interval is not a string")
}

intervalDuration, err = time.ParseDuration(intervalStr)
if err != nil {
return badPurgeUploadConfig(fmt.Sprintf("cannot parse interval: %s", err.Error()))
}
}
po.Interval = intervalDuration

var dryRunBool bool
dryRun, ok := config["dryrun"]
if ok {
dryRunBool, ok = dryRun.(bool)
if !ok {
return badPurgeUploadConfig("cannot parse dryrun")
}
po.DryRun = dryRunBool
} else {
po.DryRun = false
}

return po, nil
}
102 changes: 102 additions & 0 deletions registry/purge/purgeuploads_test.go
@@ -0,0 +1,102 @@
package purge

import (
"reflect"
"strings"
"testing"
"time"

"github.com/distribution/distribution/v3/configuration"
)

func TestParseConfig(t *testing.T) {
tests := []struct {
config string
want *PurgeOption
err string
}{
{
config: `
version: 0.1
storage:
s3:
maintenance:
uploadpurging:
enabled: true
age: 120h
interval: 48h
dryrun: false`,
want: &PurgeOption{
Enabled: true,
DryRun: false,
Age: 120 * time.Hour,
Interval: 48 * time.Hour,
},
err: "",
},
{
config: `
version: 0.1
storage:
s3:
maintenance:
uploadpurging:
dryrun: false`,
want: &PurgeOption{
Enabled: false,
DryRun: false,
Age: 168 * time.Hour,
Interval: 24 * time.Hour,
},
err: "",
},
{
config: `
version: 0.1
storage:
s3:
maintenance:
uploadpurging:
enabled: true
age: aaaa`,
want: &PurgeOption{},
err: "age",
},
{
config: `
version: 0.1
storage:
s3:
maintenance:
uploadpurging:
enabled: true
interval: aaaa`,
want: &PurgeOption{},
err: "interval",
},
}

for _, tc := range tests {
fp := strings.NewReader(tc.config)
config, err := configuration.Parse(fp)
if err != nil {
t.Fatalf("failed to parse config file: %s: %s is not a valid config file", err, tc.config)
}

// here we are sure that the config data is valid and we can get the storage.maintenance.uploadpurging value and call type assertion.
purgeConfig := config.Storage["maintenance"]["uploadpurging"].(map[interface{}]interface{})
got, err := ParseConfig(purgeConfig)
if err != nil && tc.err == "" {
// got en unexception error
t.Fatalf("failed to parse %+v: %+v", purgeConfig, err)
} else if tc.err != "" && err == nil {
t.Fatalf("should get a parse error for %+v", purgeConfig)
} else if err != nil && tc.err != "" && strings.Index(err.Error(), tc.err) < 0 {
t.Fatalf("error should contain string %s, but can't fint the string for error %s", tc.err, err.Error())
}

if tc.err == "" && !reflect.DeepEqual(tc.want, got) {
t.Fatalf("expected: %v, got: %v", tc.want, got)
}
}
}