Skip to content

Commit

Permalink
Merge pull request #123896 from aojea/revert-122541-headless_selector
Browse files Browse the repository at this point in the history
Revert "Implement a field selector for ClusterIP on Services"
  • Loading branch information
k8s-ci-robot committed Mar 12, 2024
2 parents 3409f05 + 7ab1ef6 commit 634fc1b
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 295 deletions.
17 changes: 0 additions & 17 deletions pkg/apis/core/v1/conversion.go
Expand Up @@ -97,9 +97,6 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
if err := AddFieldLabelConversionsForSecret(scheme); err != nil {
return err
}
if err := AddFieldLabelConversionsForService(scheme); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -491,20 +488,6 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error {
})
}

func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error {
return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"),
func(label, value string) (string, string, error) {
switch label {
case "metadata.namespace",
"metadata.name",
"spec.clusterIP":
return label, value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
}

var initContainerAnnotations = map[string]bool{
"pod.beta.kubernetes.io/init-containers": true,
"pod.alpha.kubernetes.io/init-containers": true,
Expand Down
6 changes: 1 addition & 5 deletions pkg/kubelet/kubelet.go
Expand Up @@ -456,11 +456,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
var serviceLister corelisters.ServiceLister
var serviceHasSynced cache.InformerSynced
if kubeDeps.KubeClient != nil {
// don't watch headless services, they are not needed since this informer is only used to create the environment variables for pods.
// See https://issues.k8s.io/122394
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", v1.ClusterIPNone).String()
}))
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0)
serviceLister = kubeInformers.Core().V1().Services().Lister()
serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
kubeInformers.Start(wait.NeverStop)
Expand Down
6 changes: 1 addition & 5 deletions pkg/registry/core/service/storage/storage.go
Expand Up @@ -88,7 +88,6 @@ func NewREST(
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Service{} },
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
PredicateFunc: svcreg.Matcher,
DefaultQualifiedResource: api.Resource("services"),
SingularQualifiedResource: api.Resource("service"),
ReturnDeletedObject: true,
Expand All @@ -100,10 +99,7 @@ func NewREST(

TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: svcreg.GetAttrs,
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
Expand Down
32 changes: 0 additions & 32 deletions pkg/registry/core/service/strategy.go
Expand Up @@ -18,16 +18,11 @@ package service

import (
"context"
"fmt"
"reflect"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/registry/generic"
pkgstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/legacyscheme"
Expand Down Expand Up @@ -171,33 +166,6 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
return nil
}

// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
service, ok := obj.(*api.Service)
if !ok {
return nil, nil, fmt.Errorf("not a service")
}
return service.Labels, SelectableFields(service), nil
}

// Matcher returns a selection predicate for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
return pkgstorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

// SelectableFields returns a field set that can be used for filter selection
func SelectableFields(service *api.Service) fields.Set {
objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, false)
serviceSpecificFieldsSet := fields.Set{
"spec.clusterIP": service.Spec.ClusterIP,
}
return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet)
}

// dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates
// are not enabled. The typical pattern is:
//
Expand Down
94 changes: 0 additions & 94 deletions pkg/registry/core/service/strategy_test.go
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
Expand Down Expand Up @@ -788,95 +786,3 @@ func TestDropTypeDependentFields(t *testing.T) {
})
}
}

func TestMatchService(t *testing.T) {
testCases := []struct {
name string
in *api.Service
fieldSelector fields.Selector
expectMatch bool
}{
{
name: "match on headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
expectMatch: true,
},
{
name: "no match on clusterIP service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
expectMatch: false,
},
{
name: "match on clusterIP service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
expectMatch: true,
},
{
name: "match on non-headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"),
expectMatch: true,
},
{
name: "match on any ClusterIP set service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""),
expectMatch: true,
},
{
name: "match on clusterIP IPv6 service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
expectMatch: true,
},
{
name: "no match on headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
expectMatch: false,
},
{
name: "no match on headless service",
in: &api.Service{
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
expectMatch: false,
},
{
name: "no match on empty service",
in: &api.Service{},
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
expectMatch: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
m := Matcher(labels.Everything(), testCase.fieldSelector)
result, err := m.Matches(testCase.in)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if result != testCase.expectMatch {
t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in)
}
})
}
}
142 changes: 0 additions & 142 deletions test/integration/service/service_test.go
Expand Up @@ -19,16 +19,10 @@ package service
import (
"context"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
Expand Down Expand Up @@ -270,139 +264,3 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t
t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
}
}

func Test_ServiceClusterIPSelector(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}

ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)

// create headless service
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-headless",
Namespace: ns.Name,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
Selector: map[string]string{
"foo": "bar",
},
},
}

_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

// informer to watch only non-headless services
kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String()
}))

serviceInformer := kubeInformers.Core().V1().Services().Informer()
serviceLister := kubeInformers.Core().V1().Services().Lister()
serviceHasSynced := serviceInformer.HasSynced
if _, err = serviceInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
t.Logf("Added Service %#v", svc)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldSvc := oldObj.(*corev1.Service)
newSvc := newObj.(*corev1.Service)
t.Logf("Updated Service %#v to %#v", oldSvc, newSvc)
},
DeleteFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
t.Logf("Deleted Service %#v", svc)
},
},
); err != nil {
t.Fatalf("Error adding service informer handler: %v", err)
}
kubeInformers.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), serviceHasSynced)
svcs, err := serviceLister.List(labels.Everything())
if err != nil {
t.Fatalf("Error listing services: %v", err)
}
// only the kubernetes.default service expected
if len(svcs) != 1 || svcs[0].Name != "kubernetes" {
t.Fatalf("expected 1 services, got %d", len(svcs))
}

// create a new service with ClusterIP
service2 := service.DeepCopy()
service2.Spec.ClusterIP = ""
service2.Name = "test-clusterip"
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service test-clusterip: %v", err)
}

// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ...
service.Spec.ExternalName = "test"
service.Spec.Type = corev1.ServiceTypeExternalName
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service without ClusterIP: %v", err)
}

// mutate the Service to get the ClusterIP again
service.Spec.ExternalName = ""
service.Spec.ClusterIP = ""
service.Spec.Type = corev1.ServiceTypeClusterIP
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}

err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service with ClusterIP: %v", err)
}
}

0 comments on commit 634fc1b

Please sign in to comment.