Skip to content

Commit

Permalink
Order CRD/CR interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Jordan Olshevski committed May 13, 2024
1 parent 43d6a9e commit ecb7508
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 0 deletions.
26 changes: 26 additions & 0 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,32 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
}
}

// CRDs must be reconciled before any CRs that use the type they define.
// For initial creation just failing and retrying will eventually converge but updates are tricky
// since unknown properties sent from clients are ignored by apiserver.
// i.e. ordering is necessary to handle adding a new property and populating it in the same synthesis.
crdResource, ok := c.resourceClient.GetDefiningCRD(ctx, synRef, resource.GVK.GroupKind())
if ok {
slice := &apiv1.ResourceSlice{}
err = c.client.Get(ctx, crdResource.ManifestRef.Slice, slice)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting resource slice: %w", err)
}
status := crdResource.FindStatus(slice)
if status == nil || status.Ready == nil {
logger.V(1).Info("skipping because the CRD that defines this resource type isn't ready")
return ctrl.Result{}, nil
}

// apiserver doesn't "close the loop" on CRD loading, so there is no way to know
// when CRDs are actually ready. This normally only takes a couple of milliseconds
// but we round up to a full second here to be safe.
if delta := time.Second - time.Since(status.Ready.Time); delta > 0 {
logger.V(1).Info("deferring until the defining CRD has been ready for 1 second")
return ctrl.Result{RequeueAfter: delta}, nil
}
}

// Fetch the current resource
current, hasChanged, err := c.getCurrent(ctx, resource)
if client.IgnoreNotFound(err) != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: runtimetests.enotest.azure.io
spec:
group: enotest.azure.io
names:
kind: RuntimeTest
listKind: RuntimeTestList
plural: runtimetests
singular: runtimetest
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
properties:
addedValue:
type: integer
values:
items:
properties:
int:
type: integer
type: object
type: array
type: object
status:
type: object
type: object
served: true
storage: true
subresources:
status: {}
51 changes: 51 additions & 0 deletions internal/controllers/reconciliation/fixtures/crd-runtimetest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: runtimetests.enotest.azure.io
spec:
group: enotest.azure.io
names:
kind: RuntimeTest
listKind: RuntimeTestList
plural: runtimetests
singular: runtimetest
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
properties:
values:
items:
properties:
int:
type: integer
type: object
type: array
type: object
status:
type: object
type: object
served: true
storage: true
subresources:
status: {}
91 changes: 91 additions & 0 deletions internal/controllers/reconciliation/ordering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reconciliation

import (
"fmt"
"os"
"slices"
"strconv"
"testing"
Expand All @@ -13,11 +14,14 @@ import (
"github.com/Azure/eno/internal/controllers/rollout"
"github.com/Azure/eno/internal/controllers/synthesis"
"github.com/Azure/eno/internal/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -142,3 +146,90 @@ func TestReadinessGroups(t *testing.T) {
return errors.IsNotFound(err)
})
}

func TestCRDOrdering(t *testing.T) {
scheme := runtime.NewScheme()
corev1.SchemeBuilder.AddToScheme(scheme)
testv1.SchemeBuilder.AddToScheme(scheme)

ctx := testutil.NewContext(t)
mgr := testutil.NewManager(t)
upstream := mgr.GetClient()

// Register supporting controllers
require.NoError(t, rollout.NewController(mgr.Manager, time.Millisecond))
require.NoError(t, synthesis.NewStatusController(mgr.Manager))
require.NoError(t, aggregation.NewSliceController(mgr.Manager))
require.NoError(t, synthesis.NewPodLifecycleController(mgr.Manager, defaultConf))
require.NoError(t, synthesis.NewSliceCleanupController(mgr.Manager))
require.NoError(t, synthesis.NewExecController(mgr.Manager, defaultConf, &testutil.ExecConn{Hook: func(s *apiv1.Synthesizer) []client.Object {
crdFixture := "fixtures/crd-runtimetest.yaml"
if s.Spec.Image == "updated" {
crdFixture = "fixtures/crd-runtimetest-extra-property.yaml"
}

crd := &unstructured.Unstructured{}
crdBytes, err := os.ReadFile(crdFixture)
require.NoError(t, err)
require.NoError(t, yaml.Unmarshal(crdBytes, &crd.Object))

cr := &unstructured.Unstructured{}
cr.SetName("test-obj")
cr.SetNamespace("default")
cr.SetKind("RuntimeTest")
cr.SetAPIVersion("enotest.azure.io/v1")
cr.Object["spec"] = map[string]any{"values": []map[string]any{{"int": 123}}}

if s.Spec.Image == "updated" {
cr.Object["spec"].(map[string]any)["addedValue"] = 234
}

return []client.Object{cr, crd}
}}))

// Test subject
setupTestSubject(t, mgr)
mgr.Start(t)

syn := &apiv1.Synthesizer{}
syn.Name = "test-syn"
syn.Spec.Image = "create"
require.NoError(t, upstream.Create(ctx, syn))

comp := &apiv1.Composition{}
comp.Name = "test-comp"
comp.Namespace = "default"
comp.Spec.Synthesizer.Name = syn.Name
require.NoError(t, upstream.Create(ctx, comp))

// Wait for the initial creation
testutil.Eventually(t, func() bool {
err := upstream.Get(ctx, client.ObjectKeyFromObject(comp), comp)
return err == nil && comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.Reconciled != nil && comp.Status.CurrentSynthesis.ObservedSynthesizerGeneration == syn.Generation
})

// Update the CR and CRD to add a new property - it should exist after the next reconciliation
// If we didn't order the writes correctly the CR update would succeed with a warning without populating the new (not yet existing) property.
err := retry.RetryOnConflict(testutil.Backoff, func() error {
upstream.Get(ctx, client.ObjectKeyFromObject(syn), syn)
syn.Spec.Image = "updated"
return upstream.Update(ctx, syn)
})
require.NoError(t, err)

testutil.Eventually(t, func() bool {
err := upstream.Get(ctx, client.ObjectKeyFromObject(comp), comp)
return err == nil && comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.Reconciled != nil && comp.Status.CurrentSynthesis.ObservedSynthesizerGeneration == syn.Generation
})

cr := &unstructured.Unstructured{}
cr.SetName("test-obj")
cr.SetNamespace("default")
cr.SetKind("RuntimeTest")
cr.SetAPIVersion("enotest.azure.io/v1")
err = mgr.DownstreamClient.Get(ctx, client.ObjectKeyFromObject(cr), cr)
require.NoError(t, err)

val, _, _ := unstructured.NestedInt64(cr.Object, "spec", "addedValue")
assert.Equal(t, int64(234), val)
}
39 changes: 39 additions & 0 deletions internal/reconstitution/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -33,6 +34,8 @@ type Cache struct {
type resources struct {
ByRef map[resource.Ref]*Resource
ByReadinessGroup *redblacktree.Tree[uint, []*Resource]
ByGroupKind map[schema.GroupKind][]*Resource
CrdsByGroupKind map[schema.GroupKind]*Resource
}

type sliceIndex struct {
Expand Down Expand Up @@ -114,6 +117,23 @@ func (c *Cache) RangeByReadinessGroup(ctx context.Context, comp *SynthesisRef, g
return node.Value
}

func (c *Cache) GetDefiningCRD(ctx context.Context, syn *SynthesisRef, gk schema.GroupKind) (*Resource, bool) {
c.mut.Lock()
defer c.mut.Unlock()

resources, ok := c.resources[*syn]
if !ok {
return nil, false
}

res, ok := resources.CrdsByGroupKind[gk]
if !ok {
return nil, false
}

return res, true
}

func (c *Cache) getByIndex(idx *sliceIndex) (*Resource, bool) {
c.mut.Lock()
defer c.mut.Unlock()
Expand All @@ -126,6 +146,18 @@ func (c *Cache) getByIndex(idx *sliceIndex) (*Resource, bool) {
return res, ok
}

func (c *Cache) getByGK(syn *SynthesisRef, gk schema.GroupKind) []*Resource {
c.mut.Lock()
defer c.mut.Unlock()

res, ok := c.resources[*syn]
if !ok {
return nil
}

return res.ByGroupKind[gk]
}

// hasSynthesis returns true when the cache contains the resulting resources of the given synthesis.
// This should be called before Fill to determine if filling is necessary.
func (c *Cache) hasSynthesis(comp *apiv1.Composition, synthesis *apiv1.Synthesis) bool {
Expand Down Expand Up @@ -169,6 +201,8 @@ func (c *Cache) buildResources(ctx context.Context, comp *apiv1.Composition, ite
resources := &resources{
ByRef: map[resource.Ref]*Resource{},
ByReadinessGroup: redblacktree.New[uint, []*Resource](),
ByGroupKind: map[schema.GroupKind][]*resource.Resource{},
CrdsByGroupKind: map[schema.GroupKind]*resource.Resource{},
}
requests := []*Request{}
for _, slice := range items {
Expand All @@ -184,6 +218,7 @@ func (c *Cache) buildResources(ctx context.Context, comp *apiv1.Composition, ite
}
resources.ByRef[res.Ref] = res
c.byIndex[sliceIndex{Index: i, SliceName: slice.Name, Namespace: slice.Namespace}] = res
resources.ByGroupKind[res.GVK.GroupKind()] = append(resources.ByGroupKind[res.GVK.GroupKind()], res)

current, _ := resources.ByReadinessGroup.Get(res.ReadinessGroup)
resources.ByReadinessGroup.Put(res.ReadinessGroup, append(current, res))
Expand All @@ -192,6 +227,10 @@ func (c *Cache) buildResources(ctx context.Context, comp *apiv1.Composition, ite
Resource: res.Ref,
Composition: types.NamespacedName{Name: comp.Name, Namespace: comp.Namespace},
})

if res.DefinedGroupKind != nil {
resources.CrdsByGroupKind[*res.DefinedGroupKind] = res
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/reconstitution/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (r *controller) HandleReadinessTransition(ctx context.Context, req ctrl.Req

synRef := &SynthesisRef{CompositionName: owner.Name, Namespace: req.Namespace, UUID: slice.Spec.SynthesisUUID}
resources := r.Cache.RangeByReadinessGroup(ctx, synRef, res.ReadinessGroup, RangeAsc)
if res.DefinedGroupKind != nil {
resources = append(resources, r.Cache.getByGK(synRef, *res.DefinedGroupKind)...)
}
for _, res := range resources {
// TODO: This can be optimized by skipping the Add call if `res` is already ready
r.queue.Add(Request{
Expand Down
2 changes: 2 additions & 0 deletions internal/reconstitution/reconstitution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand All @@ -20,6 +21,7 @@ type Reconciler interface {
type Client interface {
Get(ctx context.Context, syn *SynthesisRef, res *resource.Ref) (*resource.Resource, bool)
RangeByReadinessGroup(ctx context.Context, syn *SynthesisRef, group uint, dir RangeDirection) []*Resource
GetDefiningCRD(ctx context.Context, syn *SynthesisRef, gk schema.GroupKind) (*Resource, bool)
}

type RangeDirection bool
Expand Down
9 changes: 9 additions & 0 deletions internal/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Resource struct {
Patch jsonpatch.Patch
DisableUpdates bool
ReadinessGroup uint

// DefinedGroupKind is set on CRDs to represent the resource type they define.
DefinedGroupKind *schema.GroupKind
}

func (r *Resource) Deleted() bool {
Expand Down Expand Up @@ -167,6 +170,12 @@ func NewResource(ctx context.Context, renv *readiness.Env, slice *apiv1.Resource
res.Patch = obj.Patch.Ops
}

if res.GVK.Group == "apiextensions.k8s.io" && res.GVK.Kind == "CustomResourceDefinition" {
res.DefinedGroupKind = &schema.GroupKind{}
res.DefinedGroupKind.Group, _, _ = unstructured.NestedString(parsed.Object, "spec", "group")
res.DefinedGroupKind.Kind, _, _ = unstructured.NestedString(parsed.Object, "spec", "names", "kind")
}

anno := parsed.GetAnnotations()
if anno == nil {
return res, nil
Expand Down

0 comments on commit ecb7508

Please sign in to comment.