• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / d81f3799-2d0d-4f91-b833-b0d10a101bab

16 Nov 2025 03:02PM UTC coverage: 70.444% (+0.05%) from 70.392%
d81f3799-2d0d-4f91-b833-b0d10a101bab

push

prow

web-flow
Merge pull request #15922 from ShellyKa13/utility-volumes

VEP 90: Add new Utility volumes type in VMI spec

384 of 440 new or added lines in 14 files covered. (87.27%)

23 existing lines in 5 files now uncovered.

70115 of 99533 relevant lines covered (70.44%)

434.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.61
/pkg/virt-handler/migration-proxy/migration-proxy.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package migrationproxy
21

22
import (
23
        "crypto/tls"
24
        "fmt"
25
        "io"
26
        "net"
27
        "os"
28
        "path/filepath"
29
        "strconv"
30
        "strings"
31
        "sync"
32

33
        "kubevirt.io/client-go/log"
34

35
        diskutils "kubevirt.io/kubevirt/pkg/ephemeral-disk-utils"
36
        "kubevirt.io/kubevirt/pkg/util"
37
        "kubevirt.io/kubevirt/pkg/util/net/ip"
38
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
39
)
40

41
const (
42
        LibvirtDirectMigrationPort = 49152
43
        LibvirtBlockMigrationPort  = 49153
44
)
45

46
var migrationPortsRange = []int{LibvirtDirectMigrationPort, LibvirtBlockMigrationPort}
47

48
type ProxyManager interface {
49
        StartTargetListener(key string, targetUnixFiles []string) error
50
        GetTargetListenerPorts(key string) map[string]int
51
        StopTargetListener(key string)
52

53
        StartSourceListener(key string, targetAddress string, destSrcPortMap map[string]int, baseDir string) error
54
        GetSourceListenerFiles(key string) []string
55
        StopSourceListener(key string)
56

57
        OpenListenerCount() int
58

59
        InitiateGracefulShutdown()
60
}
61

62
type migrationProxyManager struct {
63
        sourceProxies   map[string][]*migrationProxy
64
        targetProxies   map[string][]*migrationProxy
65
        managerLock     sync.Mutex
66
        serverTLSConfig *tls.Config
67
        clientTLSConfig *tls.Config
68

69
        isShuttingDown bool
70
        config         *virtconfig.ClusterConfig
71
}
72

73
type MigrationProxyListener interface {
74
        Start() error
75
        Stop()
76
}
77

78
type migrationProxy struct {
79
        unixSocketPath string
80
        tcpBindAddress string
81
        tcpBindPort    int
82
        targetAddress  string
83
        targetProtocol string
84
        stopChan       chan struct{}
85
        listenErrChan  chan error
86
        fdChan         chan net.Conn
87

88
        listener        net.Listener
89
        serverTLSConfig *tls.Config
90
        clientTLSConfig *tls.Config
91

92
        logger *log.FilteredLogger
93
}
94

95
func (m *migrationProxyManager) InitiateGracefulShutdown() {
2✔
96
        m.managerLock.Lock()
2✔
97
        defer m.managerLock.Unlock()
2✔
98

2✔
99
        m.isShuttingDown = true
2✔
100
}
2✔
101

102
func (m *migrationProxyManager) OpenListenerCount() int {
2✔
103
        m.managerLock.Lock()
2✔
104
        defer m.managerLock.Unlock()
2✔
105

2✔
106
        return len(m.sourceProxies) + len(m.targetProxies)
2✔
107
}
2✔
108

109
func GetMigrationPortsList(isBlockMigration bool) (ports []int) {
4✔
110
        ports = append(ports, migrationPortsRange[0])
4✔
111
        if isBlockMigration {
4✔
112
                ports = append(ports, migrationPortsRange[1])
×
113
        }
×
114
        return
4✔
115
}
116

117
func NewMigrationProxyManager(serverTLSConfig *tls.Config, clientTLSConfig *tls.Config, config *virtconfig.ClusterConfig) ProxyManager {
148✔
118
        return &migrationProxyManager{
148✔
119
                sourceProxies:   make(map[string][]*migrationProxy),
148✔
120
                targetProxies:   make(map[string][]*migrationProxy),
148✔
121
                serverTLSConfig: serverTLSConfig,
148✔
122
                clientTLSConfig: clientTLSConfig,
148✔
123
                config:          config,
148✔
124
        }
148✔
125
}
148✔
126

127
func SourceUnixFile(baseDir string, key string) string {
18✔
128
        return filepath.Join(baseDir, "migrationproxy", key+"-source.sock")
18✔
129
}
18✔
130

131
func (m *migrationProxyManager) StartTargetListener(key string, targetUnixFiles []string) error {
9✔
132
        m.managerLock.Lock()
9✔
133
        defer m.managerLock.Unlock()
9✔
134

9✔
135
        if m.isShuttingDown {
11✔
136
                return fmt.Errorf("unable to process new migration connections during virt-handler shutdown")
2✔
137
        }
2✔
138

139
        isExistingProxy := func(curProxies []*migrationProxy, targetUnixFiles []string) bool {
8✔
140
                // make sure that all elements in the existing proxy match to the provided targetUnixFiles
1✔
141
                if len(curProxies) != len(targetUnixFiles) {
1✔
142
                        return false
×
143
                }
×
144
                existingSocketFiles := make(map[string]bool)
1✔
145
                for _, file := range targetUnixFiles {
3✔
146
                        existingSocketFiles[file] = true
2✔
147
                }
2✔
148
                for _, curProxy := range curProxies {
3✔
149
                        if _, ok := existingSocketFiles[curProxy.targetAddress]; !ok {
2✔
150
                                return false
×
151
                        }
×
152
                }
153
                return true
1✔
154
        }
155
        curProxies, exists := m.targetProxies[key]
7✔
156

7✔
157
        if exists {
8✔
158
                if isExistingProxy(curProxies, targetUnixFiles) {
2✔
159
                        // No Op, already exists
1✔
160
                        return nil
1✔
161
                } else {
1✔
162
                        // stop the current proxy and point it somewhere new.
×
163
                        for _, curProxy := range curProxies {
×
164
                                curProxy.logger.Infof("Manager stopping proxy on target node due to new unix filepath location")
×
165
                                curProxy.Stop()
×
166
                        }
×
167
                }
168
        }
169

170
        zeroAddress := ip.GetIPZeroAddress()
6✔
171
        proxiesList := []*migrationProxy{}
6✔
172
        serverTLSConfig := m.serverTLSConfig
6✔
173
        clientTLSConfig := m.clientTLSConfig
6✔
174
        if m.config.GetMigrationConfiguration().DisableTLS != nil && *m.config.GetMigrationConfiguration().DisableTLS {
8✔
175
                serverTLSConfig = nil
2✔
176
                clientTLSConfig = nil
2✔
177
        }
2✔
178
        for _, targetUnixFile := range targetUnixFiles {
18✔
179
                // 0 means random port is used
12✔
180
                proxy := NewTargetProxy(zeroAddress, 0, serverTLSConfig, clientTLSConfig, targetUnixFile, key)
12✔
181

12✔
182
                err := proxy.Start()
12✔
183
                if err != nil {
12✔
184
                        proxy.Stop()
×
185
                        // close all already created proxies for this key
×
186
                        for _, curProxy := range proxiesList {
×
187
                                curProxy.Stop()
×
188
                        }
×
189
                        return err
×
190
                }
191
                proxiesList = append(proxiesList, proxy)
12✔
192
                proxy.logger.Infof("Manager created proxy on target")
12✔
193
        }
194
        m.targetProxies[key] = proxiesList
6✔
195
        return nil
6✔
196
}
197

198
func (m *migrationProxyManager) GetSourceListenerFiles(key string) []string {
2✔
199
        m.managerLock.Lock()
2✔
200
        defer m.managerLock.Unlock()
2✔
201

2✔
202
        curProxies, exists := m.sourceProxies[key]
2✔
203
        socketsList := []string{}
2✔
204
        if exists {
4✔
205
                for _, curProxy := range curProxies {
6✔
206
                        socketsList = append(socketsList, curProxy.unixSocketPath)
4✔
207
                }
4✔
208
        }
209
        return socketsList
2✔
210
}
211

212
func ConstructProxyKey(id string, port int) string {
39✔
213
        key := id
39✔
214
        if port != 0 {
74✔
215
                key += fmt.Sprintf("-%d", port)
35✔
216
        }
35✔
217
        return key
39✔
218
}
219

220
func (m *migrationProxyManager) GetTargetListenerPorts(key string) map[string]int {
11✔
221
        m.managerLock.Lock()
11✔
222
        defer m.managerLock.Unlock()
11✔
223

11✔
224
        getPortFromSocket := func(id string, path string) int {
25✔
225
                for _, port := range migrationPortsRange {
35✔
226
                        key := ConstructProxyKey(id, port)
21✔
227
                        if strings.Contains(path, key) {
28✔
228
                                return port
7✔
229
                        }
7✔
230
                }
231
                return 0
7✔
232
        }
233

234
        curProxies, exists := m.targetProxies[key]
11✔
235
        targetSrcPortMap := make(map[string]int)
11✔
236

11✔
237
        if exists {
18✔
238
                for _, curProxy := range curProxies {
21✔
239
                        port := strconv.Itoa(curProxy.tcpBindPort)
14✔
240
                        targetSrcPortMap[port] = getPortFromSocket(key, curProxy.targetAddress)
14✔
241
                }
14✔
242
        }
243
        return targetSrcPortMap
11✔
244
}
245

246
func (m *migrationProxyManager) StopTargetListener(key string) {
10✔
247
        m.managerLock.Lock()
10✔
248
        defer m.managerLock.Unlock()
10✔
249

10✔
250
        curProxies, exists := m.targetProxies[key]
10✔
251
        if exists {
12✔
252
                for _, curProxy := range curProxies {
6✔
253
                        curProxy.logger.Info("Manager stopping proxy on target node")
4✔
254
                        curProxy.Stop()
4✔
255
                        delete(m.targetProxies, key)
4✔
256
                }
4✔
257
        }
258
}
259

260
func (m *migrationProxyManager) StartSourceListener(key string, targetAddress string, destSrcPortMap map[string]int, baseDir string) error {
12✔
261
        m.managerLock.Lock()
12✔
262
        defer m.managerLock.Unlock()
12✔
263

12✔
264
        if m.isShuttingDown {
14✔
265
                return fmt.Errorf("unable to process new migration connections during virt-handler shutdown")
2✔
266
        }
2✔
267

268
        isExistingProxy := func(curProxies []*migrationProxy, targetAddress string, destSrcPortMap map[string]int) bool {
10✔
269
                if len(curProxies) != len(destSrcPortMap) {
×
270
                        return false
×
271
                }
×
272
                destSrcLookup := make(map[string]int)
×
273
                for dest, src := range destSrcPortMap {
×
274
                        addr := net.JoinHostPort(targetAddress, dest)
×
275
                        destSrcLookup[addr] = src
×
276
                }
×
277
                for _, curProxy := range curProxies {
×
278
                        if _, ok := destSrcLookup[curProxy.targetAddress]; !ok {
×
279
                                return false
×
280
                        }
×
281
                }
282
                return true
×
283
        }
284

285
        curProxies, exists := m.sourceProxies[key]
10✔
286

10✔
287
        if exists {
10✔
288
                if isExistingProxy(curProxies, targetAddress, destSrcPortMap) {
×
289
                        // No Op, already exists
×
290
                        return nil
×
291
                } else {
×
292
                        // stop the current proxy and point it somewhere new.
×
293
                        for _, curProxy := range curProxies {
×
294
                                curProxy.logger.Infof("Manager is stopping proxy on source node due to new target location")
×
295
                                curProxy.Stop()
×
296
                        }
×
297
                }
298
        }
299
        serverTLSConfig := m.serverTLSConfig
10✔
300
        clientTLSConfig := m.clientTLSConfig
10✔
301
        if m.config.GetMigrationConfiguration().DisableTLS != nil && *m.config.GetMigrationConfiguration().DisableTLS {
12✔
302
                serverTLSConfig = nil
2✔
303
                clientTLSConfig = nil
2✔
304
        }
2✔
305
        proxiesList := []*migrationProxy{}
10✔
306
        for destPort, srcPort := range destSrcPortMap {
24✔
307
                proxyKey := ConstructProxyKey(key, srcPort)
14✔
308
                targetFullAddr := net.JoinHostPort(targetAddress, destPort)
14✔
309
                filePath := SourceUnixFile(baseDir, proxyKey)
14✔
310

14✔
311
                os.RemoveAll(filePath)
14✔
312

14✔
313
                proxy := NewSourceProxy(filePath, targetFullAddr, serverTLSConfig, clientTLSConfig, key)
14✔
314

14✔
315
                err := proxy.Start()
14✔
316
                if err != nil {
14✔
317
                        proxy.Stop()
×
318
                        // close all already created proxies for this key
×
319
                        for _, curProxy := range proxiesList {
×
320
                                curProxy.Stop()
×
321
                        }
×
322
                        return err
×
323
                }
324
                proxiesList = append(proxiesList, proxy)
14✔
325
                proxy.logger.Infof("Manager created proxy on source node")
14✔
326
        }
327
        m.sourceProxies[key] = proxiesList
10✔
328
        return nil
10✔
329
}
330

331
func (m *migrationProxyManager) StopSourceListener(key string) {
9✔
332
        m.managerLock.Lock()
9✔
333
        defer m.managerLock.Unlock()
9✔
334

9✔
335
        curProxies, exists := m.sourceProxies[key]
9✔
336
        if exists {
11✔
337
                for _, curProxy := range curProxies {
6✔
338
                        curProxy.logger.Infof("Manager stopping proxy on source node")
4✔
339
                        curProxy.Stop()
4✔
340
                        os.RemoveAll(curProxy.unixSocketPath)
4✔
341
                }
4✔
342
                delete(m.sourceProxies, key)
2✔
343
        }
344
}
345

346
// SRC POD ENV(migration unix socket) <-> HOST ENV (tcp client) <-----> HOST ENV (tcp server) <-> TARGET POD ENV (virtqemud unix socket)
347

348
// Source proxy exposes a unix socket server and pipes to an outbound TCP connection.
349
func NewSourceProxy(unixSocketPath string, tcpTargetAddress string, serverTLSConfig *tls.Config, clientTLSConfig *tls.Config, vmiUID string) *migrationProxy {
17✔
350
        return &migrationProxy{
17✔
351
                unixSocketPath:  unixSocketPath,
17✔
352
                targetAddress:   tcpTargetAddress,
17✔
353
                targetProtocol:  "tcp",
17✔
354
                stopChan:        make(chan struct{}),
17✔
355
                fdChan:          make(chan net.Conn, 1),
17✔
356
                listenErrChan:   make(chan error, 1),
17✔
357
                serverTLSConfig: serverTLSConfig,
17✔
358
                clientTLSConfig: clientTLSConfig,
17✔
359
                logger:          log.Log.With("uid", vmiUID).With("listening", filepath.Base(unixSocketPath)).With("outbound", tcpTargetAddress),
17✔
360
        }
17✔
361
}
17✔
362

363
// Target proxy listens on a tcp socket and pipes to a virtqemud unix socket
364
func NewTargetProxy(tcpBindAddress string, tcpBindPort int, serverTLSConfig *tls.Config, clientTLSConfig *tls.Config, virtqemudSocketPath string, vmiUID string) *migrationProxy {
13✔
365
        return &migrationProxy{
13✔
366
                tcpBindAddress:  tcpBindAddress,
13✔
367
                tcpBindPort:     tcpBindPort,
13✔
368
                targetAddress:   virtqemudSocketPath,
13✔
369
                targetProtocol:  "unix",
13✔
370
                stopChan:        make(chan struct{}),
13✔
371
                fdChan:          make(chan net.Conn, 1),
13✔
372
                listenErrChan:   make(chan error, 1),
13✔
373
                serverTLSConfig: serverTLSConfig,
13✔
374
                clientTLSConfig: clientTLSConfig,
13✔
375
                logger:          log.Log.With("uid", vmiUID).With("outbound", filepath.Base(virtqemudSocketPath)),
13✔
376
        }
13✔
377

13✔
378
}
13✔
379

380
func (m *migrationProxy) createTcpListener() error {
13✔
381
        var listener net.Listener
13✔
382
        var err error
13✔
383

13✔
384
        laddr := net.JoinHostPort(m.tcpBindAddress, strconv.Itoa(m.tcpBindPort))
13✔
385
        if m.serverTLSConfig != nil {
22✔
386
                listener, err = tls.Listen("tcp", laddr, m.serverTLSConfig)
9✔
387
        } else {
13✔
388
                listener, err = net.Listen("tcp", laddr)
4✔
389
        }
4✔
390
        if err != nil {
13✔
391
                m.logger.Reason(err).Error("failed to create unix socket for proxy service")
×
392
                return err
×
393
        }
×
394

395
        if m.tcpBindPort == 0 {
25✔
396
                // update the random port that was selected
12✔
397
                m.tcpBindPort = listener.Addr().(*net.TCPAddr).Port
12✔
398
                // Add the listener to the log output once we know the port
12✔
399
                m.logger = m.logger.With("listening", fmt.Sprintf("%s:%d", m.tcpBindAddress, m.tcpBindPort))
12✔
400
        }
12✔
401

402
        m.listener = listener
13✔
403
        return nil
13✔
404
}
405

406
func (m *migrationProxy) createUnixListener() error {
17✔
407

17✔
408
        os.RemoveAll(m.unixSocketPath)
17✔
409
        err := util.MkdirAllWithNosec(filepath.Dir(m.unixSocketPath))
17✔
410
        if err != nil {
17✔
411
                m.logger.Reason(err).Error("unable to create directory for unix socket")
×
412
                return err
×
413
        }
×
414

415
        listener, err := net.Listen("unix", m.unixSocketPath)
17✔
416
        if err != nil {
17✔
417
                m.logger.Reason(err).Error("failed to create unix socket for proxy service")
×
418
                return err
×
419
        }
×
420
        if err := diskutils.DefaultOwnershipManager.UnsafeSetFileOwnership(m.unixSocketPath); err != nil {
17✔
421
                log.Log.Reason(err).Error("failed to change ownership on migration unix socket")
×
422
                return err
×
423
        }
×
424

425
        m.listener = listener
17✔
426
        return nil
17✔
427

428
}
429

430
func (m *migrationProxy) Stop() {
11✔
431

11✔
432
        close(m.stopChan)
11✔
433
        if m.listener != nil {
22✔
434
                m.logger.Infof("proxy stopped listening")
11✔
435
                m.listener.Close()
11✔
436
        }
11✔
437
}
438

439
func (m *migrationProxy) handleConnection(fd net.Conn) {
11✔
440
        defer fd.Close()
11✔
441

11✔
442
        outBoundErr := make(chan error, 1)
11✔
443
        inBoundErr := make(chan error, 1)
11✔
444

11✔
445
        var conn net.Conn
11✔
446
        var err error
11✔
447
        if m.targetProtocol == "tcp" && m.clientTLSConfig != nil {
15✔
448
                conn, err = tls.Dial(m.targetProtocol, m.targetAddress, m.clientTLSConfig)
4✔
449
        } else {
11✔
450
                conn, err = net.Dial(m.targetProtocol, m.targetAddress)
7✔
451
        }
7✔
452
        if err != nil {
11✔
453
                m.logger.Reason(err).Error("unable to create outbound leg of proxy to host")
×
454
                return
×
455
        }
×
456

457
        go func() {
22✔
458
                //from outbound connection to proxy
11✔
459
                n, err := io.Copy(fd, conn)
11✔
460
                m.logger.Infof("%d bytes copied outbound to inbound", n)
11✔
461
                inBoundErr <- err
11✔
462
        }()
11✔
463
        go func() {
22✔
464
                //from proxy to outbound connection
11✔
465
                n, err := io.Copy(conn, fd)
11✔
466
                m.logger.Infof("%d bytes copied from inbound to outbound", n)
11✔
467
                outBoundErr <- err
11✔
468
        }()
11✔
469

470
        select {
11✔
471
        case err = <-outBoundErr:
1✔
472
                if err != nil {
1✔
473
                        m.logger.Reason(err).Errorf("error encountered copying data to outbound connection")
×
474
                }
×
UNCOV
475
        case err = <-inBoundErr:
×
UNCOV
476
                if err != nil {
×
477
                        m.logger.Reason(err).Errorf("error encountered copying data into inbound connection")
×
478
                }
×
479
        case <-m.stopChan:
3✔
480
                m.logger.Info("stop channel terminated proxy")
3✔
481
        }
482
}
483

484
func (m *migrationProxy) Start() error {
30✔
485

30✔
486
        if m.unixSocketPath != "" {
47✔
487
                err := m.createUnixListener()
17✔
488
                if err != nil {
17✔
489
                        return err
×
490
                }
×
491
        } else {
13✔
492
                err := m.createTcpListener()
13✔
493
                if err != nil {
13✔
494
                        return err
×
495
                }
×
496
        }
497

498
        go func(ln net.Listener, fdChan chan net.Conn, listenErr chan error, stopChan chan struct{}) {
60✔
499
                for {
71✔
500
                        fd, err := ln.Accept()
41✔
501
                        if err != nil {
52✔
502
                                listenErr <- err
11✔
503

11✔
504
                                select {
11✔
505
                                case <-stopChan:
11✔
506
                                        // If the stopChan is closed, then this is expected. Log at a lesser debug level
11✔
507
                                        m.logger.Reason(err).V(3).Infof("stopChan is closed. Listener exited with expected error.")
11✔
508
                                default:
×
509
                                        m.logger.Reason(err).Error("proxy unix socket listener returned error.")
×
510
                                }
511
                                break
11✔
512
                        } else {
11✔
513
                                fdChan <- fd
11✔
514
                        }
11✔
515
                }
516
        }(m.listener, m.fdChan, m.listenErrChan, m.stopChan)
517

518
        go func(m *migrationProxy) {
60✔
519
                for {
71✔
520
                        select {
41✔
521
                        case fd := <-m.fdChan:
11✔
522
                                go m.handleConnection(fd)
11✔
523
                        case <-m.stopChan:
11✔
524
                                return
11✔
UNCOV
525
                        case <-m.listenErrChan:
×
UNCOV
526
                                return
×
527
                        }
528
                }
529

530
        }(m)
531

532
        m.logger.Infof("proxy started listening")
30✔
533
        return nil
30✔
534
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc