Skip to content

Commit

Permalink
Merge pull request #86 from chenchun/reserve
Browse files Browse the repository at this point in the history
Resync ip if pod uid changed
  • Loading branch information
ChenLingPeng committed Sep 4, 2020
2 parents c1494ae + c6316ba commit 8f182e4
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 397 deletions.
4 changes: 1 addition & 3 deletions pkg/ipam/api/api.go
Expand Up @@ -60,7 +60,6 @@ type FloatingIP struct {
UpdateTime time.Time `json:"updateTime,omitempty"`
Status string `json:"status,omitempty"`
Releasable bool `json:"releasable,omitempty"`
attr string `json:"-"`
labels map[string]string `json:"-"`
}

Expand Down Expand Up @@ -349,8 +348,7 @@ func transform(fips []floatingip.FloatingIP) []FloatingIP {
AppType: toAppType(keyObj.AppTypePrefix),
Policy: fips[i].Policy,
UpdateTime: fips[i].UpdatedAt,
labels: fips[i].Labels,
attr: fips[i].Attr})
labels: fips[i].Labels})
}
return res
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipam/api/pool.go
Expand Up @@ -164,7 +164,7 @@ func (c *PoolController) preAllocateIP(req *restful.Request, resp *restful.Respo
}
needAllocateIPs := pool.Size - len(fips)
for i := 0; i < needAllocateIPs; i++ {
ip, err := c.IPAM.AllocateInSubnet(poolPrefix, subnetIPNet, constant.ReleasePolicyNever, "")
ip, err := c.IPAM.AllocateInSubnet(poolPrefix, subnetIPNet, floatingip.Attr{Policy: constant.ReleasePolicyNever})
if err == nil {
glog.Infof("allocated ip %s to %s during creating or updating pool", ip.String(), poolPrefix)
continue
Expand Down
65 changes: 54 additions & 11 deletions pkg/ipam/floatingip/floatingip.go
Expand Up @@ -24,39 +24,51 @@ import (
"time"

"k8s.io/apimachinery/pkg/util/sets"
"tkestack.io/galaxy/pkg/api/galaxy/constant"
"tkestack.io/galaxy/pkg/utils/nets"
)

// FloatingIP defines a floating ip
type FloatingIP struct {
Key string
Subnets sets.String // node subnet, not container ip's subnet
// TODO Replace attr with labels
Attr string
Key string
Subnets sets.String // node subnet, not container ip's subnet
IP net.IP
Policy uint16
UpdatedAt time.Time
Labels map[string]string
Policy uint16
NodeName string
PodUid string
}

func (f FloatingIP) String() string {
return fmt.Sprintf("FloatingIP{ip:%s key:%s attr:%s policy:%d subnets:%v}", f.IP.String(), f.Key, f.Attr, f.Policy, f.Subnets)
return fmt.Sprintf("FloatingIP{ip:%s key:%s policy:%d nodeName:%s podUid:%s subnets:%v}",
f.IP.String(), f.Key, f.Policy, f.NodeName, f.PodUid, f.Subnets)
}

// New creates a new FloatingIP
func New(ip net.IP, subnets sets.String, key string, attr *Attr, updateAt time.Time) *FloatingIP {
fip := &FloatingIP{IP: ip, Subnets: subnets}
fip.Assign(key, attr, updateAt)
return fip
}

func (f *FloatingIP) Assign(key, attr string, policy uint16, updateAt time.Time) *FloatingIP {
// Assign updates key, attr, updatedAt of FloatingIP
func (f *FloatingIP) Assign(key string, attr *Attr, updateAt time.Time) *FloatingIP {
f.Key = key
f.Attr = attr
f.Policy = policy
f.Policy = uint16(attr.Policy)
f.UpdatedAt = updateAt
f.NodeName = attr.NodeName
f.PodUid = attr.Uid
return f
}

func (f *FloatingIP) CloneWith(key, attr string, policy uint16, updateAt time.Time) *FloatingIP {
// CloneWith creates a new FloatingIP and updates key, attr, updatedAt
func (f *FloatingIP) CloneWith(key string, attr *Attr, updateAt time.Time) *FloatingIP {
fip := &FloatingIP{
IP: f.IP,
Subnets: f.Subnets,
}
return fip.Assign(key, attr, policy, updateAt)
return fip.Assign(key, attr, updateAt)
}

// FloatingIPPool is FloatingIPPool structure.
Expand Down Expand Up @@ -284,3 +296,34 @@ func (s FloatingIPSlice) Swap(i, j int) {
func (s FloatingIPSlice) Less(i, j int) bool {
return nets.IPToInt(s[i].Gateway) < nets.IPToInt(s[j].Gateway)
}

// Attr stores attrs about this pod
type Attr struct {
// NodeName is needed to send unassign request to cloud provider on resync
NodeName string
// uid is used to differentiate a deleting pod and a newly created pod with the same name such as statefulsets
// or tapp pod
Uid string
// Release policy
Policy constant.ReleasePolicy `json:"-"`
}

func (a Attr) String() string {
return fmt.Sprintf("Attr{policy:%d nodeName:%s uid:%s}", a.Policy, a.NodeName, a.Uid)
}

// unmarshalAttr unmarshal attributes and assign PodUid and NodeName
// Make sure invoke this func in a copied FloatingIP
func (f *FloatingIP) unmarshalAttr(attrStr string) error {
if attrStr == "" {
return nil
}
var attr Attr
if err := json.Unmarshal([]byte(attrStr), &attr); err != nil {
return fmt.Errorf("unmarshal attr %s for %s %s: %v", attrStr, f.Key, f.IP.String(), err)
} else {
f.NodeName = attr.NodeName
f.PodUid = attr.Uid
}
return nil
}
15 changes: 8 additions & 7 deletions pkg/ipam/floatingip/ipam.go
Expand Up @@ -39,15 +39,16 @@ type IPAM interface {
// unreleased map stores ip with its latest key if key changed
ReleaseIPs(map[string]string) (map[string]string, map[string]string, error)
// AllocateSpecificIP allocate pod a specific IP.
AllocateSpecificIP(string, net.IP, constant.ReleasePolicy, string) error
AllocateSpecificIP(string, net.IP, Attr) error
// AllocateInSubnet allocate subnet of IPs.
AllocateInSubnet(string, *net.IPNet, constant.ReleasePolicy, string) (net.IP, error)
AllocateInSubnet(string, *net.IPNet, Attr) (net.IP, error)
// AllocateInSubnetWithKey allocate a floatingIP in given subnet and key.
AllocateInSubnetWithKey(oldK, newK, subnet string, policy constant.ReleasePolicy, attr string) error
// ReserveIP can reserve a IP entitled by a terminated pod.
ReserveIP(oldK, newK, attr string) error
// UpdatePolicy update floatingIP's release policy and attr according to ip and key
UpdatePolicy(string, net.IP, constant.ReleasePolicy, string) error
AllocateInSubnetWithKey(oldK, newK, subnet string, attr Attr) error
// ReserveIP can reserve a IP entitled by a terminated pod. Attributes **expect policy attr** will be updated.
// Returns true if key or attr updated.
ReserveIP(oldK, newK string, attr Attr) (bool, error)
// UpdateAttr update floatingIP's release policy and attrs according to ip and key
UpdateAttr(string, net.IP, Attr) error
// Release release a given IP.
Release(string, net.IP) error
// First returns the first matched IP by key.
Expand Down
81 changes: 31 additions & 50 deletions pkg/ipam/floatingip/ipam_crd.go
Expand Up @@ -97,23 +97,15 @@ func NewCrdIPAM(fipClient crd_clientset.Interface, ipType Type, informer crdInfo
}

// AllocateSpecificIP allocate pod a specific IP.
func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, policy constant.ReleasePolicy, attr string) error {
func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, attr Attr) error {
ipStr := ip.String()
ci.cacheLock.RLock()
spec, find := ci.unallocatedFIPs[ipStr]
ci.cacheLock.RUnlock()
if !find {
return fmt.Errorf("failed to find floating ip by %s in cache", ipStr)
}
date := time.Now()
allocated := &FloatingIP{
IP: ip,
Key: key,
Subnets: spec.Subnets,
Attr: attr,
Policy: uint16(policy),
UpdatedAt: date,
}
allocated := New(ip, spec.Subnets, key, &attr, time.Now())
if err := ci.createFloatingIP(allocated); err != nil {
glog.Errorf("failed to create floatingIP %s: %v", ipStr, err)
return err
Expand All @@ -125,55 +117,38 @@ func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, policy constant.Rel
}

// AllocateInSubnet allocate subnet of IPs.
func (ci *crdIpam) AllocateInSubnet(key string, nodeSubnet *net.IPNet, policy constant.ReleasePolicy,
attr string) (allocated net.IP, err error) {
func (ci *crdIpam) AllocateInSubnet(key string, nodeSubnet *net.IPNet, attr Attr) (net.IP, error) {
if nodeSubnet == nil {
// this should never happen
return nil, fmt.Errorf("nil nodeSubnet")
}
var ipStr string
ci.cacheLock.Lock()
defer ci.cacheLock.Unlock()
nodeSubnetStr := nodeSubnet.String()
for k, v := range ci.unallocatedFIPs {
//find an unallocated fip, then use it
if v.Subnets.Has(nodeSubnetStr) {
ipStr = k
date := time.Now()
// we never updates ip or subnet object, it's ok to share these objs.
allocatedFIP := &FloatingIP{
IP: v.IP,
Key: key,
Subnets: v.Subnets,
Attr: attr,
Policy: uint16(policy),
UpdatedAt: date,
}
if err = ci.createFloatingIP(allocatedFIP); err != nil {
allocated := New(v.IP, v.Subnets, key, &attr, time.Now())
if err := ci.createFloatingIP(allocated); err != nil {
glog.Errorf("failed to create floatingIP %s: %v", ipStr, err)
ci.cacheLock.Unlock()
return
return nil, err
}
//sync cache when crd create success
ci.syncCacheAfterCreate(allocatedFIP)
ci.syncCacheAfterCreate(allocated)
break
}
}
ci.cacheLock.Unlock()
if ipStr == "" {
return nil, ErrNoEnoughIP
}
ci.cacheLock.RLock()
defer ci.cacheLock.RUnlock()
if err = ci.getFloatingIP(ipStr); err != nil {
return
}
allocated = net.ParseIP(ipStr)
return
return net.ParseIP(ipStr), nil
}

// AllocateInSubnetWithKey allocate a floatingIP in given subnet and key.
func (ci *crdIpam) AllocateInSubnetWithKey(oldK, newK, subnet string, policy constant.ReleasePolicy,
attr string) error {
func (ci *crdIpam) AllocateInSubnetWithKey(oldK, newK, subnet string, attr Attr) error {
ci.cacheLock.Lock()
defer ci.cacheLock.Unlock()
var (
Expand All @@ -193,35 +168,40 @@ func (ci *crdIpam) AllocateInSubnetWithKey(oldK, newK, subnet string, policy con
return fmt.Errorf("failed to find floatIP by key %s", oldK)
}
date := time.Now()
cloned := latest.CloneWith(newK, attr, uint16(policy), date)
cloned := latest.CloneWith(newK, &attr, date)
if err := ci.updateFloatingIP(cloned); err != nil {
glog.Errorf("failed to update floatingIP %s: %v", cloned.IP.String(), err)
return err
}
latest.Assign(newK, attr, uint16(policy), date)
latest.Assign(newK, &attr, date)
return nil
}

// ReserveIP can reserve a IP entitled by a terminated pod.
func (ci *crdIpam) ReserveIP(oldK, newK, attr string) error {
func (ci *crdIpam) ReserveIP(oldK, newK string, attr Attr) (bool, error) {
ci.cacheLock.Lock()
defer ci.cacheLock.Unlock()
for k, v := range ci.allocatedFIPs {
if v.Key == oldK {
if oldK == newK && v.PodUid == attr.Uid && v.NodeName == attr.NodeName {
// nothing changed
return false, nil
}
attr.Policy = constant.ReleasePolicy(v.Policy)
date := time.Now()
if err := ci.updateFloatingIP(v.CloneWith(newK, attr, v.Policy, date)); err != nil {
if err := ci.updateFloatingIP(v.CloneWith(newK, &attr, date)); err != nil {
glog.Errorf("failed to update floatingIP %s: %v", k, err)
return err
return false, err
}
v.Assign(newK, attr, v.Policy, date)
return nil
v.Assign(newK, &attr, date)
return true, nil
}
}
return fmt.Errorf("failed to find floatIP by key %s", oldK)
return false, fmt.Errorf("failed to find floatIP by key %s", oldK)
}

// UpdatePolicy update floatingIP's release policy and attr according to ip and key
func (ci *crdIpam) UpdatePolicy(key string, ip net.IP, policy constant.ReleasePolicy, attr string) error {
// UpdateAttr update floatingIP's release policy and attr according to ip and key
func (ci *crdIpam) UpdateAttr(key string, ip net.IP, attr Attr) error {
ipStr := ip.String()
ci.cacheLock.Lock()
defer ci.cacheLock.Unlock()
Expand All @@ -233,11 +213,11 @@ func (ci *crdIpam) UpdatePolicy(key string, ip net.IP, policy constant.ReleasePo
return fmt.Errorf("key for %s is %s, not %s", ipStr, v.Key, key)
}
date := time.Now()
if err := ci.updateFloatingIP(v.CloneWith(v.Key, attr, uint16(policy), date)); err != nil {
if err := ci.updateFloatingIP(v.CloneWith(v.Key, &attr, date)); err != nil {
glog.Errorf("failed to update floatingIP %s: %v", ipStr, err)
return err
}
v.Assign(v.Key, attr, uint16(policy), date)
v.Assign(v.Key, &attr, date)
return nil
}

Expand Down Expand Up @@ -395,14 +375,16 @@ func (ci *crdIpam) ConfigurePool(floatIPs []*FloatingIPPool) error {
tmpFip := &FloatingIP{
IP: netIP,
Key: ip.Spec.Key,
Attr: ip.Spec.Attribute,
Policy: uint16(ip.Spec.Policy),
// Since subnets may change and for reserved fips crds created by user manually, subnets may not be
// correct, assign it to the latest config instead of crd value
// TODO we can delete subnets field from crd?
Subnets: nodeSubnets[i],
UpdatedAt: ip.Spec.UpdateTime.Time,
}
if err := tmpFip.unmarshalAttr(ip.Spec.Attribute); err != nil {
glog.Error(err)
}
tmpCacheAllocated[ip.Name] = tmpFip
break
}
Expand Down Expand Up @@ -440,7 +422,6 @@ func (ci *crdIpam) ConfigurePool(floatIPs []*FloatingIPPool) error {
tmpFip := &FloatingIP{
IP: ip,
Key: "",
Attr: "",
Policy: uint16(constant.ReleasePolicyPodDelete),
Subnets: subnetSet,
UpdatedAt: now,
Expand All @@ -467,7 +448,7 @@ func (ci *crdIpam) syncCacheAfterCreate(fip *FloatingIP) {
// don't use lock inner function, otherwise deadlock will be caused
func (ci *crdIpam) syncCacheAfterDel(released *FloatingIP) {
ipStr := released.IP.String()
released.Assign("", "", uint16(constant.ReleasePolicyPodDelete), time.Now())
released.Assign("", &Attr{Policy: constant.ReleasePolicyPodDelete}, time.Now())
released.Labels = nil
delete(ci.allocatedFIPs, ipStr)
ci.unallocatedFIPs[ipStr] = released
Expand Down

0 comments on commit 8f182e4

Please sign in to comment.