• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 4cd5b955-8243-4cb5-97d4-349b7c70c84c

23 Jan 2026 12:41PM UTC coverage: 70.768% (+0.009%) from 70.759%
4cd5b955-8243-4cb5-97d4-349b7c70c84c

push

prow

web-flow
Merge pull request #16549 from orelmisan/template-rm-net-tests

ctrl, template: Rm net dependant NET_BIND_SERVICE tests

72166 of 101976 relevant lines covered (70.77%)

440.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.13
/pkg/virt-handler/migration-proxy/migration-proxy.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package migrationproxy
21

22
import (
23
        "crypto/tls"
24
        "fmt"
25
        "io"
26
        "net"
27
        "os"
28
        "path/filepath"
29
        "strconv"
30
        "strings"
31
        "sync"
32

33
        "kubevirt.io/client-go/log"
34

35
        diskutils "kubevirt.io/kubevirt/pkg/ephemeral-disk-utils"
36
        "kubevirt.io/kubevirt/pkg/util"
37
        "kubevirt.io/kubevirt/pkg/util/net/ip"
38
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
39
)
40

41
const (
42
        LibvirtDirectMigrationPort = 49152
43
        LibvirtBlockMigrationPort  = 49153
44
)
45

46
var migrationPortsRange = []int{LibvirtDirectMigrationPort, LibvirtBlockMigrationPort}
47

48
type ProxyManager interface {
49
        StartTargetListener(key string, targetUnixFiles []string) error
50
        GetTargetListenerPorts(key string) map[string]int
51
        StopTargetListener(key string)
52

53
        StartSourceListener(key string, targetAddress string, destSrcPortMap map[string]int, baseDir string) error
54
        GetSourceListenerFiles(key string) []string
55
        StopSourceListener(key string)
56

57
        OpenListenerCount() int
58

59
        InitiateGracefulShutdown()
60
}
61

62
type migrationProxyManager struct {
63
        sourceProxies      map[string][]*migrationProxy
64
        targetProxies      map[string][]*migrationProxy
65
        managerLock        sync.Mutex
66
        serverTLSConfig    *tls.Config
67
        clientTLSConfig    *tls.Config
68
        migrationTLSConfig *tls.Config
69

70
        isShuttingDown bool
71
        config         *virtconfig.ClusterConfig
72
}
73

74
type MigrationProxyListener interface {
75
        Start() error
76
        Stop()
77
}
78

79
type migrationProxy struct {
80
        unixSocketPath string
81
        tcpBindAddress string
82
        tcpBindPort    int
83
        targetAddress  string
84
        targetProtocol string
85
        stopChan       chan struct{}
86
        listenErrChan  chan error
87
        fdChan         chan net.Conn
88

89
        listener           net.Listener
90
        serverTLSConfig    *tls.Config
91
        clientTLSConfig    *tls.Config
92
        migrationTLSConfig *tls.Config
93

94
        logger *log.FilteredLogger
95
}
96

97
func (m *migrationProxyManager) InitiateGracefulShutdown() {
2✔
98
        m.managerLock.Lock()
2✔
99
        defer m.managerLock.Unlock()
2✔
100

2✔
101
        m.isShuttingDown = true
2✔
102
}
2✔
103

104
func (m *migrationProxyManager) OpenListenerCount() int {
2✔
105
        m.managerLock.Lock()
2✔
106
        defer m.managerLock.Unlock()
2✔
107

2✔
108
        return len(m.sourceProxies) + len(m.targetProxies)
2✔
109
}
2✔
110

111
func GetMigrationPortsList(isBlockMigration bool) (ports []int) {
4✔
112
        ports = append(ports, migrationPortsRange[0])
4✔
113
        if isBlockMigration {
4✔
114
                ports = append(ports, migrationPortsRange[1])
×
115
        }
×
116
        return
4✔
117
}
118

119
func NewMigrationProxyManager(serverTLSConfig *tls.Config, clientTLSConfig, migrationTLSConfig *tls.Config, config *virtconfig.ClusterConfig) ProxyManager {
152✔
120
        return &migrationProxyManager{
152✔
121
                sourceProxies:      make(map[string][]*migrationProxy),
152✔
122
                targetProxies:      make(map[string][]*migrationProxy),
152✔
123
                serverTLSConfig:    serverTLSConfig,
152✔
124
                clientTLSConfig:    clientTLSConfig,
152✔
125
                migrationTLSConfig: migrationTLSConfig,
152✔
126
                config:             config,
152✔
127
        }
152✔
128
}
152✔
129

130
func SourceUnixFile(baseDir string, key string) string {
18✔
131
        return filepath.Join(baseDir, "migrationproxy", key+"-source.sock")
18✔
132
}
18✔
133

134
func (m *migrationProxyManager) StartTargetListener(key string, targetUnixFiles []string) error {
9✔
135
        m.managerLock.Lock()
9✔
136
        defer m.managerLock.Unlock()
9✔
137

9✔
138
        if m.isShuttingDown {
11✔
139
                return fmt.Errorf("unable to process new migration connections during virt-handler shutdown")
2✔
140
        }
2✔
141

142
        isExistingProxy := func(curProxies []*migrationProxy, targetUnixFiles []string) bool {
8✔
143
                // make sure that all elements in the existing proxy match to the provided targetUnixFiles
1✔
144
                if len(curProxies) != len(targetUnixFiles) {
1✔
145
                        return false
×
146
                }
×
147
                existingSocketFiles := make(map[string]bool)
1✔
148
                for _, file := range targetUnixFiles {
3✔
149
                        existingSocketFiles[file] = true
2✔
150
                }
2✔
151
                for _, curProxy := range curProxies {
3✔
152
                        if _, ok := existingSocketFiles[curProxy.targetAddress]; !ok {
2✔
153
                                return false
×
154
                        }
×
155
                }
156
                return true
1✔
157
        }
158
        curProxies, exists := m.targetProxies[key]
7✔
159

7✔
160
        if exists {
8✔
161
                if isExistingProxy(curProxies, targetUnixFiles) {
2✔
162
                        // No Op, already exists
1✔
163
                        return nil
1✔
164
                } else {
1✔
165
                        // stop the current proxy and point it somewhere new.
×
166
                        for _, curProxy := range curProxies {
×
167
                                curProxy.logger.Infof("Manager stopping proxy on target node due to new unix filepath location")
×
168
                                curProxy.Stop()
×
169
                        }
×
170
                }
171
        }
172

173
        zeroAddress := ip.GetIPZeroAddress()
6✔
174
        proxiesList := []*migrationProxy{}
6✔
175
        serverTLSConfig := m.serverTLSConfig
6✔
176
        if m.config.GetMigrationConfiguration().DisableTLS != nil && *m.config.GetMigrationConfiguration().DisableTLS {
8✔
177
                serverTLSConfig = nil
2✔
178
        }
2✔
179
        for _, targetUnixFile := range targetUnixFiles {
18✔
180
                // 0 means random port is used
12✔
181
                proxy := NewTargetProxy(zeroAddress, 0, serverTLSConfig, targetUnixFile, key)
12✔
182

12✔
183
                err := proxy.Start()
12✔
184
                if err != nil {
12✔
185
                        proxy.Stop()
×
186
                        // close all already created proxies for this key
×
187
                        for _, curProxy := range proxiesList {
×
188
                                curProxy.Stop()
×
189
                        }
×
190
                        return err
×
191
                }
192
                proxiesList = append(proxiesList, proxy)
12✔
193
                proxy.logger.Infof("Manager created proxy on target")
12✔
194
        }
195
        m.targetProxies[key] = proxiesList
6✔
196
        return nil
6✔
197
}
198

199
func (m *migrationProxyManager) GetSourceListenerFiles(key string) []string {
2✔
200
        m.managerLock.Lock()
2✔
201
        defer m.managerLock.Unlock()
2✔
202

2✔
203
        curProxies, exists := m.sourceProxies[key]
2✔
204
        socketsList := []string{}
2✔
205
        if exists {
4✔
206
                for _, curProxy := range curProxies {
6✔
207
                        socketsList = append(socketsList, curProxy.unixSocketPath)
4✔
208
                }
4✔
209
        }
210
        return socketsList
2✔
211
}
212

213
func ConstructProxyKey(id string, port int) string {
39✔
214
        key := id
39✔
215
        if port != 0 {
74✔
216
                key += fmt.Sprintf("-%d", port)
35✔
217
        }
35✔
218
        return key
39✔
219
}
220

221
func (m *migrationProxyManager) GetTargetListenerPorts(key string) map[string]int {
11✔
222
        m.managerLock.Lock()
11✔
223
        defer m.managerLock.Unlock()
11✔
224

11✔
225
        getPortFromSocket := func(id string, path string) int {
25✔
226
                for _, port := range migrationPortsRange {
35✔
227
                        key := ConstructProxyKey(id, port)
21✔
228
                        if strings.Contains(path, key) {
28✔
229
                                return port
7✔
230
                        }
7✔
231
                }
232
                return 0
7✔
233
        }
234

235
        curProxies, exists := m.targetProxies[key]
11✔
236
        targetSrcPortMap := make(map[string]int)
11✔
237

11✔
238
        if exists {
18✔
239
                for _, curProxy := range curProxies {
21✔
240
                        port := strconv.Itoa(curProxy.tcpBindPort)
14✔
241
                        targetSrcPortMap[port] = getPortFromSocket(key, curProxy.targetAddress)
14✔
242
                }
14✔
243
        }
244
        return targetSrcPortMap
11✔
245
}
246

247
func (m *migrationProxyManager) StopTargetListener(key string) {
10✔
248
        m.managerLock.Lock()
10✔
249
        defer m.managerLock.Unlock()
10✔
250

10✔
251
        curProxies, exists := m.targetProxies[key]
10✔
252
        if exists {
12✔
253
                for _, curProxy := range curProxies {
6✔
254
                        curProxy.logger.Info("Manager stopping proxy on target node")
4✔
255
                        curProxy.Stop()
4✔
256
                        delete(m.targetProxies, key)
4✔
257
                }
4✔
258
        }
259
}
260

261
func (m *migrationProxyManager) StartSourceListener(key string, targetAddress string, destSrcPortMap map[string]int, baseDir string) error {
12✔
262
        m.managerLock.Lock()
12✔
263
        defer m.managerLock.Unlock()
12✔
264

12✔
265
        if m.isShuttingDown {
14✔
266
                return fmt.Errorf("unable to process new migration connections during virt-handler shutdown")
2✔
267
        }
2✔
268

269
        isExistingProxy := func(curProxies []*migrationProxy, targetAddress string, destSrcPortMap map[string]int) bool {
10✔
270
                if len(curProxies) != len(destSrcPortMap) {
×
271
                        return false
×
272
                }
×
273
                destSrcLookup := make(map[string]int)
×
274
                for dest, src := range destSrcPortMap {
×
275
                        addr := net.JoinHostPort(targetAddress, dest)
×
276
                        destSrcLookup[addr] = src
×
277
                }
×
278
                for _, curProxy := range curProxies {
×
279
                        if _, ok := destSrcLookup[curProxy.targetAddress]; !ok {
×
280
                                return false
×
281
                        }
×
282
                }
283
                return true
×
284
        }
285

286
        curProxies, exists := m.sourceProxies[key]
10✔
287

10✔
288
        if exists {
10✔
289
                if isExistingProxy(curProxies, targetAddress, destSrcPortMap) {
×
290
                        // No Op, already exists
×
291
                        return nil
×
292
                } else {
×
293
                        // stop the current proxy and point it somewhere new.
×
294
                        for _, curProxy := range curProxies {
×
295
                                curProxy.logger.Infof("Manager is stopping proxy on source node due to new target location")
×
296
                                curProxy.Stop()
×
297
                        }
×
298
                }
299
        }
300
        clientTLSConfig := m.clientTLSConfig
10✔
301
        migrationTLSConfig := m.migrationTLSConfig
10✔
302
        if m.config.GetMigrationConfiguration().DisableTLS != nil && *m.config.GetMigrationConfiguration().DisableTLS {
12✔
303
                clientTLSConfig = nil
2✔
304
                migrationTLSConfig = nil
2✔
305
        }
2✔
306
        proxiesList := []*migrationProxy{}
10✔
307
        for destPort, srcPort := range destSrcPortMap {
24✔
308
                proxyKey := ConstructProxyKey(key, srcPort)
14✔
309
                targetFullAddr := net.JoinHostPort(targetAddress, destPort)
14✔
310
                filePath := SourceUnixFile(baseDir, proxyKey)
14✔
311

14✔
312
                os.RemoveAll(filePath)
14✔
313

14✔
314
                proxy := NewSourceProxy(filePath, targetFullAddr, clientTLSConfig, migrationTLSConfig, key)
14✔
315

14✔
316
                err := proxy.Start()
14✔
317
                if err != nil {
14✔
318
                        proxy.Stop()
×
319
                        // close all already created proxies for this key
×
320
                        for _, curProxy := range proxiesList {
×
321
                                curProxy.Stop()
×
322
                        }
×
323
                        return err
×
324
                }
325
                proxiesList = append(proxiesList, proxy)
14✔
326
                proxy.logger.Infof("Manager created proxy on source node")
14✔
327
        }
328
        m.sourceProxies[key] = proxiesList
10✔
329
        return nil
10✔
330
}
331

332
func (m *migrationProxyManager) StopSourceListener(key string) {
9✔
333
        m.managerLock.Lock()
9✔
334
        defer m.managerLock.Unlock()
9✔
335

9✔
336
        curProxies, exists := m.sourceProxies[key]
9✔
337
        if exists {
11✔
338
                for _, curProxy := range curProxies {
6✔
339
                        curProxy.logger.Infof("Manager stopping proxy on source node")
4✔
340
                        curProxy.Stop()
4✔
341
                        os.RemoveAll(curProxy.unixSocketPath)
4✔
342
                }
4✔
343
                delete(m.sourceProxies, key)
2✔
344
        }
345
}
346

347
// SRC POD ENV(migration unix socket) <-> HOST ENV (tcp client) <-----> HOST ENV (tcp server) <-> TARGET POD ENV (virtqemud unix socket)
348

349
// Source proxy exposes a unix socket server and pipes to an outbound TCP connection.
350
func NewSourceProxy(unixSocketPath string, tcpTargetAddress string, clientTLSConfig, migrationTLSConfig *tls.Config, vmiUID string) *migrationProxy {
17✔
351
        return &migrationProxy{
17✔
352
                unixSocketPath:     unixSocketPath,
17✔
353
                targetAddress:      tcpTargetAddress,
17✔
354
                targetProtocol:     "tcp",
17✔
355
                stopChan:           make(chan struct{}),
17✔
356
                fdChan:             make(chan net.Conn, 1),
17✔
357
                listenErrChan:      make(chan error, 1),
17✔
358
                clientTLSConfig:    clientTLSConfig,
17✔
359
                migrationTLSConfig: migrationTLSConfig,
17✔
360
                logger:             log.Log.With("uid", vmiUID).With("listening", filepath.Base(unixSocketPath)).With("outbound", tcpTargetAddress),
17✔
361
        }
17✔
362
}
17✔
363

364
// Target proxy listens on a tcp socket and pipes to a virtqemud unix socket
365
func NewTargetProxy(tcpBindAddress string, tcpBindPort int, serverTLSConfig *tls.Config, virtqemudSocketPath string, vmiUID string) *migrationProxy {
13✔
366
        return &migrationProxy{
13✔
367
                tcpBindAddress:  tcpBindAddress,
13✔
368
                tcpBindPort:     tcpBindPort,
13✔
369
                targetAddress:   virtqemudSocketPath,
13✔
370
                targetProtocol:  "unix",
13✔
371
                stopChan:        make(chan struct{}),
13✔
372
                fdChan:          make(chan net.Conn, 1),
13✔
373
                listenErrChan:   make(chan error, 1),
13✔
374
                serverTLSConfig: serverTLSConfig,
13✔
375
                logger:          log.Log.With("uid", vmiUID).With("outbound", filepath.Base(virtqemudSocketPath)),
13✔
376
        }
13✔
377

13✔
378
}
13✔
379

380
func (m *migrationProxy) createTcpListener() error {
13✔
381
        var listener net.Listener
13✔
382
        var err error
13✔
383

13✔
384
        laddr := net.JoinHostPort(m.tcpBindAddress, strconv.Itoa(m.tcpBindPort))
13✔
385
        if m.serverTLSConfig != nil {
22✔
386
                listener, err = tls.Listen("tcp", laddr, m.serverTLSConfig)
9✔
387
        } else {
13✔
388
                listener, err = net.Listen("tcp", laddr)
4✔
389
        }
4✔
390
        if err != nil {
13✔
391
                m.logger.Reason(err).Error("failed to create unix socket for proxy service")
×
392
                return err
×
393
        }
×
394

395
        if m.tcpBindPort == 0 {
25✔
396
                // update the random port that was selected
12✔
397
                m.tcpBindPort = listener.Addr().(*net.TCPAddr).Port
12✔
398
                // Add the listener to the log output once we know the port
12✔
399
                m.logger = m.logger.With("listening", fmt.Sprintf("%s:%d", m.tcpBindAddress, m.tcpBindPort))
12✔
400
        }
12✔
401

402
        m.listener = listener
13✔
403
        return nil
13✔
404
}
405

406
func (m *migrationProxy) createUnixListener() error {
17✔
407

17✔
408
        os.RemoveAll(m.unixSocketPath)
17✔
409
        err := util.MkdirAllWithNosec(filepath.Dir(m.unixSocketPath))
17✔
410
        if err != nil {
17✔
411
                m.logger.Reason(err).Error("unable to create directory for unix socket")
×
412
                return err
×
413
        }
×
414

415
        listener, err := net.Listen("unix", m.unixSocketPath)
17✔
416
        if err != nil {
17✔
417
                m.logger.Reason(err).Error("failed to create unix socket for proxy service")
×
418
                return err
×
419
        }
×
420
        if err := diskutils.DefaultOwnershipManager.UnsafeSetFileOwnership(m.unixSocketPath); err != nil {
17✔
421
                log.Log.Reason(err).Error("failed to change ownership on migration unix socket")
×
422
                return err
×
423
        }
×
424

425
        m.listener = listener
17✔
426
        return nil
17✔
427

428
}
429

430
func (m *migrationProxy) Stop() {
11✔
431

11✔
432
        close(m.stopChan)
11✔
433
        if m.listener != nil {
22✔
434
                m.logger.Infof("proxy stopped listening")
11✔
435
                m.listener.Close()
11✔
436
        }
11✔
437
}
438

439
func (m *migrationProxy) handleConnection(fd net.Conn) {
11✔
440
        defer fd.Close()
11✔
441

11✔
442
        outBoundErr := make(chan error, 1)
11✔
443
        inBoundErr := make(chan error, 1)
11✔
444

11✔
445
        var conn net.Conn
11✔
446
        var err error
11✔
447
        if m.targetProtocol == "tcp" && m.clientTLSConfig != nil {
15✔
448
                conn, err = tls.Dial(m.targetProtocol, m.targetAddress, m.migrationTLSConfig)
4✔
449
                // Check for specific error (CN missmatch), fallback to old client TLS
4✔
450
                if err != nil {
4✔
451
                        m.logger.Reason(err).Info("fallback to old tls config")
×
452
                        conn, err = tls.Dial(m.targetProtocol, m.targetAddress, m.clientTLSConfig)
×
453
                } else if tlsErr := conn.(*tls.Conn).Handshake(); tlsErr != nil {
4✔
454
                        m.logger.Reason(err).Info("handshake failed, fallback to old tls config")
×
455
                        conn, err = tls.Dial(m.targetProtocol, m.targetAddress, m.clientTLSConfig)
×
456
                }
×
457
        } else {
7✔
458
                conn, err = net.Dial(m.targetProtocol, m.targetAddress)
7✔
459
        }
7✔
460
        if err != nil {
11✔
461
                m.logger.Reason(err).Error("unable to create outbound leg of proxy to host")
×
462
                return
×
463
        }
×
464

465
        go func() {
22✔
466
                //from outbound connection to proxy
11✔
467
                n, err := io.Copy(fd, conn)
11✔
468
                m.logger.Infof("%d bytes copied outbound to inbound", n)
11✔
469
                inBoundErr <- err
11✔
470
        }()
11✔
471
        go func() {
22✔
472
                //from proxy to outbound connection
11✔
473
                n, err := io.Copy(conn, fd)
11✔
474
                m.logger.Infof("%d bytes copied from inbound to outbound", n)
11✔
475
                outBoundErr <- err
11✔
476
        }()
11✔
477

478
        select {
11✔
479
        case err = <-outBoundErr:
4✔
480
                if err != nil {
4✔
481
                        m.logger.Reason(err).Errorf("error encountered copying data to outbound connection")
×
482
                }
×
483
        case err = <-inBoundErr:
4✔
484
                if err != nil {
4✔
485
                        m.logger.Reason(err).Errorf("error encountered copying data into inbound connection")
×
486
                }
×
487
        case <-m.stopChan:
3✔
488
                m.logger.Info("stop channel terminated proxy")
3✔
489
        }
490
}
491

492
func (m *migrationProxy) Start() error {
30✔
493

30✔
494
        if m.unixSocketPath != "" {
47✔
495
                err := m.createUnixListener()
17✔
496
                if err != nil {
17✔
497
                        return err
×
498
                }
×
499
        } else {
13✔
500
                err := m.createTcpListener()
13✔
501
                if err != nil {
13✔
502
                        return err
×
503
                }
×
504
        }
505

506
        go func(ln net.Listener, fdChan chan net.Conn, listenErr chan error, stopChan chan struct{}) {
60✔
507
                for {
71✔
508
                        fd, err := ln.Accept()
41✔
509
                        if err != nil {
52✔
510
                                listenErr <- err
11✔
511

11✔
512
                                select {
11✔
513
                                case <-stopChan:
11✔
514
                                        // If the stopChan is closed, then this is expected. Log at a lesser debug level
11✔
515
                                        m.logger.Reason(err).V(3).Infof("stopChan is closed. Listener exited with expected error.")
11✔
516
                                default:
×
517
                                        m.logger.Reason(err).Error("proxy unix socket listener returned error.")
×
518
                                }
519
                                break
11✔
520
                        } else {
11✔
521
                                fdChan <- fd
11✔
522
                        }
11✔
523
                }
524
        }(m.listener, m.fdChan, m.listenErrChan, m.stopChan)
525

526
        go func(m *migrationProxy) {
60✔
527
                for {
71✔
528
                        select {
41✔
529
                        case fd := <-m.fdChan:
11✔
530
                                go m.handleConnection(fd)
11✔
531
                        case <-m.stopChan:
11✔
532
                                return
11✔
533
                        case <-m.listenErrChan:
×
534
                                return
×
535
                        }
536
                }
537

538
        }(m)
539

540
        m.logger.Infof("proxy started listening")
30✔
541
        return nil
30✔
542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc