• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 00f46fce-88bf-4b60-b102-aff6823f14a1

25 Jun 2026 11:32PM UTC coverage: 72.015% (+0.02%) from 71.991%
00f46fce-88bf-4b60-b102-aff6823f14a1

push

prow

web-flow
Merge pull request #18109 from nestoracunablanco/feat/skipGPUDevices

[VEP-10] feat: skip GPU device validation when they are claimed by DRA

6 of 6 new or added lines in 1 file covered. (100.0%)

311 existing lines in 6 files now uncovered.

81871 of 113686 relevant lines covered (72.02%)

557.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.58
/pkg/virt-handler/migration-source.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package virthandler
21

22
import (
23
        "context"
24
        "encoding/json"
25
        "errors"
26
        "fmt"
27
        "path/filepath"
28
        "time"
29

30
        k8sv1 "k8s.io/api/core/v1"
31
        "k8s.io/apimachinery/pkg/api/equality"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/util/wait"
34
        "k8s.io/client-go/tools/cache"
35
        "k8s.io/client-go/tools/record"
36
        "k8s.io/client-go/util/workqueue"
37
        v1 "kubevirt.io/api/core/v1"
38
        "kubevirt.io/client-go/kubecli"
39
        "kubevirt.io/client-go/log"
40

41
        pluginv1alpha1 "kubevirt.io/api/plugin/v1alpha1"
42

43
        "kubevirt.io/kubevirt/pkg/controller"
44
        hostdisk "kubevirt.io/kubevirt/pkg/host-disk"
45
        "kubevirt.io/kubevirt/pkg/hypervisor"
46
        metrics "kubevirt.io/kubevirt/pkg/monitoring/metrics/common/vmisync"
47
        "kubevirt.io/kubevirt/pkg/pointer"
48
        migrationsutil "kubevirt.io/kubevirt/pkg/util/migrations"
49
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
50
        cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
51
        "kubevirt.io/kubevirt/pkg/virt-handler/isolation"
52
        launcherclients "kubevirt.io/kubevirt/pkg/virt-handler/launcher-clients"
53
        migrationproxy "kubevirt.io/kubevirt/pkg/virt-handler/migration-proxy"
54
        "kubevirt.io/kubevirt/pkg/virt-handler/plugins"
55
        "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
56
)
57

58
var errWaitingForTargetPorts = errors.New("waiting for target to publish migration ports")
59

60
type passtRepairSourceHandler interface {
61
        HandleMigrationSource(*v1.VirtualMachineInstance, func(*v1.VirtualMachineInstance) (string, error)) error
62
}
63

64
type MigrationSourceController struct {
65
        *BaseController
66
        pluginExecutor     plugins.NodeHookExecutor
67
        vmiExpectations    *controller.UIDTrackingControllerExpectations
68
        passtRepairHandler passtRepairSourceHandler
69
}
70

71
func NewMigrationSourceController(
72
        recorder record.EventRecorder,
73
        clientset kubecli.KubevirtClient,
74
        host string,
75
        launcherClients launcherclients.LauncherClientsManager,
76
        vmiInformer cache.SharedIndexInformer,
77
        domainInformer cache.SharedInformer,
78
        clusterConfig *virtconfig.ClusterConfig,
79
        podIsolationDetector isolation.PodIsolationDetector,
80
        migrationProxy migrationproxy.ProxyManager,
81
        virtLauncherFSRunDirPattern string,
82
        netStat netstat,
83
        passtRepairHandler passtRepairSourceHandler,
84
        pluginStore cache.Store,
85
        pluginExecutor plugins.NodeHookExecutor,
86
) (*MigrationSourceController, error) {
24✔
87

24✔
88
        queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
24✔
89
                workqueue.DefaultTypedControllerRateLimiter[string](),
24✔
90
                workqueue.TypedRateLimitingQueueConfig[string]{Name: "virt-handler-source"},
24✔
91
        )
24✔
92
        logger := log.Log.With("controller", "migration-source")
24✔
93

24✔
94
        hypervisorName := clusterConfig.GetHypervisor().Name
24✔
95

24✔
96
        baseCtrl, err := NewBaseController(
24✔
97
                logger,
24✔
98
                host,
24✔
99
                recorder,
24✔
100
                clientset,
24✔
101
                queue,
24✔
102
                vmiInformer,
24✔
103
                domainInformer,
24✔
104
                clusterConfig,
24✔
105
                podIsolationDetector,
24✔
106
                launcherClients,
24✔
107
                migrationProxy,
24✔
108
                virtLauncherFSRunDirPattern,
24✔
109
                netStat,
24✔
110
                hypervisor.NewHypervisorNodeInformation(hypervisorName),
24✔
111
                hypervisor.GetVirtRuntime(podIsolationDetector, hypervisorName),
24✔
112
                pluginStore,
24✔
113
        )
24✔
114
        if err != nil {
24✔
115
                return nil, err
×
UNCOV
116
        }
×
117

118
        c := &MigrationSourceController{
24✔
119
                BaseController:     baseCtrl,
24✔
120
                pluginExecutor:     pluginExecutor,
24✔
121
                vmiExpectations:    controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
24✔
122
                passtRepairHandler: passtRepairHandler,
24✔
123
        }
24✔
124

24✔
125
        _, err = vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
24✔
126
                AddFunc:    c.addFunc,
24✔
127
                DeleteFunc: c.deleteFunc,
24✔
128
                UpdateFunc: c.updateFunc,
24✔
129
        })
24✔
130
        if err != nil {
24✔
131
                return nil, err
×
UNCOV
132
        }
×
133

134
        _, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
24✔
135
                AddFunc:    c.addDeleteDomainFunc,
24✔
136
                DeleteFunc: c.addDeleteDomainFunc,
24✔
137
                UpdateFunc: c.updateDomainFunc,
24✔
138
        })
24✔
139
        if err != nil {
24✔
140
                return nil, err
×
UNCOV
141
        }
×
142

143
        return c, nil
24✔
144
}
145

146
func (c *MigrationSourceController) hasTargetDetectedReadyDomain(vmi *v1.VirtualMachineInstance) (bool, int64) {
8✔
147
        // give the target node 60 seconds to discover the libvirt domain via the domain informer
8✔
148
        // before allowing the VMI to be processed. This closes the gap between the
8✔
149
        // VMI's status getting updated to reflect the new source node, and the domain
8✔
150
        // informer firing the event to alert the source node of the new domain.
8✔
151
        migrationTargetDelayTimeout := 60
8✔
152

8✔
153
        if vmi.Status.MigrationState == nil ||
8✔
154
                vmi.Status.MigrationState.EndTimestamp == nil {
16✔
155
                return false, int64(migrationTargetDelayTimeout)
8✔
156
        }
8✔
157
        if vmi.Status.MigrationState != nil &&
×
158
                vmi.Status.MigrationState.TargetState != nil &&
×
159
                vmi.Status.MigrationState.TargetState.DomainDetected &&
×
160
                vmi.Status.MigrationState.TargetState.DomainReadyTimestamp != nil {
×
161

×
162
                return true, 0
×
UNCOV
163
        }
×
164

165
        nowUnix := time.Now().UTC().Unix()
×
166
        migrationEndUnix := vmi.Status.MigrationState.EndTimestamp.Time.UTC().Unix()
×
167

×
168
        diff := nowUnix - migrationEndUnix
×
169

×
170
        if diff > int64(migrationTargetDelayTimeout) {
×
171
                return false, 0
×
UNCOV
172
        }
×
173

174
        timeLeft := int64(migrationTargetDelayTimeout) - diff
×
175

×
176
        enqueueTime := timeLeft
×
177
        if enqueueTime < 5 {
×
178
                enqueueTime = 5
×
UNCOV
179
        }
×
180

181
        // re-enqueue the key to ensure it gets processed again within the right time.
182
        c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), time.Duration(enqueueTime)*time.Second)
×
183

×
UNCOV
184
        return false, timeLeft
×
185
}
186

187
func domainMigrated(domain *api.Domain) bool {
34✔
188
        return domain != nil && domain.Status.Status == api.Shutoff && domain.Status.Reason == api.ReasonMigrated
34✔
189
}
34✔
190

191
func (c *MigrationSourceController) setMigrationProgressStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) {
16✔
192
        if domain == nil ||
16✔
193
                domain.Spec.Metadata.KubeVirt.Migration == nil ||
16✔
194
                vmi.Status.MigrationState == nil ||
16✔
195
                !c.isMigrationSource(vmi) {
30✔
196
                return
14✔
197
        }
14✔
198

199
        migrationMetadata := domain.Spec.Metadata.KubeVirt.Migration
2✔
200
        if migrationMetadata.UID != vmi.Status.MigrationState.MigrationUID {
2✔
201
                return
×
UNCOV
202
        }
×
203
        if migrationMetadata.StartTimestamp != nil {
3✔
204
                vmi.Status.MigrationState.StartTimestamp = migrationMetadata.StartTimestamp
1✔
205
        }
1✔
206

207
        vmi.Status.MigrationState.Failed = migrationMetadata.Failed
2✔
208

2✔
209
        if migrationMetadata.Failed {
3✔
210
                vmi.Status.MigrationState.Completed = true
1✔
211
                vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
1✔
212
                vmi.Status.MigrationState.AbortStatus = v1.MigrationAbortStatus(migrationMetadata.AbortStatus)
1✔
213
                vmi.Status.MigrationState.FailureReason = migrationMetadata.FailureReason
1✔
214
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("VirtualMachineInstance migration uid %s failed. reason:%s", string(migrationMetadata.UID), migrationMetadata.FailureReason))
1✔
215
        }
1✔
216

217
        vmi.Status.MigrationState.Mode = migrationMetadata.Mode
2✔
218
}
219

220
func (c *MigrationSourceController) updateStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
221
        c.setMigrationProgressStatus(vmi, domain)
8✔
222

8✔
223
        // handle migrations differently than normal status updates.
8✔
224
        //
8✔
225
        // When a successful migration is detected, we must transfer ownership of the VMI
8✔
226
        // from the source node (this node) to the target node (node the domain was migrated to).
8✔
227
        //
8✔
228
        // Transfer ownership by...
8✔
229
        // 1. Marking vmi.Status.MigrationState as completed
8✔
230
        // 2. Update the vmi.Status.NodeName to reflect the target node's name
8✔
231
        // 3. Update the VMI's NodeNameLabel annotation to reflect the target node's name
8✔
232
        // 4. Clear the LauncherContainerImageVersion which virt-controller will detect
8✔
233
        //    and accurately based on the version used on the target pod
8✔
234
        //
8✔
235
        // After a migration, the VMI's phase is no longer owned by this node. Only the
8✔
236
        // MigrationState status field is eligible to be mutated.
8✔
237
        migrationHost := ""
8✔
238
        if vmi.Status.MigrationState != nil {
16✔
239
                migrationHost = vmi.Status.MigrationState.TargetNode
8✔
240
        }
8✔
241

242
        targetNodeDetectedDomain, timeLeft := c.hasTargetDetectedReadyDomain(vmi)
8✔
243
        // If we can't detect where the migration went to, then we have no
8✔
244
        // way of transferring ownership. The only option here is to move the
8✔
245
        // vmi to failed.  The cluster vmi controller will then tear down the
8✔
246
        // resulting pods.
8✔
247
        if migrationHost == "" {
8✔
248
                // migrated to unknown host.
×
249
                vmi.Status.Phase = v1.Failed
×
250
                vmi.Status.MigrationState.Completed = true
×
251
                vmi.Status.MigrationState.Failed = true
×
252
                if vmi.Status.MigrationState.EndTimestamp == nil {
×
253
                        vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
UNCOV
254
                }
×
255

256
                c.logger.Object(vmi).Warning("the vmi migrated to an unknown host")
×
UNCOV
257
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance migrated to unknown host."))
×
258
        } else if !targetNodeDetectedDomain {
16✔
259
                if timeLeft <= 0 {
8✔
260
                        vmi.Status.Phase = v1.Failed
×
261
                        vmi.Status.MigrationState.Completed = true
×
262
                        vmi.Status.MigrationState.Failed = true
×
263
                        if vmi.Status.MigrationState.EndTimestamp == nil {
×
264
                                vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
UNCOV
265
                        }
×
266

267
                        c.logger.Object(vmi).Warning("the domain was never observed on the taget after the migration completed within the timeout period")
×
UNCOV
268
                        c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's domain was never observed on the target after the migration completed within the timeout period."))
×
269
                }
270
        }
271

272
        if vmi.Status.Phase == v1.Failed && vmi.IsDecentralizedMigration() {
8✔
273
                vmi.Status.MigrationState.Completed = true
×
274
                vmi.Status.MigrationState.Failed = true
×
275
                c.logger.Object(vmi).Warning("the decentralized migration failed due to the source VMI being failed")
×
276
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's decentralized migration failed due to the source VMI being failed."))
×
UNCOV
277
        }
×
278

279
        if targetNodeDetectedDomain && vmi.IsDecentralizedMigration() && vmi.Status.MigrationState != nil && vmi.Status.MigrationState.Completed {
8✔
280
                c.logger.Object(vmi).V(2).Infof("decentralized migration completed successfully, marking VMI as succeeded")
×
281
                // this is a decentralized migration, and the migration completed successfully, we need to mark the VMI as succeeded
×
282
                vmi.Status.Phase = v1.Succeeded
×
UNCOV
283
        }
×
284

285
        return nil
8✔
286
}
287

288
func (c *MigrationSourceController) Run(threadiness int, stopCh chan struct{}) {
×
289
        defer c.queue.ShutDown()
×
290
        c.logger.Info("Starting virt-handler source controller.")
×
291

×
292
        cache.WaitForCacheSync(stopCh, c.hasSynced)
×
293

×
294
        // queue keys for previous Domains on the host that no longer exist
×
295
        // in the cache. This ensures we perform local cleanup of deleted VMs.
×
296
        for _, domain := range c.domainStore.List() {
×
297
                d := domain.(*api.Domain)
×
298
                vmiRef := v1.NewVMIReferenceWithUUID(
×
299
                        d.ObjectMeta.Namespace,
×
300
                        d.ObjectMeta.Name,
×
301
                        d.Spec.Metadata.KubeVirt.UID)
×
302

×
303
                key := controller.VirtualMachineInstanceKey(vmiRef)
×
304

×
305
                _, exists, _ := c.vmiStore.GetByKey(key)
×
306
                if !exists {
×
307
                        c.queue.Add(key)
×
UNCOV
308
                }
×
309
        }
310

311
        // Start the actual work
312
        for i := 0; i < threadiness; i++ {
×
313
                go wait.Until(c.runWorker, time.Second, stopCh)
×
UNCOV
314
        }
×
315

316
        <-stopCh
×
UNCOV
317
        c.logger.Info("Stopping virt-handler source controller.")
×
318
}
319

320
func (c *MigrationSourceController) runWorker() {
×
321
        for c.Execute() {
×
UNCOV
322
        }
×
323
}
324

325
func (c *MigrationSourceController) Execute() bool {
8✔
326
        key, quit := c.queue.Get()
8✔
327
        if quit {
8✔
328
                return false
×
UNCOV
329
        }
×
330
        defer c.queue.Done(key)
8✔
331
        if err := c.execute(key); err != nil {
8✔
332
                c.logger.Reason(err).Infof("re-enqueuing VirtualMachineInstance %v", key)
×
UNCOV
333
                c.queue.AddRateLimited(key)
×
334
        } else {
8✔
335
                c.logger.V(4).Infof("processed VirtualMachineInstance %v", key)
8✔
336
                c.queue.Forget(key)
8✔
337
        }
8✔
338
        return true
8✔
339
}
340

341
func (c *MigrationSourceController) sync(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
342
        if domain != nil {
16✔
343
                c.logger.Object(vmi).Infof("VMI is in phase: %v | Domain status: %v, reason: %v", vmi.Status.Phase, domain.Status.Status, domain.Status.Reason)
8✔
344
        } else {
8✔
345
                c.logger.Object(vmi).Infof("VMI is in phase: %v", vmi.Status.Phase)
×
UNCOV
346
        }
×
347

348
        oldStatus := vmi.Status.DeepCopy()
8✔
349

8✔
350
        syncErr := c.processVMI(vmi, domain)
8✔
351

8✔
352
        if syncErr != nil {
8✔
353
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.SyncFailed.String(), syncErr.Error())
×
354
                // `syncErr` will be propagated anyway, and it will be logged in `re-enqueueing`
×
355
                // so there is no need to log it twice in hot path without increased verbosity.
×
356
                c.logger.Object(vmi).Reason(syncErr).Error("Synchronizing the VirtualMachineInstance failed.")
×
UNCOV
357
        }
×
358

359
        updateErr := c.updateStatus(vmi, domain)
8✔
360

8✔
361
        if updateErr != nil {
8✔
362
                c.logger.Object(vmi).Reason(updateErr).Error("Updating the migration status failed.")
×
UNCOV
363
        }
×
364

365
        netUpdateErr := c.netStat.UpdateStatus(vmi, domain)
8✔
366

8✔
367
        if netUpdateErr != nil {
8✔
368
                log.Log.Object(vmi).Reason(updateErr).Error("Updating network interfaces status failed.")
×
UNCOV
369
        }
×
370

371
        // update the VMI if necessary
372
        if !equality.Semantic.DeepEqual(*oldStatus, vmi.Status) {
11✔
373
                key := controller.VirtualMachineInstanceKey(vmi)
3✔
374
                c.vmiExpectations.SetExpectations(key, 1, 0)
3✔
375
                _, err := c.clientset.VirtualMachineInstance(vmi.ObjectMeta.Namespace).Update(context.Background(), vmi, metav1.UpdateOptions{})
3✔
376
                if err != nil {
3✔
377
                        c.vmiExpectations.SetExpectations(key, 0, 0)
×
378
                        return err
×
UNCOV
379
                }
×
380
                metrics.VMISynced(vmi.Namespace, vmi.Name)
3✔
381
        }
382

383
        if syncErr != nil {
8✔
384
                return syncErr
×
UNCOV
385
        }
×
386

387
        if updateErr != nil {
8✔
388
                return updateErr
×
UNCOV
389
        }
×
390

391
        if netUpdateErr != nil {
8✔
392
                return netUpdateErr
×
UNCOV
393
        }
×
394

395
        c.logger.Object(vmi).V(4).Info("Source synchronization loop succeeded.")
8✔
396

8✔
397
        return nil
8✔
398
}
399

400
func (c *MigrationSourceController) execute(key string) error {
8✔
401
        vmi, vmiExists, err := c.getVMIFromCache(key)
8✔
402
        if err != nil {
8✔
403
                return err
×
UNCOV
404
        }
×
405

406
        if !vmiExists || ((vmi.IsDecentralizedMigration() && vmi.Status.Phase == v1.Succeeded) ||
8✔
407
                !vmi.IsDecentralizedMigration() && vmi.IsFinal()) ||
8✔
408
                vmi.DeletionTimestamp != nil {
8✔
409
                c.logger.V(4).Infof("vmi for key %v is terminating, succeeded or does not exists", key)
×
410
                if !vmiExists {
×
411
                        metrics.ResetVMISync(key)
×
412
                }
×
UNCOV
413
                return nil
×
414
        }
415

416
        if !c.vmiExpectations.SatisfiedExpectations(key) {
8✔
417
                c.logger.V(4).Object(vmi).Info("waiting for expectations to be satisfied")
×
418
                return nil
×
UNCOV
419
        }
×
420

421
        domain, domainExists, _, err := c.getDomainFromCache(key)
8✔
422
        if err != nil {
8✔
423
                return err
×
UNCOV
424
        }
×
425

426
        if domainExists && domain.Spec.Metadata.KubeVirt.UID != vmi.UID {
8✔
427
                c.logger.V(4).Object(vmi).Infof("Detected stale vmi %s that still needs cleanup before new vmi with identical name/namespace can be processed", vmi.UID)
×
428
                return nil
×
UNCOV
429
        }
×
430

431
        if vmi.Status.MigrationState == nil {
8✔
432
                c.logger.V(4).Object(vmi).Info("no migration is in progress")
×
433
                return nil
×
UNCOV
434
        }
×
435

436
        // post migration clean up
437
        if isMigrationDone(vmi.Status.MigrationState) {
8✔
438
                c.migrationProxy.StopSourceListener(string(vmi.UID))
×
439
                return nil
×
UNCOV
440
        }
×
441

442
        if !c.isMigrationSource(vmi) {
8✔
443
                c.logger.Object(vmi).V(4).Info("not a migration source")
×
444
                return nil
×
UNCOV
445
        }
×
446

447
        return c.sync(vmi.DeepCopy(), domain)
8✔
448
}
449

450
func (c *MigrationSourceController) isMigrationSource(vmi *v1.VirtualMachineInstance) bool {
14✔
451
        if vmi.IsDecentralizedMigration() {
14✔
452
                return vmi.Status.MigrationState != nil &&
×
453
                        vmi.Status.MigrationState.SourceNode == c.host &&
×
454
                        vmi.IsMigrationSource()
×
UNCOV
455
        }
×
456
        return vmi.Status.MigrationState != nil &&
14✔
457
                vmi.Status.NodeName == c.host &&
14✔
458
                vmi.Status.MigrationState.SourceNode == c.host
14✔
459
}
460

461
func (c *MigrationSourceController) handleSourceMigrationProxy(vmi *v1.VirtualMachineInstance) error {
7✔
462

7✔
463
        res, err := c.podIsolationDetector.Detect(vmi)
7✔
464
        if err != nil {
7✔
465
                return err
×
UNCOV
466
        }
×
467
        // the migration-proxy is no longer shared via host mount, so we
468
        // pass in the virt-launcher's baseDir to reach the unix sockets.
469
        baseDir := fmt.Sprintf(filepath.Join(c.virtLauncherFSRunDirPattern, "kubevirt"), res.Pid())
7✔
470
        if vmi.Status.MigrationState.TargetDirectMigrationNodePorts == nil {
7✔
471
                return errWaitingForTargetPorts
×
UNCOV
472
        }
×
473

474
        err = c.migrationProxy.StartSourceListener(
7✔
475
                string(vmi.UID),
7✔
476
                vmi.Status.MigrationState.TargetNodeAddress,
7✔
477
                vmi.Status.MigrationState.TargetDirectMigrationNodePorts,
7✔
478
                baseDir,
7✔
479
        )
7✔
480
        if err != nil {
7✔
481
                return err
×
UNCOV
482
        }
×
483

484
        return nil
7✔
485
}
486

487
func (c *MigrationSourceController) migrateVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
488
        shouldReturn, err := c.checkLauncherClient(vmi)
8✔
489
        if shouldReturn {
8✔
490
                return err
×
UNCOV
491
        }
×
492

493
        client, err := c.launcherClients.GetLauncherClient(vmi)
8✔
494
        if err != nil {
8✔
495
                return fmt.Errorf(unableCreateVirtLauncherConnectionFmt, err)
×
UNCOV
496
        }
×
497

498
        if vmi.Status.MigrationState.AbortRequested {
9✔
499
                err = c.handleMigrationAbort(vmi, domain, client)
1✔
500
                return err
1✔
501
        }
1✔
502

503
        if isMigrationInProgress(vmi, domain) {
7✔
504
                // we already started this migration, no need to rerun this
×
505
                c.logger.Object(vmi).V(4).Infof("migration %s has already been started", vmi.Status.MigrationState.MigrationUID)
×
506
                return nil
×
UNCOV
507
        }
×
508

509
        err = c.handleSourceMigrationProxy(vmi)
7✔
510
        if errors.Is(err, errWaitingForTargetPorts) {
7✔
511
                c.logger.Object(vmi).V(4).Info("waiting for target node to publish migration ports")
×
512
                c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), 1*time.Second)
×
UNCOV
513
                return nil
×
514
        } else if err != nil {
7✔
515
                return fmt.Errorf("failed to handle migration proxy: %v", err)
×
UNCOV
516
        }
×
517

518
        var migrationConfiguration *v1.VMIMConfigurationOptions
7✔
519
        if vmi.Status.MigrationState.VMIMConfigurationOptions == nil {
13✔
520
                migrationConfiguration = migrationsutil.ToVMIMConfigurationOptions(c.clusterConfig.GetMigrationConfiguration())
6✔
521
        } else {
7✔
522
                migrationConfiguration = vmi.Status.MigrationState.VMIMConfigurationOptions.DeepCopy()
1✔
523
        }
1✔
524

525
        // This check is only for backward compatibility.
526
        // During upgrade, AllowWorkloadDisruption could be nil since the migration controller is
527
        // updated later the virt-handler.
528
        // This check can be removed in future
529
        if migrationConfiguration.AllowWorkloadDisruption == nil {
8✔
530
                migrationConfiguration.AllowWorkloadDisruption = pointer.P(*migrationConfiguration.AllowPostCopy)
1✔
531
        }
1✔
532
        if migrationConfiguration.MaxDowntimeMs == nil {
8✔
533
                migrationConfiguration.MaxDowntimeMs = pointer.P(virtconfig.DefaultMigrationMaxDowntimeMs)
1✔
534
        }
1✔
535

536
        options := &cmdclient.MigrationOptions{
7✔
537
                Bandwidth:               *migrationConfiguration.BandwidthPerMigration,
7✔
538
                ProgressTimeout:         *migrationConfiguration.ProgressTimeout,
7✔
539
                CompletionTimeoutPerGiB: *migrationConfiguration.CompletionTimeoutPerGiB,
7✔
540
                MaxDowntimeMs:           *migrationConfiguration.MaxDowntimeMs,
7✔
541
                UnsafeMigration:         *migrationConfiguration.UnsafeMigrationOverride,
7✔
542
                AllowAutoConverge:       *migrationConfiguration.AllowAutoConverge,
7✔
543
                AllowPostCopy:           *migrationConfiguration.AllowPostCopy,
7✔
544
                AllowWorkloadDisruption: *migrationConfiguration.AllowWorkloadDisruption,
7✔
545
                StallDetectionEnabled:   c.clusterConfig.MigrationStallDetectionEnabled(),
7✔
546
        }
7✔
547

7✔
548
        configureParallelMigrationThreads(options, vmi)
7✔
549

7✔
550
        marshalledOptions, err := json.Marshal(options)
7✔
551
        if err != nil {
7✔
UNCOV
552
                c.logger.Object(vmi).Warning("failed to marshall matched migration options")
×
553
        } else {
7✔
554
                c.logger.Object(vmi).Infof("migration options matched for vmi %s: %s", vmi.Name, string(marshalledOptions))
7✔
555
        }
7✔
556

557
        vmiCopy := vmi.DeepCopy()
7✔
558
        err = hostdisk.ReplacePVCByHostDisk(vmiCopy)
7✔
559
        if err != nil {
7✔
560
                return err
×
UNCOV
561
        }
×
562

563
        if c.clusterConfig.PasstBindingEnabled() {
14✔
564
                if err = c.passtRepairHandler.HandleMigrationSource(vmi, c.passtSocketDirOnHostForVMI); err != nil {
7✔
565
                        c.logger.Object(vmi).Warningf("failed to call passt-repair for migration source, %v", err)
×
UNCOV
566
                }
×
567
        }
568

569
        if c.pluginExecutor != nil {
7✔
570
                if err := c.pluginExecutor.CallNodeHooks(pluginv1alpha1.NodeHookPreMigrationSource, vmiCopy, c.host); err != nil {
×
571
                        return err
×
UNCOV
572
                }
×
573
        }
574

575
        err = client.MigrateVirtualMachine(vmiCopy, options)
7✔
576
        if err != nil {
7✔
577
                return err
×
UNCOV
578
        }
×
579
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIMigrating)
7✔
580
        return nil
7✔
581
}
582

583
func isMigrationDone(state *v1.VirtualMachineInstanceMigrationState) bool {
8✔
584
        return state == nil || (state.EndTimestamp != nil && (state.Completed || state.Failed))
8✔
585
}
8✔
586

587
func (c *MigrationSourceController) processVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
588
        domainAlive := domain != nil &&
8✔
589
                domain.Status.Status != api.Shutoff &&
8✔
590
                domain.Status.Status != api.Crashed &&
8✔
591
                domain.Status.Status != ""
8✔
592

8✔
593
        if !domainAlive {
8✔
594
                c.logger.V(4).Object(vmi).Info("domain is not alive")
×
595
                return nil
×
UNCOV
596
        }
×
597

598
        return c.migrateVMI(vmi, domain)
8✔
599
}
600

601
func (c *MigrationSourceController) addFunc(obj interface{}) {
×
602
        key, err := controller.KeyFunc(obj)
×
603
        if err == nil {
×
604
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
605
                c.queue.Add(key)
×
UNCOV
606
        }
×
607
}
608

609
func (c *MigrationSourceController) deleteFunc(obj interface{}) {
×
610
        key, err := controller.KeyFunc(obj)
×
611
        if err == nil {
×
612
                c.queue.Add(key)
×
UNCOV
613
        }
×
614
}
615

616
func (c *MigrationSourceController) updateFunc(_, new interface{}) {
×
617
        key, err := controller.KeyFunc(new)
×
618
        if err == nil {
×
619
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
620
                c.queue.Add(key)
×
UNCOV
621
        }
×
622
}
623

624
func (c *MigrationSourceController) addDeleteDomainFunc(obj interface{}) {
×
625
        key, err := controller.KeyFunc(obj)
×
626
        if err == nil {
×
627
                c.queue.Add(key)
×
UNCOV
628
        }
×
629
}
630

631
func (c *MigrationSourceController) updateDomainFunc(_, new interface{}) {
×
632
        key, err := controller.KeyFunc(new)
×
633
        if err == nil {
×
634
                c.queue.Add(key)
×
UNCOV
635
        }
×
636
}
637

638
func (c *MigrationSourceController) handleMigrationAbort(vmi *v1.VirtualMachineInstance, domain *api.Domain, client cmdclient.LauncherClient) error {
9✔
639
        // Check both the VMI status and the domain metadata to avoid redundant cancel RPCs.
9✔
640
        // The domain metadata reflects the launcher's abort status before the API server round-trip.
9✔
641
        abortHandled := func(status v1.MigrationAbortStatus) bool {
22✔
642
                return status == v1.MigrationAbortInProgress || status == v1.MigrationAbortSucceeded
13✔
643
        }
13✔
644
        if abortHandled(vmi.Status.MigrationState.AbortStatus) {
11✔
645
                return nil
2✔
646
        }
2✔
647
        if domain != nil && domain.Spec.Metadata.KubeVirt.Migration != nil &&
7✔
648
                abortHandled(v1.MigrationAbortStatus(domain.Spec.Metadata.KubeVirt.Migration.AbortStatus)) {
9✔
649
                return nil
2✔
650
        }
2✔
651

652
        if err := client.CancelVirtualMachineMigration(vmi); err != nil {
6✔
653
                return err
1✔
654
        }
1✔
655

656
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIAbortingMigration)
4✔
657
        return nil
4✔
658
}
659

660
func configureParallelMigrationThreads(options *cmdclient.MigrationOptions, vm *v1.VirtualMachineInstance) {
7✔
661
        // When the CPU is limited, there's a risk of the migration threads choking the CPU resources on the compute container.
7✔
662
        // For this reason, we will avoid configuring migration threads in such scenarios.
7✔
663
        if cpuLimit, cpuLimitExists := vm.Spec.Domain.Resources.Limits[k8sv1.ResourceCPU]; cpuLimitExists && !cpuLimit.IsZero() {
8✔
664
                return
1✔
665
        }
1✔
666
        // Dedicated CPUs are also fully allocating each vCPU for compute.
667
        if vm.IsCPUDedicated() {
7✔
668
                return
1✔
669
        }
1✔
670

671
        options.ParallelMigrationThreads = pointer.P(parallelMultifdMigrationThreads)
5✔
672
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc