Skip to content

Commit

Permalink
fix(generic): add logging and don't retry fatal errors on List/Watch
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lesikov <ilya@lesikov.com>
  • Loading branch information
ilya-lesikov committed Jul 26, 2022
1 parent fafba74 commit 246d454
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 165 deletions.
60 changes: 60 additions & 0 deletions pkg/tracker/generic/common.go
@@ -1,5 +1,65 @@
package generic

import (
"context"
"fmt"
"io"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"

"github.com/werf/logboek"
)

func init() {
initResourceStatusJSONPathsByPriority()
}

type UnrecoverableWatchError struct {
ResName string
Err error
}

func (e UnrecoverableWatchError) Error() string {
return fmt.Sprintf("unrecoverable watch error for %q: %s", e.ResName, e.Err.Error())
}

func (e UnrecoverableWatchError) Unwrap() error {
return e.Err
}

type SetWatchErrorHandlerOptions struct {
FatalWatchErr *UnrecoverableWatchError // If unrecoverable watch error occured it will be saved here.
}

func SetWatchErrorHandler(cancelFn context.CancelFunc, resName string, setWatchErrorHandler func(handler cache.WatchErrorHandler) error, opts SetWatchErrorHandlerOptions) error {
return setWatchErrorHandler(
func(r *cache.Reflector, err error) {
isExpiredError := func(err error) bool {
// In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
// apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
// and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}

switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
logboek.Context(context.Background()).Info().LogF("watch of %q closed with: %s\n", resName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
logboek.Context(context.Background()).Info().LogF("watch of %q closed with unexpected EOF: %s\n", resName, err)
default:
logboek.Context(context.Background()).Warn().LogF("failed to watch %q: %s\n", resName, err)
if opts.FatalWatchErr != nil {
*opts.FatalWatchErr = UnrecoverableWatchError{ResName: resName, Err: err}
}
cancelFn()
}
},
)
}
115 changes: 44 additions & 71 deletions pkg/tracker/generic/resource_events_watcher.go
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"

authorizationv1 "k8s.io/api/authorization/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -14,13 +13,10 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"

"github.com/werf/kubedog/pkg/tracker"
"github.com/werf/kubedog/pkg/tracker/debug"
"github.com/werf/kubedog/pkg/tracker/resid"
"github.com/werf/kubedog/pkg/utils"
"github.com/werf/logboek"
)

type ResourceEventsWatcher struct {
Expand All @@ -47,92 +43,69 @@ func NewResourceEventsWatcher(
}

func (i *ResourceEventsWatcher) Run(ctx context.Context, eventsCh chan<- *corev1.Event) error {
fieldsSet, eventsNs := utils.EventFieldSelectorFromUnstructured(i.object)

for _, verb := range []string{"list", "watch"} {
response, err := i.client.AuthorizationV1().SelfSubjectAccessReviews().Create(
ctx,
&authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Verb: verb,
Namespace: eventsNs,
Resource: "events",
Version: "v1",
},
},
},
metav1.CreateOptions{},
)
runCtx, runCancelFn := context.WithCancel(ctx)

if debug.Debug() {
if err != nil {
fmt.Printf("SelfSubjectAccessReview error for %q: %+v\n", i.ResourceID, err)
} else {
fmt.Printf("SelfSubjectAccessReview for %q: %+v\n", i.ResourceID, response)
}
}
i.generateResourceInitialEventsUIDs(runCtx)

if err != nil {
logboek.Context(context.Background()).Default().LogF("Won't track %q events: error checking %q access: %s\n", i.ResourceID, verb, err)
return nil
} else if !response.Status.Allowed || response.Status.Denied {
logboek.Context(context.Background()).Default().LogF("Won't track %q events: no %q access.\n", i.ResourceID, verb)
return nil
}
}

i.generateResourceInitialEventsUIDs(ctx)
fieldsSet, eventsNs := utils.EventFieldSelectorFromUnstructured(i.object)

setOptionsFunc := func(options *metav1.ListOptions) {
tweakListOptsFn := func(options *metav1.ListOptions) {
options.FieldSelector = fieldsSet.AsSelector().String()
}

listWatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
setOptionsFunc(&options)
return i.client.CoreV1().Events(eventsNs).List(ctx, options)
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
tweakListOptsFn(&options)
return i.client.CoreV1().Events(eventsNs).List(runCtx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
tweakListOptsFn(&options)
return i.client.CoreV1().Events(eventsNs).Watch(runCtx, options)
},
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
setOptionsFunc(&options)
return i.client.CoreV1().Events(eventsNs).Watch(ctx, options)
&corev1.Event{},
0,
cache.Indexers{},
)

informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if debug.Debug() {
fmt.Printf(" add event: %#v\n", i.ResourceID)
}
i.handleEventStateChange(runCtx, obj.(*corev1.Event), eventsCh)
},
UpdateFunc: func(oldObj, newObj interface{}) {
if debug.Debug() {
fmt.Printf(" update event: %#v\n", i.ResourceID)
}
i.handleEventStateChange(runCtx, newObj.(*corev1.Event), eventsCh)
},
DeleteFunc: func(obj interface{}) {
if debug.Debug() {
fmt.Printf(" delete event: %#v\n", i.ResourceID)
}
},
},
)

if err := SetWatchErrorHandler(runCancelFn, i.ResourceID.String(), informer.SetWatchErrorHandler, SetWatchErrorHandlerOptions{}); err != nil {
return fmt.Errorf("error setting watch error handler: %w", err)
}

if debug.Debug() {
fmt.Printf("> %s run event informer\n", i.ResourceID)
}

_, err := watchtools.UntilWithSync(ctx, listWatch, &corev1.Event{}, nil, func(watchEvent watch.Event) (bool, error) {
if debug.Debug() {
fmt.Printf(" %s event: %#v\n", i.ResourceID, watchEvent.Type)
}

var event *corev1.Event
if watchEvent.Type != watch.Error {
var ok bool
event, ok = watchEvent.Object.(*corev1.Event)
if !ok {
return true, fmt.Errorf("TRACK EVENT expect *corev1.Event object, got %T", watchEvent.Object)
}
}

switch watchEvent.Type {
case watch.Added, watch.Modified:
i.handleEventStateChange(ctx, event, eventsCh)
case watch.Deleted:
case watch.Error:
return true, fmt.Errorf("event watch error: %v", watchEvent.Object)
}

return false, nil
})
informer.Run(runCtx.Done())

if debug.Debug() {
fmt.Printf(" %s event informer DONE\n", i.ResourceID)
}

return tracker.AdaptInformerError(err)
return nil
}

func (i *ResourceEventsWatcher) generateResourceInitialEventsUIDs(ctx context.Context) {
Expand Down
131 changes: 40 additions & 91 deletions pkg/tracker/generic/resource_state_watcher.go
Expand Up @@ -4,23 +4,17 @@ import (
"context"
"fmt"

authorizationv1 "k8s.io/api/authorization/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"

"github.com/werf/kubedog/pkg/tracker"
"github.com/werf/kubedog/pkg/tracker/debug"
"github.com/werf/kubedog/pkg/tracker/resid"
"github.com/werf/logboek"
)

type ResourceStateWatcher struct {
Expand All @@ -46,106 +40,61 @@ func NewResourceStateWatcher(
}

func (w *ResourceStateWatcher) Run(ctx context.Context, resourceAddedCh, resourceModifiedCh, resourceDeletedCh chan<- *unstructured.Unstructured) error {
runCtx, runCancelFn := context.WithCancel(ctx)

gvr, err := w.ResourceID.GroupVersionResource(w.mapper)
if err != nil {
return fmt.Errorf("error getting GroupVersionResource: %w", err)
}

for _, verb := range []string{"list", "watch"} {
response, err := w.client.AuthorizationV1().SelfSubjectAccessReviews().Create(
ctx,
&authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Verb: verb,
Resource: gvr.Resource,
Namespace: w.ResourceID.Namespace,
Group: gvr.Group,
Version: gvr.Version,
Name: w.ResourceID.Name,
},
},
},
metav1.CreateOptions{},
)

if debug.Debug() {
if err != nil {
fmt.Printf("SelfSubjectAccessReview error for %q: %+v\n", w.ResourceID, err)
} else {
fmt.Printf("SelfSubjectAccessReview for %q: %+v\n", w.ResourceID, response)
}
}

if err != nil {
logboek.Context(context.Background()).Warn().LogF("Won't track %q: error checking %q access: %s\n", w.ResourceID, verb, err)
return nil
} else if !response.Status.Allowed || response.Status.Denied {
logboek.Context(context.Background()).Warn().LogF("Won't track %q: no %q access.\n", w.ResourceID, verb)
return nil
}
}

resClient, err := w.resourceClient(gvr)
if err != nil {
return fmt.Errorf("error getting resource client: %w", err)
}

setOptionsFunc := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", w.ResourceID.Name).String()
}

listWatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
setOptionsFunc(&options)
return resClient.List(ctx, options)
informer := dynamicinformer.NewFilteredDynamicInformer(
w.dynamicClient,
*gvr,
w.ResourceID.Namespace,
0,
cache.Indexers{},
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", w.ResourceID.Name).String()
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
setOptionsFunc(&options)
return resClient.Watch(ctx, options)
)

informer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if debug.Debug() {
fmt.Printf(" add state event: %#v\n", w.ResourceID)
}
resourceAddedCh <- obj.(*unstructured.Unstructured)
},
UpdateFunc: func(oldObj, newObj interface{}) {
if debug.Debug() {
fmt.Printf(" update state event: %#v\n", w.ResourceID)
}
resourceModifiedCh <- newObj.(*unstructured.Unstructured)
},
DeleteFunc: func(obj interface{}) {
if debug.Debug() {
fmt.Printf(" delete state event: %#v\n", w.ResourceID)
}
resourceDeletedCh <- obj.(*unstructured.Unstructured)
},
},
)

fatalWatchErr := &UnrecoverableWatchError{}
if err := SetWatchErrorHandler(runCancelFn, w.ResourceID.String(), informer.Informer().SetWatchErrorHandler, SetWatchErrorHandlerOptions{FatalWatchErr: fatalWatchErr}); err != nil {
return fmt.Errorf("error setting watch error handler: %w", err)
}

if debug.Debug() {
fmt.Printf(" %s resource watcher STARTED\n", w.ResourceID)
}

_, err = watchtools.UntilWithSync(ctx, listWatch, &unstructured.Unstructured{}, nil,
func(event watch.Event) (bool, error) {
if debug.Debug() {
fmt.Printf(" %s event: %#v\n", w.ResourceID, event.Type)
}

switch event.Type {
case watch.Added:
resourceAddedCh <- event.Object.(*unstructured.Unstructured)
case watch.Modified:
resourceModifiedCh <- event.Object.(*unstructured.Unstructured)
case watch.Deleted:
resourceDeletedCh <- event.Object.(*unstructured.Unstructured)
case watch.Error:
return true, fmt.Errorf("watch error: %v", event.Object)
}

return false, nil
},
)
informer.Informer().Run(runCtx.Done())

if debug.Debug() {
fmt.Printf(" %s resource watcher DONE\n", w.ResourceID)
}

return tracker.AdaptInformerError(err)
}

func (w *ResourceStateWatcher) resourceClient(gvr *schema.GroupVersionResource) (dynamic.ResourceInterface, error) {
resClient := w.dynamicClient.Resource(*gvr)

if namespaced, err := w.ResourceID.Namespaced(w.mapper); err != nil {
return nil, fmt.Errorf("error checking whether resource is namespaced: %w", err)
} else if namespaced {
resClient.Namespace(w.ResourceID.Namespace)
}

return resClient, nil
return fatalWatchErr
}

0 comments on commit 246d454

Please sign in to comment.