• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 4f8174e0-53f0-4422-a99a-485ac038cff0

18 Feb 2025 01:55AM UTC coverage: 71.728% (+0.01%) from 71.718%
4f8174e0-53f0-4422-a99a-485ac038cff0

push

prow

web-flow
Merge pull request #13943 from orelmisan/test-rm-cnao

test.sh: Remove leftover reference to cnao lanes

62294 of 86848 relevant lines covered (71.73%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.3
/pkg/virt-handler/node-labeller/node_labeller.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright 2021 Red Hat, Inc.
17
 *
18
 */
19

20
package nodelabeller
21

22
import (
23
        "context"
24
        "fmt"
25
        "os/exec"
26
        "runtime"
27
        "strings"
28
        "time"
29

30
        "k8s.io/client-go/tools/record"
31
        "libvirt.org/go/libvirtxml"
32

33
        v1 "k8s.io/api/core/v1"
34
        "k8s.io/apimachinery/pkg/api/equality"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/types"
37
        "k8s.io/apimachinery/pkg/util/wait"
38
        k8scli "k8s.io/client-go/kubernetes/typed/core/v1"
39
        "k8s.io/client-go/util/workqueue"
40

41
        kubevirtv1 "kubevirt.io/api/core/v1"
42
        "kubevirt.io/client-go/log"
43

44
        "kubevirt.io/kubevirt/pkg/apimachinery/patch"
45
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
46
)
47

48
var nodeLabellerLabels = []string{
49
        kubevirtv1.CPUFeatureLabel,
50
        kubevirtv1.CPUModelLabel,
51
        kubevirtv1.SupportedHostModelMigrationCPU,
52
        kubevirtv1.CPUTimerLabel,
53
        kubevirtv1.HypervLabel,
54
        kubevirtv1.RealtimeLabel,
55
        kubevirtv1.SEVLabel,
56
        kubevirtv1.SEVESLabel,
57
        kubevirtv1.HostModelCPULabel,
58
        kubevirtv1.HostModelRequiredFeaturesLabel,
59
        kubevirtv1.NodeHostModelIsObsoleteLabel,
60
        kubevirtv1.SupportedMachineTypeLabel,
61
}
62

63
// NodeLabeller struct holds information needed to run node-labeller
64
type NodeLabeller struct {
65
        recorder                record.EventRecorder
66
        nodeClient              k8scli.NodeInterface
67
        host                    string
68
        logger                  *log.FilteredLogger
69
        clusterConfig           *virtconfig.ClusterConfig
70
        hypervFeatures          supportedFeatures
71
        hostCapabilities        supportedFeatures
72
        queue                   workqueue.TypedRateLimitingInterface[string]
73
        supportedFeatures       []string
74
        cpuModelVendor          string
75
        volumePath              string
76
        domCapabilitiesFileName string
77
        cpuCounter              *libvirtxml.CapsHostCPUCounter
78
        guestCaps               []libvirtxml.CapsGuest
79
        hostCPUModel            hostCPUModel
80
        SEV                     SEVConfiguration
81
        arch                    string
82
}
83

84
func NewNodeLabeller(clusterConfig *virtconfig.ClusterConfig, nodeClient k8scli.NodeInterface, host string, recorder record.EventRecorder, cpuCounter *libvirtxml.CapsHostCPUCounter, guestCaps []libvirtxml.CapsGuest) (*NodeLabeller, error) {
×
85
        return newNodeLabeller(clusterConfig, nodeClient, host, NodeLabellerVolumePath, recorder, cpuCounter, guestCaps)
×
86

×
87
}
×
88
func newNodeLabeller(clusterConfig *virtconfig.ClusterConfig, nodeClient k8scli.NodeInterface, host, volumePath string, recorder record.EventRecorder, cpuCounter *libvirtxml.CapsHostCPUCounter, guestCaps []libvirtxml.CapsGuest) (*NodeLabeller, error) {
1✔
89
        n := &NodeLabeller{
1✔
90
                recorder:      recorder,
1✔
91
                nodeClient:    nodeClient,
1✔
92
                host:          host,
1✔
93
                logger:        log.DefaultLogger(),
1✔
94
                clusterConfig: clusterConfig,
1✔
95
                queue: workqueue.NewTypedRateLimitingQueueWithConfig[string](
1✔
96
                        workqueue.DefaultTypedControllerRateLimiter[string](),
1✔
97
                        workqueue.TypedRateLimitingQueueConfig[string]{Name: "virt-handler-node-labeller"},
1✔
98
                ),
1✔
99
                volumePath:              volumePath,
1✔
100
                domCapabilitiesFileName: "virsh_domcapabilities.xml",
1✔
101
                cpuCounter:              cpuCounter,
1✔
102
                guestCaps:               guestCaps,
1✔
103
                hostCPUModel:            hostCPUModel{requiredFeatures: make(map[string]bool, 0)},
1✔
104
                arch:                    runtime.GOARCH,
1✔
105
        }
1✔
106

1✔
107
        err := n.loadAll()
1✔
108
        if err != nil {
1✔
109
                return n, err
×
110
        }
×
111
        return n, nil
1✔
112
}
113

114
// Run runs node-labeller
115
func (n *NodeLabeller) Run(threadiness int, stop chan struct{}) {
×
116
        defer n.queue.ShutDown()
×
117

×
118
        n.logger.Infof("node-labeller is running")
×
119

×
120
        if !n.hasTSCCounter() {
×
121
                n.logger.Error("failed to get tsc cpu frequency, will continue without the tsc frequency label")
×
122
        }
×
123

124
        n.clusterConfig.SetConfigModifiedCallback(func() {
×
125
                n.queue.Add(n.host)
×
126
        })
×
127

128
        interval := 3 * time.Minute
×
129
        go wait.JitterUntil(func() { n.queue.Add(n.host) }, interval, 1.2, true, stop)
×
130

131
        for i := 0; i < threadiness; i++ {
×
132
                go wait.Until(n.runWorker, time.Second, stop)
×
133
        }
×
134
        <-stop
×
135
}
136

137
func (n *NodeLabeller) runWorker() {
×
138
        for n.execute() {
×
139
        }
×
140
}
141

142
func (n *NodeLabeller) execute() bool {
1✔
143
        key, quit := n.queue.Get()
1✔
144
        if quit {
1✔
145
                return false
×
146
        }
×
147
        defer n.queue.Done(key)
1✔
148

1✔
149
        err := n.run()
1✔
150

1✔
151
        if err != nil {
2✔
152
                n.logger.Errorf("node-labeller sync error encountered: %v", err)
1✔
153
                n.queue.AddRateLimited(key)
1✔
154
        } else {
2✔
155
                n.queue.Forget(key)
1✔
156
        }
1✔
157
        return true
1✔
158
}
159

160
func (n *NodeLabeller) loadAll() error {
1✔
161
        // host supported features is only available on AMD64 and S390X nodes.
1✔
162
        // This is because hypervisor-cpu-baseline virsh command doesnt work for ARM64 architecture.
1✔
163
        if virtconfig.IsAMD64(n.arch) || virtconfig.IsS390X(n.arch) {
2✔
164
                err := n.loadHostSupportedFeatures()
1✔
165
                if err != nil {
1✔
166
                        n.logger.Errorf("node-labeller could not load supported features: " + err.Error())
×
167
                        return err
×
168
                }
×
169
        }
170

171
        err := n.loadDomCapabilities()
1✔
172
        if err != nil {
1✔
173
                n.logger.Errorf("node-labeller could not load host dom capabilities: " + err.Error())
×
174
                return err
×
175
        }
×
176

177
        n.loadHypervFeatures()
1✔
178

1✔
179
        return nil
1✔
180
}
181

182
func (n *NodeLabeller) run() error {
1✔
183
        obsoleteCPUsx86 := n.clusterConfig.GetObsoleteCPUModels()
1✔
184
        cpuModels := n.getSupportedCpuModels(obsoleteCPUsx86)
1✔
185
        cpuFeatures := n.getSupportedCpuFeatures()
1✔
186
        hostCPUModel := n.GetHostCpuModel()
1✔
187

1✔
188
        originalNode, err := n.nodeClient.Get(context.Background(), n.host, metav1.GetOptions{})
1✔
189
        if err != nil {
1✔
190
                return err
×
191
        }
×
192

193
        node := originalNode.DeepCopy()
1✔
194

1✔
195
        if !skipNodeLabelling(node) {
2✔
196
                //prepare new labels
1✔
197
                newLabels := n.prepareLabels(node, cpuModels, cpuFeatures, hostCPUModel, obsoleteCPUsx86)
1✔
198
                //remove old labeller labels
1✔
199
                n.removeLabellerLabels(node)
1✔
200
                //add new labels
1✔
201
                n.addLabellerLabels(node, newLabels)
1✔
202
        }
1✔
203

204
        err = n.patchNode(originalNode, node)
1✔
205

1✔
206
        return err
1✔
207
}
208

209
func skipNodeLabelling(node *v1.Node) bool {
1✔
210
        _, exists := node.Annotations[kubevirtv1.LabellerSkipNodeAnnotation]
1✔
211
        return exists
1✔
212
}
1✔
213

214
func (n *NodeLabeller) patchNode(originalNode, node *v1.Node) error {
1✔
215
        if equality.Semantic.DeepEqual(originalNode.Labels, node.Labels) {
2✔
216
                return nil
1✔
217
        }
1✔
218

219
        patchBytes, err := patch.New(
1✔
220
                patch.WithTest("/metadata/labels", originalNode.Labels),
1✔
221
                patch.WithReplace("/metadata/labels", node.Labels),
1✔
222
        ).GeneratePayload()
1✔
223

1✔
224
        if err != nil {
1✔
225
                return err
×
226
        }
×
227

228
        _, err = n.nodeClient.Patch(context.Background(), node.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{})
1✔
229
        return err
1✔
230
}
231

232
func (n *NodeLabeller) loadHypervFeatures() {
1✔
233
        n.hypervFeatures.items = getCapLabels()
1✔
234
}
1✔
235

236
// prepareLabels converts cpu models, features, hyperv features to map[string]string format
237
// e.g. "cpu-feature.node.kubevirt.io/Penryn": "true"
238
func (n *NodeLabeller) prepareLabels(node *v1.Node, cpuModels []string, cpuFeatures cpuFeatures, hostCpuModel hostCPUModel, obsoleteCPUsx86 map[string]bool) map[string]string {
1✔
239
        newLabels := make(map[string]string)
1✔
240
        for key := range cpuFeatures {
2✔
241
                newLabels[kubevirtv1.CPUFeatureLabel+key] = "true"
1✔
242
        }
1✔
243

244
        for _, value := range cpuModels {
2✔
245
                newLabels[kubevirtv1.CPUModelLabel+value] = "true"
1✔
246
                newLabels[kubevirtv1.SupportedHostModelMigrationCPU+value] = "true"
1✔
247
        }
1✔
248

249
        // Add labels for supported machine types
250
        machines := n.getSupportedMachines()
1✔
251

1✔
252
        for _, machine := range machines {
2✔
253
                labelKey := kubevirtv1.SupportedMachineTypeLabel + machine.Name
1✔
254
                newLabels[labelKey] = "true"
1✔
255
        }
1✔
256

257
        if _, hostModelObsolete := obsoleteCPUsx86[hostCpuModel.Name]; !hostModelObsolete {
2✔
258
                newLabels[kubevirtv1.SupportedHostModelMigrationCPU+hostCpuModel.Name] = "true"
1✔
259
        }
1✔
260

261
        for _, key := range n.hypervFeatures.items {
1✔
262
                newLabels[kubevirtv1.HypervLabel+key] = "true"
×
263
        }
×
264

265
        if n.hasTSCCounter() {
2✔
266
                newLabels[kubevirtv1.CPUTimerLabel+"tsc-frequency"] = fmt.Sprintf("%d", n.cpuCounter.Frequency)
1✔
267
                newLabels[kubevirtv1.CPUTimerLabel+"tsc-scalable"] = fmt.Sprintf("%t", n.cpuCounter.Scaling == "yes")
1✔
268
        }
1✔
269

270
        for feature := range hostCpuModel.requiredFeatures {
2✔
271
                newLabels[kubevirtv1.HostModelRequiredFeaturesLabel+feature] = "true"
1✔
272
        }
1✔
273
        if _, obsolete := obsoleteCPUsx86[hostCpuModel.Name]; obsolete {
2✔
274
                newLabels[kubevirtv1.NodeHostModelIsObsoleteLabel] = "true"
1✔
275
                err := n.alertIfHostModelIsObsolete(node, hostCpuModel.Name, obsoleteCPUsx86)
1✔
276
                if err != nil {
1✔
277
                        n.logger.Reason(err).Error(err.Error())
×
278
                }
×
279
        }
280

281
        newLabels[kubevirtv1.CPUModelVendorLabel+n.cpuModelVendor] = "true"
1✔
282
        newLabels[kubevirtv1.HostModelCPULabel+hostCpuModel.Name] = "true"
1✔
283

1✔
284
        capable, err := isNodeRealtimeCapable()
1✔
285
        if err != nil {
2✔
286
                n.logger.Reason(err).Error("failed to identify if a node is capable of running realtime workloads")
1✔
287
        }
1✔
288
        if capable {
1✔
289
                newLabels[kubevirtv1.RealtimeLabel] = ""
×
290
        }
×
291

292
        if n.SEV.Supported == "yes" {
2✔
293
                newLabels[kubevirtv1.SEVLabel] = ""
1✔
294
        }
1✔
295

296
        if n.SEV.SupportedES == "yes" {
2✔
297
                newLabels[kubevirtv1.SEVESLabel] = ""
1✔
298
        }
1✔
299

300
        return newLabels
1✔
301
}
302

303
// addNodeLabels adds labels to node.
304
func (n *NodeLabeller) addLabellerLabels(node *v1.Node, labels map[string]string) {
1✔
305
        for key, value := range labels {
2✔
306
                node.Labels[key] = value
1✔
307
        }
1✔
308
}
309

310
// removeLabellerLabels removes labels from node
311
func (n *NodeLabeller) removeLabellerLabels(node *v1.Node) {
1✔
312
        for label := range node.Labels {
2✔
313
                if isNodeLabellerLabel(label) {
2✔
314
                        delete(node.Labels, label)
1✔
315
                }
1✔
316
        }
317
}
318

319
const kernelSchedRealtimeRuntimeInMicrosecods = "kernel.sched_rt_runtime_us"
320

321
// isNodeRealtimeCapable Checks if a node is capable of running realtime workloads. Currently by validating if the kernel system setting value
322
// for `kernel.sched_rt_runtime_us` is set to allow running realtime scheduling with unlimited time (==-1)
323
// TODO: This part should be improved to validate against key attributes that determine best if a host is able to run realtime
324
// workloads at peak performance.
325

326
func isNodeRealtimeCapable() (bool, error) {
1✔
327
        ret, err := exec.Command("sysctl", kernelSchedRealtimeRuntimeInMicrosecods).CombinedOutput()
1✔
328
        if err != nil {
2✔
329
                return false, err
1✔
330
        }
1✔
331
        st := strings.Trim(string(ret), "\n")
×
332
        return fmt.Sprintf("%s = -1", kernelSchedRealtimeRuntimeInMicrosecods) == st, nil
×
333
}
334

335
func isNodeLabellerLabel(label string) bool {
1✔
336
        for _, prefix := range nodeLabellerLabels {
2✔
337
                if strings.HasPrefix(label, prefix) {
2✔
338
                        return true
1✔
339
                }
1✔
340
        }
341

342
        return false
1✔
343
}
344

345
func (n *NodeLabeller) alertIfHostModelIsObsolete(originalNode *v1.Node, hostModel string, ObsoleteCPUModels map[string]bool) error {
1✔
346
        warningMsg := fmt.Sprintf("This node has %v host-model cpu that is included in ObsoleteCPUModels: %v", hostModel, ObsoleteCPUModels)
1✔
347
        n.recorder.Eventf(originalNode, v1.EventTypeWarning, "HostModelIsObsolete", warningMsg)
1✔
348
        return nil
1✔
349
}
1✔
350

351
func (n *NodeLabeller) hasTSCCounter() bool {
1✔
352
        return n.cpuCounter != nil && n.cpuCounter.Name == "tsc"
1✔
353
}
1✔
354

355
func (n *NodeLabeller) getSupportedMachines() []libvirtxml.CapsGuestMachine {
1✔
356
        var supportedMachines []libvirtxml.CapsGuestMachine
1✔
357
        for _, guest := range n.guestCaps {
2✔
358
                supportedMachines = append(supportedMachines, guest.Arch.Machines...)
1✔
359
        }
1✔
360
        return supportedMachines
1✔
361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc