Skip to content

Commit

Permalink
feat(helm): track Helm hooks of any kind
Browse files Browse the repository at this point in the history
Previously only Jobs were tracked, others ignored.

Signed-off-by: Ilya Lesikov <ilya@lesikov.com>
  • Loading branch information
ilya-lesikov committed Jul 6, 2022
1 parent fc07bd3 commit 86ba23f
Showing 1 changed file with 159 additions and 163 deletions.
322 changes: 159 additions & 163 deletions pkg/deploy/helm/resources_waiter.go
Expand Up @@ -74,140 +74,16 @@ func (waiter *ResourcesWaiter) Wait(ctx context.Context, resources helm_kube.Res
}
}

specs := multitrack.MultitrackSpecs{}

for _, v := range resources {
switch value := asVersioned(v).(type) {
case *appsv1.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *appsv1beta1.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *appsv1beta2.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *extensions.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *extensions.DaemonSet:
// TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster.
// TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok.
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.DaemonSets = append(specs.DaemonSets, *spec)
}
case *appsv1.DaemonSet:
// TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster.
// TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok.
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.DaemonSets = append(specs.DaemonSets, *spec)
}
case *appsv1beta2.DaemonSet:
// TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster.
// TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok.
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.DaemonSets = append(specs.DaemonSets, *spec)
}
case *appsv1.StatefulSet:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.StatefulSets = append(specs.StatefulSets, *spec)
}
case *appsv1beta1.StatefulSet:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.StatefulSets = append(specs.StatefulSets, *spec)
}
case *appsv1beta2.StatefulSet:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.StatefulSets = append(specs.StatefulSets, *spec)
}
case *batchv1.Job:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "job")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Jobs = append(specs.Jobs, *spec)
}
case *flaggerv1beta1.Canary:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "canary")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Canaries = append(specs.Canaries, *spec)
}
default:
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(v.Object)
if err != nil {
return fmt.Errorf("error converting object to unstructured: %w", err)
}

object := unstructured.Unstructured{
Object: obj,
}

resourceID := resid.NewResourceID(object.GetName(), object.GroupVersionKind(), resid.NewResourceIDOptions{
Namespace: object.GetNamespace(),
})

if spec, err := makeGenericSpec(ctx, resourceID, waiter.StatusProgressPeriod, timeout, object.GetAnnotations()); err != nil {
logboek.Context(ctx).Warn().LogLn()
logboek.Context(ctx).Warn().LogF("WARNING %s\n", err)
} else if spec != nil {
specs.Generics = append(specs.Generics, spec)
}
}
specs, err := makeMultitrackSpecsFromResList(ctx, resources, timeout, waiter.StatusProgressPeriod)
if err != nil {
return fmt.Errorf("error making multitrack specs: %w", err)
}

// NOTE: use context from resources-waiter object here, will be changed in helm 3
logboek.Context(ctx).LogOptionalLn()
return logboek.Context(ctx).LogProcess("Waiting for resources to become ready").
DoError(func() error {
return multitrack.Multitrack(kube.Client, specs, multitrack.MultitrackOptions{
return multitrack.Multitrack(kube.Client, *specs, multitrack.MultitrackOptions{
StatusProgressPeriod: waiter.StatusProgressPeriod,
Options: tracker.Options{
Timeout: timeout,
Expand Down Expand Up @@ -428,49 +304,36 @@ mainLoop:
}

func (waiter *ResourcesWaiter) WatchUntilReady(ctx context.Context, resources helm_kube.ResourceList, timeout time.Duration) error {
if os.Getenv("WERF_DISABLE_RESOURCES_WAITER") == "1" {
return nil
}

if waiter.KubeInitializer != nil {
if err := waiter.KubeInitializer.Init(ctx); err != nil {
return fmt.Errorf("kube initializer failed: %w", err)
}
}

for _, info := range resources {
name := info.Name
kind := info.Mapping.GroupVersionKind.Kind

// TODO: should this work with all resources, not just Jobs?
switch value := asVersioned(info).(type) {
case *batchv1.Job:
specs := multitrack.MultitrackSpecs{}

spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "job")
if err != nil {
return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Jobs = append(specs.Jobs, *spec)
}

return logboek.Context(ctx).LogProcess("Waiting for helm hook job/%s termination", name).
DoError(func() error {
return multitrack.Multitrack(kube.Client, specs, multitrack.MultitrackOptions{
StatusProgressPeriod: waiter.HooksStatusProgressPeriod,
Options: tracker.Options{
Timeout: timeout,
LogsFromTime: waiter.LogsFromTime,
},
DynamicClient: kube.DynamicClient,
DiscoveryClient: kube.CachedDiscoveryClient,
Mapper: kube.Mapper,
})
})

default:
logboek.Context(ctx).Default().LogFDetails("Will not track helm hook %s/%s: %s kind not supported for tracking\n", strings.ToLower(kind), name, kind)
}
specs, err := makeMultitrackSpecsFromResList(ctx, resources, timeout, waiter.HooksStatusProgressPeriod)
if err != nil {
return fmt.Errorf("error making multitrack specs: %w", err)
}

return nil
// NOTE: use context from resources-waiter object here, will be changed in helm 3
logboek.Context(ctx).LogOptionalLn()
return logboek.Context(ctx).LogProcess("Waiting for helm hooks termination").
DoError(func() error {
return multitrack.Multitrack(kube.Client, *specs, multitrack.MultitrackOptions{
StatusProgressPeriod: waiter.HooksStatusProgressPeriod,
Options: tracker.Options{
Timeout: timeout,
LogsFromTime: waiter.LogsFromTime,
},
DynamicClient: kube.DynamicClient,
DiscoveryClient: kube.CachedDiscoveryClient,
Mapper: kube.Mapper,
})
})
}

func asVersioned(info *resource.Info) runtime.Object {
Expand Down Expand Up @@ -510,3 +373,136 @@ func (waiter *ResourcesWaiter) WaitUntilDeleted(ctx context.Context, specs []*he
return elimination.TrackUntilEliminated(ctx, kube.DynamicClient, eliminationSpecs, elimination.EliminationTrackerOptions{Timeout: timeout, StatusProgressPeriod: waiter.StatusProgressPeriod})
})
}

func makeMultitrackSpecsFromResList(ctx context.Context, resources helm_kube.ResourceList, timeout, statusProgressPeriod time.Duration) (*multitrack.MultitrackSpecs, error) {
specs := &multitrack.MultitrackSpecs{}

for _, v := range resources {
switch value := asVersioned(v).(type) {
case *appsv1.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *appsv1beta1.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *appsv1beta2.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *extensions.Deployment:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Deployments = append(specs.Deployments, *spec)
}
case *extensions.DaemonSet:
// TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster.
// TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok.
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.DaemonSets = append(specs.DaemonSets, *spec)
}
case *appsv1.DaemonSet:
// TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster.
// TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok.
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.DaemonSets = append(specs.DaemonSets, *spec)
}
case *appsv1beta2.DaemonSet:
// TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster.
// TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok.
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.DaemonSets = append(specs.DaemonSets, *spec)
}
case *appsv1.StatefulSet:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.StatefulSets = append(specs.StatefulSets, *spec)
}
case *appsv1beta1.StatefulSet:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.StatefulSets = append(specs.StatefulSets, *spec)
}
case *appsv1beta2.StatefulSet:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.StatefulSets = append(specs.StatefulSets, *spec)
}
case *batchv1.Job:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "job")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Jobs = append(specs.Jobs, *spec)
}
case *flaggerv1beta1.Canary:
spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "canary")
if err != nil {
return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err)
}
if spec != nil {
specs.Canaries = append(specs.Canaries, *spec)
}
default:
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(v.Object)
if err != nil {
return nil, fmt.Errorf("error converting object to unstructured: %w", err)
}

object := unstructured.Unstructured{
Object: obj,
}

resourceID := resid.NewResourceID(object.GetName(), object.GroupVersionKind(), resid.NewResourceIDOptions{
Namespace: object.GetNamespace(),
})

if spec, err := makeGenericSpec(ctx, resourceID, statusProgressPeriod, timeout, object.GetAnnotations()); err != nil {
logboek.Context(ctx).Warn().LogLn()
logboek.Context(ctx).Warn().LogF("WARNING %s\n", err)
} else if spec != nil {
specs.Generics = append(specs.Generics, spec)
}
}
}

return specs, nil
}

0 comments on commit 86ba23f

Please sign in to comment.