• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 66091390-c356-4ea4-b0ca-2727d96e4ed3

24 Jun 2026 07:30PM UTC coverage: 71.957% (-0.01%) from 71.969%
66091390-c356-4ea4-b0ca-2727d96e4ed3

push

prow

web-flow
Merge pull request #18001 from iholder101/worktree-vep190-pr5-node-hooks

VEP 190: Plugin node hooks

93 of 153 new or added lines in 6 files covered. (60.78%)

4 existing lines in 1 file now uncovered.

81275 of 112950 relevant lines covered (71.96%)

486.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.58
/pkg/virt-handler/migration-source.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package virthandler
21

22
import (
23
        "context"
24
        "encoding/json"
25
        "errors"
26
        "fmt"
27
        "path/filepath"
28
        "time"
29

30
        k8sv1 "k8s.io/api/core/v1"
31
        "k8s.io/apimachinery/pkg/api/equality"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/util/wait"
34
        "k8s.io/client-go/tools/cache"
35
        "k8s.io/client-go/tools/record"
36
        "k8s.io/client-go/util/workqueue"
37
        v1 "kubevirt.io/api/core/v1"
38
        "kubevirt.io/client-go/kubecli"
39
        "kubevirt.io/client-go/log"
40

41
        pluginv1alpha1 "kubevirt.io/api/plugin/v1alpha1"
42

43
        "kubevirt.io/kubevirt/pkg/controller"
44
        hostdisk "kubevirt.io/kubevirt/pkg/host-disk"
45
        "kubevirt.io/kubevirt/pkg/hypervisor"
46
        metrics "kubevirt.io/kubevirt/pkg/monitoring/metrics/common/vmisync"
47
        "kubevirt.io/kubevirt/pkg/pointer"
48
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
49
        cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
50
        "kubevirt.io/kubevirt/pkg/virt-handler/isolation"
51
        launcherclients "kubevirt.io/kubevirt/pkg/virt-handler/launcher-clients"
52
        migrationproxy "kubevirt.io/kubevirt/pkg/virt-handler/migration-proxy"
53
        "kubevirt.io/kubevirt/pkg/virt-handler/plugins"
54
        "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
55
)
56

57
var errWaitingForTargetPorts = errors.New("waiting for target to publish migration ports")
58

59
type passtRepairSourceHandler interface {
60
        HandleMigrationSource(*v1.VirtualMachineInstance, func(*v1.VirtualMachineInstance) (string, error)) error
61
}
62

63
type MigrationSourceController struct {
64
        *BaseController
65
        pluginExecutor     plugins.NodeHookExecutor
66
        vmiExpectations    *controller.UIDTrackingControllerExpectations
67
        passtRepairHandler passtRepairSourceHandler
68
}
69

70
func NewMigrationSourceController(
71
        recorder record.EventRecorder,
72
        clientset kubecli.KubevirtClient,
73
        host string,
74
        launcherClients launcherclients.LauncherClientsManager,
75
        vmiInformer cache.SharedIndexInformer,
76
        domainInformer cache.SharedInformer,
77
        clusterConfig *virtconfig.ClusterConfig,
78
        podIsolationDetector isolation.PodIsolationDetector,
79
        migrationProxy migrationproxy.ProxyManager,
80
        virtLauncherFSRunDirPattern string,
81
        netStat netstat,
82
        passtRepairHandler passtRepairSourceHandler,
83
        pluginStore cache.Store,
84
        pluginExecutor plugins.NodeHookExecutor,
85
) (*MigrationSourceController, error) {
24✔
86

24✔
87
        queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
24✔
88
                workqueue.DefaultTypedControllerRateLimiter[string](),
24✔
89
                workqueue.TypedRateLimitingQueueConfig[string]{Name: "virt-handler-source"},
24✔
90
        )
24✔
91
        logger := log.Log.With("controller", "migration-source")
24✔
92

24✔
93
        hypervisorName := clusterConfig.GetHypervisor().Name
24✔
94

24✔
95
        baseCtrl, err := NewBaseController(
24✔
96
                logger,
24✔
97
                host,
24✔
98
                recorder,
24✔
99
                clientset,
24✔
100
                queue,
24✔
101
                vmiInformer,
24✔
102
                domainInformer,
24✔
103
                clusterConfig,
24✔
104
                podIsolationDetector,
24✔
105
                launcherClients,
24✔
106
                migrationProxy,
24✔
107
                virtLauncherFSRunDirPattern,
24✔
108
                netStat,
24✔
109
                hypervisor.NewHypervisorNodeInformation(hypervisorName),
24✔
110
                hypervisor.GetVirtRuntime(podIsolationDetector, hypervisorName),
24✔
111
                pluginStore,
24✔
112
        )
24✔
113
        if err != nil {
24✔
114
                return nil, err
×
115
        }
×
116

117
        c := &MigrationSourceController{
24✔
118
                BaseController:     baseCtrl,
24✔
119
                pluginExecutor:     pluginExecutor,
24✔
120
                vmiExpectations:    controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
24✔
121
                passtRepairHandler: passtRepairHandler,
24✔
122
        }
24✔
123

24✔
124
        _, err = vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
24✔
125
                AddFunc:    c.addFunc,
24✔
126
                DeleteFunc: c.deleteFunc,
24✔
127
                UpdateFunc: c.updateFunc,
24✔
128
        })
24✔
129
        if err != nil {
24✔
130
                return nil, err
×
131
        }
×
132

133
        _, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
24✔
134
                AddFunc:    c.addDeleteDomainFunc,
24✔
135
                DeleteFunc: c.addDeleteDomainFunc,
24✔
136
                UpdateFunc: c.updateDomainFunc,
24✔
137
        })
24✔
138
        if err != nil {
24✔
139
                return nil, err
×
140
        }
×
141

142
        return c, nil
24✔
143
}
144

145
func (c *MigrationSourceController) hasTargetDetectedReadyDomain(vmi *v1.VirtualMachineInstance) (bool, int64) {
8✔
146
        // give the target node 60 seconds to discover the libvirt domain via the domain informer
8✔
147
        // before allowing the VMI to be processed. This closes the gap between the
8✔
148
        // VMI's status getting updated to reflect the new source node, and the domain
8✔
149
        // informer firing the event to alert the source node of the new domain.
8✔
150
        migrationTargetDelayTimeout := 60
8✔
151

8✔
152
        if vmi.Status.MigrationState == nil ||
8✔
153
                vmi.Status.MigrationState.EndTimestamp == nil {
16✔
154
                return false, int64(migrationTargetDelayTimeout)
8✔
155
        }
8✔
156
        if vmi.Status.MigrationState != nil &&
×
157
                vmi.Status.MigrationState.TargetState != nil &&
×
158
                vmi.Status.MigrationState.TargetState.DomainDetected &&
×
159
                vmi.Status.MigrationState.TargetState.DomainReadyTimestamp != nil {
×
160

×
161
                return true, 0
×
162
        }
×
163

164
        nowUnix := time.Now().UTC().Unix()
×
165
        migrationEndUnix := vmi.Status.MigrationState.EndTimestamp.Time.UTC().Unix()
×
166

×
167
        diff := nowUnix - migrationEndUnix
×
168

×
169
        if diff > int64(migrationTargetDelayTimeout) {
×
170
                return false, 0
×
171
        }
×
172

173
        timeLeft := int64(migrationTargetDelayTimeout) - diff
×
174

×
175
        enqueueTime := timeLeft
×
176
        if enqueueTime < 5 {
×
177
                enqueueTime = 5
×
178
        }
×
179

180
        // re-enqueue the key to ensure it gets processed again within the right time.
181
        c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), time.Duration(enqueueTime)*time.Second)
×
182

×
183
        return false, timeLeft
×
184
}
185

186
func domainMigrated(domain *api.Domain) bool {
34✔
187
        return domain != nil && domain.Status.Status == api.Shutoff && domain.Status.Reason == api.ReasonMigrated
34✔
188
}
34✔
189

190
func (c *MigrationSourceController) setMigrationProgressStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) {
16✔
191
        if domain == nil ||
16✔
192
                domain.Spec.Metadata.KubeVirt.Migration == nil ||
16✔
193
                vmi.Status.MigrationState == nil ||
16✔
194
                !c.isMigrationSource(vmi) {
30✔
195
                return
14✔
196
        }
14✔
197

198
        migrationMetadata := domain.Spec.Metadata.KubeVirt.Migration
2✔
199
        if migrationMetadata.UID != vmi.Status.MigrationState.MigrationUID {
2✔
200
                return
×
201
        }
×
202
        if migrationMetadata.StartTimestamp != nil {
3✔
203
                vmi.Status.MigrationState.StartTimestamp = migrationMetadata.StartTimestamp
1✔
204
        }
1✔
205

206
        vmi.Status.MigrationState.Failed = migrationMetadata.Failed
2✔
207

2✔
208
        if migrationMetadata.Failed {
3✔
209
                vmi.Status.MigrationState.Completed = true
1✔
210
                vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
1✔
211
                vmi.Status.MigrationState.AbortStatus = v1.MigrationAbortStatus(migrationMetadata.AbortStatus)
1✔
212
                vmi.Status.MigrationState.FailureReason = migrationMetadata.FailureReason
1✔
213
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("VirtualMachineInstance migration uid %s failed. reason:%s", string(migrationMetadata.UID), migrationMetadata.FailureReason))
1✔
214
        }
1✔
215

216
        vmi.Status.MigrationState.Mode = migrationMetadata.Mode
2✔
217
}
218

219
func (c *MigrationSourceController) updateStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
220
        c.setMigrationProgressStatus(vmi, domain)
8✔
221

8✔
222
        // handle migrations differently than normal status updates.
8✔
223
        //
8✔
224
        // When a successful migration is detected, we must transfer ownership of the VMI
8✔
225
        // from the source node (this node) to the target node (node the domain was migrated to).
8✔
226
        //
8✔
227
        // Transfer ownership by...
8✔
228
        // 1. Marking vmi.Status.MigrationState as completed
8✔
229
        // 2. Update the vmi.Status.NodeName to reflect the target node's name
8✔
230
        // 3. Update the VMI's NodeNameLabel annotation to reflect the target node's name
8✔
231
        // 4. Clear the LauncherContainerImageVersion which virt-controller will detect
8✔
232
        //    and accurately based on the version used on the target pod
8✔
233
        //
8✔
234
        // After a migration, the VMI's phase is no longer owned by this node. Only the
8✔
235
        // MigrationState status field is eligible to be mutated.
8✔
236
        migrationHost := ""
8✔
237
        if vmi.Status.MigrationState != nil {
16✔
238
                migrationHost = vmi.Status.MigrationState.TargetNode
8✔
239
        }
8✔
240

241
        targetNodeDetectedDomain, timeLeft := c.hasTargetDetectedReadyDomain(vmi)
8✔
242
        // If we can't detect where the migration went to, then we have no
8✔
243
        // way of transferring ownership. The only option here is to move the
8✔
244
        // vmi to failed.  The cluster vmi controller will then tear down the
8✔
245
        // resulting pods.
8✔
246
        if migrationHost == "" {
8✔
247
                // migrated to unknown host.
×
248
                vmi.Status.Phase = v1.Failed
×
249
                vmi.Status.MigrationState.Completed = true
×
250
                vmi.Status.MigrationState.Failed = true
×
251
                if vmi.Status.MigrationState.EndTimestamp == nil {
×
252
                        vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
253
                }
×
254

255
                c.logger.Object(vmi).Warning("the vmi migrated to an unknown host")
×
256
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance migrated to unknown host."))
×
257
        } else if !targetNodeDetectedDomain {
16✔
258
                if timeLeft <= 0 {
8✔
259
                        vmi.Status.Phase = v1.Failed
×
260
                        vmi.Status.MigrationState.Completed = true
×
261
                        vmi.Status.MigrationState.Failed = true
×
262
                        if vmi.Status.MigrationState.EndTimestamp == nil {
×
263
                                vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
264
                        }
×
265

266
                        c.logger.Object(vmi).Warning("the domain was never observed on the taget after the migration completed within the timeout period")
×
267
                        c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's domain was never observed on the target after the migration completed within the timeout period."))
×
268
                }
269
        }
270

271
        if vmi.Status.Phase == v1.Failed && vmi.IsDecentralizedMigration() {
8✔
272
                vmi.Status.MigrationState.Completed = true
×
273
                vmi.Status.MigrationState.Failed = true
×
274
                c.logger.Object(vmi).Warning("the decentralized migration failed due to the source VMI being failed")
×
275
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's decentralized migration failed due to the source VMI being failed."))
×
276
        }
×
277

278
        if targetNodeDetectedDomain && vmi.IsDecentralizedMigration() && vmi.Status.MigrationState != nil && vmi.Status.MigrationState.Completed {
8✔
279
                c.logger.Object(vmi).V(2).Infof("decentralized migration completed successfully, marking VMI as succeeded")
×
280
                // this is a decentralized migration, and the migration completed successfully, we need to mark the VMI as succeeded
×
281
                vmi.Status.Phase = v1.Succeeded
×
282
        }
×
283

284
        return nil
8✔
285
}
286

287
func (c *MigrationSourceController) Run(threadiness int, stopCh chan struct{}) {
×
288
        defer c.queue.ShutDown()
×
289
        c.logger.Info("Starting virt-handler source controller.")
×
290

×
291
        cache.WaitForCacheSync(stopCh, c.hasSynced)
×
292

×
293
        // queue keys for previous Domains on the host that no longer exist
×
294
        // in the cache. This ensures we perform local cleanup of deleted VMs.
×
295
        for _, domain := range c.domainStore.List() {
×
296
                d := domain.(*api.Domain)
×
297
                vmiRef := v1.NewVMIReferenceWithUUID(
×
298
                        d.ObjectMeta.Namespace,
×
299
                        d.ObjectMeta.Name,
×
300
                        d.Spec.Metadata.KubeVirt.UID)
×
301

×
302
                key := controller.VirtualMachineInstanceKey(vmiRef)
×
303

×
304
                _, exists, _ := c.vmiStore.GetByKey(key)
×
305
                if !exists {
×
306
                        c.queue.Add(key)
×
307
                }
×
308
        }
309

310
        // Start the actual work
311
        for i := 0; i < threadiness; i++ {
×
312
                go wait.Until(c.runWorker, time.Second, stopCh)
×
313
        }
×
314

315
        <-stopCh
×
316
        c.logger.Info("Stopping virt-handler source controller.")
×
317
}
318

319
func (c *MigrationSourceController) runWorker() {
×
320
        for c.Execute() {
×
321
        }
×
322
}
323

324
func (c *MigrationSourceController) Execute() bool {
8✔
325
        key, quit := c.queue.Get()
8✔
326
        if quit {
8✔
327
                return false
×
328
        }
×
329
        defer c.queue.Done(key)
8✔
330
        if err := c.execute(key); err != nil {
8✔
331
                c.logger.Reason(err).Infof("re-enqueuing VirtualMachineInstance %v", key)
×
332
                c.queue.AddRateLimited(key)
×
333
        } else {
8✔
334
                c.logger.V(4).Infof("processed VirtualMachineInstance %v", key)
8✔
335
                c.queue.Forget(key)
8✔
336
        }
8✔
337
        return true
8✔
338
}
339

340
func (c *MigrationSourceController) sync(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
341
        if domain != nil {
16✔
342
                c.logger.Object(vmi).Infof("VMI is in phase: %v | Domain status: %v, reason: %v", vmi.Status.Phase, domain.Status.Status, domain.Status.Reason)
8✔
343
        } else {
8✔
344
                c.logger.Object(vmi).Infof("VMI is in phase: %v", vmi.Status.Phase)
×
345
        }
×
346

347
        oldStatus := vmi.Status.DeepCopy()
8✔
348

8✔
349
        syncErr := c.processVMI(vmi, domain)
8✔
350

8✔
351
        if syncErr != nil {
8✔
352
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.SyncFailed.String(), syncErr.Error())
×
353
                // `syncErr` will be propagated anyway, and it will be logged in `re-enqueueing`
×
354
                // so there is no need to log it twice in hot path without increased verbosity.
×
355
                c.logger.Object(vmi).Reason(syncErr).Error("Synchronizing the VirtualMachineInstance failed.")
×
356
        }
×
357

358
        updateErr := c.updateStatus(vmi, domain)
8✔
359

8✔
360
        if updateErr != nil {
8✔
361
                c.logger.Object(vmi).Reason(updateErr).Error("Updating the migration status failed.")
×
362
        }
×
363

364
        netUpdateErr := c.netStat.UpdateStatus(vmi, domain)
8✔
365

8✔
366
        if netUpdateErr != nil {
8✔
367
                log.Log.Object(vmi).Reason(updateErr).Error("Updating network interfaces status failed.")
×
368
        }
×
369

370
        // update the VMI if necessary
371
        if !equality.Semantic.DeepEqual(*oldStatus, vmi.Status) {
11✔
372
                key := controller.VirtualMachineInstanceKey(vmi)
3✔
373
                c.vmiExpectations.SetExpectations(key, 1, 0)
3✔
374
                _, err := c.clientset.VirtualMachineInstance(vmi.ObjectMeta.Namespace).Update(context.Background(), vmi, metav1.UpdateOptions{})
3✔
375
                if err != nil {
3✔
376
                        c.vmiExpectations.SetExpectations(key, 0, 0)
×
377
                        return err
×
378
                }
×
379
                metrics.VMISynced(vmi.Namespace, vmi.Name)
3✔
380
        }
381

382
        if syncErr != nil {
8✔
383
                return syncErr
×
384
        }
×
385

386
        if updateErr != nil {
8✔
387
                return updateErr
×
388
        }
×
389

390
        if netUpdateErr != nil {
8✔
391
                return netUpdateErr
×
392
        }
×
393

394
        c.logger.Object(vmi).V(4).Info("Source synchronization loop succeeded.")
8✔
395

8✔
396
        return nil
8✔
397
}
398

399
func (c *MigrationSourceController) execute(key string) error {
8✔
400
        vmi, vmiExists, err := c.getVMIFromCache(key)
8✔
401
        if err != nil {
8✔
402
                return err
×
403
        }
×
404

405
        if !vmiExists || ((vmi.IsDecentralizedMigration() && vmi.Status.Phase == v1.Succeeded) ||
8✔
406
                !vmi.IsDecentralizedMigration() && vmi.IsFinal()) ||
8✔
407
                vmi.DeletionTimestamp != nil {
8✔
408
                c.logger.V(4).Infof("vmi for key %v is terminating, succeeded or does not exists", key)
×
409
                if !vmiExists {
×
410
                        metrics.ResetVMISync(key)
×
411
                }
×
412
                return nil
×
413
        }
414

415
        if !c.vmiExpectations.SatisfiedExpectations(key) {
8✔
416
                c.logger.V(4).Object(vmi).Info("waiting for expectations to be satisfied")
×
417
                return nil
×
418
        }
×
419

420
        domain, domainExists, _, err := c.getDomainFromCache(key)
8✔
421
        if err != nil {
8✔
422
                return err
×
423
        }
×
424

425
        if domainExists && domain.Spec.Metadata.KubeVirt.UID != vmi.UID {
8✔
426
                c.logger.V(4).Object(vmi).Infof("Detected stale vmi %s that still needs cleanup before new vmi with identical name/namespace can be processed", vmi.UID)
×
427
                return nil
×
428
        }
×
429

430
        if vmi.Status.MigrationState == nil {
8✔
431
                c.logger.V(4).Object(vmi).Info("no migration is in progress")
×
432
                return nil
×
433
        }
×
434

435
        // post migration clean up
436
        if isMigrationDone(vmi.Status.MigrationState) {
8✔
437
                c.migrationProxy.StopSourceListener(string(vmi.UID))
×
438
                return nil
×
439
        }
×
440

441
        if !c.isMigrationSource(vmi) {
8✔
442
                c.logger.Object(vmi).V(4).Info("not a migration source")
×
443
                return nil
×
444
        }
×
445

446
        return c.sync(vmi.DeepCopy(), domain)
8✔
447
}
448

449
func (c *MigrationSourceController) isMigrationSource(vmi *v1.VirtualMachineInstance) bool {
14✔
450
        if vmi.IsDecentralizedMigration() {
14✔
451
                return vmi.Status.MigrationState != nil &&
×
452
                        vmi.Status.MigrationState.SourceNode == c.host &&
×
453
                        vmi.IsMigrationSource()
×
454
        }
×
455
        return vmi.Status.MigrationState != nil &&
14✔
456
                vmi.Status.NodeName == c.host &&
14✔
457
                vmi.Status.MigrationState.SourceNode == c.host
14✔
458
}
459

460
func (c *MigrationSourceController) handleSourceMigrationProxy(vmi *v1.VirtualMachineInstance) error {
7✔
461

7✔
462
        res, err := c.podIsolationDetector.Detect(vmi)
7✔
463
        if err != nil {
7✔
464
                return err
×
465
        }
×
466
        // the migration-proxy is no longer shared via host mount, so we
467
        // pass in the virt-launcher's baseDir to reach the unix sockets.
468
        baseDir := fmt.Sprintf(filepath.Join(c.virtLauncherFSRunDirPattern, "kubevirt"), res.Pid())
7✔
469
        if vmi.Status.MigrationState.TargetDirectMigrationNodePorts == nil {
7✔
470
                return errWaitingForTargetPorts
×
471
        }
×
472

473
        err = c.migrationProxy.StartSourceListener(
7✔
474
                string(vmi.UID),
7✔
475
                vmi.Status.MigrationState.TargetNodeAddress,
7✔
476
                vmi.Status.MigrationState.TargetDirectMigrationNodePorts,
7✔
477
                baseDir,
7✔
478
        )
7✔
479
        if err != nil {
7✔
480
                return err
×
481
        }
×
482

483
        return nil
7✔
484
}
485

486
func (c *MigrationSourceController) migrateVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
487
        shouldReturn, err := c.checkLauncherClient(vmi)
8✔
488
        if shouldReturn {
8✔
489
                return err
×
490
        }
×
491

492
        client, err := c.launcherClients.GetLauncherClient(vmi)
8✔
493
        if err != nil {
8✔
494
                return fmt.Errorf(unableCreateVirtLauncherConnectionFmt, err)
×
495
        }
×
496

497
        if vmi.Status.MigrationState.AbortRequested {
9✔
498
                err = c.handleMigrationAbort(vmi, domain, client)
1✔
499
                return err
1✔
500
        }
1✔
501

502
        if isMigrationInProgress(vmi, domain) {
7✔
503
                // we already started this migration, no need to rerun this
×
504
                c.logger.Object(vmi).V(4).Infof("migration %s has already been started", vmi.Status.MigrationState.MigrationUID)
×
505
                return nil
×
506
        }
×
507

508
        err = c.handleSourceMigrationProxy(vmi)
7✔
509
        if errors.Is(err, errWaitingForTargetPorts) {
7✔
510
                c.logger.Object(vmi).V(4).Info("waiting for target node to publish migration ports")
×
511
                c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), 1*time.Second)
×
512
                return nil
×
513
        } else if err != nil {
7✔
514
                return fmt.Errorf("failed to handle migration proxy: %v", err)
×
515
        }
×
516

517
        var migrationConfiguration *v1.MigrationConfiguration
7✔
518
        if vmi.Status.MigrationState.MigrationConfiguration == nil {
13✔
519
                migrationConfiguration = c.clusterConfig.GetMigrationConfiguration()
6✔
520
        } else {
7✔
521
                migrationConfiguration = vmi.Status.MigrationState.MigrationConfiguration.DeepCopy()
1✔
522
        }
1✔
523

524
        // This check is only for backward compatibility.
525
        // During upgrade, AllowWorkloadDisruption could be nil since the migration controller is
526
        // updated later the virt-handler.
527
        // This check can be removed in future
528
        if migrationConfiguration.AllowWorkloadDisruption == nil {
8✔
529
                migrationConfiguration.AllowWorkloadDisruption = pointer.P(*migrationConfiguration.AllowPostCopy)
1✔
530
        }
1✔
531
        if migrationConfiguration.MaxDowntimeMs == nil {
8✔
532
                migrationConfiguration.MaxDowntimeMs = pointer.P(virtconfig.DefaultMigrationMaxDowntimeMs)
1✔
533
        }
1✔
534

535
        options := &cmdclient.MigrationOptions{
7✔
536
                Bandwidth:               *migrationConfiguration.BandwidthPerMigration,
7✔
537
                ProgressTimeout:         *migrationConfiguration.ProgressTimeout,
7✔
538
                CompletionTimeoutPerGiB: *migrationConfiguration.CompletionTimeoutPerGiB,
7✔
539
                MaxDowntimeMs:           *migrationConfiguration.MaxDowntimeMs,
7✔
540
                UnsafeMigration:         *migrationConfiguration.UnsafeMigrationOverride,
7✔
541
                AllowAutoConverge:       *migrationConfiguration.AllowAutoConverge,
7✔
542
                AllowPostCopy:           *migrationConfiguration.AllowPostCopy,
7✔
543
                AllowWorkloadDisruption: *migrationConfiguration.AllowWorkloadDisruption,
7✔
544
                StallDetectionEnabled:   c.clusterConfig.MigrationStallDetectionEnabled(),
7✔
545
        }
7✔
546

7✔
547
        configureParallelMigrationThreads(options, vmi)
7✔
548

7✔
549
        marshalledOptions, err := json.Marshal(options)
7✔
550
        if err != nil {
7✔
551
                c.logger.Object(vmi).Warning("failed to marshall matched migration options")
×
552
        } else {
7✔
553
                c.logger.Object(vmi).Infof("migration options matched for vmi %s: %s", vmi.Name, string(marshalledOptions))
7✔
554
        }
7✔
555

556
        vmiCopy := vmi.DeepCopy()
7✔
557
        err = hostdisk.ReplacePVCByHostDisk(vmiCopy)
7✔
558
        if err != nil {
7✔
559
                return err
×
560
        }
×
561

562
        if c.clusterConfig.PasstBindingEnabled() {
14✔
563
                if err = c.passtRepairHandler.HandleMigrationSource(vmi, c.passtSocketDirOnHostForVMI); err != nil {
7✔
564
                        c.logger.Object(vmi).Warningf("failed to call passt-repair for migration source, %v", err)
×
565
                }
×
566
        }
567

568
        if c.pluginExecutor != nil {
7✔
NEW
569
                if err := c.pluginExecutor.CallNodeHooks(pluginv1alpha1.NodeHookPreMigrationSource, vmiCopy, c.host); err != nil {
×
NEW
570
                        return err
×
NEW
571
                }
×
572
        }
573

574
        err = client.MigrateVirtualMachine(vmiCopy, options)
7✔
575
        if err != nil {
7✔
576
                return err
×
577
        }
×
578
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIMigrating)
7✔
579
        return nil
7✔
580
}
581

582
func isMigrationDone(state *v1.VirtualMachineInstanceMigrationState) bool {
8✔
583
        return state == nil || (state.EndTimestamp != nil && (state.Completed || state.Failed))
8✔
584
}
8✔
585

586
func (c *MigrationSourceController) processVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
8✔
587
        domainAlive := domain != nil &&
8✔
588
                domain.Status.Status != api.Shutoff &&
8✔
589
                domain.Status.Status != api.Crashed &&
8✔
590
                domain.Status.Status != ""
8✔
591

8✔
592
        if !domainAlive {
8✔
593
                c.logger.V(4).Object(vmi).Info("domain is not alive")
×
594
                return nil
×
595
        }
×
596

597
        return c.migrateVMI(vmi, domain)
8✔
598
}
599

600
func (c *MigrationSourceController) addFunc(obj interface{}) {
×
601
        key, err := controller.KeyFunc(obj)
×
602
        if err == nil {
×
603
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
604
                c.queue.Add(key)
×
605
        }
×
606
}
607

608
func (c *MigrationSourceController) deleteFunc(obj interface{}) {
×
609
        key, err := controller.KeyFunc(obj)
×
610
        if err == nil {
×
611
                c.queue.Add(key)
×
612
        }
×
613
}
614

615
func (c *MigrationSourceController) updateFunc(_, new interface{}) {
×
616
        key, err := controller.KeyFunc(new)
×
617
        if err == nil {
×
618
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
619
                c.queue.Add(key)
×
620
        }
×
621
}
622

623
func (c *MigrationSourceController) addDeleteDomainFunc(obj interface{}) {
×
624
        key, err := controller.KeyFunc(obj)
×
625
        if err == nil {
×
626
                c.queue.Add(key)
×
627
        }
×
628
}
629

630
func (c *MigrationSourceController) updateDomainFunc(_, new interface{}) {
×
631
        key, err := controller.KeyFunc(new)
×
632
        if err == nil {
×
633
                c.queue.Add(key)
×
634
        }
×
635
}
636

637
func (c *MigrationSourceController) handleMigrationAbort(vmi *v1.VirtualMachineInstance, domain *api.Domain, client cmdclient.LauncherClient) error {
9✔
638
        // Check both the VMI status and the domain metadata to avoid redundant cancel RPCs.
9✔
639
        // The domain metadata reflects the launcher's abort status before the API server round-trip.
9✔
640
        abortHandled := func(status v1.MigrationAbortStatus) bool {
22✔
641
                return status == v1.MigrationAbortInProgress || status == v1.MigrationAbortSucceeded
13✔
642
        }
13✔
643
        if abortHandled(vmi.Status.MigrationState.AbortStatus) {
11✔
644
                return nil
2✔
645
        }
2✔
646
        if domain != nil && domain.Spec.Metadata.KubeVirt.Migration != nil &&
7✔
647
                abortHandled(v1.MigrationAbortStatus(domain.Spec.Metadata.KubeVirt.Migration.AbortStatus)) {
9✔
648
                return nil
2✔
649
        }
2✔
650

651
        if err := client.CancelVirtualMachineMigration(vmi); err != nil {
6✔
652
                return err
1✔
653
        }
1✔
654

655
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIAbortingMigration)
4✔
656
        return nil
4✔
657
}
658

659
func configureParallelMigrationThreads(options *cmdclient.MigrationOptions, vm *v1.VirtualMachineInstance) {
7✔
660
        // When the CPU is limited, there's a risk of the migration threads choking the CPU resources on the compute container.
7✔
661
        // For this reason, we will avoid configuring migration threads in such scenarios.
7✔
662
        if cpuLimit, cpuLimitExists := vm.Spec.Domain.Resources.Limits[k8sv1.ResourceCPU]; cpuLimitExists && !cpuLimit.IsZero() {
8✔
663
                return
1✔
664
        }
1✔
665
        // Dedicated CPUs are also fully allocating each vCPU for compute.
666
        if vm.IsCPUDedicated() {
7✔
667
                return
1✔
668
        }
1✔
669

670
        options.ParallelMigrationThreads = pointer.P(parallelMultifdMigrationThreads)
5✔
671
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc