Skip to content

Commit

Permalink
Refactor domainstats collector
Browse files Browse the repository at this point in the history
Signed-off-by: machadovilaca <machadovilaca@gmail.com>
  • Loading branch information
machadovilaca committed Apr 15, 2024
1 parent 3e20f8c commit 15c23f9
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 77 deletions.
13 changes: 6 additions & 7 deletions cmd/virt-handler/virt-handler.go
Expand Up @@ -71,8 +71,8 @@ import (
"kubevirt.io/kubevirt/pkg/certificates/bootstrap"
containerdisk "kubevirt.io/kubevirt/pkg/container-disk"
"kubevirt.io/kubevirt/pkg/controller"
promdomain "kubevirt.io/kubevirt/pkg/monitoring/domainstats/prometheus" // import for prometheus metrics
metrics "kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler"
metricshandler "kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler/handler"
"kubevirt.io/kubevirt/pkg/monitoring/profiler"
"kubevirt.io/kubevirt/pkg/service"
"kubevirt.io/kubevirt/pkg/util"
Expand Down Expand Up @@ -201,10 +201,6 @@ func (app *virtHandlerApp) Run() {
logger.V(1).Infof("hostname %s", app.HostOverride)
var err error

if err := metrics.SetupMetrics(); err != nil {
panic(fmt.Errorf("failed to set up metrics: %v", err))
}

// Copy container-disk binary
targetFile := filepath.Join(app.VirtLibDir, "/init/usr/bin/container-disk")
err = os.MkdirAll(filepath.Dir(targetFile), os.ModePerm)
Expand Down Expand Up @@ -375,7 +371,10 @@ func (app *virtHandlerApp) Run() {
app.VirtShareDir,
)

promdomain.SetupDomainStatsCollector(app.virtCli, app.VirtShareDir, app.HostOverride, app.MaxRequestsInFlight, vmiSourceInformer)
if err := metrics.SetupMetrics(app.VirtShareDir, app.HostOverride, app.MaxRequestsInFlight, vmiSourceInformer); err != nil {
panic(err)
}

if err := downwardmetrics.RunDownwardMetricsCollector(context.Background(), app.HostOverride, vmiSourceInformer, podIsolationDetector); err != nil {
panic(fmt.Errorf("failed to set up the downwardMetrics collector: %v", err))
}
Expand Down Expand Up @@ -543,7 +542,7 @@ func (app *virtHandlerApp) runPrometheusServer(errCh chan error) {

mux.Add(webService)
log.Log.V(1).Infof("metrics: max concurrent requests=%d", app.MaxRequestsInFlight)
mux.Handle("/metrics", promdomain.Handler(app.MaxRequestsInFlight))
mux.Handle("/metrics", metricshandler.Handler(app.MaxRequestsInFlight))
server := http.Server{
Addr: app.ServiceListen.Address(),
Handler: mux,
Expand Down
11 changes: 0 additions & 11 deletions pkg/monitoring/domainstats/collector_suite_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/monitoring/domainstats/downwardmetrics/BUILD.bazel
Expand Up @@ -13,7 +13,7 @@ go_library(
"//pkg/downwardmetrics/vhostmd:go_default_library",
"//pkg/downwardmetrics/vhostmd/api:go_default_library",
"//pkg/downwardmetrics/vhostmd/metrics:go_default_library",
"//pkg/monitoring/domainstats:go_default_library",
"//pkg/monitoring/metrics/virt-handler/domainstats/collector:go_default_library",
"//pkg/virt-handler/cmd-client:go_default_library",
"//pkg/virt-handler/isolation:go_default_library",
"//pkg/virt-launcher/virtwrap/stats:go_default_library",
Expand Down
12 changes: 7 additions & 5 deletions pkg/monitoring/domainstats/downwardmetrics/scraper.go
Expand Up @@ -14,14 +14,14 @@ import (
"kubevirt.io/kubevirt/pkg/downwardmetrics/vhostmd"
"kubevirt.io/kubevirt/pkg/downwardmetrics/vhostmd/api"
metricspkg "kubevirt.io/kubevirt/pkg/downwardmetrics/vhostmd/metrics"
vms "kubevirt.io/kubevirt/pkg/monitoring/domainstats"
"kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler/domainstats/collector"
cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
"kubevirt.io/kubevirt/pkg/virt-handler/isolation"
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/stats"
)

const DownwardmetricsRefreshDuration = 5 * time.Second
const DownwardmetricsCollectionTimeout = vms.CollectionTimeout
const DownwardmetricsCollectionTimeout = collector.CollectionTimeout
const qemuVersionUnknown = "qemu-unknown"

type StaticHostMetrics struct {
Expand All @@ -35,6 +35,8 @@ type Scraper struct {
reporter *DownwardMetricsReporter
}

func (s *Scraper) Complete() {}

func (s *Scraper) Scrape(socketFile string, vmi *k6sv1.VirtualMachineInstance) {
if !vmi.IsRunning() || !downwardmetrics.HasDownwardMetricDisk(vmi) {
return
Expand Down Expand Up @@ -100,7 +102,7 @@ func (r *DownwardMetricsReporter) Report(socketFile string) (*api.Metrics, error
// In the best case the information is stale, in the worst case the information is stale *and*
// the reporting channel is already closed, leading to a possible panic - see below
elapsed := time.Now().Sub(ts)
if elapsed > vms.StatsMaxAge {
if elapsed > collector.StatsMaxAge {
log.Log.Infof("took too long (%v) to collect stats from %s: ignored", elapsed, socketFile)
return nil, fmt.Errorf("took too long (%v) to collect stats from %s: ignored", elapsed, socketFile)
}
Expand Down Expand Up @@ -142,7 +144,7 @@ func guestMemoryMetrics(vmStats *stats.DomainStats) []api.Metric {
}

type Collector struct {
concCollector *vms.ConcurrentCollector
concCollector *collector.ConcurrentCollector
}

func NewReporter(nodeName string) *DownwardMetricsReporter {
Expand All @@ -161,7 +163,7 @@ func RunDownwardMetricsCollector(context context.Context, nodeName string, vmiIn
isolation: isolation,
reporter: NewReporter(nodeName),
}
collector := vms.NewConcurrentCollector(1)
collector := collector.NewConcurrentCollector(1)

go func() {
ticker := time.NewTicker(DownwardmetricsRefreshDuration)
Expand Down
7 changes: 0 additions & 7 deletions pkg/monitoring/domainstats/scraper.go

This file was deleted.

1 change: 1 addition & 0 deletions pkg/monitoring/metrics/virt-handler/BUILD.bazel
Expand Up @@ -14,5 +14,6 @@ go_library(
"//pkg/monitoring/metrics/virt-handler/domainstats:go_default_library",
"//staging/src/kubevirt.io/client-go/version:go_default_library",
"//vendor/github.com/machadovilaca/operator-observability/pkg/operatormetrics:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
19 changes: 18 additions & 1 deletion pkg/monitoring/metrics/virt-handler/domainstats/BUILD.bazel
Expand Up @@ -12,31 +12,48 @@ go_library(
"migration_metrics.go",
"network_metrics.go",
"node_cpu_affinity_metrics.go",
"vcpuMetrics.go",
"scrapper.go",
"vcpu_metrics.go",
],
importpath = "kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler/domainstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/monitoring/metrics/virt-handler/domainstats/collector:go_default_library",
"//pkg/virt-handler/cmd-client:go_default_library",
"//pkg/virt-launcher/virtwrap/stats:go_default_library",
"//staging/src/kubevirt.io/api/core/v1:go_default_library",
"//staging/src/kubevirt.io/client-go/log:go_default_library",
"//vendor/github.com/machadovilaca/operator-observability/pkg/operatormetrics:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = [
"block_metrics_test.go",
"collector_test.go",
"cpu_metrics_test.go",
"domainstats_suite_test.go",
"domainstats_test.go",
"filesystem_metrics_test.go",
"memory_metrics_test.go",
"migration_metrics_test.go",
"network_metrics_test.go",
"node_cpu_affinity_metrics_test.go",
"vcpu_metrics_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/monitoring/metrics/virt-handler/domainstats/collector:go_default_library",
"//pkg/virt-launcher/virtwrap/stats:go_default_library",
"//staging/src/kubevirt.io/api/core/v1:go_default_library",
"//staging/src/kubevirt.io/client-go/testutils:go_default_library",
"//vendor/github.com/machadovilaca/operator-observability/pkg/operatormetrics:go_default_library",
"//vendor/github.com/onsi/ginkgo/v2:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/onsi/gomega/format:go_default_library",
"//vendor/github.com/onsi/gomega/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
71 changes: 47 additions & 24 deletions pkg/monitoring/metrics/virt-handler/domainstats/collector.go
Expand Up @@ -20,13 +20,16 @@ package domainstats

import (
"github.com/machadovilaca/operator-observability/pkg/operatormetrics"
"k8s.io/client-go/tools/cache"
k6tv1 "kubevirt.io/api/core/v1"
"kubevirt.io/client-go/log"

"kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler/domainstats/collector"
)

type resourceMetrics interface {
Describe() []operatormetrics.Metric
Collect(report *VirtualMachineInstanceReport) []operatormetrics.CollectorResult
}
const (
PrometheusCollectionTimeout = collector.CollectionTimeout
)

var (
rms = []resourceMetrics{
Expand All @@ -44,8 +47,31 @@ var (
Metrics: domainStatsMetrics(rms...),
CollectCallback: domainStatsCollectorCallback,
}

settings *collectorSettings
)

type resourceMetrics interface {
Describe() []operatormetrics.Metric
Collect(report *VirtualMachineInstanceReport) []operatormetrics.CollectorResult
}

type collectorSettings struct {
virtShareDir string
nodeName string
maxRequestsInFlight int
vmiInformer cache.SharedIndexInformer
}

func SetupDomainStatsCollector(virtShareDir, nodeName string, maxRequestsInFlight int, vmiInformer cache.SharedIndexInformer) {
settings = &collectorSettings{
virtShareDir: virtShareDir,
nodeName: nodeName,
maxRequestsInFlight: maxRequestsInFlight,
vmiInformer: vmiInformer,
}
}

func domainStatsMetrics(rms ...resourceMetrics) []operatormetrics.Metric {
var metrics []operatormetrics.Metric

Expand All @@ -57,33 +83,30 @@ func domainStatsMetrics(rms ...resourceMetrics) []operatormetrics.Metric {
}

func domainStatsCollectorCallback() []operatormetrics.CollectorResult {
var crs []operatormetrics.CollectorResult
var vmis []k6tv1.VirtualMachineInstance

for _, vmi := range vmis {
crs = append(crs, collect(&vmi)...)
cachedObjs := settings.vmiInformer.GetIndexer().List()
if len(cachedObjs) == 0 {
log.Log.V(4).Infof("No VMIs detected")
return []operatormetrics.CollectorResult{}
}

return crs
}
vmis := make([]*k6tv1.VirtualMachineInstance, len(cachedObjs))

func collect(vmi *k6tv1.VirtualMachineInstance) []operatormetrics.CollectorResult {
var crs []operatormetrics.CollectorResult
for i, obj := range cachedObjs {
vmis[i] = obj.(*k6tv1.VirtualMachineInstance)
}

vmiStats := scrape(vmi)
concCollector := collector.NewConcurrentCollector(settings.maxRequestsInFlight)
return execCollector(concCollector, vmis)
}

if vmiStats == nil {
return crs
}
func execCollector(concCollector collector.Collector, vmis []*k6tv1.VirtualMachineInstance) []operatormetrics.CollectorResult {
scraper := &domainstatsScraper{ch: make(chan operatormetrics.CollectorResult, len(vmis)*len(rms))}
go concCollector.Collect(vmis, scraper, PrometheusCollectionTimeout)

vmiReport := newVirtualMachineInstanceReport(vmi, vmiStats)
for _, rm := range rms {
crs = append(crs, rm.Collect(vmiReport)...)
var crs []operatormetrics.CollectorResult
for cr := range scraper.ch {
crs = append(crs, cr)
}

return crs
}

func scrape(vmi *k6tv1.VirtualMachineInstance) *VirtualMachineInstanceStats {
return nil
}
Expand Up @@ -6,7 +6,7 @@ go_library(
"collector.go",
"scraper.go",
],
importpath = "kubevirt.io/kubevirt/pkg/monitoring/domainstats",
importpath = "kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler/domainstats/collector",
visibility = ["//visibility:public"],
deps = [
"//pkg/virt-handler/cmd-client:go_default_library",
Expand Down
Expand Up @@ -17,7 +17,7 @@
*
*/

package vms
package collector

import (
"sync"
Expand All @@ -35,7 +35,6 @@ const StatsMaxAge = CollectionTimeout + 2*time.Second // "a bit more" than timeo
type vmiSocketMap map[string]*k6tv1.VirtualMachineInstance

type Collector interface {
Scrape(key string, vmi *k6tv1.VirtualMachineInstance)
Collect(vmis []*k6tv1.VirtualMachineInstance, scraper MetricsScraper, timeout time.Duration) (skipped []string, completed bool)
}

Expand All @@ -46,11 +45,15 @@ type ConcurrentCollector struct {
socketMapper func(vmis []*k6tv1.VirtualMachineInstance) vmiSocketMap
}

func NewConcurrentCollector(MaxRequestsPerKey int) *ConcurrentCollector {
func NewConcurrentCollector(MaxRequestsPerKey int) Collector {
return NewConcurrentCollectorWithMapper(MaxRequestsPerKey, newvmiSocketMapFromVMIs)
}

func NewConcurrentCollectorWithMapper(MaxRequestsPerKey int, mapper func(vmis []*k6tv1.VirtualMachineInstance) vmiSocketMap) Collector {
return &ConcurrentCollector{
clientsPerKey: make(map[string]int),
maxClientsPerKey: MaxRequestsPerKey,
socketMapper: newvmiSocketMapFromVMIs,
socketMapper: mapper,
}
}

Expand Down Expand Up @@ -88,6 +91,7 @@ func (cc *ConcurrentCollector) Collect(vmis []*k6tv1.VirtualMachineInstance, scr
}

log.Log.V(4).Infof("Collection completed")
scraper.Complete()

return skipped, completed
}
Expand Down
@@ -0,0 +1,30 @@
/*
* This file is part of the KubeVirt project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright the KubeVirt Authors.
*
*/

package collector

import (
"testing"

"kubevirt.io/client-go/testutils"
)

func TestPrometheus(t *testing.T) {
testutils.KubeVirtTestSuiteSetup(t)
}

0 comments on commit 15c23f9

Please sign in to comment.