diff --git a/pkg/deploy/helm/resources_waiter.go b/pkg/deploy/helm/resources_waiter.go index 0595f862a5..e24bce36bb 100644 --- a/pkg/deploy/helm/resources_waiter.go +++ b/pkg/deploy/helm/resources_waiter.go @@ -74,140 +74,16 @@ func (waiter *ResourcesWaiter) Wait(ctx context.Context, resources helm_kube.Res } } - specs := multitrack.MultitrackSpecs{} - - for _, v := range resources { - switch value := asVersioned(v).(type) { - case *appsv1.Deployment: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Deployments = append(specs.Deployments, *spec) - } - case *appsv1beta1.Deployment: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Deployments = append(specs.Deployments, *spec) - } - case *appsv1beta2.Deployment: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Deployments = append(specs.Deployments, *spec) - } - case *extensions.Deployment: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Deployments = append(specs.Deployments, *spec) - } - case *extensions.DaemonSet: - // TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster. - // TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok. - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.DaemonSets = append(specs.DaemonSets, *spec) - } - case *appsv1.DaemonSet: - // TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster. - // TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok. - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.DaemonSets = append(specs.DaemonSets, *spec) - } - case *appsv1beta2.DaemonSet: - // TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster. - // TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok. - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.DaemonSets = append(specs.DaemonSets, *spec) - } - case *appsv1.StatefulSet: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.StatefulSets = append(specs.StatefulSets, *spec) - } - case *appsv1beta1.StatefulSet: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.StatefulSets = append(specs.StatefulSets, *spec) - } - case *appsv1beta2.StatefulSet: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.StatefulSets = append(specs.StatefulSets, *spec) - } - case *batchv1.Job: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "job") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Jobs = append(specs.Jobs, *spec) - } - case *flaggerv1beta1.Canary: - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "canary") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Canaries = append(specs.Canaries, *spec) - } - default: - obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(v.Object) - if err != nil { - return fmt.Errorf("error converting object to unstructured: %w", err) - } - - object := unstructured.Unstructured{ - Object: obj, - } - - resourceID := resid.NewResourceID(object.GetName(), object.GroupVersionKind(), resid.NewResourceIDOptions{ - Namespace: object.GetNamespace(), - }) - - if spec, err := makeGenericSpec(ctx, resourceID, waiter.StatusProgressPeriod, timeout, object.GetAnnotations()); err != nil { - logboek.Context(ctx).Warn().LogLn() - logboek.Context(ctx).Warn().LogF("WARNING %s\n", err) - } else if spec != nil { - specs.Generics = append(specs.Generics, spec) - } - } + specs, err := makeMultitrackSpecsFromResList(ctx, resources, timeout, waiter.StatusProgressPeriod) + if err != nil { + return fmt.Errorf("error making multitrack specs: %w", err) } // NOTE: use context from resources-waiter object here, will be changed in helm 3 logboek.Context(ctx).LogOptionalLn() return logboek.Context(ctx).LogProcess("Waiting for resources to become ready"). DoError(func() error { - return multitrack.Multitrack(kube.Client, specs, multitrack.MultitrackOptions{ + return multitrack.Multitrack(kube.Client, *specs, multitrack.MultitrackOptions{ StatusProgressPeriod: waiter.StatusProgressPeriod, Options: tracker.Options{ Timeout: timeout, @@ -428,49 +304,36 @@ mainLoop: } func (waiter *ResourcesWaiter) WatchUntilReady(ctx context.Context, resources helm_kube.ResourceList, timeout time.Duration) error { + if os.Getenv("WERF_DISABLE_RESOURCES_WAITER") == "1" { + return nil + } + if waiter.KubeInitializer != nil { if err := waiter.KubeInitializer.Init(ctx); err != nil { return fmt.Errorf("kube initializer failed: %w", err) } } - for _, info := range resources { - name := info.Name - kind := info.Mapping.GroupVersionKind.Kind - - // TODO: should this work with all resources, not just Jobs? - switch value := asVersioned(info).(type) { - case *batchv1.Job: - specs := multitrack.MultitrackSpecs{} - - spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "job") - if err != nil { - return fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) - } - if spec != nil { - specs.Jobs = append(specs.Jobs, *spec) - } - - return logboek.Context(ctx).LogProcess("Waiting for helm hook job/%s termination", name). - DoError(func() error { - return multitrack.Multitrack(kube.Client, specs, multitrack.MultitrackOptions{ - StatusProgressPeriod: waiter.HooksStatusProgressPeriod, - Options: tracker.Options{ - Timeout: timeout, - LogsFromTime: waiter.LogsFromTime, - }, - DynamicClient: kube.DynamicClient, - DiscoveryClient: kube.CachedDiscoveryClient, - Mapper: kube.Mapper, - }) - }) - - default: - logboek.Context(ctx).Default().LogFDetails("Will not track helm hook %s/%s: %s kind not supported for tracking\n", strings.ToLower(kind), name, kind) - } + specs, err := makeMultitrackSpecsFromResList(ctx, resources, timeout, waiter.HooksStatusProgressPeriod) + if err != nil { + return fmt.Errorf("error making multitrack specs: %w", err) } - return nil + // NOTE: use context from resources-waiter object here, will be changed in helm 3 + logboek.Context(ctx).LogOptionalLn() + return logboek.Context(ctx).LogProcess("Waiting for helm hooks termination"). + DoError(func() error { + return multitrack.Multitrack(kube.Client, *specs, multitrack.MultitrackOptions{ + StatusProgressPeriod: waiter.HooksStatusProgressPeriod, + Options: tracker.Options{ + Timeout: timeout, + LogsFromTime: waiter.LogsFromTime, + }, + DynamicClient: kube.DynamicClient, + DiscoveryClient: kube.CachedDiscoveryClient, + Mapper: kube.Mapper, + }) + }) } func asVersioned(info *resource.Info) runtime.Object { @@ -510,3 +373,136 @@ func (waiter *ResourcesWaiter) WaitUntilDeleted(ctx context.Context, specs []*he return elimination.TrackUntilEliminated(ctx, kube.DynamicClient, eliminationSpecs, elimination.EliminationTrackerOptions{Timeout: timeout, StatusProgressPeriod: waiter.StatusProgressPeriod}) }) } + +func makeMultitrackSpecsFromResList(ctx context.Context, resources helm_kube.ResourceList, timeout, statusProgressPeriod time.Duration) (*multitrack.MultitrackSpecs, error) { + specs := &multitrack.MultitrackSpecs{} + + for _, v := range resources { + switch value := asVersioned(v).(type) { + case *appsv1.Deployment: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.Deployments = append(specs.Deployments, *spec) + } + case *appsv1beta1.Deployment: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.Deployments = append(specs.Deployments, *spec) + } + case *appsv1beta2.Deployment: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.Deployments = append(specs.Deployments, *spec) + } + case *extensions.Deployment: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "deploy") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.Deployments = append(specs.Deployments, *spec) + } + case *extensions.DaemonSet: + // TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster. + // TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok. + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.DaemonSets = append(specs.DaemonSets, *spec) + } + case *appsv1.DaemonSet: + // TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster. + // TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok. + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.DaemonSets = append(specs.DaemonSets, *spec) + } + case *appsv1beta2.DaemonSet: + // TODO: multiplier equals 3 because typically there are only 3 nodes in the cluster. + // TODO: It is better to fetch number of nodes dynamically, but in the most cases multiplier=3 will work ok. + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 3, defaultPerReplica: 1}, "ds") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.DaemonSets = append(specs.DaemonSets, *spec) + } + case *appsv1.StatefulSet: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.StatefulSets = append(specs.StatefulSets, *spec) + } + case *appsv1beta1.StatefulSet: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.StatefulSets = append(specs.StatefulSets, *spec) + } + case *appsv1beta2.StatefulSet: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: extractSpecReplicas(value.Spec.Replicas), defaultPerReplica: 1}, "sts") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.StatefulSets = append(specs.StatefulSets, *spec) + } + case *batchv1.Job: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "job") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.Jobs = append(specs.Jobs, *spec) + } + case *flaggerv1beta1.Canary: + spec, err := makeMultitrackSpec(ctx, &value.ObjectMeta, allowedFailuresCountOptions{multiplier: 1, defaultPerReplica: 0}, "canary") + if err != nil { + return nil, fmt.Errorf("cannot track %s %s: %w", value.Kind, value.Name, err) + } + if spec != nil { + specs.Canaries = append(specs.Canaries, *spec) + } + default: + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(v.Object) + if err != nil { + return nil, fmt.Errorf("error converting object to unstructured: %w", err) + } + + object := unstructured.Unstructured{ + Object: obj, + } + + resourceID := resid.NewResourceID(object.GetName(), object.GroupVersionKind(), resid.NewResourceIDOptions{ + Namespace: object.GetNamespace(), + }) + + if spec, err := makeGenericSpec(ctx, resourceID, statusProgressPeriod, timeout, object.GetAnnotations()); err != nil { + logboek.Context(ctx).Warn().LogLn() + logboek.Context(ctx).Warn().LogF("WARNING %s\n", err) + } else if spec != nil { + specs.Generics = append(specs.Generics, spec) + } + } + } + + return specs, nil +}