Skip to content

Commit

Permalink
fix: dynamic tracker hotfixes
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lesikov <ilya@lesikov.com>
  • Loading branch information
ilya-lesikov committed Jan 25, 2024
1 parent f0297a3 commit 3817e3d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 38 deletions.
17 changes: 16 additions & 1 deletion pkg/tracker/deployment/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ func (d *Tracker) Track(ctx context.Context) (err error) {
if err != nil {
return err
}
if len(d.knownReplicaSets) == 0 {
rsNew = true
}

d.StatusGeneration++
newPodsNames, err := d.getNewPodsNames()
Expand Down Expand Up @@ -225,6 +228,9 @@ func (d *Tracker) Track(ctx context.Context) (err error) {
if err != nil {
return err
}
if len(d.knownReplicaSets) == 0 {
rsNew = true
}
status := NewDeploymentStatus(d.lastObject, d.StatusGeneration, d.State == tracker.ResourceFailed, d.failedReason, d.podStatuses, newPodsNames)

d.AddedPod <- PodAddedReport{
Expand Down Expand Up @@ -292,6 +298,9 @@ func (d *Tracker) Track(ctx context.Context) (err error) {
if err != nil {
return err
}
if len(d.knownReplicaSets) == 0 {
rsNew = true
}

rsChunk := &replicaset.ReplicaSetPodLogChunk{
PodLogChunk: &pod.PodLogChunk{
Expand Down Expand Up @@ -329,6 +338,9 @@ func (d *Tracker) Track(ctx context.Context) (err error) {
if err != nil {
return err
}
if len(d.knownReplicaSets) == 0 {
rsNew = true
}

d.PodError <- PodErrorReport{
ReplicaSetPodError: replicaset.ReplicaSetPodError{
Expand Down Expand Up @@ -375,6 +387,9 @@ func (d *Tracker) getNewPodsNames() ([]string, error) {
if err != nil {
return nil, err
}
if len(d.knownReplicaSets) == 0 {
rsNew = true
}
if rsNew {
res = append(res, podName)
}
Expand Down Expand Up @@ -542,8 +557,8 @@ func (d *Tracker) handleDeploymentState(ctx context.Context, object *appsv1.Depl

switch d.State {
case tracker.Initial:
d.runPodsInformer(ctx, object)
d.runReplicaSetsInformer(ctx, object)
d.runPodsInformer(ctx, object)

if os.Getenv("KUBEDOG_DISABLE_EVENTS") != "1" {
d.runEventsInformer(ctx, object)
Expand Down
11 changes: 7 additions & 4 deletions pkg/trackers/dyntracker/dynamic_absence_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ func (t *DynamicAbsenceTracker) Track(ctx context.Context) error {
resourceClient = t.dynamicClient.Resource(gvr)
}

resourceHumanID, err := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("get resource human ID: %w", err)
}
resourceHumanID := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper)

if err := wait.PollImmediate(t.pollPeriod, t.timeout, func() (bool, error) {
if _, err := resourceClient.Get(ctx, name, metav1.GetOptions{}); err != nil {
Expand All @@ -106,5 +103,11 @@ func (t *DynamicAbsenceTracker) Track(ctx context.Context) error {
return fmt.Errorf("poll resource %q: %w", resourceHumanID, err)
}

t.taskState.RWTransaction(func(ats *statestore.AbsenceTaskState) {
ats.ResourceState().RWTransaction(func(rs *statestore.ResourceState) {
rs.SetStatus(statestore.ResourceStatusDeleted)
})
})

return nil
}
11 changes: 7 additions & 4 deletions pkg/trackers/dyntracker/dynamic_presence_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ func (t *DynamicPresenceTracker) Track(ctx context.Context) error {
resourceClient = t.dynamicClient.Resource(gvr)
}

resourceHumanID, err := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("get resource human ID: %w", err)
}
resourceHumanID := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper)

if err := wait.PollImmediate(t.pollPeriod, t.timeout, func() (bool, error) {
if _, err := resourceClient.Get(ctx, name, metav1.GetOptions{}); err != nil {
Expand All @@ -108,5 +105,11 @@ func (t *DynamicPresenceTracker) Track(ctx context.Context) error {
return fmt.Errorf("poll resource %q: %w", resourceHumanID, err)
}

t.taskState.RWTransaction(func(pts *statestore.PresenceTaskState) {
pts.ResourceState().RWTransaction(func(rs *statestore.ResourceState) {
rs.SetStatus(statestore.ResourceStatusCreated)
})
})

return nil
}
62 changes: 39 additions & 23 deletions pkg/trackers/dyntracker/dynamic_readiness_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ import (
)

type DynamicReadinessTracker struct {
taskState *util.Concurrent[*statestore.ReadinessTaskState]
logStore *util.Concurrent[*logstore.LogStore]
tracker any
resourceName string
resourceNamespace string
resourceGVK schema.GroupVersionKind
resourceHumanID string
taskState *util.Concurrent[*statestore.ReadinessTaskState]
logStore *util.Concurrent[*logstore.LogStore]
mapper meta.ResettableRESTMapper
tracker any

timeout time.Duration
noActivityTimeout time.Duration
Expand All @@ -57,7 +62,7 @@ func NewDynamicReadinessTracker(
discoveryClient discovery.CachedDiscoveryInterface,
mapper meta.ResettableRESTMapper,
opts DynamicReadinessTrackerOptions,
) *DynamicReadinessTracker {
) (*DynamicReadinessTracker, error) {
timeout := opts.Timeout
captureLogsFromTime := opts.CaptureLogsFromTime

Expand All @@ -84,6 +89,12 @@ func NewDynamicReadinessTracker(
resourceGVK = ts.GroupVersionKind()
})

if namespaced, err := util.IsNamespaced(resourceGVK, mapper); err != nil {
return nil, fmt.Errorf("check if namespaced: %w", err)
} else if !namespaced {
resourceNamespace = ""
}

var tracker any
switch resourceGVK.GroupKind() {
case schema.GroupKind{Group: "apps", Kind: "Deployment"}, schema.GroupKind{Group: "extensions", Kind: "Deployment"}:
Expand Down Expand Up @@ -130,8 +141,13 @@ func NewDynamicReadinessTracker(
}

return &DynamicReadinessTracker{
resourceName: resourceName,
resourceNamespace: resourceNamespace,
resourceGVK: resourceGVK,
resourceHumanID: util.ResourceHumanID(resourceName, resourceNamespace, resourceGVK, mapper),
taskState: taskState,
logStore: logStore,
mapper: mapper,
tracker: tracker,
timeout: timeout,
noActivityTimeout: noActivityTimeout,
Expand All @@ -141,7 +157,7 @@ func NewDynamicReadinessTracker(
ignoreLogs: opts.IgnoreLogs,
ignoreLogsForContainers: opts.IgnoreLogsForContainers,
saveEvents: opts.SaveEvents,
}
}, nil
}

type DynamicReadinessTrackerOptions struct {
Expand Down Expand Up @@ -203,7 +219,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker *
select {
case err := <-trackErrCh:
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
return fmt.Errorf("track: %w", err)
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
}

return nil
Expand Down Expand Up @@ -334,7 +350,7 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker
select {
case err := <-trackErrCh:
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
return fmt.Errorf("track: %w", err)
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
}

return nil
Expand Down Expand Up @@ -451,7 +467,7 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d
select {
case err := <-trackErrCh:
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
return fmt.Errorf("track: %w", err)
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
}

return nil
Expand Down Expand Up @@ -568,7 +584,7 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra
select {
case err := <-trackErrCh:
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
return fmt.Errorf("track: %w", err)
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
}

return nil
Expand Down Expand Up @@ -685,7 +701,7 @@ func (t *DynamicReadinessTracker) trackCanary(ctx context.Context, tracker *cana
select {
case err := <-trackErrCh:
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
return fmt.Errorf("track: %w", err)
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
}

return nil
Expand Down Expand Up @@ -768,7 +784,7 @@ func (t *DynamicReadinessTracker) trackGeneric(ctx context.Context, tracker *gen
select {
case err := <-trackErrCh:
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
return fmt.Errorf("track: %w", err)
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
}

return nil
Expand Down Expand Up @@ -842,7 +858,7 @@ func (t *DynamicReadinessTracker) handlePodsFromDeploymentStatus(status *deploym
taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk)

taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -857,7 +873,7 @@ func (t *DynamicReadinessTracker) handlePodsFromStatefulSetStatus(status *statef
taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk)

taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -872,7 +888,7 @@ func (t *DynamicReadinessTracker) handlePodsFromDaemonSetStatus(status *daemonse
taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk)

taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -883,7 +899,7 @@ func (t *DynamicReadinessTracker) handlePodsFromJobStatus(status *job.JobStatus,
taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk)

taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -898,7 +914,7 @@ func (t *DynamicReadinessTracker) handlePodsFromDeploymentPodAddedReport(report

for _, pod := range report.DeploymentStatus.Pods {
taskState.ResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -913,7 +929,7 @@ func (t *DynamicReadinessTracker) handlePodsFromStatefulSetPodAddedReport(report

for _, pod := range report.StatefulSetStatus.Pods {
taskState.ResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -928,7 +944,7 @@ func (t *DynamicReadinessTracker) handlePodsFromDaemonSetPodAddedReport(report *

for _, pod := range report.DaemonSetStatus.Pods {
taskState.ResourceState(report.Pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand All @@ -939,7 +955,7 @@ func (t *DynamicReadinessTracker) handlePodsFromJobPodAddedReport(report *job.Po

for _, pod := range report.JobStatus.Pods {
taskState.ResourceState(report.PodName, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
setPodStatusAttribute(rs, pod.PodStatus.Phase)
setPodStatusAttribute(rs, pod.StatusIndicator.Value)
})
}
}
Expand Down Expand Up @@ -1212,7 +1228,7 @@ func (t *DynamicReadinessTracker) handleTaskStateStatus(taskState *statestore.Re
return
case statestore.ReadinessTaskStatusFailed:
abort = true
abortErr = fmt.Errorf("waiting for readiness failed")
abortErr = fmt.Errorf("waiting for resource %q readiness failed", t.resourceHumanID)
return
default:
panic("unexpected status")
Expand All @@ -1234,15 +1250,15 @@ func setReplicasAttribute(resourceState *statestore.ResourceState, replicas int)
}
}

func setPodStatusAttribute(resourceState *statestore.ResourceState, phase corev1.PodPhase) {
func setPodStatusAttribute(resourceState *statestore.ResourceState, status string) {
attributes := resourceState.Attributes()

if statusAttr, found := lo.Find(attributes, func(attr statestore.Attributer) bool {
return attr.Name() == statestore.AttributeNameStatus
}); found {
statusAttr.(*statestore.Attribute[string]).Value = string(phase)
statusAttr.(*statestore.Attribute[string]).Value = status
} else {
statusAttr = statestore.NewAttribute(statestore.AttributeNameStatus, string(phase))
statusAttr = statestore.NewAttribute(statestore.AttributeNameStatus, status)
resourceState.AddAttribute(statusAttr)
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/trackers/dyntracker/statestore/absence_task_state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package statestore

import (
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/werf/kubedog/pkg/trackers/dyntracker/util"
Expand All @@ -14,6 +15,7 @@ type AbsenceTaskState struct {
absentConditions []AbsenceTaskConditionFn
failureConditions []AbsenceTaskConditionFn

uuid string
resourceState *util.Concurrent[*ResourceState]
}

Expand All @@ -23,12 +25,15 @@ func NewAbsenceTaskState(name, namespace string, groupVersionKind schema.GroupVe
absentConditions := initAbsenceTaskStateAbsentConditions()
failureConditions := []AbsenceTaskConditionFn{}

uuid := uuid.NewString()

return &AbsenceTaskState{
name: name,
namespace: namespace,
groupVersionKind: groupVersionKind,
absentConditions: absentConditions,
failureConditions: failureConditions,
uuid: uuid,
resourceState: resourceState,
}
}
Expand Down Expand Up @@ -67,6 +72,10 @@ func (s *AbsenceTaskState) Status() AbsenceTaskStatus {
return AbsenceTaskStatusAbsent
}

func (s *AbsenceTaskState) UUID() string {
return s.uuid
}

func initAbsenceTaskStateAbsentConditions() []AbsenceTaskConditionFn {
var absentConditions []AbsenceTaskConditionFn

Expand Down

0 comments on commit 3817e3d

Please sign in to comment.