• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 75de3be9-dc24-4e53-8377-3f635b38dcd6

19 Feb 2025 03:21AM UTC coverage: 71.631% (+0.02%) from 71.609%
75de3be9-dc24-4e53-8377-3f635b38dcd6

push

prow

web-flow
Merge pull request #13950 from fossedihelm/fakeclient-for-operator-wh

virt-operator webhook test: Use fakeclient and cleanups

62189 of 86818 relevant lines covered (71.63%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.17
/pkg/virt-handler/migration-proxy/migration-proxy.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright 2018 Red Hat, Inc.
17
 *
18
 */
19

20
package migrationproxy
21

22
import (
23
        "crypto/tls"
24
        "fmt"
25
        "io"
26
        "net"
27
        "os"
28
        "path/filepath"
29
        "strconv"
30
        "strings"
31
        "sync"
32

33
        "kubevirt.io/client-go/log"
34

35
        diskutils "kubevirt.io/kubevirt/pkg/ephemeral-disk-utils"
36
        "kubevirt.io/kubevirt/pkg/util"
37
        "kubevirt.io/kubevirt/pkg/util/net/ip"
38
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
39
)
40

41
const (
42
        LibvirtDirectMigrationPort = 49152
43
        LibvirtBlockMigrationPort  = 49153
44
)
45

46
var migrationPortsRange = []int{LibvirtDirectMigrationPort, LibvirtBlockMigrationPort}
47

48
type ProxyManager interface {
49
        StartTargetListener(key string, targetUnixFiles []string) error
50
        GetTargetListenerPorts(key string) map[string]int
51
        StopTargetListener(key string)
52

53
        StartSourceListener(key string, targetAddress string, destSrcPortMap map[string]int, baseDir string) error
54
        GetSourceListenerFiles(key string) []string
55
        StopSourceListener(key string)
56

57
        OpenListenerCount() int
58

59
        InitiateGracefulShutdown()
60
}
61

62
type migrationProxyManager struct {
63
        sourceProxies   map[string][]*migrationProxy
64
        targetProxies   map[string][]*migrationProxy
65
        managerLock     sync.Mutex
66
        serverTLSConfig *tls.Config
67
        clientTLSConfig *tls.Config
68

69
        isShuttingDown bool
70
        config         *virtconfig.ClusterConfig
71
}
72

73
type MigrationProxyListener interface {
74
        Start() error
75
        Stop()
76
}
77

78
type migrationProxy struct {
79
        unixSocketPath string
80
        tcpBindAddress string
81
        tcpBindPort    int
82
        targetAddress  string
83
        targetProtocol string
84
        stopChan       chan struct{}
85
        listenErrChan  chan error
86
        fdChan         chan net.Conn
87

88
        listener        net.Listener
89
        serverTLSConfig *tls.Config
90
        clientTLSConfig *tls.Config
91

92
        logger *log.FilteredLogger
93
}
94

95
func (m *migrationProxyManager) InitiateGracefulShutdown() {
1✔
96
        m.managerLock.Lock()
1✔
97
        defer m.managerLock.Unlock()
1✔
98

1✔
99
        m.isShuttingDown = true
1✔
100
}
1✔
101

102
func (m *migrationProxyManager) OpenListenerCount() int {
1✔
103
        m.managerLock.Lock()
1✔
104
        defer m.managerLock.Unlock()
1✔
105

1✔
106
        return len(m.sourceProxies) + len(m.targetProxies)
1✔
107
}
1✔
108

109
func GetMigrationPortsList(isBlockMigration bool) (ports []int) {
1✔
110
        ports = append(ports, migrationPortsRange[0])
1✔
111
        if isBlockMigration {
1✔
112
                ports = append(ports, migrationPortsRange[1])
×
113
        }
×
114
        return
1✔
115
}
116

117
func NewMigrationProxyManager(serverTLSConfig *tls.Config, clientTLSConfig *tls.Config, config *virtconfig.ClusterConfig) ProxyManager {
1✔
118
        return &migrationProxyManager{
1✔
119
                sourceProxies:   make(map[string][]*migrationProxy),
1✔
120
                targetProxies:   make(map[string][]*migrationProxy),
1✔
121
                serverTLSConfig: serverTLSConfig,
1✔
122
                clientTLSConfig: clientTLSConfig,
1✔
123
                config:          config,
1✔
124
        }
1✔
125
}
1✔
126

127
func SourceUnixFile(baseDir string, key string) string {
1✔
128
        return filepath.Join(baseDir, "migrationproxy", key+"-source.sock")
1✔
129
}
1✔
130

131
func (m *migrationProxyManager) StartTargetListener(key string, targetUnixFiles []string) error {
1✔
132
        m.managerLock.Lock()
1✔
133
        defer m.managerLock.Unlock()
1✔
134

1✔
135
        if m.isShuttingDown {
2✔
136
                return fmt.Errorf("unable to process new migration connections during virt-handler shutdown")
1✔
137
        }
1✔
138

139
        isExistingProxy := func(curProxies []*migrationProxy, targetUnixFiles []string) bool {
2✔
140
                // make sure that all elements in the existing proxy match to the provided targetUnixFiles
1✔
141
                if len(curProxies) != len(targetUnixFiles) {
1✔
142
                        return false
×
143
                }
×
144
                existingSocketFiles := make(map[string]bool)
1✔
145
                for _, file := range targetUnixFiles {
2✔
146
                        existingSocketFiles[file] = true
1✔
147
                }
1✔
148
                for _, curProxy := range curProxies {
2✔
149
                        if _, ok := existingSocketFiles[curProxy.targetAddress]; !ok {
1✔
150
                                return false
×
151
                        }
×
152
                }
153
                return true
1✔
154
        }
155
        curProxies, exists := m.targetProxies[key]
1✔
156

1✔
157
        if exists {
2✔
158
                if isExistingProxy(curProxies, targetUnixFiles) {
2✔
159
                        // No Op, already exists
1✔
160
                        return nil
1✔
161
                } else {
1✔
162
                        // stop the current proxy and point it somewhere new.
×
163
                        for _, curProxy := range curProxies {
×
164
                                curProxy.logger.Infof("Manager stopping proxy on target node due to new unix filepath location")
×
165
                                curProxy.Stop()
×
166
                        }
×
167
                }
168
        }
169

170
        zeroAddress := ip.GetIPZeroAddress()
1✔
171
        proxiesList := []*migrationProxy{}
1✔
172
        serverTLSConfig := m.serverTLSConfig
1✔
173
        clientTLSConfig := m.clientTLSConfig
1✔
174
        if m.config.GetMigrationConfiguration().DisableTLS != nil && *m.config.GetMigrationConfiguration().DisableTLS {
2✔
175
                serverTLSConfig = nil
1✔
176
                clientTLSConfig = nil
1✔
177
        }
1✔
178
        for _, targetUnixFile := range targetUnixFiles {
2✔
179
                // 0 means random port is used
1✔
180
                proxy := NewTargetProxy(zeroAddress, 0, serverTLSConfig, clientTLSConfig, targetUnixFile, key)
1✔
181

1✔
182
                err := proxy.Start()
1✔
183
                if err != nil {
1✔
184
                        proxy.Stop()
×
185
                        // close all already created proxies for this key
×
186
                        for _, curProxy := range proxiesList {
×
187
                                curProxy.Stop()
×
188
                        }
×
189
                        return err
×
190
                }
191
                proxiesList = append(proxiesList, proxy)
1✔
192
                proxy.logger.Infof("Manager created proxy on target")
1✔
193
        }
194
        m.targetProxies[key] = proxiesList
1✔
195
        return nil
1✔
196
}
197

198
func (m *migrationProxyManager) GetSourceListenerFiles(key string) []string {
1✔
199
        m.managerLock.Lock()
1✔
200
        defer m.managerLock.Unlock()
1✔
201

1✔
202
        curProxies, exists := m.sourceProxies[key]
1✔
203
        socketsList := []string{}
1✔
204
        if exists {
2✔
205
                for _, curProxy := range curProxies {
2✔
206
                        socketsList = append(socketsList, curProxy.unixSocketPath)
1✔
207
                }
1✔
208
        }
209
        return socketsList
1✔
210
}
211

212
func ConstructProxyKey(id string, port int) string {
1✔
213
        key := id
1✔
214
        if port != 0 {
2✔
215
                key += fmt.Sprintf("-%d", port)
1✔
216
        }
1✔
217
        return key
1✔
218
}
219

220
func (m *migrationProxyManager) GetTargetListenerPorts(key string) map[string]int {
1✔
221
        m.managerLock.Lock()
1✔
222
        defer m.managerLock.Unlock()
1✔
223

1✔
224
        getPortFromSocket := func(id string, path string) int {
2✔
225
                for _, port := range migrationPortsRange {
2✔
226
                        key := ConstructProxyKey(id, port)
1✔
227
                        if strings.Contains(path, key) {
2✔
228
                                return port
1✔
229
                        }
1✔
230
                }
231
                return 0
1✔
232
        }
233

234
        curProxies, exists := m.targetProxies[key]
1✔
235
        targetSrcPortMap := make(map[string]int)
1✔
236

1✔
237
        if exists {
2✔
238
                for _, curProxy := range curProxies {
2✔
239
                        port := strconv.Itoa(curProxy.tcpBindPort)
1✔
240
                        targetSrcPortMap[port] = getPortFromSocket(key, curProxy.targetAddress)
1✔
241
                }
1✔
242
        }
243
        return targetSrcPortMap
1✔
244
}
245

246
func (m *migrationProxyManager) StopTargetListener(key string) {
1✔
247
        m.managerLock.Lock()
1✔
248
        defer m.managerLock.Unlock()
1✔
249

1✔
250
        curProxies, exists := m.targetProxies[key]
1✔
251
        if exists {
2✔
252
                for _, curProxy := range curProxies {
2✔
253
                        curProxy.logger.Info("Manager stopping proxy on target node")
1✔
254
                        curProxy.Stop()
1✔
255
                        delete(m.targetProxies, key)
1✔
256
                }
1✔
257
        }
258
}
259

260
func (m *migrationProxyManager) StartSourceListener(key string, targetAddress string, destSrcPortMap map[string]int, baseDir string) error {
1✔
261
        m.managerLock.Lock()
1✔
262
        defer m.managerLock.Unlock()
1✔
263

1✔
264
        if m.isShuttingDown {
2✔
265
                return fmt.Errorf("unable to process new migration connections during virt-handler shutdown")
1✔
266
        }
1✔
267

268
        isExistingProxy := func(curProxies []*migrationProxy, targetAddress string, destSrcPortMap map[string]int) bool {
1✔
269
                if len(curProxies) != len(destSrcPortMap) {
×
270
                        return false
×
271
                }
×
272
                destSrcLookup := make(map[string]int)
×
273
                for dest, src := range destSrcPortMap {
×
274
                        addr := net.JoinHostPort(targetAddress, dest)
×
275
                        destSrcLookup[addr] = src
×
276
                }
×
277
                for _, curProxy := range curProxies {
×
278
                        if _, ok := destSrcLookup[curProxy.targetAddress]; !ok {
×
279
                                return false
×
280
                        }
×
281
                }
282
                return true
×
283
        }
284

285
        curProxies, exists := m.sourceProxies[key]
1✔
286

1✔
287
        if exists {
1✔
288
                if isExistingProxy(curProxies, targetAddress, destSrcPortMap) {
×
289
                        // No Op, already exists
×
290
                        return nil
×
291
                } else {
×
292
                        // stop the current proxy and point it somewhere new.
×
293
                        for _, curProxy := range curProxies {
×
294
                                curProxy.logger.Infof("Manager is stopping proxy on source node due to new target location")
×
295
                                curProxy.Stop()
×
296
                        }
×
297
                }
298
        }
299
        serverTLSConfig := m.serverTLSConfig
1✔
300
        clientTLSConfig := m.clientTLSConfig
1✔
301
        if m.config.GetMigrationConfiguration().DisableTLS != nil && *m.config.GetMigrationConfiguration().DisableTLS {
2✔
302
                serverTLSConfig = nil
1✔
303
                clientTLSConfig = nil
1✔
304
        }
1✔
305
        proxiesList := []*migrationProxy{}
1✔
306
        for destPort, srcPort := range destSrcPortMap {
2✔
307
                proxyKey := ConstructProxyKey(key, srcPort)
1✔
308
                targetFullAddr := net.JoinHostPort(targetAddress, destPort)
1✔
309
                filePath := SourceUnixFile(baseDir, proxyKey)
1✔
310

1✔
311
                os.RemoveAll(filePath)
1✔
312

1✔
313
                proxy := NewSourceProxy(filePath, targetFullAddr, serverTLSConfig, clientTLSConfig, key)
1✔
314

1✔
315
                err := proxy.Start()
1✔
316
                if err != nil {
1✔
317
                        proxy.Stop()
×
318
                        // close all already created proxies for this key
×
319
                        for _, curProxy := range proxiesList {
×
320
                                curProxy.Stop()
×
321
                        }
×
322
                        return err
×
323
                }
324
                proxiesList = append(proxiesList, proxy)
1✔
325
                proxy.logger.Infof("Manager created proxy on source node")
1✔
326
        }
327
        m.sourceProxies[key] = proxiesList
1✔
328
        return nil
1✔
329
}
330

331
func (m *migrationProxyManager) StopSourceListener(key string) {
1✔
332
        m.managerLock.Lock()
1✔
333
        defer m.managerLock.Unlock()
1✔
334

1✔
335
        curProxies, exists := m.sourceProxies[key]
1✔
336
        if exists {
2✔
337
                for _, curProxy := range curProxies {
2✔
338
                        curProxy.logger.Infof("Manager stopping proxy on source node")
1✔
339
                        curProxy.Stop()
1✔
340
                        os.RemoveAll(curProxy.unixSocketPath)
1✔
341
                }
1✔
342
                delete(m.sourceProxies, key)
1✔
343
        }
344
}
345

346
// SRC POD ENV(migration unix socket) <-> HOST ENV (tcp client) <-----> HOST ENV (tcp server) <-> TARGET POD ENV (virtqemud unix socket)
347

348
// Source proxy exposes a unix socket server and pipes to an outbound TCP connection.
349
func NewSourceProxy(unixSocketPath string, tcpTargetAddress string, serverTLSConfig *tls.Config, clientTLSConfig *tls.Config, vmiUID string) *migrationProxy {
1✔
350
        return &migrationProxy{
1✔
351
                unixSocketPath:  unixSocketPath,
1✔
352
                targetAddress:   tcpTargetAddress,
1✔
353
                targetProtocol:  "tcp",
1✔
354
                stopChan:        make(chan struct{}),
1✔
355
                fdChan:          make(chan net.Conn, 1),
1✔
356
                listenErrChan:   make(chan error, 1),
1✔
357
                serverTLSConfig: serverTLSConfig,
1✔
358
                clientTLSConfig: clientTLSConfig,
1✔
359
                logger:          log.Log.With("uid", vmiUID).With("listening", filepath.Base(unixSocketPath)).With("outbound", tcpTargetAddress),
1✔
360
        }
1✔
361
}
1✔
362

363
// Target proxy listens on a tcp socket and pipes to a virtqemud unix socket
364
func NewTargetProxy(tcpBindAddress string, tcpBindPort int, serverTLSConfig *tls.Config, clientTLSConfig *tls.Config, virtqemudSocketPath string, vmiUID string) *migrationProxy {
1✔
365
        return &migrationProxy{
1✔
366
                tcpBindAddress:  tcpBindAddress,
1✔
367
                tcpBindPort:     tcpBindPort,
1✔
368
                targetAddress:   virtqemudSocketPath,
1✔
369
                targetProtocol:  "unix",
1✔
370
                stopChan:        make(chan struct{}),
1✔
371
                fdChan:          make(chan net.Conn, 1),
1✔
372
                listenErrChan:   make(chan error, 1),
1✔
373
                serverTLSConfig: serverTLSConfig,
1✔
374
                clientTLSConfig: clientTLSConfig,
1✔
375
                logger:          log.Log.With("uid", vmiUID).With("outbound", filepath.Base(virtqemudSocketPath)),
1✔
376
        }
1✔
377

1✔
378
}
1✔
379

380
func (m *migrationProxy) createTcpListener() error {
1✔
381
        var listener net.Listener
1✔
382
        var err error
1✔
383

1✔
384
        laddr := net.JoinHostPort(m.tcpBindAddress, strconv.Itoa(m.tcpBindPort))
1✔
385
        if m.serverTLSConfig != nil {
2✔
386
                listener, err = tls.Listen("tcp", laddr, m.serverTLSConfig)
1✔
387
        } else {
2✔
388
                listener, err = net.Listen("tcp", laddr)
1✔
389
        }
1✔
390
        if err != nil {
1✔
391
                m.logger.Reason(err).Error("failed to create unix socket for proxy service")
×
392
                return err
×
393
        }
×
394

395
        if m.tcpBindPort == 0 {
2✔
396
                // update the random port that was selected
1✔
397
                m.tcpBindPort = listener.Addr().(*net.TCPAddr).Port
1✔
398
                // Add the listener to the log output once we know the port
1✔
399
                m.logger = m.logger.With("listening", fmt.Sprintf("%s:%d", m.tcpBindAddress, m.tcpBindPort))
1✔
400
        }
1✔
401

402
        m.listener = listener
1✔
403
        return nil
1✔
404
}
405

406
func (m *migrationProxy) createUnixListener() error {
1✔
407

1✔
408
        os.RemoveAll(m.unixSocketPath)
1✔
409
        err := util.MkdirAllWithNosec(filepath.Dir(m.unixSocketPath))
1✔
410
        if err != nil {
1✔
411
                m.logger.Reason(err).Error("unable to create directory for unix socket")
×
412
                return err
×
413
        }
×
414

415
        listener, err := net.Listen("unix", m.unixSocketPath)
1✔
416
        if err != nil {
1✔
417
                m.logger.Reason(err).Error("failed to create unix socket for proxy service")
×
418
                return err
×
419
        }
×
420
        if err := diskutils.DefaultOwnershipManager.UnsafeSetFileOwnership(m.unixSocketPath); err != nil {
1✔
421
                log.Log.Reason(err).Error("failed to change ownership on migration unix socket")
×
422
                return err
×
423
        }
×
424

425
        m.listener = listener
1✔
426
        return nil
1✔
427

428
}
429

430
func (m *migrationProxy) Stop() {
1✔
431

1✔
432
        close(m.stopChan)
1✔
433
        if m.listener != nil {
2✔
434
                m.logger.Infof("proxy stopped listening")
1✔
435
                m.listener.Close()
1✔
436
        }
1✔
437
}
438

439
func (m *migrationProxy) handleConnection(fd net.Conn) {
1✔
440
        defer fd.Close()
1✔
441

1✔
442
        outBoundErr := make(chan error, 1)
1✔
443
        inBoundErr := make(chan error, 1)
1✔
444

1✔
445
        var conn net.Conn
1✔
446
        var err error
1✔
447
        if m.targetProtocol == "tcp" && m.clientTLSConfig != nil {
2✔
448
                conn, err = tls.Dial(m.targetProtocol, m.targetAddress, m.clientTLSConfig)
1✔
449
        } else {
2✔
450
                conn, err = net.Dial(m.targetProtocol, m.targetAddress)
1✔
451
        }
1✔
452
        if err != nil {
1✔
453
                m.logger.Reason(err).Error("unable to create outbound leg of proxy to host")
×
454
                return
×
455
        }
×
456

457
        go func() {
2✔
458
                //from outbound connection to proxy
1✔
459
                n, err := io.Copy(fd, conn)
1✔
460
                m.logger.Infof("%d bytes copied outbound to inbound", n)
1✔
461
                inBoundErr <- err
1✔
462
        }()
1✔
463
        go func() {
2✔
464
                //from proxy to outbound connection
1✔
465
                n, err := io.Copy(conn, fd)
1✔
466
                m.logger.Infof("%d bytes copied from inbound to outbound", n)
1✔
467
                outBoundErr <- err
1✔
468
        }()
1✔
469

470
        select {
1✔
471
        case err = <-outBoundErr:
1✔
472
                if err != nil {
1✔
473
                        m.logger.Reason(err).Errorf("error encountered copying data to outbound connection")
×
474
                }
×
475
        case err = <-inBoundErr:
1✔
476
                if err != nil {
1✔
477
                        m.logger.Reason(err).Errorf("error encountered copying data into inbound connection")
×
478
                }
×
479
        case <-m.stopChan:
1✔
480
                m.logger.Info("stop channel terminated proxy")
1✔
481
        }
482
}
483

484
func (m *migrationProxy) Start() error {
1✔
485

1✔
486
        if m.unixSocketPath != "" {
2✔
487
                err := m.createUnixListener()
1✔
488
                if err != nil {
1✔
489
                        return err
×
490
                }
×
491
        } else {
1✔
492
                err := m.createTcpListener()
1✔
493
                if err != nil {
1✔
494
                        return err
×
495
                }
×
496
        }
497

498
        go func(ln net.Listener, fdChan chan net.Conn, listenErr chan error, stopChan chan struct{}) {
2✔
499
                for {
2✔
500
                        fd, err := ln.Accept()
1✔
501
                        if err != nil {
2✔
502
                                listenErr <- err
1✔
503

1✔
504
                                select {
1✔
505
                                case <-stopChan:
1✔
506
                                        // If the stopChan is closed, then this is expected. Log at a lesser debug level
1✔
507
                                        m.logger.Reason(err).V(3).Infof("stopChan is closed. Listener exited with expected error.")
1✔
508
                                default:
×
509
                                        m.logger.Reason(err).Error("proxy unix socket listener returned error.")
×
510
                                }
511
                                break
1✔
512
                        } else {
1✔
513
                                fdChan <- fd
1✔
514
                        }
1✔
515
                }
516
        }(m.listener, m.fdChan, m.listenErrChan, m.stopChan)
517

518
        go func(m *migrationProxy) {
2✔
519
                for {
2✔
520
                        select {
1✔
521
                        case fd := <-m.fdChan:
1✔
522
                                go m.handleConnection(fd)
1✔
523
                        case <-m.stopChan:
1✔
524
                                return
1✔
525
                        case <-m.listenErrChan:
×
526
                                return
×
527
                        }
528
                }
529

530
        }(m)
531

532
        m.logger.Infof("proxy started listening")
1✔
533
        return nil
1✔
534
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc