/
scraper.go
192 lines (163 loc) · 6.02 KB
/
scraper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package downwardmetrics
import (
"context"
"fmt"
"time"
"k8s.io/client-go/tools/cache"
k6sv1 "kubevirt.io/api/core/v1"
"kubevirt.io/client-go/log"
"kubevirt.io/kubevirt/pkg/downwardmetrics"
"kubevirt.io/kubevirt/pkg/downwardmetrics/vhostmd"
"kubevirt.io/kubevirt/pkg/downwardmetrics/vhostmd/api"
metricspkg "kubevirt.io/kubevirt/pkg/downwardmetrics/vhostmd/metrics"
"kubevirt.io/kubevirt/pkg/monitoring/metrics/virt-handler/domainstats/collector"
cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
"kubevirt.io/kubevirt/pkg/virt-handler/isolation"
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/stats"
)
const DownwardmetricsRefreshDuration = 5 * time.Second
const DownwardmetricsCollectionTimeout = collector.CollectionTimeout
const qemuVersionUnknown = "qemu-unknown"
type StaticHostMetrics struct {
HostName string
HostSystemInfo string
VirtualizationVendor string
}
type Scraper struct {
isolation isolation.PodIsolationDetector
reporter *DownwardMetricsReporter
}
func (s *Scraper) Complete() {}
func (s *Scraper) Scrape(socketFile string, vmi *k6sv1.VirtualMachineInstance) {
if !vmi.IsRunning() || !downwardmetrics.HasDownwardMetricDisk(vmi) {
return
}
metrics, err := s.reporter.Report(socketFile)
if err != nil {
log.Log.Reason(err).Infof("failed to collect the metrics")
return
}
res, err := s.isolation.Detect(vmi)
if err != nil {
log.Log.Reason(err).Infof("failed to detect root directory of the vmi pod")
return
}
metricsUpdater := vhostmd.NewMetricsIODisk(downwardmetrics.FormatDownwardMetricPath(res.Pid()))
err = metricsUpdater.Write(metrics)
if err != nil {
log.Log.Reason(err).Infof("failed to write metrics to disk")
return
}
}
type DownwardMetricsReporter struct {
staticHostInfo *StaticHostMetrics
hostMetricsCollector *hostMetricsCollector
}
func (r *DownwardMetricsReporter) Report(socketFile string) (*api.Metrics, error) {
ts := time.Now()
cli, err := cmdclient.NewClient(socketFile)
if err != nil {
// Ignore failure to connect to client.
// These are all local connections via unix socket.
// A failure to connect means there's nothing on the other
// end listening.
return nil, fmt.Errorf("failed to connect to cmd client socket: %s", err.Error())
}
defer cli.Close()
version, err := cli.GetQemuVersion()
if err != nil {
if cmdclient.IsUnimplemented(err) {
log.Log.Reason(err).Warning("getQemuVersion not implemented, consider to upgrade kubevirt")
version = qemuVersionUnknown
} else {
return nil, fmt.Errorf("failed to update qemu stats from socket %s: %s", socketFile, err.Error())
}
}
vmStats, exists, err := cli.GetDomainStats()
if err != nil {
return nil, fmt.Errorf("failed to update stats from socket %s: %s", socketFile, err.Error())
}
if !exists || vmStats.Name == "" {
return nil, fmt.Errorf("disappearing VM on %s, ignored", socketFile) // VM may be shutting down
}
// GetDomainStats() may hang for a long time.
// If it wakes up past the timeout, there is no point in send back any metric.
// In the best case the information is stale, in the worst case the information is stale *and*
// the reporting channel is already closed, leading to a possible panic - see below
elapsed := time.Now().Sub(ts)
if elapsed > collector.StatsMaxAge {
log.Log.Infof("took too long (%v) to collect stats from %s: ignored", elapsed, socketFile)
return nil, fmt.Errorf("took too long (%v) to collect stats from %s: ignored", elapsed, socketFile)
}
metrics := &api.Metrics{
Metrics: []api.Metric{
metricspkg.MustToUnitlessHostMetric(r.staticHostInfo.HostName, "HostName"),
metricspkg.MustToUnitlessHostMetric(r.staticHostInfo.HostSystemInfo, "HostSystemInfo"),
metricspkg.MustToUnitlessHostMetric(r.staticHostInfo.VirtualizationVendor, "VirtualizationVendor"),
metricspkg.MustToUnitlessHostMetric(version, "VirtProductInfo"),
},
}
metrics.Metrics = append(metrics.Metrics, guestCPUMetrics(vmStats)...)
metrics.Metrics = append(metrics.Metrics, guestMemoryMetrics(vmStats)...)
metrics.Metrics = append(metrics.Metrics, r.hostMetricsCollector.Collect()...)
return metrics, nil
}
func guestCPUMetrics(vmStats *stats.DomainStats) []api.Metric {
var cpuTimeTotal uint64
for _, vcpu := range vmStats.Vcpu {
cpuTimeTotal += vcpu.Time
}
return []api.Metric{
metricspkg.MustToVMMetric(float64(cpuTimeTotal)/float64(1000000000), "TotalCPUTime", "s"),
metricspkg.MustToVMMetric(vmStats.NrVirtCpu, "ResourceProcessorLimit", ""),
}
}
func guestMemoryMetrics(vmStats *stats.DomainStats) []api.Metric {
return []api.Metric{
metricspkg.MustToVMMetric(vmStats.Memory.ActualBalloon, "PhysicalMemoryAllocatedToVirtualSystem", "KiB"),
// Since we don't do active ballooning, ActualBalloon is the same as the memory limit
metricspkg.MustToVMMetric(vmStats.Memory.ActualBalloon, "ResourceMemoryLimit", "KiB"),
}
}
type Collector struct {
concCollector *collector.ConcurrentCollector
}
func NewReporter(nodeName string) *DownwardMetricsReporter {
return &DownwardMetricsReporter{
staticHostInfo: &StaticHostMetrics{
HostName: nodeName,
HostSystemInfo: "linux",
VirtualizationVendor: "kubevirt.io",
},
hostMetricsCollector: defaultHostMetricsCollector(),
}
}
func RunDownwardMetricsCollector(context context.Context, nodeName string, vmiInformer cache.SharedIndexInformer, isolation isolation.PodIsolationDetector) error {
scraper := &Scraper{
isolation: isolation,
reporter: NewReporter(nodeName),
}
collector := collector.NewConcurrentCollector(1)
go func() {
ticker := time.NewTicker(DownwardmetricsRefreshDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cachedObjs := vmiInformer.GetIndexer().List()
if len(cachedObjs) == 0 {
log.Log.V(4).Infof("No VMIs detected")
continue
}
vmis := []*k6sv1.VirtualMachineInstance{}
for _, obj := range cachedObjs {
vmis = append(vmis, obj.(*k6sv1.VirtualMachineInstance))
}
collector.Collect(vmis, scraper, DownwardmetricsCollectionTimeout)
case <-context.Done():
return
}
}
}()
return nil
}