Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add bounded concurrency for tag lookup and untag #4329

Merged
merged 2 commits into from Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/registry/config-cache.yml
Expand Up @@ -12,6 +12,8 @@ storage:
maintenance:
uploadpurging:
enabled: false
tag:
concurrencylimit: 8
http:
addr: :5000
secret: asecretforlocaldevelopment
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry/config-dev.yml
Expand Up @@ -14,6 +14,8 @@ storage:
maintenance:
uploadpurging:
enabled: false
tag:
concurrencylimit: 8
http:
addr: :5000
debug:
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry/config-example.yml
Expand Up @@ -7,6 +7,8 @@ storage:
blobdescriptor: inmemory
filesystem:
rootdirectory: /var/lib/registry
tag:
concurrencylimit: 8
http:
addr: :5000
headers:
Expand Down
17 changes: 17 additions & 0 deletions configuration/configuration.go
Expand Up @@ -441,6 +441,8 @@ func (storage Storage) Type() string {
// allow configuration of delete
case "redirect":
// allow configuration of redirect
case "tag":
// allow configuration of tag
default:
storageType = append(storageType, k)
}
Expand All @@ -454,6 +456,19 @@ func (storage Storage) Type() string {
return ""
}

// TagParameters returns the Parameters map for a Storage tag configuration
func (storage Storage) TagParameters() Parameters {
return storage["tag"]
}

// setTagParameter changes the parameter at the provided key to the new value
func (storage Storage) setTagParameter(key string, value interface{}) {
if _, ok := storage["tag"]; !ok {
storage["tag"] = make(Parameters)
}
storage["tag"][key] = value
}

// Parameters returns the Parameters map for a Storage configuration
func (storage Storage) Parameters() Parameters {
return storage[storage.Type()]
Expand Down Expand Up @@ -482,6 +497,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error {
// allow configuration of delete
case "redirect":
// allow configuration of redirect
case "tag":
// allow configuration of tag
default:
types = append(types, k)
}
Expand Down
8 changes: 8 additions & 0 deletions configuration/configuration_test.go
Expand Up @@ -39,6 +39,9 @@ var configStruct = Configuration{
"url1": "https://foo.example.com",
"path1": "/some-path",
},
"tag": Parameters{
"concurrencylimit": 10,
},
},
Auth: Auth{
"silly": Parameters{
Expand Down Expand Up @@ -167,6 +170,8 @@ storage:
int1: 42
url1: "https://foo.example.com"
path1: "/some-path"
tag:
concurrencylimit: 10
auth:
silly:
realm: silly
Expand Down Expand Up @@ -542,6 +547,9 @@ func copyConfig(config Configuration) *Configuration {
for k, v := range config.Storage.Parameters() {
configCopy.Storage.setParameter(k, v)
}
for k, v := range config.Storage.TagParameters() {
configCopy.Storage.setTagParameter(k, v)
}

configCopy.Auth = Auth{config.Auth.Type(): Parameters{}}
for k, v := range config.Auth.Parameters() {
Expand Down
22 changes: 22 additions & 0 deletions docs/content/about/configuration.md
Expand Up @@ -141,6 +141,8 @@ storage:
usedualstack: false
loglevel: debug
inmemory: # This driver takes no parameters
tag:
concurrencylimit: 8
delete:
enabled: false
redirect:
Expand Down Expand Up @@ -521,6 +523,26 @@ parameter sets a limit on the number of descriptors to store in the cache.
The default value is 10000. If this parameter is set to 0, the cache is allowed
to grow with no size limit.

### `tag`

The `tag` subsection provides configuration to set concurrency limit for tag lookup.
When user calls into the registry to delete the manifest, which in turn then does a
lookup for all tags that reference the deleted manifest. To find the tag references,
the registry will iterate every tag in the repository and read it's link file to check
if it matches the deleted manifest (i.e. to see if uses the same sha256 digest).
So, the more tags in repository, the worse the performance will be (as there will
be more S3 API calls occurring for the tag directory lookups and tag file reads if
using S3 storage driver).

Therefore, add a single flag `concurrencylimit` to set concurrency limit to optimize tag
lookup performance under the `tag` section. When a value is not provided or equal to 0,
`GOMAXPROCS` will be used.

```yaml
tag:
concurrencylimit: 8
```

### `redirect`

The `redirect` subsection provides configuration for managing redirects from
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -88,7 +88,7 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sync v0.3.0
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
15 changes: 15 additions & 0 deletions registry/handlers/app.go
Expand Up @@ -188,6 +188,21 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
}
}

// configure tag lookup concurrency limit
if p := config.Storage.TagParameters(); p != nil {
l, ok := p["concurrencylimit"]
if ok {
limit, ok := l.(int)
if !ok {
panic("tag lookup concurrency limit config key must have a integer value")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics of an explicit concurrency limit of 0? -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the initialization of the tagStore, a judgment is made, and if it is greater than 0, it will be set; otherwise, the concurrency limit is set to the default value.

limit := DefaultConcurrencyLimit
if repo.tagLookupConcurrencyLimit > 0 {
	limit = repo.tagLookupConcurrencyLimit
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document that in the docs FWIW

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document that in the docs FWIW

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining all non-positive integers to mean GOMAXPROCS does not leave any values to signify the other possible special case: unbounded concurrency / no limit. To be clear, I don't think an option for unbounded concurrency should be added as part of this PR. I simply want to reserve the negative numbers for future use by having it be an error to set concurrencylimit to a negative value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @corhere, I've added a check that directly throws an error if it's set to a negative value. However, I'm not quite clear on how you suggest handling unbounded / no limit concurrency. Should we maintain the current logic? Or are you saying that even if the user sets it to unbounded, we should treat it with a certain upper limit, such as GOMAXPROCS?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that we do not handle unbounded concurrency. At least not as part of this PR. What I am suggesting is the following:

concurrencylimit value Result
(unset) Tag lookups are limited to GOMAXPROCS concurrency (the default)
0 Tag lookups are limited to GOMAXPROCS concurrency (the default)
Positive integer, e.g. 3 Tag lookups are limited to the value, e.g. 3
Negative integer, e.g. -1 Configuration error

Making negative numbers an error today guarantees that no valid registry configuration can have a concurrencylimit option set to a negative number. That gives us the freedom to redefine negative values to mean something valid in the future as a non-breaking change. One potential future definition of negative concurrencylimit values is to configure unbounded concurrency. But we don't need that today.

Note that setting concurrencylimit: 0 should be allowed. It is very useful to have an "explicit default" value for distribution configuration as it allows the user to be explicit about their intent to use GOMAXPROCS as the concurrency limit in their configuration, or to override e.g. concurrencylimit: 4 in the YAML config by setting the environment variable REGISTRY_TAG_CONCURRENCYLIMIT=0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your nice explanation, I have updated it, PTAL.

}
if limit < 0 {
panic("tag lookup concurrency limit should be a non-negative integer value")
}
options = append(options, storage.TagLookupConcurrencyLimit(limit))
}
}

// configure redirects
var redirectDisabled bool
if redirectConfig, ok := config.Storage["redirect"]; ok {
Expand Down
25 changes: 21 additions & 4 deletions registry/handlers/manifests.go
Expand Up @@ -6,18 +6,21 @@ import (
"mime"
"net/http"
"strings"
"sync"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/manifestlist"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
"github.com/gorilla/handlers"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -481,12 +484,26 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques
return
}

var (
errs []error
mu sync.Mutex
)
g := errgroup.Group{}
g.SetLimit(storage.DefaultConcurrencyLimit)
for _, tag := range referencedTags {
if err := tagService.Untag(imh, tag); err != nil {
imh.Errors = append(imh.Errors, err)
return
}
tag := tag

g.Go(func() error {
if err := tagService.Untag(imh, tag); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
}
_ = g.Wait() // imh will record all errors, so ignore the error of Wait()
imh.Errors = errs

w.WriteHeader(http.StatusAccepted)
}
22 changes: 20 additions & 2 deletions registry/storage/registry.go
Expand Up @@ -3,13 +3,18 @@ package storage
import (
"context"
"regexp"
"runtime"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/storage/cache"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
)

var (
DefaultConcurrencyLimit = runtime.GOMAXPROCS(0)
)

// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
Expand All @@ -18,6 +23,7 @@ type registry struct {
statter *blobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
tagLookupConcurrencyLimit int
resumableDigestEnabled bool
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
manifestURLs manifestURLs
Expand All @@ -40,6 +46,13 @@ func EnableRedirect(registry *registry) error {
return nil
}

func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption {
return func(registry *registry) error {
registry.tagLookupConcurrencyLimit = concurrencyLimit
return nil
}
}

// EnableDelete is a functional option for NewRegistry. It enables deletion on
// the registry.
func EnableDelete(registry *registry) error {
Expand Down Expand Up @@ -184,9 +197,14 @@ func (repo *repository) Named() reference.Named {
}

func (repo *repository) Tags(ctx context.Context) distribution.TagService {
limit := DefaultConcurrencyLimit
if repo.tagLookupConcurrencyLimit > 0 {
limit = repo.tagLookupConcurrencyLimit
}
tags := &tagStore{
repository: repo,
blobStore: repo.registry.blobStore,
repository: repo,
blobStore: repo.registry.blobStore,
concurrencyLimit: limit,
}

return tags
Expand Down
62 changes: 44 additions & 18 deletions registry/storage/tagstore.go
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"path"
"sort"
"sync"

"github.com/opencontainers/go-digest"
"golang.org/x/sync/errgroup"

"github.com/distribution/distribution/v3"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
)

var _ distribution.TagService = &tagStore{}
Expand All @@ -18,8 +21,9 @@ var _ distribution.TagService = &tagStore{}
// which only makes use of the Digest field of the returned distribution.Descriptor
// but does not enable full roundtripping of Descriptor objects
type tagStore struct {
repository *repository
blobStore *blobStore
repository *repository
blobStore *blobStore
concurrencyLimit int
}

// All returns all tags
Expand Down Expand Up @@ -145,26 +149,48 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
return nil, err
}

var tags []string
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ts.concurrencyLimit)

var (
tags []string
mu sync.Mutex
)
for _, tag := range allTags {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
if ctx.Err() != nil {
break
}
tag := tag

tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
continue
g.Go(func() error {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
}
return nil, err
}

if tagDigest == desc.Digest {
tags = append(tags, tag)
}
tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
return nil
}
return err
}

if tagDigest == desc.Digest {
mu.Lock()
tags = append(tags, tag)
mu.Unlock()
}

return nil
})
}

err = g.Wait()
if err != nil {
return nil, err
}

return tags, nil
Expand Down