Skip to content

Commit

Permalink
Merge pull request #70 from telekom/feature/monitoring-and-bpf-reattach
Browse files Browse the repository at this point in the history
Re-Attach/Update BPF program on existing VRF interfaces and Fix Monitoring
  • Loading branch information
chdxD1 committed Oct 18, 2023
2 parents 9528bbd + 0745bcd commit 40566cd
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 123 deletions.
1 change: 1 addition & 0 deletions cmd/frr-exporter/main.go
Expand Up @@ -34,6 +34,7 @@ func main() {
map[string]bool{
"frr": true,
"netlink": false,
"bpf": false,
})
if err != nil {
log.Fatal(fmt.Errorf("failed to create collector %w", err))
Expand Down
9 changes: 8 additions & 1 deletion pkg/bpf/loader.go
Expand Up @@ -33,7 +33,6 @@ func InitBPFRouter() error {
if err := loadRouterObjects(&router, nil); err != nil {
return err
}
initMonitoring()
return nil
}

Expand All @@ -59,6 +58,14 @@ func AttachInterfaces(intfs []string) error {
return nil
}

func EbpfRetStatsMap() *ebpf.Map {
return router.routerMaps.EbpfRetStatsMap
}

func EbpfFibLkupStatsMap() *ebpf.Map {
return router.routerMaps.EbpfFibLkupStatsMap
}

// First we ensure the qdisc is there. It is a very basic check, ensuring we have an clsact qdisc with the correct handle
// as no other app should modify the tc options on existing interfaces (other than deleting/adding them altogether) there shouldn't be a risk.
func ensureQdisc(intf netlink.Link) error {
Expand Down
76 changes: 0 additions & 76 deletions pkg/bpf/monitoring.go

This file was deleted.

179 changes: 133 additions & 46 deletions pkg/monitoring/bpf.go
@@ -1,48 +1,135 @@
package monitoring

// TODO: move bpf monitoring in here from pkg/bpf/monioring.go
// import (
// "github.com/go-logr/logr"
// "github.com/prometheus/client_golang/prometheus"
// ctrl "sigs.k8s.io/controller-runtime"
// )
// type bpfCollector struct {
// // routesDesc typedFactoryDesc
// bpf string
// logger logr.Logger
// }
// func init() {
// registerCollector("bpf", defaultEnabled, NewBPFCollector)
// }
// func NewBPFCollector() (Collector, error) {
// // frrCli, err := bpf.
// var err error
// if err != nil {
// return nil, err
// }
// collector := bpfCollector{
// // routesDesc: typedFactoryDesc{
// // desc: prometheus.NewDesc(
// // prometheus.BuildFQName(namespace, "frr", "routes"),
// // "The number of routes currently in the Linux Dataplane.",
// // []string{"table", "protocol", "address_family"},
// // nil,
// // ),
// // valueType: prometheus.GaugeValue,
// // },
// // bpf: nil,
// logger: ctrl.Log.WithName("bpf.collector"),
// }
// return &collector, nil
// }
// func (c *bpfCollector) Update(ch chan<- prometheus.Metric) error {
// // routes, err := (nil, nil)
// // c.logger.Info("I am in the netlink collector")
// // if err != nil {
// // return err
// // }
// // for _, route := range routes {
// // ch <- c.routesDesc.mustNewConstMetric(float64(route.Quantity), fmt.Sprint(route.TableID), route.RouteProtocol.String(), route.AddressFamily)
// // }
// return nil
// }.
import (
"fmt"

"github.com/cilium/ebpf"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/telekom/das-schiff-network-operator/pkg/bpf"
ctrl "sigs.k8s.io/controller-runtime"
)

type bpfCollector struct {
returnReasonsPacketsDesc typedFactoryDesc
returnReasonsBytesDesc typedFactoryDesc
fibLookupPacketsDesc typedFactoryDesc
fibLookupBytesDesc typedFactoryDesc
logger logr.Logger
}

type statsRecord struct {
RXPackets uint64
RXBytes uint64
}

var (
epbfReturnReasons = []string{
"route", "route_noneigh", "err_parse_headers", "not_fwd", "err_store_mac", "err_buffer_size", "err_fallthrough",
}
ebpfFibLookupResult = []string{
"success", "blackhole", "unreacheable", "prohibit", "not_fwded", "fwd_disabled", "unsupp_lwt", "no_neigh", "frag_needed",
}
)

func init() {
registerCollector("bpf", defaultEnabled, NewBPFCollector)
}

func NewBPFCollector() (Collector, error) {
collector := bpfCollector{
returnReasonsPacketsDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "bpf", "return_reasons_packets"),
"The BPF tc_router program return reasons",
[]string{"key"},
nil,
),
valueType: prometheus.CounterValue,
},
returnReasonsBytesDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "bpf", "return_reasons_bytes"),
"The BPF tc_router program return reasons",
[]string{"key"},
nil,
),
valueType: prometheus.CounterValue,
},
fibLookupPacketsDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "bpf", "fib_lookup_packets"),
"The BPF tc_router program lookup results",
[]string{"key"},
nil,
),
valueType: prometheus.CounterValue,
},
fibLookupBytesDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "bpf", "fib_lookup_bytes"),
"The BPF tc_router program lookup results",
[]string{"key"},
nil,
),
valueType: prometheus.CounterValue,
},
logger: ctrl.Log.WithName("bpf.collector"),
}

return &collector, nil
}

func (*bpfCollector) fetchEbpfStatistics(m *ebpf.Map, key uint32) (*statsRecord, error) {
var perCPUStats []*statsRecord
err := m.Lookup(key, &perCPUStats)
if err != nil {
return nil, fmt.Errorf("error looking up bpf key: %w", err)
}
var aggregatedStats statsRecord
for _, stat := range perCPUStats {
aggregatedStats.RXBytes += stat.RXBytes
aggregatedStats.RXPackets += stat.RXPackets
}
return &aggregatedStats, nil
}

func (c *bpfCollector) updateReturnReasons(ch chan<- prometheus.Metric) error {
if bpf.EbpfRetStatsMap() == nil {
return ErrNoData
}
for idx, name := range epbfReturnReasons {
stats, err := c.fetchEbpfStatistics(bpf.EbpfRetStatsMap(), uint32(idx))
if err != nil {
return err
}
ch <- c.returnReasonsPacketsDesc.mustNewConstMetric(float64(stats.RXPackets), name)
ch <- c.returnReasonsBytesDesc.mustNewConstMetric(float64(stats.RXBytes), name)
}
return nil
}

func (c *bpfCollector) updateFibLookupResults(ch chan<- prometheus.Metric) error {
if bpf.EbpfFibLkupStatsMap() == nil {
return ErrNoData
}
for idx, name := range ebpfFibLookupResult {
stats, err := c.fetchEbpfStatistics(bpf.EbpfFibLkupStatsMap(), uint32(idx))
if err != nil {
return err
}
ch <- c.fibLookupPacketsDesc.mustNewConstMetric(float64(stats.RXPackets), name)
ch <- c.fibLookupBytesDesc.mustNewConstMetric(float64(stats.RXBytes), name)
}
return nil
}

func (c *bpfCollector) Update(ch chan<- prometheus.Metric) error {
if err := c.updateReturnReasons(ch); err != nil {
return err
}
if err := c.updateFibLookupResults(ch); err != nil {
return err
}
return nil
}
23 changes: 23 additions & 0 deletions pkg/nl/layer3.go
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

"github.com/telekom/das-schiff-network-operator/pkg/bpf"
"github.com/vishvananda/netlink"
)

const (
Expand Down Expand Up @@ -141,3 +142,25 @@ func (n *NetlinkManager) GetL3ByName(name string) (*VRFInformation, error) {
}
return nil, fmt.Errorf("no VRF with name %s", name)
}

func (*NetlinkManager) EnsureBPFProgram(info VRFInformation) error {
if link, err := netlink.LinkByName(bridgePrefix + info.Name); err != nil {
return fmt.Errorf("error getting bridge interface of vrf %s: %w", info.Name, err)
} else if err := bpf.AttachToInterface(link); err != nil {
return fmt.Errorf("error attaching bpf program to bridge interface of vrf %s: %w", info.Name, err)
}

if link, err := netlink.LinkByName(vrfToDefaultPrefix + info.Name); err != nil {
return fmt.Errorf("error getting vrf2default interface of vrf %s: %w", info.Name, err)
} else if err := bpf.AttachToInterface(link); err != nil {
return fmt.Errorf("error attaching bpf program to vrf2default interface of vrf %s: %w", info.Name, err)
}

if link, err := netlink.LinkByName(vxlanPrefix + info.Name); err != nil {
return fmt.Errorf("error getting vxlan interface of vrf %s: %w", info.Name, err)
} else if err := bpf.AttachToInterface(link); err != nil {
return fmt.Errorf("error attaching bpf program to vxlan interface of vrf %s: %w", info.Name, err)
}

return nil
}
2 changes: 2 additions & 0 deletions pkg/reconciler/layer3.go
Expand Up @@ -181,6 +181,8 @@ func (r *reconcile) reconcileL3Netlink(vrfConfigs []frr.VRFConfiguration) ([]nl.
}
if !stillExists || cfg.MarkForDelete {
toDelete = append(toDelete, cfg)
} else if err := r.netlinkManager.EnsureBPFProgram(cfg); err != nil {
r.Logger.Error(err, "Error ensuring BPF program on VRF", "vrf", cfg.Name, "vni", strconv.Itoa(cfg.VNI))
}
}

Expand Down

0 comments on commit 40566cd

Please sign in to comment.