Skip to content

Commit

Permalink
Support "readiness groups" i.e. dependencies (#127)
Browse files Browse the repository at this point in the history
Implements the concept of readiness groups.

---------

Co-authored-by: Jordan Olshevski <joolshev@microsoft.com>
  • Loading branch information
jveski and Jordan Olshevski committed May 10, 2024
1 parent 0e52b52 commit 43d6a9e
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 39 deletions.
14 changes: 8 additions & 6 deletions api/v1/config/crd/eno.azure.io_synthesizers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ spec:
resources you expect another user/process to mutate.\n- eno.azure.io/readiness:
CEL expression used to assert that the resource is ready. More details below.\n-
eno.azure.io/readiness-*: Same as above, allows for multiple readiness checks.
All checks must pass for the resource to be considered ready.\n\n\nReadiness
expressions can return either bool or a Kubernetes condition struct.\nIf
a condition is returned it will be used as the resource's readiness time,
otherwise the controller will use wallclock time at the first moment it
noticed the truthy value.\nWhen possible, match on a timestamp to preserve
accuracy.\n\n\nExample matching on a condition:\n```cel\n\n\n\tself.status.conditions.filter(item,
All checks must pass for the resource to be considered ready.\n- eno.azure.io/readiness-group:
(int, default: 0) Eno will not create or update this resource until all
resoruces in lower-valued groups have become ready.\n\n\nReadiness expressions
can return either bool or a Kubernetes condition struct.\nIf a condition
is returned it will be used as the resource's readiness time, otherwise
the controller will use wallclock time at the first moment it noticed the
truthy value.\nWhen possible, match on a timestamp to preserve accuracy.\n\n\nExample
matching on a condition:\n```cel\n\n\n\tself.status.conditions.filter(item,
item.type == 'Test' && item.status == 'False')\n\n\n```\n\n\nExample matching
on a boolean:\n```cel\n\n\n\tself.status.foo == 'bar'\n\n\n```\n\n\nA special
resource can be returned from synthesizers: `eno.azure.io/v1.Patch`.\nExample:\n\n\n```yaml\n\n\n\t
Expand Down
1 change: 1 addition & 0 deletions api/v1/synthesizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type SynthesizerList struct {
// - eno.azure.io/disable-updates: Ensure that the resource exists but never update it. Useful for populating resources you expect another user/process to mutate.
// - eno.azure.io/readiness: CEL expression used to assert that the resource is ready. More details below.
// - eno.azure.io/readiness-*: Same as above, allows for multiple readiness checks. All checks must pass for the resource to be considered ready.
// - eno.azure.io/readiness-group: (int, default: 0) Eno will not create or update this resource until all resoruces in lower-valued groups have become ready.
//
// Readiness expressions can return either bool or a Kubernetes condition struct.
// If a condition is returned it will be used as the resource's readiness time, otherwise the controller will use wallclock time at the first moment it noticed the truthy value.
Expand Down
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ Eno honors a handful of annotations on resources emitted from synthesizers. They
- eno.azure.io/disable-updates: Ensure that the resource exists but never update it. Useful for populating resources you expect another user/process to mutate.
- eno.azure.io/readiness: CEL expression used to assert that the resource is ready. More details below.
- eno.azure.io/readiness-*: Same as above, allows for multiple readiness checks. All checks must pass for the resource to be considered ready.
- eno.azure.io/readiness-group: (int, default: 0) Eno will not create or update this resource until all resoruces in lower-valued groups have become ready.


Readiness expressions can return either bool or a Kubernetes condition struct.
Expand Down
54 changes: 39 additions & 15 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)

var prev *reconstitution.Resource
if comp.Status.PreviousSynthesis != nil {
synRef.UUID = comp.Status.PreviousSynthesis.UUID
prev, _ = c.resourceClient.Get(ctx, synRef, &req.Resource)
prevSynRef := reconstitution.NewSynthesisRef(comp)
prevSynRef.UUID = comp.Status.PreviousSynthesis.UUID
prev, _ = c.resourceClient.Get(ctx, prevSynRef, &req.Resource)
}
logger = logger.WithValues("resourceKind", resource.Ref.Kind, "resourceName", resource.Ref.Name, "resourceNamespace", resource.Ref.Namespace)
ctx = logr.NewContext(ctx, logger)

// Keep track of the last reconciliation time and report on it relative to the resource's reconcile interval
// This is useful for identifying cases where the loop can't keep up
Expand All @@ -125,17 +128,6 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
return ctrl.Result{}, fmt.Errorf("getting current state: %w", err)
}

// Nil current struct means the resource version hasn't changed since it was last observed
// Skip without logging since this is a very hot path
var modified bool
if hasChanged {
resource.ObserveVersion("") // in case reconciliation fails, invalidate the cache first to avoid skipping the next attempt
modified, err = c.reconcileResource(ctx, comp, prev, resource, current)
if err != nil {
return ctrl.Result{}, err
}
}

// Evaluate resource readiness
// - Readiness checks are skipped when this version of the resource's desired state has already become ready
// - Readiness checks are skipped when the resource hasn't changed since the last check
Expand All @@ -146,17 +138,49 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
return ctrl.Result{}, fmt.Errorf("getting resource slice: %w", err)
}
var ready *metav1.Time
if status := resource.FindStatus(slice); status == nil || status.Ready == nil {
status := resource.FindStatus(slice)
if status == nil || status.Ready == nil {
readiness, ok := resource.ReadinessChecks.EvalOptionally(ctx, current)
if ok {
ready = &readiness.ReadyTime
}
} else {
ready = status.Ready
}

// Evaluate the readiness of resources in the previous readiness group
if (status == nil || !status.Reconciled) && !resource.Deleted() {
dependencies := c.resourceClient.RangeByReadinessGroup(ctx, synRef, resource.ReadinessGroup, reconstitution.RangeDesc)
for _, dep := range dependencies {
slice := &apiv1.ResourceSlice{}
err = c.client.Get(ctx, dep.ManifestRef.Slice, slice)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting resource slice: %w", err)
}
status := dep.FindStatus(slice)
if status == nil || status.Ready == nil {
logger.V(1).Info("skipping because at least one resource in an earlier readiness group isn't ready yet")
return ctrl.Result{}, nil
}
}
}

// Nil current struct means the resource version hasn't changed since it was last observed
// Skip without logging since this is a very hot path
var modified bool
if hasChanged {
resource.ObserveVersion("") // in case reconciliation fails, invalidate the cache first to avoid skipping the next attempt
modified, err = c.reconcileResource(ctx, comp, prev, resource, current)
if err != nil {
return ctrl.Result{}, err
}
}

// We requeue to make sure the resource is in sync before updating our cache's resource version
// Otherwise the next sync would just hit the cache without actually diffing the resource.
if modified {
return ctrl.Result{Requeue: true}, nil
}

if current != nil {
if rv := current.GetResourceVersion(); rv != "" {
resource.ObserveVersion(rv)
Expand Down
144 changes: 144 additions & 0 deletions internal/controllers/reconciliation/ordering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package reconciliation

import (
"fmt"
"slices"
"strconv"
"testing"
"time"

apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/controllers/aggregation"
testv1 "github.com/Azure/eno/internal/controllers/reconciliation/fixtures/v1"
"github.com/Azure/eno/internal/controllers/rollout"
"github.com/Azure/eno/internal/controllers/synthesis"
"github.com/Azure/eno/internal/testutil"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestReadinessGroups(t *testing.T) {
scheme := runtime.NewScheme()
corev1.SchemeBuilder.AddToScheme(scheme)
testv1.SchemeBuilder.AddToScheme(scheme)

ctx := testutil.NewContext(t)
mgr := testutil.NewManager(t)
upstream := mgr.GetClient()

// Register supporting controllers
require.NoError(t, rollout.NewController(mgr.Manager, time.Millisecond))
require.NoError(t, synthesis.NewStatusController(mgr.Manager))
require.NoError(t, aggregation.NewSliceController(mgr.Manager))
require.NoError(t, synthesis.NewPodLifecycleController(mgr.Manager, defaultConf))
require.NoError(t, synthesis.NewSliceCleanupController(mgr.Manager))
require.NoError(t, synthesis.NewExecController(mgr.Manager, defaultConf, &testutil.ExecConn{Hook: func(s *apiv1.Synthesizer) []client.Object {
obj := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-obj-0",
Namespace: "default",
},
Data: map[string]string{"image": s.Spec.Image},
}

gvks, _, err := scheme.ObjectKinds(obj)
require.NoError(t, err)
obj.GetObjectKind().SetGroupVersionKind(gvks[0])

obj1 := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-obj-1",
Namespace: "default",
Annotations: map[string]string{"eno.azure.io/readiness-group": "2"},
},
Data: map[string]string{"image": s.Spec.Image},
}

gvks, _, err = scheme.ObjectKinds(obj1)
require.NoError(t, err)
obj1.GetObjectKind().SetGroupVersionKind(gvks[0])

obj2 := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-obj-2",
Namespace: "default",
Annotations: map[string]string{"eno.azure.io/readiness-group": "4"},
},
Data: map[string]string{"image": s.Spec.Image},
}

gvks, _, err = scheme.ObjectKinds(obj2)
require.NoError(t, err)
obj2.GetObjectKind().SetGroupVersionKind(gvks[0])

return []client.Object{obj, obj2, obj1}
}}))

// Test subject
setupTestSubject(t, mgr)
mgr.Start(t)

syn := &apiv1.Synthesizer{}
syn.Name = "test-syn"
syn.Spec.Image = "create"
require.NoError(t, upstream.Create(ctx, syn))

comp := &apiv1.Composition{}
comp.Name = "test-comp"
comp.Namespace = "default"
comp.Spec.Synthesizer.Name = syn.Name
require.NoError(t, upstream.Create(ctx, comp))

// Wait for reconciliation
testutil.Eventually(t, func() bool {
err := upstream.Get(ctx, client.ObjectKeyFromObject(comp), comp)
return err == nil && comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.Reconciled != nil && comp.Status.CurrentSynthesis.ObservedSynthesizerGeneration == syn.Generation
})

// Prove resources were created in the expected order
// Technically resource version is an opaque string, realistically it won't be changing
// any time soon so it's safe to use here and less flaky than the creation timestamp
assertOrder := func() {
resourceVersions := []int{}
for i := 0; i < 2; i++ {
cm := &corev1.ConfigMap{}
cm.Name = fmt.Sprintf("test-obj-%d", i)
cm.Namespace = "default"
err := mgr.DownstreamClient.Get(ctx, client.ObjectKeyFromObject(cm), cm)
require.NoError(t, err)

rv, _ := strconv.Atoi(cm.ResourceVersion)
resourceVersions = append(resourceVersions, rv)
}
if !slices.IsSorted(resourceVersions) { // ascending
t.Errorf("expected resource versions to be sorted: %+d", resourceVersions)
}
}
assertOrder()

// Updates should also be ordered
err := retry.RetryOnConflict(testutil.Backoff, func() error {
upstream.Get(ctx, client.ObjectKeyFromObject(syn), syn)
syn.Spec.Image = "updated"
return upstream.Update(ctx, syn)
})
require.NoError(t, err)

testutil.Eventually(t, func() bool {
err := upstream.Get(ctx, client.ObjectKeyFromObject(comp), comp)
return err == nil && comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.Reconciled != nil && comp.Status.CurrentSynthesis.ObservedSynthesizerGeneration == syn.Generation
})
assertOrder()

// Deletes should not be ordered
require.NoError(t, upstream.Delete(ctx, comp))
testutil.Eventually(t, func() bool {
err := upstream.Get(ctx, client.ObjectKeyFromObject(comp), comp)
return errors.IsNotFound(err)
})
}
29 changes: 25 additions & 4 deletions internal/reconstitution/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Cache struct {
mut sync.Mutex
resources map[SynthesisRef]*resources
synthesisUUIDsByComposition map[types.NamespacedName][]string
byIndex map[sliceIndex]*Resource
}

// resources contains a set of indexed resources scoped to a single Composition
Expand All @@ -34,6 +35,12 @@ type resources struct {
ByReadinessGroup *redblacktree.Tree[uint, []*Resource]
}

type sliceIndex struct {
Index int
SliceName string
Namespace string
}

func NewCache(client client.Client) *Cache {
renv, err := readiness.NewEnv()
if err != nil {
Expand All @@ -44,6 +51,7 @@ func NewCache(client client.Client) *Cache {
renv: renv,
resources: make(map[SynthesisRef]*resources),
synthesisUUIDsByComposition: make(map[types.NamespacedName][]string),
byIndex: make(map[sliceIndex]*resource.Resource),
}
}

Expand All @@ -64,11 +72,11 @@ func (c *Cache) Get(ctx context.Context, comp *SynthesisRef, ref *resource.Ref)
return res, ok
}

func (c *Cache) RangeByReadinessGroup(ctx context.Context, comp *SynthesisRef, group uint, dir int) []*Resource {
func (c *Cache) RangeByReadinessGroup(ctx context.Context, comp *SynthesisRef, group uint, dir RangeDirection) []*Resource {
c.mut.Lock()
defer c.mut.Unlock()

if group == 0 && dir == -1 {
if group == 0 && !dir {
return nil
}

Expand All @@ -83,7 +91,7 @@ func (c *Cache) RangeByReadinessGroup(ctx context.Context, comp *SynthesisRef, g
}

// If we're adjacent...
if dir > 0 {
if dir {
if node.Right != nil {
return node.Right.Value
}
Expand All @@ -94,7 +102,7 @@ func (c *Cache) RangeByReadinessGroup(ctx context.Context, comp *SynthesisRef, g
}

// ...otherwise we need to find it
if dir > 0 {
if dir {
node, ok = resources.ByReadinessGroup.Ceiling(group + 1)
} else {
node, ok = resources.ByReadinessGroup.Floor(group - 1)
Expand All @@ -106,6 +114,18 @@ func (c *Cache) RangeByReadinessGroup(ctx context.Context, comp *SynthesisRef, g
return node.Value
}

func (c *Cache) getByIndex(idx *sliceIndex) (*Resource, bool) {
c.mut.Lock()
defer c.mut.Unlock()

res, ok := c.byIndex[*idx]
if !ok {
return nil, false
}

return res, ok
}

// hasSynthesis returns true when the cache contains the resulting resources of the given synthesis.
// This should be called before Fill to determine if filling is necessary.
func (c *Cache) hasSynthesis(comp *apiv1.Composition, synthesis *apiv1.Synthesis) bool {
Expand Down Expand Up @@ -163,6 +183,7 @@ func (c *Cache) buildResources(ctx context.Context, comp *apiv1.Composition, ite
return nil, nil, fmt.Errorf("building resource at index %d of slice %s: %w", i, slice.Name, err)
}
resources.ByRef[res.Ref] = res
c.byIndex[sliceIndex{Index: i, SliceName: slice.Name, Namespace: slice.Namespace}] = res

current, _ := resources.ByReadinessGroup.Get(res.ReadinessGroup)
resources.ByReadinessGroup.Put(res.ReadinessGroup, append(current, res))
Expand Down

0 comments on commit 43d6a9e

Please sign in to comment.