• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 6918a57f-71c4-4a23-aded-ebac47a2d64a

16 Jun 2026 09:48AM UTC coverage: 71.914% (+0.1%) from 71.787%
6918a57f-71c4-4a23-aded-ebac47a2d64a

push

prow

web-flow
Merge pull request #18127 from kubevirt-bot/bump-kubevirtci

Bump kubevirtci

80022 of 111274 relevant lines covered (71.91%)

360.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.33
/pkg/virt-handler/migration-source.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package virthandler
21

22
import (
23
        "context"
24
        "encoding/json"
25
        "errors"
26
        "fmt"
27
        "path/filepath"
28
        "time"
29

30
        k8sv1 "k8s.io/api/core/v1"
31
        "k8s.io/apimachinery/pkg/api/equality"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/util/wait"
34
        "k8s.io/client-go/tools/cache"
35
        "k8s.io/client-go/tools/record"
36
        "k8s.io/client-go/util/workqueue"
37
        v1 "kubevirt.io/api/core/v1"
38
        "kubevirt.io/client-go/kubecli"
39
        "kubevirt.io/client-go/log"
40

41
        "kubevirt.io/kubevirt/pkg/controller"
42
        hostdisk "kubevirt.io/kubevirt/pkg/host-disk"
43
        "kubevirt.io/kubevirt/pkg/hypervisor"
44
        metrics "kubevirt.io/kubevirt/pkg/monitoring/metrics/common/vmisync"
45
        "kubevirt.io/kubevirt/pkg/pointer"
46
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
47
        cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
48
        "kubevirt.io/kubevirt/pkg/virt-handler/isolation"
49
        launcherclients "kubevirt.io/kubevirt/pkg/virt-handler/launcher-clients"
50
        migrationproxy "kubevirt.io/kubevirt/pkg/virt-handler/migration-proxy"
51
        "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
52
)
53

54
var errWaitingForTargetPorts = errors.New("waiting for target to publish migration ports")
55

56
type passtRepairSourceHandler interface {
57
        HandleMigrationSource(*v1.VirtualMachineInstance, func(*v1.VirtualMachineInstance) (string, error)) error
58
}
59

60
type MigrationSourceController struct {
61
        *BaseController
62
        vmiExpectations    *controller.UIDTrackingControllerExpectations
63
        passtRepairHandler passtRepairSourceHandler
64
}
65

66
func NewMigrationSourceController(
67
        recorder record.EventRecorder,
68
        clientset kubecli.KubevirtClient,
69
        host string,
70
        launcherClients launcherclients.LauncherClientsManager,
71
        vmiInformer cache.SharedIndexInformer,
72
        domainInformer cache.SharedInformer,
73
        clusterConfig *virtconfig.ClusterConfig,
74
        podIsolationDetector isolation.PodIsolationDetector,
75
        migrationProxy migrationproxy.ProxyManager,
76
        virtLauncherFSRunDirPattern string,
77
        netStat netstat,
78
        passtRepairHandler passtRepairSourceHandler,
79
) (*MigrationSourceController, error) {
23✔
80

23✔
81
        queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
23✔
82
                workqueue.DefaultTypedControllerRateLimiter[string](),
23✔
83
                workqueue.TypedRateLimitingQueueConfig[string]{Name: "virt-handler-source"},
23✔
84
        )
23✔
85
        logger := log.Log.With("controller", "migration-source")
23✔
86

23✔
87
        hypervisorName := clusterConfig.GetHypervisor().Name
23✔
88

23✔
89
        baseCtrl, err := NewBaseController(
23✔
90
                logger,
23✔
91
                host,
23✔
92
                recorder,
23✔
93
                clientset,
23✔
94
                queue,
23✔
95
                vmiInformer,
23✔
96
                domainInformer,
23✔
97
                clusterConfig,
23✔
98
                podIsolationDetector,
23✔
99
                launcherClients,
23✔
100
                migrationProxy,
23✔
101
                virtLauncherFSRunDirPattern,
23✔
102
                netStat,
23✔
103
                hypervisor.NewHypervisorNodeInformation(hypervisorName),
23✔
104
                hypervisor.GetVirtRuntime(podIsolationDetector, hypervisorName),
23✔
105
                nil,
23✔
106
        )
23✔
107
        if err != nil {
23✔
108
                return nil, err
×
109
        }
×
110

111
        c := &MigrationSourceController{
23✔
112
                BaseController:     baseCtrl,
23✔
113
                vmiExpectations:    controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
23✔
114
                passtRepairHandler: passtRepairHandler,
23✔
115
        }
23✔
116

23✔
117
        _, err = vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
23✔
118
                AddFunc:    c.addFunc,
23✔
119
                DeleteFunc: c.deleteFunc,
23✔
120
                UpdateFunc: c.updateFunc,
23✔
121
        })
23✔
122
        if err != nil {
23✔
123
                return nil, err
×
124
        }
×
125

126
        _, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
23✔
127
                AddFunc:    c.addDeleteDomainFunc,
23✔
128
                DeleteFunc: c.addDeleteDomainFunc,
23✔
129
                UpdateFunc: c.updateDomainFunc,
23✔
130
        })
23✔
131
        if err != nil {
23✔
132
                return nil, err
×
133
        }
×
134

135
        return c, nil
23✔
136
}
137

138
func (c *MigrationSourceController) hasTargetDetectedReadyDomain(vmi *v1.VirtualMachineInstance) (bool, int64) {
7✔
139
        // give the target node 60 seconds to discover the libvirt domain via the domain informer
7✔
140
        // before allowing the VMI to be processed. This closes the gap between the
7✔
141
        // VMI's status getting updated to reflect the new source node, and the domain
7✔
142
        // informer firing the event to alert the source node of the new domain.
7✔
143
        migrationTargetDelayTimeout := 60
7✔
144

7✔
145
        if vmi.Status.MigrationState == nil ||
7✔
146
                vmi.Status.MigrationState.EndTimestamp == nil {
14✔
147
                return false, int64(migrationTargetDelayTimeout)
7✔
148
        }
7✔
149
        if vmi.Status.MigrationState != nil &&
×
150
                vmi.Status.MigrationState.TargetState != nil &&
×
151
                vmi.Status.MigrationState.TargetState.DomainDetected &&
×
152
                vmi.Status.MigrationState.TargetState.DomainReadyTimestamp != nil {
×
153

×
154
                return true, 0
×
155
        }
×
156

157
        nowUnix := time.Now().UTC().Unix()
×
158
        migrationEndUnix := vmi.Status.MigrationState.EndTimestamp.Time.UTC().Unix()
×
159

×
160
        diff := nowUnix - migrationEndUnix
×
161

×
162
        if diff > int64(migrationTargetDelayTimeout) {
×
163
                return false, 0
×
164
        }
×
165

166
        timeLeft := int64(migrationTargetDelayTimeout) - diff
×
167

×
168
        enqueueTime := timeLeft
×
169
        if enqueueTime < 5 {
×
170
                enqueueTime = 5
×
171
        }
×
172

173
        // re-enqueue the key to ensure it gets processed again within the right time.
174
        c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), time.Duration(enqueueTime)*time.Second)
×
175

×
176
        return false, timeLeft
×
177
}
178

179
func domainMigrated(domain *api.Domain) bool {
34✔
180
        return domain != nil && domain.Status.Status == api.Shutoff && domain.Status.Reason == api.ReasonMigrated
34✔
181
}
34✔
182

183
func (c *MigrationSourceController) setMigrationProgressStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) {
15✔
184
        if domain == nil ||
15✔
185
                domain.Spec.Metadata.KubeVirt.Migration == nil ||
15✔
186
                vmi.Status.MigrationState == nil ||
15✔
187
                !c.isMigrationSource(vmi) {
28✔
188
                return
13✔
189
        }
13✔
190

191
        migrationMetadata := domain.Spec.Metadata.KubeVirt.Migration
2✔
192
        if migrationMetadata.UID != vmi.Status.MigrationState.MigrationUID {
2✔
193
                return
×
194
        }
×
195
        if migrationMetadata.StartTimestamp != nil {
3✔
196
                vmi.Status.MigrationState.StartTimestamp = migrationMetadata.StartTimestamp
1✔
197
        }
1✔
198

199
        vmi.Status.MigrationState.Failed = migrationMetadata.Failed
2✔
200

2✔
201
        if migrationMetadata.Failed {
3✔
202
                vmi.Status.MigrationState.Completed = true
1✔
203
                vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
1✔
204
                vmi.Status.MigrationState.AbortStatus = v1.MigrationAbortStatus(migrationMetadata.AbortStatus)
1✔
205
                vmi.Status.MigrationState.FailureReason = migrationMetadata.FailureReason
1✔
206
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("VirtualMachineInstance migration uid %s failed. reason:%s", string(migrationMetadata.UID), migrationMetadata.FailureReason))
1✔
207
        }
1✔
208

209
        vmi.Status.MigrationState.Mode = migrationMetadata.Mode
2✔
210
}
211

212
func (c *MigrationSourceController) updateStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
213
        c.setMigrationProgressStatus(vmi, domain)
7✔
214

7✔
215
        // handle migrations differently than normal status updates.
7✔
216
        //
7✔
217
        // When a successful migration is detected, we must transfer ownership of the VMI
7✔
218
        // from the source node (this node) to the target node (node the domain was migrated to).
7✔
219
        //
7✔
220
        // Transfer ownership by...
7✔
221
        // 1. Marking vmi.Status.MigrationState as completed
7✔
222
        // 2. Update the vmi.Status.NodeName to reflect the target node's name
7✔
223
        // 3. Update the VMI's NodeNameLabel annotation to reflect the target node's name
7✔
224
        // 4. Clear the LauncherContainerImageVersion which virt-controller will detect
7✔
225
        //    and accurately based on the version used on the target pod
7✔
226
        //
7✔
227
        // After a migration, the VMI's phase is no longer owned by this node. Only the
7✔
228
        // MigrationState status field is eligible to be mutated.
7✔
229
        migrationHost := ""
7✔
230
        if vmi.Status.MigrationState != nil {
14✔
231
                migrationHost = vmi.Status.MigrationState.TargetNode
7✔
232
        }
7✔
233

234
        targetNodeDetectedDomain, timeLeft := c.hasTargetDetectedReadyDomain(vmi)
7✔
235
        // If we can't detect where the migration went to, then we have no
7✔
236
        // way of transferring ownership. The only option here is to move the
7✔
237
        // vmi to failed.  The cluster vmi controller will then tear down the
7✔
238
        // resulting pods.
7✔
239
        if migrationHost == "" {
7✔
240
                // migrated to unknown host.
×
241
                vmi.Status.Phase = v1.Failed
×
242
                vmi.Status.MigrationState.Completed = true
×
243
                vmi.Status.MigrationState.Failed = true
×
244
                if vmi.Status.MigrationState.EndTimestamp == nil {
×
245
                        vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
246
                }
×
247

248
                c.logger.Object(vmi).Warning("the vmi migrated to an unknown host")
×
249
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance migrated to unknown host."))
×
250
        } else if !targetNodeDetectedDomain {
14✔
251
                if timeLeft <= 0 {
7✔
252
                        vmi.Status.Phase = v1.Failed
×
253
                        vmi.Status.MigrationState.Completed = true
×
254
                        vmi.Status.MigrationState.Failed = true
×
255
                        if vmi.Status.MigrationState.EndTimestamp == nil {
×
256
                                vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
257
                        }
×
258

259
                        c.logger.Object(vmi).Warning("the domain was never observed on the taget after the migration completed within the timeout period")
×
260
                        c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's domain was never observed on the target after the migration completed within the timeout period."))
×
261
                }
262
        }
263

264
        if vmi.Status.Phase == v1.Failed && vmi.IsDecentralizedMigration() {
7✔
265
                vmi.Status.MigrationState.Completed = true
×
266
                vmi.Status.MigrationState.Failed = true
×
267
                c.logger.Object(vmi).Warning("the decentralized migration failed due to the source VMI being failed")
×
268
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's decentralized migration failed due to the source VMI being failed."))
×
269
        }
×
270

271
        if targetNodeDetectedDomain && vmi.IsDecentralizedMigration() && vmi.Status.MigrationState != nil && vmi.Status.MigrationState.Completed {
7✔
272
                c.logger.Object(vmi).V(2).Infof("decentralized migration completed successfully, marking VMI as succeeded")
×
273
                // this is a decentralized migration, and the migration completed successfully, we need to mark the VMI as succeeded
×
274
                vmi.Status.Phase = v1.Succeeded
×
275
        }
×
276

277
        return nil
7✔
278
}
279

280
func (c *MigrationSourceController) Run(threadiness int, stopCh chan struct{}) {
×
281
        defer c.queue.ShutDown()
×
282
        c.logger.Info("Starting virt-handler source controller.")
×
283

×
284
        cache.WaitForCacheSync(stopCh, c.hasSynced)
×
285

×
286
        // queue keys for previous Domains on the host that no longer exist
×
287
        // in the cache. This ensures we perform local cleanup of deleted VMs.
×
288
        for _, domain := range c.domainStore.List() {
×
289
                d := domain.(*api.Domain)
×
290
                vmiRef := v1.NewVMIReferenceWithUUID(
×
291
                        d.ObjectMeta.Namespace,
×
292
                        d.ObjectMeta.Name,
×
293
                        d.Spec.Metadata.KubeVirt.UID)
×
294

×
295
                key := controller.VirtualMachineInstanceKey(vmiRef)
×
296

×
297
                _, exists, _ := c.vmiStore.GetByKey(key)
×
298
                if !exists {
×
299
                        c.queue.Add(key)
×
300
                }
×
301
        }
302

303
        // Start the actual work
304
        for i := 0; i < threadiness; i++ {
×
305
                go wait.Until(c.runWorker, time.Second, stopCh)
×
306
        }
×
307

308
        <-stopCh
×
309
        c.logger.Info("Stopping virt-handler source controller.")
×
310
}
311

312
func (c *MigrationSourceController) runWorker() {
×
313
        for c.Execute() {
×
314
        }
×
315
}
316

317
func (c *MigrationSourceController) Execute() bool {
7✔
318
        key, quit := c.queue.Get()
7✔
319
        if quit {
7✔
320
                return false
×
321
        }
×
322
        defer c.queue.Done(key)
7✔
323
        if err := c.execute(key); err != nil {
7✔
324
                c.logger.Reason(err).Infof("re-enqueuing VirtualMachineInstance %v", key)
×
325
                c.queue.AddRateLimited(key)
×
326
        } else {
7✔
327
                c.logger.V(4).Infof("processed VirtualMachineInstance %v", key)
7✔
328
                c.queue.Forget(key)
7✔
329
        }
7✔
330
        return true
7✔
331
}
332

333
func (c *MigrationSourceController) sync(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
334
        if domain != nil {
14✔
335
                c.logger.Object(vmi).Infof("VMI is in phase: %v | Domain status: %v, reason: %v", vmi.Status.Phase, domain.Status.Status, domain.Status.Reason)
7✔
336
        } else {
7✔
337
                c.logger.Object(vmi).Infof("VMI is in phase: %v", vmi.Status.Phase)
×
338
        }
×
339

340
        oldStatus := vmi.Status.DeepCopy()
7✔
341

7✔
342
        syncErr := c.processVMI(vmi, domain)
7✔
343

7✔
344
        if syncErr != nil {
7✔
345
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.SyncFailed.String(), syncErr.Error())
×
346
                // `syncErr` will be propagated anyway, and it will be logged in `re-enqueueing`
×
347
                // so there is no need to log it twice in hot path without increased verbosity.
×
348
                c.logger.Object(vmi).Reason(syncErr).Error("Synchronizing the VirtualMachineInstance failed.")
×
349
        }
×
350

351
        updateErr := c.updateStatus(vmi, domain)
7✔
352

7✔
353
        if updateErr != nil {
7✔
354
                c.logger.Object(vmi).Reason(updateErr).Error("Updating the migration status failed.")
×
355
        }
×
356

357
        netUpdateErr := c.netStat.UpdateStatus(vmi, domain)
7✔
358

7✔
359
        if netUpdateErr != nil {
7✔
360
                log.Log.Object(vmi).Reason(updateErr).Error("Updating network interfaces status failed.")
×
361
        }
×
362

363
        // update the VMI if necessary
364
        if !equality.Semantic.DeepEqual(*oldStatus, vmi.Status) {
9✔
365
                key := controller.VirtualMachineInstanceKey(vmi)
2✔
366
                c.vmiExpectations.SetExpectations(key, 1, 0)
2✔
367
                _, err := c.clientset.VirtualMachineInstance(vmi.ObjectMeta.Namespace).Update(context.Background(), vmi, metav1.UpdateOptions{})
2✔
368
                if err != nil {
2✔
369
                        c.vmiExpectations.SetExpectations(key, 0, 0)
×
370
                        return err
×
371
                }
×
372
                metrics.VMISynced(vmi.Namespace, vmi.Name)
2✔
373
        }
374

375
        if syncErr != nil {
7✔
376
                return syncErr
×
377
        }
×
378

379
        if updateErr != nil {
7✔
380
                return updateErr
×
381
        }
×
382

383
        if netUpdateErr != nil {
7✔
384
                return netUpdateErr
×
385
        }
×
386

387
        c.logger.Object(vmi).V(4).Info("Source synchronization loop succeeded.")
7✔
388

7✔
389
        return nil
7✔
390
}
391

392
func (c *MigrationSourceController) execute(key string) error {
7✔
393
        vmi, vmiExists, err := c.getVMIFromCache(key)
7✔
394
        if err != nil {
7✔
395
                return err
×
396
        }
×
397

398
        if !vmiExists || ((vmi.IsDecentralizedMigration() && vmi.Status.Phase == v1.Succeeded) ||
7✔
399
                !vmi.IsDecentralizedMigration() && vmi.IsFinal()) ||
7✔
400
                vmi.DeletionTimestamp != nil {
7✔
401
                c.logger.V(4).Infof("vmi for key %v is terminating, succeeded or does not exists", key)
×
402
                if !vmiExists {
×
403
                        metrics.ResetVMISync(key)
×
404
                }
×
405
                return nil
×
406
        }
407

408
        if !c.vmiExpectations.SatisfiedExpectations(key) {
7✔
409
                c.logger.V(4).Object(vmi).Info("waiting for expectations to be satisfied")
×
410
                return nil
×
411
        }
×
412

413
        domain, domainExists, _, err := c.getDomainFromCache(key)
7✔
414
        if err != nil {
7✔
415
                return err
×
416
        }
×
417

418
        if domainExists && domain.Spec.Metadata.KubeVirt.UID != vmi.UID {
7✔
419
                c.logger.V(4).Object(vmi).Infof("Detected stale vmi %s that still needs cleanup before new vmi with identical name/namespace can be processed", vmi.UID)
×
420
                return nil
×
421
        }
×
422

423
        if vmi.Status.MigrationState == nil {
7✔
424
                c.logger.V(4).Object(vmi).Info("no migration is in progress")
×
425
                return nil
×
426
        }
×
427

428
        // post migration clean up
429
        if isMigrationDone(vmi.Status.MigrationState) {
7✔
430
                c.migrationProxy.StopSourceListener(string(vmi.UID))
×
431
                return nil
×
432
        }
×
433

434
        if !c.isMigrationSource(vmi) {
7✔
435
                c.logger.Object(vmi).V(4).Info("not a migration source")
×
436
                return nil
×
437
        }
×
438

439
        return c.sync(vmi.DeepCopy(), domain)
7✔
440
}
441

442
func (c *MigrationSourceController) isMigrationSource(vmi *v1.VirtualMachineInstance) bool {
13✔
443
        if vmi.IsDecentralizedMigration() {
13✔
444
                return vmi.Status.MigrationState != nil &&
×
445
                        vmi.Status.MigrationState.SourceNode == c.host &&
×
446
                        vmi.IsMigrationSource()
×
447
        }
×
448
        return vmi.Status.MigrationState != nil &&
13✔
449
                vmi.Status.NodeName == c.host &&
13✔
450
                vmi.Status.MigrationState.SourceNode == c.host
13✔
451
}
452

453
func (c *MigrationSourceController) handleSourceMigrationProxy(vmi *v1.VirtualMachineInstance) error {
6✔
454

6✔
455
        res, err := c.podIsolationDetector.Detect(vmi)
6✔
456
        if err != nil {
6✔
457
                return err
×
458
        }
×
459
        // the migration-proxy is no longer shared via host mount, so we
460
        // pass in the virt-launcher's baseDir to reach the unix sockets.
461
        baseDir := fmt.Sprintf(filepath.Join(c.virtLauncherFSRunDirPattern, "kubevirt"), res.Pid())
6✔
462
        if vmi.Status.MigrationState.TargetDirectMigrationNodePorts == nil {
6✔
463
                return errWaitingForTargetPorts
×
464
        }
×
465

466
        err = c.migrationProxy.StartSourceListener(
6✔
467
                string(vmi.UID),
6✔
468
                vmi.Status.MigrationState.TargetNodeAddress,
6✔
469
                vmi.Status.MigrationState.TargetDirectMigrationNodePorts,
6✔
470
                baseDir,
6✔
471
        )
6✔
472
        if err != nil {
6✔
473
                return err
×
474
        }
×
475

476
        return nil
6✔
477
}
478

479
func (c *MigrationSourceController) migrateVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
480
        shouldReturn, err := c.checkLauncherClient(vmi)
7✔
481
        if shouldReturn {
7✔
482
                return err
×
483
        }
×
484

485
        client, err := c.launcherClients.GetLauncherClient(vmi)
7✔
486
        if err != nil {
7✔
487
                return fmt.Errorf(unableCreateVirtLauncherConnectionFmt, err)
×
488
        }
×
489

490
        if vmi.Status.MigrationState.AbortRequested {
8✔
491
                err = c.handleMigrationAbort(vmi, domain, client)
1✔
492
                return err
1✔
493
        }
1✔
494

495
        if isMigrationInProgress(vmi, domain) {
6✔
496
                // we already started this migration, no need to rerun this
×
497
                c.logger.Object(vmi).V(4).Infof("migration %s has already been started", vmi.Status.MigrationState.MigrationUID)
×
498
                return nil
×
499
        }
×
500

501
        err = c.handleSourceMigrationProxy(vmi)
6✔
502
        if errors.Is(err, errWaitingForTargetPorts) {
6✔
503
                c.logger.Object(vmi).V(4).Info("waiting for target node to publish migration ports")
×
504
                c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), 1*time.Second)
×
505
                return nil
×
506
        } else if err != nil {
6✔
507
                return fmt.Errorf("failed to handle migration proxy: %v", err)
×
508
        }
×
509

510
        var migrationConfiguration *v1.MigrationConfiguration
6✔
511
        if vmi.Status.MigrationState.MigrationConfiguration == nil {
11✔
512
                migrationConfiguration = c.clusterConfig.GetMigrationConfiguration()
5✔
513
        } else {
6✔
514
                migrationConfiguration = vmi.Status.MigrationState.MigrationConfiguration.DeepCopy()
1✔
515
        }
1✔
516

517
        // This check is only for backward compatibility.
518
        // During upgrade, AllowWorkloadDisruption could be nil since the migration controller is
519
        // updated later the virt-handler.
520
        // This check can be removed in future
521
        if migrationConfiguration.AllowWorkloadDisruption == nil {
7✔
522
                migrationConfiguration.AllowWorkloadDisruption = pointer.P(*migrationConfiguration.AllowPostCopy)
1✔
523
        }
1✔
524

525
        options := &cmdclient.MigrationOptions{
6✔
526
                Bandwidth:               *migrationConfiguration.BandwidthPerMigration,
6✔
527
                ProgressTimeout:         *migrationConfiguration.ProgressTimeout,
6✔
528
                CompletionTimeoutPerGiB: *migrationConfiguration.CompletionTimeoutPerGiB,
6✔
529
                UnsafeMigration:         *migrationConfiguration.UnsafeMigrationOverride,
6✔
530
                AllowAutoConverge:       *migrationConfiguration.AllowAutoConverge,
6✔
531
                AllowPostCopy:           *migrationConfiguration.AllowPostCopy,
6✔
532
                AllowWorkloadDisruption: *migrationConfiguration.AllowWorkloadDisruption,
6✔
533
        }
6✔
534

6✔
535
        configureParallelMigrationThreads(options, vmi)
6✔
536

6✔
537
        marshalledOptions, err := json.Marshal(options)
6✔
538
        if err != nil {
6✔
539
                c.logger.Object(vmi).Warning("failed to marshall matched migration options")
×
540
        } else {
6✔
541
                c.logger.Object(vmi).Infof("migration options matched for vmi %s: %s", vmi.Name, string(marshalledOptions))
6✔
542
        }
6✔
543

544
        vmiCopy := vmi.DeepCopy()
6✔
545
        err = hostdisk.ReplacePVCByHostDisk(vmiCopy)
6✔
546
        if err != nil {
6✔
547
                return err
×
548
        }
×
549

550
        if c.clusterConfig.PasstBindingEnabled() {
12✔
551
                if err = c.passtRepairHandler.HandleMigrationSource(vmi, c.passtSocketDirOnHostForVMI); err != nil {
6✔
552
                        c.logger.Object(vmi).Warningf("failed to call passt-repair for migration source, %v", err)
×
553
                }
×
554
        }
555

556
        err = client.MigrateVirtualMachine(vmiCopy, options)
6✔
557
        if err != nil {
6✔
558
                return err
×
559
        }
×
560
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIMigrating)
6✔
561
        return nil
6✔
562
}
563

564
func isMigrationDone(state *v1.VirtualMachineInstanceMigrationState) bool {
7✔
565
        return state == nil || (state.EndTimestamp != nil && (state.Completed || state.Failed))
7✔
566
}
7✔
567

568
func (c *MigrationSourceController) processVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
569
        domainAlive := domain != nil &&
7✔
570
                domain.Status.Status != api.Shutoff &&
7✔
571
                domain.Status.Status != api.Crashed &&
7✔
572
                domain.Status.Status != ""
7✔
573

7✔
574
        if !domainAlive {
7✔
575
                c.logger.V(4).Object(vmi).Info("domain is not alive")
×
576
                return nil
×
577
        }
×
578

579
        return c.migrateVMI(vmi, domain)
7✔
580
}
581

582
func (c *MigrationSourceController) addFunc(obj interface{}) {
×
583
        key, err := controller.KeyFunc(obj)
×
584
        if err == nil {
×
585
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
586
                c.queue.Add(key)
×
587
        }
×
588
}
589

590
func (c *MigrationSourceController) deleteFunc(obj interface{}) {
×
591
        key, err := controller.KeyFunc(obj)
×
592
        if err == nil {
×
593
                c.queue.Add(key)
×
594
        }
×
595
}
596

597
func (c *MigrationSourceController) updateFunc(_, new interface{}) {
×
598
        key, err := controller.KeyFunc(new)
×
599
        if err == nil {
×
600
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
601
                c.queue.Add(key)
×
602
        }
×
603
}
604

605
func (c *MigrationSourceController) addDeleteDomainFunc(obj interface{}) {
×
606
        key, err := controller.KeyFunc(obj)
×
607
        if err == nil {
×
608
                c.queue.Add(key)
×
609
        }
×
610
}
611

612
func (c *MigrationSourceController) updateDomainFunc(_, new interface{}) {
×
613
        key, err := controller.KeyFunc(new)
×
614
        if err == nil {
×
615
                c.queue.Add(key)
×
616
        }
×
617
}
618

619
func (c *MigrationSourceController) handleMigrationAbort(vmi *v1.VirtualMachineInstance, domain *api.Domain, client cmdclient.LauncherClient) error {
9✔
620
        // Check both the VMI status and the domain metadata to avoid redundant cancel RPCs.
9✔
621
        // The domain metadata reflects the launcher's abort status before the API server round-trip.
9✔
622
        abortHandled := func(status v1.MigrationAbortStatus) bool {
22✔
623
                return status == v1.MigrationAbortInProgress || status == v1.MigrationAbortSucceeded
13✔
624
        }
13✔
625
        if abortHandled(vmi.Status.MigrationState.AbortStatus) {
11✔
626
                return nil
2✔
627
        }
2✔
628
        if domain != nil && domain.Spec.Metadata.KubeVirt.Migration != nil &&
7✔
629
                abortHandled(v1.MigrationAbortStatus(domain.Spec.Metadata.KubeVirt.Migration.AbortStatus)) {
9✔
630
                return nil
2✔
631
        }
2✔
632

633
        if err := client.CancelVirtualMachineMigration(vmi); err != nil {
6✔
634
                return err
1✔
635
        }
1✔
636

637
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIAbortingMigration)
4✔
638
        return nil
4✔
639
}
640

641
func configureParallelMigrationThreads(options *cmdclient.MigrationOptions, vm *v1.VirtualMachineInstance) {
6✔
642
        // When the CPU is limited, there's a risk of the migration threads choking the CPU resources on the compute container.
6✔
643
        // For this reason, we will avoid configuring migration threads in such scenarios.
6✔
644
        if cpuLimit, cpuLimitExists := vm.Spec.Domain.Resources.Limits[k8sv1.ResourceCPU]; cpuLimitExists && !cpuLimit.IsZero() {
7✔
645
                return
1✔
646
        }
1✔
647
        // Dedicated CPUs are also fully allocating each vCPU for compute.
648
        if vm.IsCPUDedicated() {
6✔
649
                return
1✔
650
        }
1✔
651

652
        options.ParallelMigrationThreads = pointer.P(parallelMultifdMigrationThreads)
4✔
653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc