• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 918c2858-31bd-4c0e-ad1a-cde8d851acc7

03 Mar 2026 12:56PM UTC coverage: 71.395% (+0.02%) from 71.38%
918c2858-31bd-4c0e-ad1a-cde8d851acc7

push

prow

web-flow
Merge pull request #16836 from kaizentm/multihypervisor/3-handler

Decouple virt-handler from KVM hypervisor

82 of 356 new or added lines in 14 files covered. (23.03%)

7 existing lines in 4 files now uncovered.

75647 of 105955 relevant lines covered (71.4%)

576.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.49
/pkg/virt-handler/device-manager/socket_device.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package device_manager
21

22
import (
23
        "context"
24
        "errors"
25
        "fmt"
26
        "net"
27
        "os"
28
        "path"
29
        "path/filepath"
30
        "strconv"
31
        "strings"
32
        "sync"
33

34
        "github.com/fsnotify/fsnotify"
35
        "google.golang.org/grpc"
36

37
        "kubevirt.io/client-go/log"
38

39
        "kubevirt.io/kubevirt/pkg/safepath"
40
        "kubevirt.io/kubevirt/pkg/util"
41
        pluginapi "kubevirt.io/kubevirt/pkg/virt-handler/device-manager/deviceplugin/v1beta1"
42
        "kubevirt.io/kubevirt/pkg/virt-handler/selinux"
43
)
44

45
//go:generate mockgen -source $GOFILE -package=$GOPACKAGE -destination=generated_mock_$GOFILE
46

47
type PermissionManager interface {
48
        ChownAtNoFollow(path *safepath.Path, uid, gid int) error
49
}
50

51
type permissionManager struct{}
52

53
func NewPermissionManager() PermissionManager {
×
54
        return &permissionManager{}
×
55
}
×
56

57
func (p *permissionManager) ChownAtNoFollow(path *safepath.Path, uid, gid int) error {
×
58
        return safepath.ChownAtNoFollow(path, uid, gid)
×
59
}
×
60

61
type SocketDevicePlugin struct {
62
        *DevicePluginBase
63
        socketRoot    string
64
        socketDir     string
65
        socket        string
66
        socketName    string
67
        executor      selinux.Executor
68
        p             PermissionManager
69
        healthChecks  bool
70
        hostRootMount string
71
}
72

73
func (dpi *SocketDevicePlugin) Start(stop <-chan struct{}) (err error) {
×
74
        logger := log.DefaultLogger()
×
75
        dpi.stop = stop
×
76

×
77
        err = dpi.cleanup()
×
78
        if err != nil {
×
79
                return err
×
80
        }
×
81

82
        sock, err := net.Listen("unix", dpi.socketPath)
×
83
        if err != nil {
×
84
                return fmt.Errorf("error creating GRPC server socket: %v", err)
×
85
        }
×
86

87
        dpi.server = grpc.NewServer([]grpc.ServerOption{}...)
×
88
        defer dpi.stopDevicePlugin()
×
89

×
90
        pluginapi.RegisterDevicePluginServer(dpi.server, dpi)
×
91

×
92
        errChan := make(chan error, 2)
×
93

×
94
        go func() {
×
95
                errChan <- dpi.server.Serve(sock)
×
96
        }()
×
97

98
        err = waitForGRPCServer(dpi.socketPath, connectionTimeout)
×
99
        if err != nil {
×
100
                return fmt.Errorf("error starting the GRPC server: %v", err)
×
101
        }
×
102

103
        err = dpi.register()
×
104
        if err != nil {
×
105
                return fmt.Errorf("error registering with device plugin manager: %v", err)
×
106
        }
×
107

108
        go func() {
×
109
                errChan <- dpi.healthCheck()
×
110
        }()
×
111

112
        dpi.setInitialized(true)
×
113
        logger.Infof("%s device plugin started", dpi.resourceName)
×
114
        err = <-errChan
×
115

×
116
        return err
×
117
}
118

119
func (dpi *SocketDevicePlugin) setSocketPermissions() error {
6✔
120
        if dpi.p == nil {
6✔
121
                return nil
×
122
        }
×
123
        prSock, err := safepath.JoinAndResolveWithRelativeRoot(dpi.socketRoot, dpi.socketDir, dpi.socket)
6✔
124
        if err != nil {
6✔
125
                return fmt.Errorf("error opening the socket %s: %v", path.Join(dpi.socketRoot, dpi.socketDir, dpi.socket), err)
×
126
        }
×
127
        err = dpi.p.ChownAtNoFollow(prSock, util.NonRootUID, util.NonRootUID)
6✔
128
        if err != nil {
6✔
129
                return fmt.Errorf("error setting the permission the socket %s: %v", path.Join(dpi.socketRoot, dpi.socketDir, dpi.socket), err)
×
130
        }
×
131
        if se, exists, err := dpi.executor.NewSELinux(); err == nil && exists {
12✔
132
                if err := selinux.RelabelFilesUnprivileged(se.IsPermissive(), prSock); err != nil {
6✔
133
                        return fmt.Errorf("error relabeling required files: %v", err)
×
134
                }
×
135
        } else if err != nil {
×
136
                return fmt.Errorf("failed to detect the presence of selinux: %v", err)
×
137
        }
×
138

139
        return nil
6✔
140
}
141

142
func (dpi *SocketDevicePlugin) setSocketDirectoryPermissions() error {
6✔
143
        if dpi.p == nil {
6✔
144
                return nil
×
145
        }
×
146
        dir, err := safepath.JoinAndResolveWithRelativeRoot(dpi.socketRoot, dpi.socketDir)
6✔
147
        log.DefaultLogger().Infof("setting socket directory permissions for %s", path.Join(dpi.socketRoot, dpi.socketDir))
6✔
148
        if err != nil {
6✔
149
                return fmt.Errorf("error opening the socket dir %s: %v", path.Join(dpi.socketRoot, dpi.socketDir), err)
×
150
        }
×
151
        err = dpi.p.ChownAtNoFollow(dir, util.NonRootUID, util.NonRootUID)
6✔
152
        if err != nil {
6✔
153
                return fmt.Errorf("error setting the permission the socket dir %s: %v", path.Join(dpi.socketRoot, dpi.socketDir), err)
×
154
        }
×
155
        if se, exists, err := dpi.executor.NewSELinux(); err == nil && exists {
12✔
156
                if err := selinux.RelabelFilesUnprivileged(se.IsPermissive(), dir); err != nil {
6✔
157
                        return fmt.Errorf("error relabeling required files: %v", err)
×
158
                }
×
159
        } else if err != nil {
×
160
                return fmt.Errorf("failed to detect the presence of selinux: %v", err)
×
161
        }
×
162

163
        return nil
6✔
164
}
165

166
func NewSocketDevicePlugin(socketName, socketDir, socket string, maxDevices int, executor selinux.Executor, p PermissionManager, useHostRootMount bool) (*SocketDevicePlugin, error) {
5✔
167
        socketRoot := "/"
5✔
168
        if useHostRootMount {
5✔
169
                socketRoot = util.HostRootMount
×
170
        }
×
171
        dpi := &SocketDevicePlugin{
5✔
172
                DevicePluginBase: &DevicePluginBase{
5✔
173
                        health:       make(chan deviceHealth),
5✔
174
                        resourceName: fmt.Sprintf("%s/%s", DeviceNamespace, socketName),
5✔
175
                        initialized:  false,
5✔
176
                        lock:         &sync.Mutex{},
5✔
177
                        done:         make(chan struct{}),
5✔
178
                        deregistered: make(chan struct{}),
5✔
179
                        socketPath:   SocketPath(strings.Replace(socketName, "/", "-", -1)),
5✔
180
                },
5✔
181
                socketRoot:   socketRoot,
5✔
182
                socket:       socket,
5✔
183
                socketDir:    socketDir,
5✔
184
                socketName:   socketName,
5✔
185
                executor:     executor,
5✔
186
                p:            p,
5✔
187
                healthChecks: true,
5✔
188
        }
5✔
189

5✔
190
        for i := 0; i < maxDevices; i++ {
10✔
191
                deviceId := dpi.socketName + strconv.Itoa(i)
5✔
192
                dpi.devs = append(dpi.devs, &pluginapi.Device{
5✔
193
                        ID:     deviceId,
5✔
194
                        Health: pluginapi.Healthy,
5✔
195
                })
5✔
196
        }
5✔
197
        if err := dpi.setSocketDirectoryPermissions(); err != nil {
5✔
198
                return dpi, err
×
199
        }
×
200
        if err := dpi.setSocketPermissions(); err != nil {
5✔
201
                return dpi, err
×
202
        }
×
203

204
        return dpi, nil
5✔
205
}
206

207
// NewOptionalSocketDevicePlugin creates a SocketDevicePlugin where health checks are disabled (so device is always healthy)
208
func NewOptionalSocketDevicePlugin(socketName, socketDir, socket string, maxDevices int, executor selinux.Executor, p PermissionManager, useHostRootMount bool) *SocketDevicePlugin {
3✔
209
        dpi, _ := NewSocketDevicePlugin(socketName, socketDir, socket, maxDevices, executor, p, useHostRootMount)
3✔
210
        dpi.healthChecks = false
3✔
211
        return dpi
3✔
212
}
3✔
213

214
// Register registers the device plugin for the given resourceName with Kubelet.
215
func (dpi *SocketDevicePlugin) register() error {
×
216
        conn, err := gRPCConnect(pluginapi.KubeletSocket, connectionTimeout)
×
217
        if err != nil {
×
218
                return err
×
219
        }
×
220
        defer conn.Close()
×
221

×
222
        client := pluginapi.NewRegistrationClient(conn)
×
223
        reqt := &pluginapi.RegisterRequest{
×
224
                Version:      pluginapi.Version,
×
225
                Endpoint:     path.Base(dpi.socketPath),
×
226
                ResourceName: dpi.resourceName,
×
227
        }
×
228

×
229
        _, err = client.Register(context.Background(), reqt)
×
230
        if err != nil {
×
231
                return err
×
232
        }
×
233
        return nil
×
234
}
235

236
func (dpi *SocketDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
×
237
        log.DefaultLogger().Infof("Socket Allocate: resourceName: %s", dpi.socketName)
×
238
        log.DefaultLogger().Infof("Socket Allocate: request: %v", r.ContainerRequests)
×
239
        response := pluginapi.AllocateResponse{}
×
240
        containerResponse := new(pluginapi.ContainerAllocateResponse)
×
241

×
242
        m := new(pluginapi.Mount)
×
243
        m.HostPath = dpi.socketDir
×
244
        m.ContainerPath = dpi.socketDir
×
245
        m.ReadOnly = false
×
246
        containerResponse.Mounts = []*pluginapi.Mount{m}
×
247

×
248
        response.ContainerResponses = []*pluginapi.ContainerAllocateResponse{containerResponse}
×
249

×
250
        return &response, nil
×
251
}
×
252

253
func (dpi *SocketDevicePlugin) sendHealthUpdate(healthy bool) {
3✔
254
        if !dpi.healthChecks {
4✔
255
                return
1✔
256
        }
1✔
257
        if healthy {
3✔
258
                dpi.health <- deviceHealth{Health: pluginapi.Healthy}
1✔
259
        } else {
2✔
260
                dpi.health <- deviceHealth{Health: pluginapi.Unhealthy}
1✔
261
        }
1✔
262
}
263

264
func (dpi *SocketDevicePlugin) healthCheck() error {
4✔
265
        logger := log.DefaultLogger()
4✔
266
        watcher, err := fsnotify.NewWatcher()
4✔
267
        if err != nil {
4✔
268
                return fmt.Errorf("failed to creating a fsnotify watcher: %v", err)
×
269
        }
×
270
        defer watcher.Close()
4✔
271

4✔
272
        deviceDir := filepath.Join(dpi.socketRoot, dpi.socketDir)
4✔
273
        devicePath := filepath.Join(deviceDir, dpi.socket)
4✔
274

4✔
275
        // Start watching the files before we check for their existence to avoid races
4✔
276
        err = watcher.Add(deviceDir)
4✔
277
        if err != nil {
4✔
278
                return fmt.Errorf("failed to add the device root path to the watcher: %v", err)
×
279
        }
×
280

281
        _, err = os.Stat(devicePath)
4✔
282
        if err != nil {
6✔
283
                if !errors.Is(err, os.ErrNotExist) {
2✔
284
                        return fmt.Errorf("could not stat the device: %v", err)
×
285
                }
×
286
                logger.Warningf("device '%s' is not present, the device plugin can't expose it.", dpi.socketName)
2✔
287
                dpi.sendHealthUpdate(false)
2✔
288
        }
289
        logger.Infof("device '%s' is present.", devicePath)
4✔
290

4✔
291
        err = watcher.Add(deviceDir)
4✔
292

4✔
293
        if err != nil {
4✔
294
                return fmt.Errorf("failed to add the device-plugin kubelet path to the watcher: %v", err)
×
295
        }
×
296
        _, err = os.Stat(dpi.socketPath)
4✔
297
        if err != nil {
4✔
298
                return fmt.Errorf("failed to stat the device-plugin socket: %v", err)
×
299
        }
×
300

301
        for {
9✔
302
                select {
5✔
303
                case <-dpi.stop:
2✔
304
                        return nil
2✔
305
                case err := <-watcher.Errors:
×
306
                        logger.Reason(err).Errorf("error watching devices and device plugin directory")
×
307
                case event := <-watcher.Events:
3✔
308
                        logger.V(4).Infof("health Event: %v", event)
3✔
309
                        if event.Name == devicePath && dpi.healthChecks {
4✔
310
                                // Health in this case is if the device path actually exists
1✔
311
                                if event.Op == fsnotify.Create {
2✔
312
                                        logger.Infof("monitored device %s appeared", dpi.socketName)
1✔
313
                                        dpi.sendHealthUpdate(true)
1✔
314
                                        if err := dpi.setSocketDirectoryPermissions(); err != nil {
1✔
315
                                                logger.Warningf("failed to set directory permissions for socket device %s", dpi.socketName)
×
316
                                        }
×
317
                                        if err := dpi.setSocketPermissions(); err != nil {
1✔
318
                                                logger.Warningf("failed to set socket permissions for socket device %s", dpi.socketName)
×
319
                                        }
×
UNCOV
320
                                } else if (event.Op == fsnotify.Remove) || (event.Op == fsnotify.Rename) {
×
UNCOV
321
                                        logger.Infof("monitored device %s disappeared", dpi.socketName)
×
UNCOV
322
                                        dpi.sendHealthUpdate(false)
×
UNCOV
323
                                }
×
324
                        } else if event.Name == dpi.socketPath && event.Op == fsnotify.Remove {
4✔
325
                                logger.Infof("device socket file for device %s was removed, kubelet probably restarted.", dpi.socketName)
2✔
326
                                return nil
2✔
327
                        }
2✔
328
                }
329
        }
330
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc