Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use blob digest as cleanup and cache key for refs in registry proxy mode #4236

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 20 additions & 9 deletions registry/proxy/scheduler/scheduler.go
Expand Up @@ -158,17 +158,22 @@ func (ttles *TTLExpirationScheduler) Start() error {
return nil
}

func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
func (ttles *TTLExpirationScheduler) add(r reference.Canonical, ttl time.Duration, eType int) {
// Use the raw digest as the scheduler entry key so that common blob TTLs can be properly extended as they are
// pulled with other manifests. Note that schedulerEntry.Key remains as the full reference format to avoid info loss
// when recovering the schedule from disk.
entryKey := r.Digest().String()
entry := &schedulerEntry{
Key: r.String(),
Expiry: time.Now().Add(ttl),
EntryType: eType,
}
dcontext.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, time.Until(entry.Expiry))
if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil {
if oldEntry, present := ttles.entries[entryKey]; present && oldEntry.timer != nil {
oldEntry.timer.Stop()
dcontext.GetLogger(ttles.ctx).Infof("Replacing existing scheduler entry for common blob %s", entryKey)
}
ttles.entries[entry.Key] = entry
ttles.entries[entryKey] = entry
entry.timer = ttles.startTimer(entry, ttl)
ttles.indexDirty = true
}
Expand All @@ -192,15 +197,21 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
}

ref, err := reference.Parse(entry.Key)
if err == nil {
if err := f(ref); err != nil {
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
}
} else {
if err != nil {
dcontext.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err)
return
}
cRef, ok := ref.(reference.Canonical)
if !ok {
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler expected canonicalReference, but got : %T", ref)
return
}

if err := f(ref); err != nil {
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
}

delete(ttles.entries, entry.Key)
delete(ttles.entries, cRef.Digest().String())
ttles.indexDirty = true
})
}
Expand Down
154 changes: 120 additions & 34 deletions registry/proxy/scheduler/scheduler_test.go
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/distribution/reference"
)

func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
func testRefs(t *testing.T) (reference.Canonical, reference.Canonical, reference.Canonical) {
ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
Expand All @@ -27,31 +27,37 @@ func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference
t.Fatalf("could not parse reference: %v", err)
}

return ref1, ref2, ref3
return ref1.(reference.Canonical), ref2.(reference.Canonical), ref3.(reference.Canonical)
}

func TestSchedule(t *testing.T) {
ref1, ref2, ref3 := testRefs(t)
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
ref1.String(): true,
ref2.String(): true,
ref3.String(): true,
remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
ref3.Digest().String(): true,
}

var mu sync.Mutex
s := New(dcontext.Background(), inmemory.New(), "/ttl")
deleteFunc := func(repoName reference.Reference) error {
if len(remainingRepos) == 0 {
deleteFunc := func(ref reference.Reference) error {
cRef, ok := ref.(reference.Canonical)
if !ok {
t.Fatalf("reference is not cannonical (includes name & digest): %v", ref)
}
refKey := cRef.Digest().String()

if len(remainingRefs) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok := remainingRepos[repoName.String()]
_, ok = remainingRefs[refKey]
if !ok {
t.Fatalf("Trying to remove nonexistent repo: %s", repoName)
t.Fatalf("Trying to remove nonexistent ref: %s", refKey)
}
t.Log("removing", repoName)
t.Log("removing", refKey)
mu.Lock()
delete(remainingRepos, repoName.String())
delete(remainingRefs, refKey)
mu.Unlock()

return nil
Expand All @@ -71,49 +77,56 @@ func TestSchedule(t *testing.T) {
s.Unlock()
}()

// Ensure all repos are deleted
// Ensure all refs are deleted
<-time.After(50 * timeUnit)

mu.Lock()
defer mu.Unlock()
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
if len(remainingRefs) != 0 {
t.Fatalf("Refs remaining: %#v", remainingRefs)
}
}

func TestRestoreOld(t *testing.T) {
ref1, ref2, _ := testRefs(t)
remainingRepos := map[string]bool{
ref1.String(): true,
ref2.String(): true,
remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
}

var wg sync.WaitGroup
wg.Add(len(remainingRepos))
wg.Add(len(remainingRefs))
var mu sync.Mutex
deleteFunc := func(r reference.Reference) error {
deleteFunc := func(ref reference.Reference) error {
mu.Lock()
defer mu.Unlock()
if r.String() == ref1.String() && len(remainingRepos) == 2 {

cRef, ok := ref.(reference.Canonical)
if !ok {
t.Fatalf("reference is not cannonical (includes name & digest): %v", ref)
}
refKey := cRef.Digest().String()

if cRef.(reference.Canonical).Digest() == ref1.Digest() && len(remainingRefs) == 2 {
t.Errorf("ref1 should not be removed first")
}
_, ok := remainingRepos[r.String()]
_, ok = remainingRefs[refKey]
if !ok {
t.Fatalf("Trying to remove nonexistent repo: %s", r)
t.Fatalf("Trying to remove nonexistent ref: %s", refKey)
}
delete(remainingRepos, r.String())
delete(remainingRefs, refKey)
wg.Done()
return nil
}

timeUnit := time.Millisecond
serialized, err := json.Marshal(&map[string]schedulerEntry{
ref1.String(): {
ref1.Digest().String(): {
Expiry: time.Now().Add(10 * timeUnit),
Key: ref1.String(),
EntryType: 0,
},
ref2.String(): {
ref2.Digest().String(): {
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
Key: ref2.String(),
EntryType: 0,
Expand Down Expand Up @@ -141,24 +154,24 @@ func TestRestoreOld(t *testing.T) {
wg.Wait()
mu.Lock()
defer mu.Unlock()
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
if len(remainingRefs) != 0 {
t.Fatalf("Refs remaining: %#v", remainingRefs)
}
}

func TestStopRestore(t *testing.T) {
ref1, ref2, _ := testRefs(t)

timeUnit := time.Millisecond
remainingRepos := map[string]bool{
ref1.String(): true,
ref2.String(): true,
remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
}

var mu sync.Mutex
deleteFunc := func(r reference.Reference) error {
mu.Lock()
delete(remainingRepos, r.String())
delete(remainingRefs, r.(reference.Canonical).Digest().String())
mu.Unlock()
return nil
}
Expand Down Expand Up @@ -191,8 +204,8 @@ func TestStopRestore(t *testing.T) {
<-time.After(500 * timeUnit)
mu.Lock()
defer mu.Unlock()
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
if len(remainingRefs) != 0 {
t.Fatalf("Refs remaining: %#v", remainingRefs)
}
}

Expand All @@ -207,3 +220,76 @@ func TestDoubleStart(t *testing.T) {
t.Fatalf("Scheduler started twice without error")
}
}

func TestCommonRef(t *testing.T) {
ref1, ref2, ref3 := testRefs(t)

timeUnit := time.Millisecond

// Create a shared blob reference for ref3
ref3Copy, err := reference.Parse("anothertestrepo@" + ref3.Digest().String())
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
cRef3Copy := ref3Copy.(reference.Canonical)

remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
ref3.Digest().String(): true,
}

var mu sync.Mutex
s := New(dcontext.Background(), inmemory.New(), "/ttl")
deleteFunc := func(ref reference.Reference) error {
cRef, ok := ref.(reference.Canonical)
if !ok {
t.Fatalf("reference is not cannonical (includes name & digest): %v", ref)
}
refKey := cRef.Digest().String()

if len(remainingRefs) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok = remainingRefs[refKey]
if !ok {
t.Fatalf("Trying to remove nonexistent ref: %s", refKey)
}
t.Log("removing", refKey)
mu.Lock()
delete(remainingRefs, refKey)
mu.Unlock()

return nil
}
s.onBlobExpire = deleteFunc
err = s.Start()
if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}

s.add(ref1, 3*timeUnit, entryTypeBlob)
s.add(ref2, 1*timeUnit, entryTypeBlob)

func() {
s.Lock()
s.add(ref3, 1*timeUnit, entryTypeBlob)
// This should override the existing expiry of ref3
s.add(cRef3Copy, 60000*timeUnit, entryTypeBlob)
s.Unlock()
}()

// Wait for refs to be deleted
<-time.After(50 * timeUnit)

mu.Lock()
defer mu.Unlock()

// Only the common blob should be reminaing
if len(remainingRefs) != 1 {
t.Fatalf("Expected 1 ref remaining, but got: %#v", remainingRefs)
}
if _, ok := remainingRefs[ref3.Digest().String()]; !ok {
t.Fatalf("Expected ref3 to be remaining, but got: %#v", remainingRefs)
}
}
2 changes: 1 addition & 1 deletion registry/storage/cache/memory/memory.go
Expand Up @@ -57,7 +57,7 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) RepositoryScoped(repo string)
}

return &repositoryScopedInMemoryBlobDescriptorCache{
repo: repo,
repo: "", // todo: removed repo scope to handle common blob invalidation
parent: imbdcp,
}, nil
}
Expand Down