Skip to content

Commit

Permalink
Merge pull request #145 from 00pf00/ip_gc
Browse files Browse the repository at this point in the history
flannel_gc
  • Loading branch information
qyzhaoxun committed Jan 30, 2024
2 parents e8b542c + 977b42a commit 1b17268
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 27 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Expand Up @@ -7,10 +7,10 @@ jobs:
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.14
- name: Set up Go 1.18
uses: actions/setup-go@v1
with:
go-version: 1.14.2
go-version: 1.18.4
id: go

- name: Check out code into the Go module directory
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/docker/docker.go
Expand Up @@ -90,15 +90,15 @@ func (d *DockerInterface) DockerInspectContainer(id string) (*dockertypes.Contai
return &containerJSON, nil
}

func (d *DockerInterface) ContainedInspectContainer(id string) (*criapi.ContainerStatus, error) {
func (d *DockerInterface) ContainedInspectContainer(id string) (*criapi.PodSandboxStatus, error) {
ctx, cancel := getTimeoutContext()
defer cancel()
if os.Getenv("CONTAINERD_HOST") != "" {
request := &criapi.ContainerStatusRequest{
ContainerId: id,
Verbose: true,
request := &criapi.PodSandboxStatusRequest{
PodSandboxId: id,
Verbose: true,
}
resp, err := d.containerdClient.ContainerStatus(ctx, request)
resp, err := d.containerdClient.PodSandboxStatus(ctx, request)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/galaxy/galaxy.go
Expand Up @@ -124,7 +124,7 @@ func (g *Galaxy) Start() error {
return err
}
g.initk8sClient()
gc.NewFlannelGC(g.dockerCli, g.quitChan, g.cleanIPtables).Run()
gc.NewFlannelGC(g.client, g.dockerCli, g.quitChan, g.cleanIPtables).Run()
kernel.BridgeNFCallIptables(g.quitChan, g.BridgeNFCallIptables)
kernel.IPForward(g.quitChan, g.IPForward)
if err := g.setupIPtables(); err != nil {
Expand Down
33 changes: 27 additions & 6 deletions pkg/gc/flannel_gc.go
Expand Up @@ -17,11 +17,15 @@
package gc

import (
"context"
"flag"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io/ioutil"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"net"
"os"
"path/filepath"
Expand All @@ -37,8 +41,10 @@ import (
)

const (
ContainerExited = "exited"
ContainerDead = "dead"
ContainerExited = "exited"
ContainerDead = "dead"
SandboxName = "io.kubernetes.cri.sandbox-name"
SandboxNamespace = "io.kubernetes.cri.sandbox-namespace"
)

var (
Expand All @@ -61,16 +67,18 @@ type flannelGC struct {
allocatedIPDir []string
gcDirs []string
dockerCli *docker.DockerInterface
kubeCli kubernetes.Interface
quit <-chan struct{}
cleanPortFunc func(containerID string) error
}

func NewFlannelGC(dockerCli *docker.DockerInterface, quit <-chan struct{},
func NewFlannelGC(kubeCli kubernetes.Interface, dockerCli *docker.DockerInterface, quit <-chan struct{},
cleanPortFunc func(containerID string) error) GC {
dirs := strings.Split(*flagGCDirs, ",")
return &flannelGC{
allocatedIPDir: strings.Split(*flagAllocatedIPDir, ","),
gcDirs: dirs,
kubeCli: kubeCli,
dockerCli: dockerCli,
quit: quit,
cleanPortFunc: cleanPortFunc,
Expand Down Expand Up @@ -191,15 +199,28 @@ func (gc *flannelGC) shouldCleanup(cid string) bool {
if c, err := gc.dockerCli.ContainedInspectContainer(cid); err != nil {
if stausErr, ok := status.FromError(err); ok {
if stausErr.Code() == codes.NotFound {
//glog.Infof("container %s not found", cid)
return false
glog.Infof("container %s not found", cid)
return true
}
glog.Warningf("Error inspect container %s: %v", cid, err)
} else {
glog.Warningf("Error inspect container %s: %v", cid, err)
}
} else {
if c != nil && (c.State == criapi.ContainerState_CONTAINER_EXITED || c.State == criapi.ContainerState_CONTAINER_UNKNOWN) {
if c != nil && (c.State == criapi.PodSandboxState_SANDBOX_NOTREADY) {
pod, err := gc.kubeCli.CoreV1().Pods(c.Annotations[SandboxNamespace]).Get(context.Background(), c.Annotations[SandboxName], metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true
}
glog.Errorf("failed to get pod %s", fmt.Sprintf("%s/%s", c.Annotations[SandboxNamespace], c.Annotations[SandboxName]))
return false
}
for _, status := range pod.Status.ContainerStatuses {
if status.State.Waiting != nil || status.State.Running != nil {
return false
}
}
glog.Infof("container %s exited %s", c.Id, c.State.String())
return true
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ipam/floatingip/ipam_crd_test.go
Expand Up @@ -17,6 +17,7 @@
package floatingip

import (
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestAllocateSpecificIP(t *testing.T) {
}

func checkFIP(ipam *crdIpam, expect ...string) error {
fips, err := ipam.client.GalaxyV1alpha1().FloatingIPs().List(v1.ListOptions{})
fips, err := ipam.client.GalaxyV1alpha1().FloatingIPs().List(context.Background(), v1.ListOptions{})
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ipam/floatingip/store_crd_test.go
Expand Up @@ -17,6 +17,7 @@
package floatingip

import (
"context"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -44,7 +45,7 @@ func TestAddFloatingIPEventByUser(t *testing.T) {
if err := assign(fipCrd, fip); err != nil {
t.Fatal(err)
}
if _, err := ipam.client.GalaxyV1alpha1().FloatingIPs().Create(fipCrd); err != nil {
if _, err := ipam.client.GalaxyV1alpha1().FloatingIPs().Create(context.Background(), fipCrd, v1.CreateOptions{}); err != nil {
t.Fatal(err)
}
if err := waitFor(ipam, fip.IP, fip.Key, true, node1IPNet.String()); err != nil {
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestDeleteFloatingIPEvent(t *testing.T) {
ip := net.ParseIP(fipCrd.Name)
fipCrd.Labels[constant.ReserveFIPLabel] = ""
fipCrd.Spec.Key = "pool__reserved-for-node_"
if _, err := ipam.client.GalaxyV1alpha1().FloatingIPs().Create(fipCrd); err != nil {
if _, err := ipam.client.GalaxyV1alpha1().FloatingIPs().Create(context.Background(), fipCrd, v1.CreateOptions{}); err != nil {
t.Fatal(err)
}
if err := waitFor(ipam, ip, fipCrd.Spec.Key, true, node1IPNet.String()); err != nil {
Expand All @@ -124,7 +125,7 @@ func TestDeleteFloatingIPEvent(t *testing.T) {

// test if an event is created by user, deleteFloatingIPEvent should handle it
fipCrd.Labels[constant.ReserveFIPLabel] = ""
if err := ipam.client.GalaxyV1alpha1().FloatingIPs().Delete(fipCrd.Name, &v1.DeleteOptions{}); err != nil {
if err := ipam.client.GalaxyV1alpha1().FloatingIPs().Delete(context.Background(), fipCrd.Name, v1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
if err := waitFor(ipam, ip, "", false, node1IPNet.String()); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ipam/schedulerplugin/bind_test.go
Expand Up @@ -17,6 +17,7 @@
package schedulerplugin

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestReleaseIPOfFinishedPod(t *testing.T) {
t.Fatalf("case %d: %v", i, err)
}
testCase.updatePodStatus(pod)
if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.Background(), pod, v1.UpdateOptions{}); err != nil {
t.Fatalf("case %d: %v", i, err)
}
if err := wait.Poll(time.Microsecond*10, time.Second*30, func() (done bool, err error) {
Expand Down Expand Up @@ -285,7 +286,7 @@ func TestFilterBindRequestIPRange(t *testing.T) {
t.Fatal(err)
}
pod.Annotations = cniArgsAnnotation(request)
if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).Update(pod); err != nil {
if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).Update(context.Background(), pod, v1.UpdateOptions{}); err != nil {
t.Fatal(err)
}
// wait for lister updates
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipam/schedulerplugin/crdkey_test.go
Expand Up @@ -37,7 +37,7 @@ func TestGetGroupVersionResource(t *testing.T) {
if gvr == nil {
t.Fatal()
}
if gvr.Group != FooCrd.Spec.Group || gvr.Version != FooCrd.Spec.Version ||
if gvr.Group != FooCrd.Spec.Group || gvr.Version != FooCrd.Spec.Versions[0].Name ||
gvr.Resource != FooCrd.Spec.Names.Plural {
t.Fatal(gvr)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/utils/test/test_customresourcedefinition.go
Expand Up @@ -75,21 +75,23 @@ func (c *customResourceDefinitionBuilder) Get() *extensionv1.CustomResourceDefin
APIVersion: "apiextensions.k8s.io/v1",
},
Spec: extensionv1.CustomResourceDefinitionSpec{
Group: c.group,
Version: c.version,
Scope: "Namespaced",
Group: c.group,
Versions: []extensionv1.CustomResourceDefinitionVersion{{
Name: c.version,
Subresources: c.subresources,
}},
Scope: "Namespaced",
Names: extensionv1.CustomResourceDefinitionNames{
Plural: plural,
Singular: name,
Kind: kind,
ListKind: kind + "List",
},
Subresources: c.subresources,
},
}
}

// CrdApiVersionAndKind returns the apiVersion and kind of the given CustomResourceDefinition
func CrdApiVersionAndKind(crd *extensionv1.CustomResourceDefinition) (string, string) {
return crd.Spec.Group + "/" + crd.Spec.Version, crd.Spec.Names.Kind
return crd.Spec.Group + "/" + crd.Spec.Versions[0].Name, crd.Spec.Names.Kind
}
2 changes: 1 addition & 1 deletion pkg/utils/test/test_helper.go
Expand Up @@ -51,7 +51,7 @@ func CreateCRDPod(name, namespace string, annotations map[string]string,
return Pod().WithName(name).WithNamespace(namespace).
WithAnnotations(annotations).WithLabels(map[string]string{"app": appName}).
AddOwnerReferences(v1.OwnerReference{
APIVersion: crd.Spec.Version,
APIVersion: crd.Spec.Versions[0].Name,
Kind: crd.Spec.Names.Kind,
Name: appName,
}).
Expand Down

0 comments on commit 1b17268

Please sign in to comment.