Skip to content

Commit

Permalink
Merge pull request #114 from telekom/feature/singularity
Browse files Browse the repository at this point in the history
Added singularity to getNetlink and getFRR status information
  • Loading branch information
Cellebyte committed Apr 23, 2024
2 parents 58beb58 + 015a283 commit 68c43e2
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 92 deletions.
17 changes: 15 additions & 2 deletions pkg/monitoring/collector.go
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -44,6 +45,18 @@ type typedFactoryDesc struct {
valueType prometheus.ValueType
}

type basicCollector struct {
name string
mu sync.Mutex
wg sync.WaitGroup
channels []chan<- prometheus.Metric
logger logr.Logger
}

// only use with Lock called before.
func (c *basicCollector) clearChannels() {
c.channels = []chan<- prometheus.Metric{}
}
func (d *typedFactoryDesc) mustNewConstMetric(value float64, labels ...string) prometheus.Metric {
return prometheus.MustNewConstMetric(d.desc, d.valueType, value, labels...)
}
Expand Down Expand Up @@ -94,11 +107,11 @@ func (DasSchiffNetworkOperatorCollector) Describe(ch chan<- *prometheus.Desc) {
// Collect implements the prometheus.Collector interface.
func (n DasSchiffNetworkOperatorCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(n.Collectors))
for name, c := range n.Collectors {
wg.Add(1)
go func(name string, c Collector) {
defer wg.Done()
execute(name, c, ch)
wg.Done()
}(name, c)
}
wg.Wait()
Expand Down
83 changes: 60 additions & 23 deletions pkg/monitoring/frr.go
Expand Up @@ -3,18 +3,21 @@ package monitoring
import (
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/telekom/das-schiff-network-operator/pkg/frr"
"github.com/telekom/das-schiff-network-operator/pkg/nl"
"github.com/telekom/das-schiff-network-operator/pkg/route"
"golang.org/x/sys/unix"
ctrl "sigs.k8s.io/controller-runtime"
)

const secondToMillisecond = 1000
const frrCollectorName = "frr"

type frrCollector struct {
basicCollector
routesFibDesc typedFactoryDesc
routesRibDesc typedFactoryDesc
vrfVniDesc typedFactoryDesc
Expand All @@ -26,13 +29,12 @@ type frrCollector struct {
bgpMessagesReceivedDesc typedFactoryDesc
bgpMessagesTransmittedDesc typedFactoryDesc
frr *frr.Manager
logger logr.Logger
}

// as we do not want to have frr collector registered within network operator
// it is commented here.
func init() {
registerCollector("frr", defaultDisabled, NewFRRCollector)
registerCollector(frrCollectorName, defaultDisabled, NewFRRCollector)
}

func convertToStateFloat(state string) float64 {
Expand All @@ -58,7 +60,7 @@ func NewFRRCollector() (Collector, error) {
collector := frrCollector{
routesFibDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "routes_fib"),
prometheus.BuildFQName(namespace, frrCollectorName, "routes_fib"),
"The number of routes currently in the frr Controlplane.",
[]string{"table", "vrf", "protocol", "address_family"},
nil,
Expand All @@ -67,7 +69,7 @@ func NewFRRCollector() (Collector, error) {
},
routesRibDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "routes_rib"),
prometheus.BuildFQName(namespace, frrCollectorName, "routes_rib"),
"The number of routes currently in the frr Controlplane.",
[]string{"table", "vrf", "protocol", "address_family"},
nil,
Expand All @@ -76,7 +78,7 @@ func NewFRRCollector() (Collector, error) {
},
vrfVniDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "vni_state"),
prometheus.BuildFQName(namespace, frrCollectorName, "vni_state"),
"The state of the vrf interface in frr",
[]string{
"table", "vrf", "svi", "vtep",
Expand All @@ -87,7 +89,7 @@ func NewFRRCollector() (Collector, error) {
},
evpnVniDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "vni_state"),
prometheus.BuildFQName(namespace, frrCollectorName, "vni_state"),
"The state of the Evpn vni interface",
[]string{
"table", "vrf", "svi", "vtep",
Expand All @@ -98,7 +100,7 @@ func NewFRRCollector() (Collector, error) {
},
bgpUptimeDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "bgp_uptime_seconds_total"),
prometheus.BuildFQName(namespace, frrCollectorName, "bgp_uptime_seconds_total"),
"Uptime of the session with the other BGP Peer",
bgpLabels,
nil,
Expand All @@ -107,7 +109,7 @@ func NewFRRCollector() (Collector, error) {
},
bgpStatusDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "bgp_status"),
prometheus.BuildFQName(namespace, frrCollectorName, "bgp_status"),
"The Session Status to the other BGP Peer",
bgpLabels,
nil,
Expand All @@ -116,7 +118,7 @@ func NewFRRCollector() (Collector, error) {
},
bgpPrefixesReceivedDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "bgp_prefixes_received_total"),
prometheus.BuildFQName(namespace, frrCollectorName, "bgp_prefixes_received_total"),
"The Prefixes Received from the other peer.",
bgpLabels,
nil,
Expand All @@ -125,7 +127,7 @@ func NewFRRCollector() (Collector, error) {
},
bgpPrefixesTransmittedDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "bgp_prefixes_transmitted_total"),
prometheus.BuildFQName(namespace, frrCollectorName, "bgp_prefixes_transmitted_total"),
"The Prefixes Transmitted to the other peer.",
bgpLabels,
nil,
Expand All @@ -134,7 +136,7 @@ func NewFRRCollector() (Collector, error) {
},
bgpMessagesReceivedDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "bgp_messages_received_total"),
prometheus.BuildFQName(namespace, frrCollectorName, "bgp_messages_received_total"),
"The messages Received to the other peer.",
bgpLabels,
nil,
Expand All @@ -143,25 +145,30 @@ func NewFRRCollector() (Collector, error) {
},
bgpMessagesTransmittedDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "frr", "bgp_messages_transmitted_total"),
prometheus.BuildFQName(namespace, frrCollectorName, "bgp_messages_transmitted_total"),
"The messages Transmitted to the other peer.",
bgpLabels,
nil,
),
valueType: prometheus.CounterValue,
},
frr: frr.NewFRRManager(),
logger: ctrl.Log.WithName("frr.collector"),
frr: frr.NewFRRManager(),
}

collector.name = frrCollectorName
collector.logger = ctrl.Log.WithName("frr.collector")

return &collector, nil
}

func (c *frrCollector) UpdateVrfs(ch chan<- prometheus.Metric) {
func (c *frrCollector) getVrfs() []frr.VrfVniSpec {
vrfs, err := c.frr.ListVrfs()
if err != nil {
c.logger.Error(err, "can't get vrfs from frr")
}
return vrfs
}

func (c *frrCollector) updateVrfs(ch chan<- prometheus.Metric, vrfs []frr.VrfVniSpec) {
for _, vrf := range vrfs {
// hotfix for default as it is called
// main in netlink/kernel
Expand All @@ -173,12 +180,15 @@ func (c *frrCollector) UpdateVrfs(ch chan<- prometheus.Metric) {
ch <- c.vrfVniDesc.mustNewConstMetric(state, vrf.Table, vrf.Vrf, vrf.SviIntf, vrf.VxlanIntf)
}
}

func (c *frrCollector) UpdateRoutes(ch chan<- prometheus.Metric) {
func (c *frrCollector) getRoutes() []route.Information {
routeSummaries, err := c.frr.ListRouteSummary("")
if err != nil {
c.logger.Error(err, "can't get routes from frr")
}
return routeSummaries
}

func (c *frrCollector) updateRoutes(ch chan<- prometheus.Metric, routeSummaries []route.Information) {
for _, routeSummary := range routeSummaries {
if routeSummary.VrfName == "default" {
routeSummary.VrfName = "main"
Expand All @@ -189,12 +199,15 @@ func (c *frrCollector) UpdateRoutes(ch chan<- prometheus.Metric) {
}
}

func (c *frrCollector) UpdateBGPNeighbors(ch chan<- prometheus.Metric) {
func (c *frrCollector) getBGPNeighbors() frr.BGPVrfSummary {
bgpNeighbors, err := c.frr.ListBGPNeighbors("")
if err != nil {
c.logger.Error(err, "can't get bgpNeighbors from frr: %w")
}
return bgpNeighbors
}

func (c *frrCollector) updateBGPNeighbors(ch chan<- prometheus.Metric, bgpNeighbors frr.BGPVrfSummary) {
for _, families := range bgpNeighbors {
for _, family := range frr.BGPAddressFamilyValues() {
neighbor, ok := families[family.String()]
Expand Down Expand Up @@ -226,10 +239,34 @@ func (c *frrCollector) UpdateBGPNeighbors(ch chan<- prometheus.Metric) {
}
}
}
func (c *frrCollector) updateChannels(vrfs []frr.VrfVniSpec, routes []route.Information, neighbors frr.BGPVrfSummary) {
for _, ch := range c.channels {
c.updateVrfs(ch, vrfs)
c.updateRoutes(ch, routes)
c.updateBGPNeighbors(ch, neighbors)
}
}

func (c *frrCollector) Update(ch chan<- prometheus.Metric) error {
c.UpdateVrfs(ch)
c.UpdateRoutes(ch)
c.UpdateBGPNeighbors(ch)
c.mu.Lock()
c.channels = append(c.channels, ch)
if len(c.channels) == 1 {
c.wg = sync.WaitGroup{}
c.wg.Add(1)
c.mu.Unlock()

routes := c.getRoutes()
vrfs := c.getVrfs()
neighbors := c.getBGPNeighbors()

c.mu.Lock()
c.updateChannels(vrfs, routes, neighbors)
c.clearChannels()
c.wg.Done()
c.mu.Unlock()
} else {
c.mu.Unlock()
c.wg.Wait()
}
return nil
}
62 changes: 49 additions & 13 deletions pkg/monitoring/nl.go
Expand Up @@ -2,30 +2,33 @@ package monitoring

import (
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/telekom/das-schiff-network-operator/pkg/nl"
"github.com/telekom/das-schiff-network-operator/pkg/route"
ctrl "sigs.k8s.io/controller-runtime"
)

const nlCollectorName = "netlink"

type netlinkCollector struct {
basicCollector
routesFibDesc typedFactoryDesc
neighborsDesc typedFactoryDesc
netlink *nl.NetlinkManager
logger logr.Logger
}

func init() {
registerCollector("netlink", defaultEnabled, NewNetlinkCollector)
registerCollector(nlCollectorName, defaultEnabled, NewNetlinkCollector)
}

// NewNetlinkCollector returns a new Collector exposing buddyinfo stats.
func NewNetlinkCollector() (Collector, error) {
collector := netlinkCollector{
routesFibDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "netlink", "routes_fib"),
prometheus.BuildFQName(namespace, nlCollectorName, "routes_fib"),
"The number of routes currently in the Linux Dataplane.",
[]string{"table", "vrf", "protocol", "address_family"},
nil,
Expand All @@ -34,42 +37,75 @@ func NewNetlinkCollector() (Collector, error) {
},
neighborsDesc: typedFactoryDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "netlink", "neighbors"),
prometheus.BuildFQName(namespace, nlCollectorName, "neighbors"),
"The number of neighbors currently in the Linux Dataplane.",
[]string{"interface", "address_family", "flags", "status"},
nil,
),
valueType: prometheus.GaugeValue,
},
netlink: &nl.NetlinkManager{},
logger: ctrl.Log.WithName("netlink.collector"),
}
collector.name = nlCollectorName
collector.logger = ctrl.Log.WithName("netlink.collector")

return &collector, nil
}

func (c *netlinkCollector) updateRoutes(ch chan<- prometheus.Metric) {
func (c *netlinkCollector) getRoutes() []route.Information {
routes, err := c.netlink.ListRouteInformation()
if err != nil {
c.logger.Error(err, "cannot get routes from netlink")
}
for _, route := range routes {
ch <- c.routesFibDesc.mustNewConstMetric(float64(route.Fib), fmt.Sprint(route.TableID), route.VrfName, nl.GetProtocolName(route.RouteProtocol), route.AddressFamily)
return routes
}

func (c *netlinkCollector) updateRoutes(ch chan<- prometheus.Metric, routeSummaries []route.Information) {
for _, routeSummary := range routeSummaries {
ch <- c.routesFibDesc.mustNewConstMetric(float64(routeSummary.Fib), fmt.Sprint(routeSummary.TableID), routeSummary.VrfName, nl.GetProtocolName(routeSummary.RouteProtocol), routeSummary.AddressFamily)
}
}

func (c *netlinkCollector) updateNeighbors(ch chan<- prometheus.Metric) {
func (c *netlinkCollector) getNeighbors() []nl.NeighborInformation {
neighbors, err := c.netlink.ListNeighborInformation()
if err != nil {
c.logger.Error(err, "Cannot get neighbors from netlink")
c.logger.Error(err, "cannot get neighbors from netlink")
}
return neighbors
}

func (c *netlinkCollector) updateNeighbors(ch chan<- prometheus.Metric, neighbors []nl.NeighborInformation) {
for _, neighbor := range neighbors {
ch <- c.neighborsDesc.mustNewConstMetric(neighbor.Quantity, neighbor.Interface, neighbor.Family, neighbor.Flag, neighbor.State)
}
}

func (c *netlinkCollector) updateChannels(neighbors []nl.NeighborInformation, routes []route.Information) {
for _, ch := range c.channels {
c.updateNeighbors(ch, neighbors)
c.updateRoutes(ch, routes)
}
}

func (c *netlinkCollector) Update(ch chan<- prometheus.Metric) error {
c.updateRoutes(ch)
c.updateNeighbors(ch)
c.mu.Lock()
c.channels = append(c.channels, ch)
if len(c.channels) == 1 {
c.wg = sync.WaitGroup{}
c.wg.Add(1)
c.mu.Unlock()

routes := c.getRoutes()
neighbors := c.getNeighbors()

c.mu.Lock()
c.updateChannels(neighbors, routes)
c.clearChannels()
c.wg.Done()
c.mu.Unlock()
} else {
c.mu.Unlock()
c.wg.Wait()
}
return nil
}

0 comments on commit 68c43e2

Please sign in to comment.