Skip to content

Commit

Permalink
datapath,endpoint: explicitly remove TC filters during endpoint teardown
Browse files Browse the repository at this point in the history
Prior to this commit, we left it up to the kernel to clean up tc attachments
when the CNI finally removes the veth when a Pod goes away. This leaves a window
of time where an endpoint's tc programs can potentially be invoked after
the endpoint's internal tail call maps have already been cleared and the
endpoint has been removed from the endpoint map and ipcache, resulting in
undefined behaviour.

This patch clearly defines the endpoint teardown sequence as follows:
- remove (endpoint) routes
- set the interface down
- detach tc(x) hooks
- remove endpoint from endpoint map
- remove endpoint policy program(s)
- delete conntrack map pins
- remove policy prog array map pin
- remove internal tail call map pin
- remove custom calls map pin

This puts the agent more in control of the teardown sequence and will allow us
to reason better about failures related to missed tail calls and other flakes.

Signed-off-by: Timo Beckers <timo@isovalent.com>
  • Loading branch information
ti-mo committed Apr 25, 2024
1 parent 6e53ad7 commit 7da33fd
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 49 deletions.
26 changes: 21 additions & 5 deletions pkg/datapath/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,13 +732,29 @@ func (l *loader) Unload(ep datapath.Endpoint) {
}
}

log := log.WithField(logfields.EndpointID, ep.StringID())

// Remove legacy tc attachments.
if err := removeTCFilters(ep.InterfaceName(), netlink.HANDLE_MIN_INGRESS); err != nil {
log.WithError(err).Errorf("Removing ingress filter from interface %s", ep.InterfaceName())
}
if err := removeTCFilters(ep.InterfaceName(), netlink.HANDLE_MIN_EGRESS); err != nil {
log.WithError(err).Errorf("Removing egress filter from interface %s", ep.InterfaceName())
}

// If Cilium and the kernel support tcx to attach TC programs to the
// endpoint's veth device, its bpf_link object is pinned to a per-endpoint
// bpffs directory. When the endpoint gets deleted, removing the whole
// directory cleans up any pinned maps and links.
bpffsPath := bpffsEndpointDir(bpf.CiliumPath(), ep)
if err := bpf.Remove(bpffsPath); err != nil {
log.WithError(err).WithField(logfields.EndpointID, ep.StringID())
// bpffs directory. When the endpoint gets deleted, we can remove the whole
// directory to clean up any leftover pinned links and maps.

// Remove the links directory first to avoid removing program arrays before
// the entrypoints are detached.
if err := bpf.Remove(bpffsEndpointLinksDir(bpf.CiliumPath(), ep)); err != nil {
log.WithError(err)
}
// Finally, remove the endpoint's top-level directory.
if err := bpf.Remove(bpffsEndpointDir(bpf.CiliumPath(), ep)); err != nil {
log.WithError(err)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/datapath/loader/netlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ func attachTCProgram(link netlink.Link, prog *ebpf.Program, progName, bpffsDir s
func removeTCFilters(ifName string, tcDir uint32) error {
link, err := netlink.LinkByName(ifName)
if err != nil {
return err
// No interface, no filters.
return nil
}

filters, err := netlink.FilterList(link, tcDir)
Expand Down
66 changes: 39 additions & 27 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,51 +974,63 @@ func (e *Endpoint) InitMap() error {
return policymap.Create(e.policyMapPath())
}

// deleteMaps releases references to all BPF maps associated with this
// endpoint.
//
// For each error that occurs while releasing these references, an error is
// added to the resulting error slice which is returned.
// deleteMaps deletes the endpoint's entry from the global
// cilium_(egress)call_policy maps and removes endpoint-specific cilium_calls_,
// cilium_policy_ and cilium_ct{4,6}_ map pins.
//
// Returns nil on success.
// Call this after the endpoint's tc hook has been detached.
func (e *Endpoint) deleteMaps() []error {
var errors []error

maps := map[string]string{
"policy": e.policyMapPath(),
"calls": e.callsMapPath(),
// Remove the endpoint from cilium_lxc. After this point, ip->epID lookups
// will fail, causing packets to/from the Pod to be dropped in many cases,
// stopping packet evaluation.
if err := lxcmap.DeleteElement(e); err != nil {
errors = append(errors, err...)
}
if !e.isHost {
maps["custom"] = e.customCallsMapPath()

// Remove the policy tail call entry for the endpoint. This will disable
// policy evaluation for the endpoint and will result in missing tail calls if
// e.g. bpf_host or bpf_overlay call into the endpoint's policy program.
if err := policymap.RemoveGlobalMapping(uint32(e.ID), option.Config.EnableEnvoyConfig); err != nil {
errors = append(errors, fmt.Errorf("removing endpoint program from global policy map: %w", err))
}
for name, path := range maps {
if err := os.RemoveAll(path); err != nil {
errors = append(errors, fmt.Errorf("unable to remove %s map file %s: %w", name, path, err))

// Remove rate limit from bandwidth manager map.
if e.bps != 0 {
if err := e.owner.Datapath().BandwidthManager().DeleteEndpointBandwidthLimit(e.ID); err != nil {
errors = append(errors, fmt.Errorf("removing endpoint from bandwidth manager map: %w", err))
}
}

if e.ConntrackLocalLocked() {
// Remove local connection tracking maps
// Remove endpoint-specific CT map pins.
for _, m := range ctmap.LocalMaps(e, option.Config.EnableIPv4, option.Config.EnableIPv6) {
ctPath, err := m.Path()
if err == nil {
err = os.RemoveAll(ctPath)
}
if err != nil {
errors = append(errors, fmt.Errorf("unable to remove CT map %s: %w", ctPath, err))
errors = append(errors, fmt.Errorf("getting path for CT map pin %s: %w", m.Name(), err))
continue
}
if err := os.RemoveAll(ctPath); err == nil {
errors = append(errors, fmt.Errorf("removing CT map pin %s: %w", ctPath, err))
}
}
}

// Remove handle_policy() tail call entry for EP
if err := policymap.RemoveGlobalMapping(uint32(e.ID), option.Config.EnableEnvoyConfig); err != nil {
errors = append(errors, fmt.Errorf("unable to remove endpoint from global policy map: %w", err))
// Remove program array pins as the last step. This permanently invalidates
// the endpoint programs' state, because removing a program array map pin
// removes the map's entries if the map is still referenced by any live bpf
// programs, potentially resulting in missed tail calls if any packets are
// still in flight.
if err := os.RemoveAll(e.policyMapPath()); err != nil {
errors = append(errors, fmt.Errorf("removing policy map pin for endpoint %s: %w", e.StringID(), err))
}

// Remove rate-limit from bandwidth manager map.
if e.bps != 0 {
if err := e.owner.Datapath().BandwidthManager().DeleteEndpointBandwidthLimit(e.ID); err != nil {
errors = append(errors, fmt.Errorf("unable to remote endpoint from bandwidth manager map: %w", err))
if err := os.RemoveAll(e.callsMapPath()); err != nil {
errors = append(errors, fmt.Errorf("removing calls map pin for endpoint %s: %w", e.StringID(), err))
}
if !e.isHost {
if err := os.RemoveAll(e.customCallsMapPath()); err != nil {
errors = append(errors, fmt.Errorf("removing custom calls map pin for endpoint %s: %w", e.StringID(), err))
}
}

Expand Down
46 changes: 30 additions & 16 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
k8sTypes "k8s.io/apimachinery/pkg/types"

"github.com/cilium/cilium/api/v1/models"
Expand Down Expand Up @@ -47,7 +48,6 @@ import (
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/mac"
"github.com/cilium/cilium/pkg/maps/ctmap"
"github.com/cilium/cilium/pkg/maps/lxcmap"
"github.com/cilium/cilium/pkg/maps/policymap"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/monitor/notifications"
Expand Down Expand Up @@ -1186,10 +1186,6 @@ type DeleteConfig struct {
func (e *Endpoint) leaveLocked(proxyWaitGroup *completion.WaitGroup, conf DeleteConfig) []error {
errs := []error{}

if !e.isProperty(PropertyFakeEndpoint) {
e.owner.Datapath().Loader().Unload(e.createEpInfoCache(""))
}

// Remove policy references from shared policy structures
e.desiredPolicy.Detach()
e.realizedPolicy.Detach()
Expand Down Expand Up @@ -2443,17 +2439,6 @@ func (e *Endpoint) Delete(conf DeleteConfig) []error {
}
e.setState(StateDisconnecting, "Deleting endpoint")

// If dry mode is enabled, no changes to BPF maps are performed
if !e.isProperty(PropertyFakeEndpoint) {
if errs2 := lxcmap.DeleteElement(e); errs2 != nil {
errs = append(errs, errs2...)
}

if errs2 := e.deleteMaps(); errs2 != nil {
errs = append(errs, errs2...)
}
}

if option.Config.IPAM == ipamOption.IPAMENI || option.Config.IPAM == ipamOption.IPAMAzure || option.Config.IPAM == ipamOption.IPAMAlibabaCloud {
e.getLogger().WithFields(logrus.Fields{
"ep": e.GetID(),
Expand Down Expand Up @@ -2483,6 +2468,24 @@ func (e *Endpoint) Delete(conf DeleteConfig) []error {
}
}

// If dry mode is enabled, no changes to system state are made.
if !e.isProperty(PropertyFakeEndpoint) {
// Set the Endpoint's interface down to prevent it from passing any traffic
// after its tc filters are removed.
if err := e.setDown(); err != nil {
errs = append(errs, err)
}

// Detach the endpoint program from any tc(x) hooks.
e.owner.Datapath().Loader().Unload(e.createEpInfoCache(""))

// Delete the endpoint's entries from the global cilium_(egress)call_policy
// maps and remove per-endpoint cilium_calls_ and cilium_policy_ map pins.
if err := e.deleteMaps(); err != nil {
errs = append(errs, err...)
}
}

completionCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
proxyWaitGroup := completion.NewWaitGroup(completionCtx)

Expand All @@ -2498,6 +2501,17 @@ func (e *Endpoint) Delete(conf DeleteConfig) []error {
return errs
}

// setDown sets the Endpoint's underlying interface down. If the interface
// cannot be retrieved, returns nil.
func (e *Endpoint) setDown() error {
link, err := netlink.LinkByName(e.HostInterface())
if err != nil {
return nil
}

return netlink.LinkSetDown(link)
}

// WaitForFirstRegeneration waits for the endpoint to complete its first full regeneration.
func (e *Endpoint) WaitForFirstRegeneration(ctx context.Context) error {
e.getLogger().Info("Waiting for endpoint to be generated")
Expand Down

0 comments on commit 7da33fd

Please sign in to comment.