Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #35 from flanksource/moshloop
Browse files Browse the repository at this point in the history
refactor + add image registry mutating webhook
  • Loading branch information
moshloop committed Oct 15, 2020
2 parents d806e2c + d94c2b0 commit f363b1b
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 146 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
@@ -1,6 +1,7 @@
name: Publish Docker
name: Test
on:
push:
pull_request:
jobs:
build:
runs-on: ubuntu-latest
Expand Down
7 changes: 5 additions & 2 deletions Makefile
Expand Up @@ -31,7 +31,10 @@ all: manager

# Run tests
test: fmt vet
go test ./... -coverprofile cover.out
go test ./... -coverprofile cover.out -v

e2e: fmt vet
TEST_E2E=true go test ./... -coverprofile cover.out -v

# Build manager binary
manager: fmt vet
Expand All @@ -52,7 +55,7 @@ generate: controller-gen
# Generate manifests e.g. CRD, RBAC etc.
$(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager paths="./pkg/..." output:crd:artifacts:config=config/crds/bases output:rbac:artifacts:config=config/operator/rbac
# set image name and tag
# Generate an all-in-one version including the operator manifests
# Generate an all-in-one version including the operator manifests
kubectl kustomize config/operator/default > config/deploy/manifests.yaml

# Run go fmt against code
Expand Down
14 changes: 11 additions & 3 deletions cmd/manager/main.go
Expand Up @@ -56,11 +56,13 @@ func main() {
var metricsAddr string
var enableLeaderElection bool
var cleanupInterval, annotationInterval time.Duration
var annotations string
var enableClusterResourceQuota bool
var oauth2ProxySvcName string
var oauth2ProxySvcNamespace string
var domain string
var registryWhitelist string
var annotations string
cfg := platformv1.PodMutaterConfig{}

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")

Expand All @@ -75,10 +77,16 @@ func main() {

flag.StringVar(&oauth2ProxySvcName, "oauth2-proxy-service-name", "", "Name of oauth2-proxy service")
flag.StringVar(&oauth2ProxySvcNamespace, "oauth2-proxy-service-namespace", "", "Name of oauth2-proxy service namespace")
flag.StringVar(&cfg.DefaultRegistryPrefix, "default-registry-prefix", "", "A default registry prefix path to apply to all pods")
flag.StringVar(&cfg.DefaultImagePullSecret, "defailt-image-pull-secret", "", "Default dmage pull secret to apply to all pods")
flag.StringVar(&registryWhitelist, "registry-whitelist", "", "A list of image prefixes to ignore")
flag.StringVar(&domain, "domain", "", "Domain used by platform")

flag.Parse()

cfg.Annotations = strings.Split(annotations, ",")
cfg.RegistryWhitelist = strings.Split(registryWhitelist, ",")

ctrl.SetLogger(zap.New(func(o *zap.Options) {
o.Development = true
}))
Expand Down Expand Up @@ -108,7 +116,7 @@ func main() {
}
}

if err := podannotator.Add(mgr, annotationInterval, strings.Split(annotations, ",")); err != nil {
if err := podannotator.Add(mgr, annotationInterval, cfg); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PodAnnotator")
os.Exit(1)
}
Expand All @@ -127,7 +135,7 @@ func main() {
mtx := &sync.Mutex{}

setupLog.Info("registering webhooks to the webhook server")
hookServer.Register("/mutate-v1-pod", &webhook.Admission{Handler: platformv1.PodAnnotatorMutateWebhook(mgr.GetClient(), strings.Split(annotations, ","))})
hookServer.Register("/mutate-v1-pod", &webhook.Admission{Handler: platformv1.PodAnnotatorMutateWebhook(mgr.GetClient(), cfg)})
hookServer.Register("/mutate-v1-ingress", &webhook.Admission{Handler: platformv1.IngressAnnotatorMutateWebhook(mgr.GetClient(), oauth2ProxySvcName, oauth2ProxySvcNamespace, domain)})
hookServer.Register("/validate-clusterresourcequota-platform-flanksource-com-v1", platformv1.ClusterResourceQuotaValidatingWebhook(mtx, enableClusterResourceQuota))
hookServer.Register("/validate-resourcequota-v1", platformv1.ResourceQuotaValidatingWebhook(mtx, enableClusterResourceQuota))
Expand Down
70 changes: 56 additions & 14 deletions pkg/apis/platform/v1/podannotator_mutatewebhook.go
Expand Up @@ -3,42 +3,53 @@ package v1
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

func PodAnnotatorMutateWebhook(client client.Client, annotations []string) *admission.Webhook {
type PodMutaterConfig struct {
AnnotationsMap map[string]bool
Annotations []string
RegistryWhitelist []string
DefaultRegistryPrefix string
DefaultImagePullSecret string
}

func PodAnnotatorMutateWebhook(client client.Client, cfg PodMutaterConfig) *admission.Webhook {
return &admission.Webhook{
Handler: NewPodAnnotatorHandler(client, annotations),
Handler: NewPodAnnotatorHandler(client, cfg),
}
}

type podAnnotatorHandler struct {
Client client.Client
decoder *admission.Decoder
annotations map[string]bool
Client client.Client
decoder *admission.Decoder
Log logr.Logger
PodMutaterConfig
}

// +kubebuilder:webhook:path=/mutate-v1-pod,mutating=true,failurePolicy=ignore,groups="",resources=pods,verbs=create;update,versions=v1,name=annotate-pods-v1.platform.flanksource.com

func NewPodAnnotatorHandler(client client.Client, annotations []string) *podAnnotatorHandler {
annotationsMap := map[string]bool{}

for _, a := range annotations {
annotationsMap[a] = true
func NewPodAnnotatorHandler(client client.Client, cfg PodMutaterConfig) *podAnnotatorHandler {
cfg.AnnotationsMap = make(map[string]bool)
for _, a := range cfg.Annotations {
cfg.AnnotationsMap[a] = true
}

return &podAnnotatorHandler{Client: client, annotations: annotationsMap}
return &podAnnotatorHandler{Client: client, PodMutaterConfig: cfg, Log: logf.Log.WithName("pod-mutator")}
}

func (a *podAnnotatorHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
pod := &corev1.Pod{}
err := a.decoder.Decode(req, pod)
a.Log.Info("Mutating", "pod", pod)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
Expand All @@ -57,13 +68,44 @@ func (a *podAnnotatorHandler) Handle(ctx context.Context, req admission.Request)
}

for k, v := range namespace.Annotations {
if _, f := a.annotations[k]; f { // if annotation is whitelisted
if _, f := a.AnnotationsMap[k]; f { // if annotation is whitelisted
if _, podHasAnnotation := pod.Annotations[k]; !podHasAnnotation { // if pod already has annotation, don't inherit
pod.Annotations[k] = v
}
}
}

containers:
for _, container := range pod.Spec.Containers {
for _, reg := range a.RegistryWhitelist {
if strings.HasPrefix(container.Image, reg) {
continue containers
}
}
to := fmt.Sprintf("%s/%s", a.DefaultRegistryPrefix, container.Image)
a.Log.V(2).Info("Updating image", "from", container.Image, "to", to)
container.Image = to
}

initContainers:
for _, container := range pod.Spec.InitContainers {
for _, reg := range a.RegistryWhitelist {
if strings.HasPrefix(container.Image, reg) {
continue initContainers
}
}
to := fmt.Sprintf("%s/%s", a.DefaultRegistryPrefix, container.Image)
a.Log.V(2).Info("Updating image", "from", container.Image, "to", to)
container.Image = to
container.Image = fmt.Sprintf("%s/%s", a.DefaultRegistryPrefix, container.Image)
}

if len(pod.Spec.ImagePullSecrets) == 0 && a.DefaultImagePullSecret != "" {
a.Log.V(2).Info("Injecting image pull secret", "name", a.DefaultImagePullSecret)
pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{
Name: a.DefaultImagePullSecret,
}}
}
marshaledPod, err := json.Marshal(pod)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
Expand Down
23 changes: 9 additions & 14 deletions pkg/controllers/cleanup/cleanup_controller.go
Expand Up @@ -52,9 +52,8 @@ func Add(mgr manager.Manager, interval time.Duration) error {

func newReconciler(mgr manager.Manager, interval time.Duration) reconcile.Reconciler {
return &ReconcileCleanup{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),

Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
interval: interval,
}
}
Expand All @@ -65,14 +64,10 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

if err := c.Watch(
return c.Watch(
&source.Kind{Type: &corev1.Namespace{}}, &handler.EnqueueRequestForObject{},
predicate.Funcs{CreateFunc: onCreate, UpdateFunc: onUpdate},
); err != nil {
return err
}

return nil
)
}

var _ reconcile.Reconciler = &ReconcileCleanup{}
Expand Down Expand Up @@ -103,7 +98,6 @@ func parseDuration(expiry string) (*time.Duration, error) {

func (r *ReconcileCleanup) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.Background()

namespace := corev1.Namespace{}
if err := r.Get(ctx, request.NamespacedName, &namespace); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -119,6 +113,9 @@ func (r *ReconcileCleanup) Reconcile(request reconcile.Request) (reconcile.Resul
}

expiry := namespace.Labels[cleanupLabel]
if expiry == "" {
return reconcile.Result{}, nil
}
duration, err := parseDuration(expiry)
if err != nil {
log.Error(err, "Invalid duration for namespace", "namespace", namespace.Name, "expiry", expiry)
Expand All @@ -127,11 +124,11 @@ func (r *ReconcileCleanup) Reconcile(request reconcile.Request) (reconcile.Resul

expiresOn := namespace.GetCreationTimestamp().Add(*duration)
if expiresOn.Before(time.Now()) {
log.V(1).Info("Deleting namespace", "namespace", namespace.Name)
log.V(1).Info("Deleting namespace", "namespace", namespace.Name, "expiry", expiry)

if err := r.Delete(ctx, &namespace); err != nil {
log.Error(err, "Failed to delete namespace", "namespace", namespace.Name)
return reconcile.Result{}, err
return reconcile.Result{Requeue: true}, err
}
}

Expand All @@ -143,15 +140,13 @@ func (r *ReconcileCleanup) Reconcile(request reconcile.Request) (reconcile.Resul
// objects are queued only in case the label exists
func onCreate(e event.CreateEvent) bool {
namespace := e.Object.(*corev1.Namespace)

labels := namespace.GetLabels()
_, isSet := labels[cleanupLabel]
return isSet
}

func onUpdate(e event.UpdateEvent) bool {
namespace := e.ObjectNew.(*corev1.Namespace)

labels := namespace.GetLabels()
_, isSet := labels[cleanupLabel]
return isSet
Expand Down
Expand Up @@ -111,7 +111,6 @@ type ReconcileClusterResourceQuota struct {
// +kubebuilder:rbac:groups=platform.flanksource.com,resources=clusterresourcequotas,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=platform.flanksource.com,resources=clusterresourcequotas/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=resourcequotas,verbs=get;list;watch

func (r *ReconcileClusterResourceQuota) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.Background()

Expand Down
14 changes: 6 additions & 8 deletions pkg/controllers/podannotator/common.go
@@ -1,13 +1,11 @@
package podannotator

import v1 "k8s.io/api/core/v1"

func updatePodAnnotations(ns v1.Namespace, annotations []string, pods ...v1.Pod) []v1.Pod {
annotationsMap := map[string]bool{}
for _, a := range annotations {
annotationsMap[a] = true
}
import (
platformv1 "github.com/flanksource/platform-operator/pkg/apis/platform/v1"
v1 "k8s.io/api/core/v1"
)

func updatePods(ns v1.Namespace, cfg platformv1.PodMutaterConfig, pods ...v1.Pod) []v1.Pod {
changedPods := []v1.Pod{}

if ns.Annotations == nil {
Expand All @@ -22,7 +20,7 @@ func updatePodAnnotations(ns v1.Namespace, annotations []string, pods ...v1.Pod)
podChanged := false

for k, v := range ns.Annotations {
if _, f := annotationsMap[k]; f { // if annotation is whitelisted
if _, f := cfg.AnnotationsMap[k]; f { // if annotation is whitelisted
if _, podHasAnnotation := pod.Annotations[k]; !podHasAnnotation { // if pod already has annotation, don't inherit
pod.Annotations[k] = v
podChanged = true
Expand Down
30 changes: 10 additions & 20 deletions pkg/controllers/podannotator/namespace_reconciler.go
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"time"

platformv1 "github.com/flanksource/platform-operator/pkg/apis/platform/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -22,17 +22,15 @@ type NamespaceReconciler struct {

// interval is the time after which the controller requeue the reconcile key
interval time.Duration
// list of whitelisted annotations
annotations []string
cfg platformv1.PodMutaterConfig
}

func newNamespaceReconciler(mgr manager.Manager, interval time.Duration, annotations []string) reconcile.Reconciler {
func newNamespaceReconciler(mgr manager.Manager, interval time.Duration, cfg platformv1.PodMutaterConfig) reconcile.Reconciler {
return &NamespaceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),

interval: interval,
annotations: annotations,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
interval: interval,
cfg: cfg,
}
}

Expand All @@ -42,19 +40,11 @@ func addNamespaceReconciler(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

if err := c.Watch(
&source.Kind{Type: &corev1.Namespace{}}, &handler.EnqueueRequestForObject{},
predicate.Funcs{CreateFunc: onCreate, UpdateFunc: onUpdate},
); err != nil {
return err
}

return nil
return c.Watch(&source.Kind{Type: &corev1.Namespace{}}, &handler.EnqueueRequestForObject{})
}

// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;update

func (r *NamespaceReconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.Background()

Expand All @@ -71,7 +61,7 @@ func (r *NamespaceReconciler) Reconcile(request reconcile.Request) (reconcile.Re
return reconcile.Result{}, err
}

changedPods := updatePodAnnotations(ns, r.annotations, podList.Items...)
changedPods := updatePods(ns, r.cfg, podList.Items...)

for _, pod := range changedPods {
if err := r.Client.Update(ctx, &pod); err != nil {
Expand All @@ -80,6 +70,6 @@ func (r *NamespaceReconciler) Reconcile(request reconcile.Request) (reconcile.Re
}
}

log.V(1).Info("Requeue reconciliation", "interval", r.interval)
log.V(1).Info("Requeue reconciliation", "interval", r.interval, "namespace", ns.Name)
return reconcile.Result{RequeueAfter: r.interval}, nil
}

0 comments on commit f363b1b

Please sign in to comment.