Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ttl and add mark and sweep #4266

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 12 additions & 12 deletions registry/proxy/proxyblobstore.go
Expand Up @@ -132,18 +132,18 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
return err
}

blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return err
}

if pbs.scheduler != nil && pbs.ttl != nil {
if err := pbs.scheduler.AddBlob(blobRef, *pbs.ttl); err != nil {
dcontext.GetLogger(ctx).Errorf("Error adding blob: %s", err)
return err
}
}
// blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
// if err != nil {
// dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
// return err
// }

// if pbs.scheduler != nil && pbs.ttl != nil {
// if err := pbs.scheduler.AddBlob(blobRef, *pbs.ttl); err != nil {
// dcontext.GetLogger(ctx).Errorf("Error adding blob: %s", err)
// return err
// }
// }

return nil
}
Expand Down
37 changes: 25 additions & 12 deletions registry/proxy/proxymanifeststore.go
Expand Up @@ -55,6 +55,19 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio
fromRemote = true
}

repoBlob, err := reference.WithDigest(pms.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return nil, err
}

if pms.scheduler != nil && pms.ttl != nil {
if err := pms.scheduler.AddManifest(repoBlob, *pms.ttl); err != nil {
dcontext.GetLogger(ctx).Errorf("Error adding manifest: %s", err)
return nil, err
}
}

_, payload, err := manifest.Payload()
if err != nil {
return nil, err
Expand All @@ -70,18 +83,18 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio
}

// Schedule the manifest blob for removal
repoBlob, err := reference.WithDigest(pms.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return nil, err
}

if pms.scheduler != nil && pms.ttl != nil {
if err := pms.scheduler.AddManifest(repoBlob, *pms.ttl); err != nil {
dcontext.GetLogger(ctx).Errorf("Error adding manifest: %s", err)
return nil, err
}
}
// repoBlob, err := reference.WithDigest(pms.repositoryName, dgst)
// if err != nil {
// dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
// return nil, err
// }

// if pms.scheduler != nil && pms.ttl != nil {
// if err := pms.scheduler.AddManifest(repoBlob, *pms.ttl); err != nil {
// dcontext.GetLogger(ctx).Errorf("Error adding manifest: %s", err)
// return nil, err
// }
// }

// Ensure the manifest blob is cleaned up
// pms.scheduler.AddBlob(blobRef, repositoryTTL)
Expand Down
59 changes: 31 additions & 28 deletions registry/proxy/proxyregistry.go
Expand Up @@ -39,7 +39,7 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
return nil, err
}

v := storage.NewVacuum(ctx, driver)
// v := storage.NewVacuum(ctx, driver)

var s *scheduler.TTLExpirationScheduler
var ttl *time.Duration
Expand All @@ -54,34 +54,37 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
}

if ttl != nil {
s = scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}

repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}

blobs := repo.Blobs(ctx)

// Clear the repository reference and descriptor caches
err = blobs.Delete(ctx, r.Digest())
if err != nil {
return err
}

err = v.RemoveBlob(r.Digest().String())
if err != nil {
return err
}

return nil
s = scheduler.New(ctx, driver, "/scheduler-state.json", registry, storage.GCOpts{
DryRun: true,
RemoveUntagged: false,
})
// s.OnBlobExpire(func(ref reference.Reference) error {
// var r reference.Canonical
// var ok bool
// if r, ok = ref.(reference.Canonical); !ok {
// return fmt.Errorf("unexpected reference type : %T", ref)
// }

// repo, err := registry.Repository(ctx, r)
// if err != nil {
// return err
// }

// blobs := repo.Blobs(ctx)

// // Clear the repository reference and descriptor caches
// err = blobs.Delete(ctx, r.Digest())
// if err != nil {
// return err
// }

// err = v.RemoveBlob(r.Digest().String())
// if err != nil {
// return err
// }

// return nil
// })

s.OnManifestExpire(func(ref reference.Reference) error {
var r reference.Canonical
Expand Down
135 changes: 122 additions & 13 deletions registry/proxy/scheduler/scheduler.go
Expand Up @@ -7,9 +7,12 @@ import (
"sync"
"time"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
"github.com/opencontainers/go-digest"
)

// onTTLExpiryFunc is called when a repository's TTL expires
Expand All @@ -18,7 +21,8 @@ type expiryFunc func(reference.Reference) error
const (
entryTypeBlob = iota
entryTypeManifest
indexSaveFrequency = 5 * time.Second
indexSaveFrequency = 5 * time.Second
garbageCollectFrequency = 1 * time.Minute
)

// schedulerEntry represents an entry in the scheduler
Expand All @@ -31,16 +35,24 @@ type schedulerEntry struct {
timer *time.Timer
}

func (se schedulerEntry) String() string {
return fmt.Sprintf("Expiry: %s, EntryType: %d", se.Expiry, se.EntryType)
}

// New returns a new instance of the scheduler
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
func New(ctx context.Context, driver driver.StorageDriver, path string, ttl *time.Duration, registry distribution.Namespace, opts storage.GCOpts) *TTLExpirationScheduler {
return &TTLExpirationScheduler{
entries: make(map[string]*schedulerEntry),
driver: driver,
pathToStateFile: path,
ctx: ctx,
stopped: true,
doneChan: make(chan struct{}),
saveTimer: time.NewTicker(indexSaveFrequency),
entries: make(map[string]*schedulerEntry),
driver: driver,
pathToStateFile: path,
ttl: ttl,
registry: registry,
opts: opts,
ctx: ctx,
stopped: true,
doneChan: make(chan struct{}),
saveTimer: time.NewTicker(indexSaveFrequency),
garbageCollectTimer: time.NewTicker(garbageCollectFrequency),
}
}

Expand All @@ -54,15 +66,19 @@ type TTLExpirationScheduler struct {
driver driver.StorageDriver
ctx context.Context
pathToStateFile string
ttl *time.Duration
registry distribution.Namespace
opts storage.GCOpts

stopped bool

onBlobExpire expiryFunc
onManifestExpire expiryFunc

indexDirty bool
saveTimer *time.Ticker
doneChan chan struct{}
indexDirty bool
saveTimer *time.Ticker
garbageCollectTimer *time.Ticker
doneChan chan struct{}
}

// OnBlobExpire is called when a scheduled blob's TTL expires
Expand Down Expand Up @@ -121,14 +137,19 @@ func (ttles *TTLExpirationScheduler) Start() error {
return fmt.Errorf("scheduler already started")
}

dcontext.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
dcontext.GetLogger(ttles.ctx).Infof("Starting cached object TTL (cameron changed) expiration scheduler...")
ttles.stopped = false

// Start timer for each deserialized entry
for _, entry := range ttles.entries {
entry.timer = ttles.startTimer(entry, time.Until(entry.Expiry))
}

err = ttles.BackfillManifests()
if err != nil {
return fmt.Errorf("failed to backfill manifests: %w", err)
}

// Start a ticker to periodically save the entries index

go func() {
Expand All @@ -140,6 +161,7 @@ func (ttles *TTLExpirationScheduler) Start() error {
ttles.Unlock()
continue
}
dcontext.GetLogger(ttles.ctx).Debugf("Current state: \n %+v", ttles.entries)

err := ttles.writeState()
if err != nil {
Expand All @@ -155,9 +177,25 @@ func (ttles *TTLExpirationScheduler) Start() error {
}
}()

// You could paraallize this, but you'd want to make sure work was not overlapping
go ttles.GarbageCollect()

return nil
}

func (ttles *TTLExpirationScheduler) GarbageCollect() {
for {
select {
case <-ttles.garbageCollectTimer.C:

storage.MarkAndSweep(ttles.ctx, ttles.driver, ttles.registry, ttles.opts)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be run as a separate process if desired, but is resistant to already deleted items.


case <-ttles.doneChan:
return
}
}
}

func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
entry := &schedulerEntry{
Key: r.String(),
Expand Down Expand Up @@ -256,5 +294,76 @@ func (ttles *TTLExpirationScheduler) readState() error {
if err != nil {
return err
}
dcontext.GetLogger(ttles.ctx).Infof("Start state: \n %+v", ttles.entries)

return nil
}

func (ttles *TTLExpirationScheduler) BackfillManifests() error {
repositoryEnumerator, ok := ttles.registry.(distribution.RepositoryEnumerator)
if !ok {
return fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
}
emit("backfilling manifests")

// mark
err := repositoryEnumerator.Enumerate(ttles.ctx, func(repoName string) error {
emit("backfill for " + repoName)

var err error
named, err := reference.WithName(repoName)
if err != nil {
return fmt.Errorf("failed to parse repo name %s: %v", repoName, err)
}
repository, err := ttles.registry.Repository(ttles.ctx, named)
if err != nil {
return fmt.Errorf("failed to construct repository: %v", err)
}

manifestService, err := repository.Manifests(ttles.ctx)
if err != nil {
return fmt.Errorf("failed to construct manifest service: %v", err)
}

manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
if !ok {
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
}

err = manifestEnumerator.Enumerate(ttles.ctx, func(dgst digest.Digest) error {
// Mark the manifest's blob
emit("backfill for %s: adding ttl manifest %s ", repoName, dgst)

// Skip if TTL exists for manifest
key := dgst.String()
if _, ok := ttles.entries[key]; !ok {
ttles.entries[key] = &schedulerEntry{
Key: key,

// TODO file created at is probably better
Expiry: time.Now().Add(*ttles.ttl),
EntryType: entryTypeManifest,
}
}

return nil
})

// In certain situations such as unfinished uploads, deleting all
// tags in S3 or removing the _manifests folder manually, this
// error may be of type PathNotFound.
//
// In these cases we can continue marking other manifests safely.
if _, ok := err.(driver.PathNotFoundError); ok {
return nil
}

return err
})

return err
}

func emit(format string, a ...interface{}) {
fmt.Printf(format+"\n", a...)
}
3 changes: 3 additions & 0 deletions tests/conf-e2e-cloud-storage.yml
Expand Up @@ -26,6 +26,9 @@ redis:
idletimeout: 60s
maxactive: 64
maxidle: 16
proxy:
remoteurl: "https://registry-1.docker.io"
ttl: 1m
storage:
redirect:
disable: true
Expand Down
4 changes: 2 additions & 2 deletions tests/docker-compose-e2e-cloud-storage.yml
Expand Up @@ -55,7 +55,7 @@ services:
minio-init:
condition: service_completed_successfully
ports:
- "5000:5000"
- "5001:5001"
- "5004:5000"
- "5005:5001"
volumes:
- ./conf-e2e-cloud-storage.yml:/etc/docker/registry/config-test.yml