• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 09a86c40-164b-4ba5-8bec-e9f7c0abf56f

12 Nov 2025 12:31PM UTC coverage: 70.328% (-0.008%) from 70.336%
09a86c40-164b-4ba5-8bec-e9f7c0abf56f

push

prow

web-flow
Merge pull request #16078 from noamasu/bugfix/memorydump-functest-fixes

tests/storage/memorydump: restore pod deletion and remove redundant test

69604 of 98970 relevant lines covered (70.33%)

412.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.48
/pkg/virt-handler/migration-source.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package virthandler
21

22
import (
23
        "context"
24
        "encoding/json"
25
        "errors"
26
        "fmt"
27
        "path/filepath"
28
        "time"
29

30
        k8sv1 "k8s.io/api/core/v1"
31
        "k8s.io/apimachinery/pkg/api/equality"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/util/wait"
34
        "k8s.io/client-go/tools/cache"
35
        "k8s.io/client-go/tools/record"
36
        "k8s.io/client-go/util/workqueue"
37
        v1 "kubevirt.io/api/core/v1"
38
        "kubevirt.io/client-go/kubecli"
39
        "kubevirt.io/client-go/log"
40

41
        "kubevirt.io/kubevirt/pkg/controller"
42
        hostdisk "kubevirt.io/kubevirt/pkg/host-disk"
43
        "kubevirt.io/kubevirt/pkg/pointer"
44
        "kubevirt.io/kubevirt/pkg/util/migrations"
45
        virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
46
        cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
47
        "kubevirt.io/kubevirt/pkg/virt-handler/isolation"
48
        launcherclients "kubevirt.io/kubevirt/pkg/virt-handler/launcher-clients"
49
        migrationproxy "kubevirt.io/kubevirt/pkg/virt-handler/migration-proxy"
50
        "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
51
)
52

53
var errWaitingForTargetPorts = errors.New("waiting for target to publish migration ports")
54

55
type passtRepairSourceHandler interface {
56
        HandleMigrationSource(*v1.VirtualMachineInstance, func(*v1.VirtualMachineInstance) (string, error)) error
57
}
58

59
type MigrationSourceController struct {
60
        *BaseController
61
        vmiExpectations    *controller.UIDTrackingControllerExpectations
62
        passtRepairHandler passtRepairSourceHandler
63
}
64

65
func NewMigrationSourceController(
66
        recorder record.EventRecorder,
67
        clientset kubecli.KubevirtClient,
68
        host string,
69
        launcherClients launcherclients.LauncherClientsManager,
70
        vmiInformer cache.SharedIndexInformer,
71
        domainInformer cache.SharedInformer,
72
        clusterConfig *virtconfig.ClusterConfig,
73
        podIsolationDetector isolation.PodIsolationDetector,
74
        migrationProxy migrationproxy.ProxyManager,
75
        virtLauncherFSRunDirPattern string,
76
        netStat netstat,
77
        passtRepairHandler passtRepairSourceHandler,
78
) (*MigrationSourceController, error) {
20✔
79

20✔
80
        queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
20✔
81
                workqueue.DefaultTypedControllerRateLimiter[string](),
20✔
82
                workqueue.TypedRateLimitingQueueConfig[string]{Name: "virt-handler-source"},
20✔
83
        )
20✔
84
        logger := log.Log.With("controller", "migration-source")
20✔
85

20✔
86
        baseCtrl, err := NewBaseController(
20✔
87
                logger,
20✔
88
                host,
20✔
89
                recorder,
20✔
90
                clientset,
20✔
91
                queue,
20✔
92
                vmiInformer,
20✔
93
                domainInformer,
20✔
94
                clusterConfig,
20✔
95
                podIsolationDetector,
20✔
96
                launcherClients,
20✔
97
                migrationProxy,
20✔
98
                virtLauncherFSRunDirPattern,
20✔
99
                netStat,
20✔
100
        )
20✔
101
        if err != nil {
20✔
102
                return nil, err
×
103
        }
×
104

105
        c := &MigrationSourceController{
20✔
106
                BaseController:     baseCtrl,
20✔
107
                vmiExpectations:    controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
20✔
108
                passtRepairHandler: passtRepairHandler,
20✔
109
        }
20✔
110

20✔
111
        _, err = vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
20✔
112
                AddFunc:    c.addFunc,
20✔
113
                DeleteFunc: c.deleteFunc,
20✔
114
                UpdateFunc: c.updateFunc,
20✔
115
        })
20✔
116
        if err != nil {
20✔
117
                return nil, err
×
118
        }
×
119

120
        _, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
20✔
121
                AddFunc:    c.addDeleteDomainFunc,
20✔
122
                DeleteFunc: c.addDeleteDomainFunc,
20✔
123
                UpdateFunc: c.updateDomainFunc,
20✔
124
        })
20✔
125
        if err != nil {
20✔
126
                return nil, err
×
127
        }
×
128

129
        return c, nil
20✔
130
}
131

132
func (c *MigrationSourceController) hasTargetDetectedReadyDomain(vmi *v1.VirtualMachineInstance) (bool, int64) {
7✔
133
        // give the target node 60 seconds to discover the libvirt domain via the domain informer
7✔
134
        // before allowing the VMI to be processed. This closes the gap between the
7✔
135
        // VMI's status getting updated to reflect the new source node, and the domain
7✔
136
        // informer firing the event to alert the source node of the new domain.
7✔
137
        migrationTargetDelayTimeout := 60
7✔
138

7✔
139
        if vmi.Status.MigrationState == nil ||
7✔
140
                vmi.Status.MigrationState.EndTimestamp == nil {
14✔
141
                return false, int64(migrationTargetDelayTimeout)
7✔
142
        }
7✔
143
        if vmi.Status.MigrationState != nil &&
×
144
                vmi.Status.MigrationState.TargetState != nil &&
×
145
                vmi.Status.MigrationState.TargetState.DomainDetected &&
×
146
                vmi.Status.MigrationState.TargetState.DomainReadyTimestamp != nil {
×
147

×
148
                return true, 0
×
149
        }
×
150

151
        nowUnix := time.Now().UTC().Unix()
×
152
        migrationEndUnix := vmi.Status.MigrationState.EndTimestamp.Time.UTC().Unix()
×
153

×
154
        diff := nowUnix - migrationEndUnix
×
155

×
156
        if diff > int64(migrationTargetDelayTimeout) {
×
157
                return false, 0
×
158
        }
×
159

160
        timeLeft := int64(migrationTargetDelayTimeout) - diff
×
161

×
162
        enqueueTime := timeLeft
×
163
        if enqueueTime < 5 {
×
164
                enqueueTime = 5
×
165
        }
×
166

167
        // re-enqueue the key to ensure it gets processed again within the right time.
168
        c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), time.Duration(enqueueTime)*time.Second)
×
169

×
170
        return false, timeLeft
×
171
}
172

173
func domainMigrated(domain *api.Domain) bool {
32✔
174
        return domain != nil && domain.Status.Status == api.Shutoff && domain.Status.Reason == api.ReasonMigrated
32✔
175
}
32✔
176

177
func (c *MigrationSourceController) setMigrationProgressStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) {
15✔
178
        if domain == nil ||
15✔
179
                domain.Spec.Metadata.KubeVirt.Migration == nil ||
15✔
180
                vmi.Status.MigrationState == nil ||
15✔
181
                !c.isMigrationSource(vmi) {
28✔
182
                return
13✔
183
        }
13✔
184

185
        migrationMetadata := domain.Spec.Metadata.KubeVirt.Migration
2✔
186
        if migrationMetadata.UID != vmi.Status.MigrationState.MigrationUID {
2✔
187
                return
×
188
        }
×
189
        vmi.Status.MigrationState.StartTimestamp = migrationMetadata.StartTimestamp
2✔
190

2✔
191
        vmi.Status.MigrationState.Failed = migrationMetadata.Failed
2✔
192

2✔
193
        if migrationMetadata.Failed {
3✔
194
                vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
1✔
195
                vmi.Status.MigrationState.FailureReason = migrationMetadata.FailureReason
1✔
196
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("VirtualMachineInstance migration uid %s failed. reason:%s", string(migrationMetadata.UID), migrationMetadata.FailureReason))
1✔
197
        }
1✔
198

199
        vmi.Status.MigrationState.AbortStatus = v1.MigrationAbortStatus(migrationMetadata.AbortStatus)
2✔
200
        if migrationMetadata.AbortStatus == string(v1.MigrationAbortSucceeded) {
2✔
201
                vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
×
202
        }
×
203

204
        vmi.Status.MigrationState.Mode = migrationMetadata.Mode
2✔
205
}
206

207
func (c *MigrationSourceController) updateStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
208
        c.setMigrationProgressStatus(vmi, domain)
7✔
209

7✔
210
        // handle migrations differently than normal status updates.
7✔
211
        //
7✔
212
        // When a successful migration is detected, we must transfer ownership of the VMI
7✔
213
        // from the source node (this node) to the target node (node the domain was migrated to).
7✔
214
        //
7✔
215
        // Transfer ownership by...
7✔
216
        // 1. Marking vmi.Status.MigrationState as completed
7✔
217
        // 2. Update the vmi.Status.NodeName to reflect the target node's name
7✔
218
        // 3. Update the VMI's NodeNameLabel annotation to reflect the target node's name
7✔
219
        // 4. Clear the LauncherContainerImageVersion which virt-controller will detect
7✔
220
        //    and accurately based on the version used on the target pod
7✔
221
        //
7✔
222
        // After a migration, the VMI's phase is no longer owned by this node. Only the
7✔
223
        // MigrationState status field is eligible to be mutated.
7✔
224
        migrationHost := ""
7✔
225
        if vmi.Status.MigrationState != nil {
14✔
226
                migrationHost = vmi.Status.MigrationState.TargetNode
7✔
227
        }
7✔
228

229
        targetNodeDetectedDomain, timeLeft := c.hasTargetDetectedReadyDomain(vmi)
7✔
230
        // If we can't detect where the migration went to, then we have no
7✔
231
        // way of transferring ownership. The only option here is to move the
7✔
232
        // vmi to failed.  The cluster vmi controller will then tear down the
7✔
233
        // resulting pods.
7✔
234
        if migrationHost == "" {
7✔
235
                // migrated to unknown host.
×
236
                vmi.Status.Phase = v1.Failed
×
237
                vmi.Status.MigrationState.Completed = true
×
238
                vmi.Status.MigrationState.Failed = true
×
239
                if vmi.Status.MigrationState.EndTimestamp == nil {
×
240
                        vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
241
                }
×
242

243
                c.logger.Object(vmi).Warning("the vmi migrated to an unknown host")
×
244
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance migrated to unknown host."))
×
245
        } else if !targetNodeDetectedDomain {
14✔
246
                if timeLeft <= 0 {
7✔
247
                        vmi.Status.Phase = v1.Failed
×
248
                        vmi.Status.MigrationState.Completed = true
×
249
                        vmi.Status.MigrationState.Failed = true
×
250
                        if vmi.Status.MigrationState.EndTimestamp == nil {
×
251
                                vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
×
252
                        }
×
253

254
                        c.logger.Object(vmi).Warning("the domain was never observed on the taget after the migration completed within the timeout period")
×
255
                        c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's domain was never observed on the target after the migration completed within the timeout period."))
×
256
                }
257
        }
258

259
        if vmi.Status.Phase == v1.Failed && vmi.IsDecentralizedMigration() {
7✔
260
                vmi.Status.MigrationState.Completed = true
×
261
                vmi.Status.MigrationState.Failed = true
×
262
                c.logger.Object(vmi).Warning("the decentralized migration failed due to the source VMI being failed")
×
263
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's decentralized migration failed due to the source VMI being failed."))
×
264
        }
×
265

266
        if targetNodeDetectedDomain && vmi.IsDecentralizedMigration() && vmi.Status.MigrationState != nil && vmi.Status.MigrationState.Completed {
7✔
267
                c.logger.Object(vmi).V(2).Infof("decentralized migration completed successfully, marking VMI as succeeded")
×
268
                // this is a decentralized migration, and the migration completed successfully, we need to mark the VMI as succeeded
×
269
                vmi.Status.Phase = v1.Succeeded
×
270
        }
×
271

272
        return nil
7✔
273
}
274

275
func (c *MigrationSourceController) Run(threadiness int, stopCh chan struct{}) {
×
276
        defer c.queue.ShutDown()
×
277
        c.logger.Info("Starting virt-handler source controller.")
×
278

×
279
        cache.WaitForCacheSync(stopCh, c.hasSynced)
×
280

×
281
        // queue keys for previous Domains on the host that no longer exist
×
282
        // in the cache. This ensures we perform local cleanup of deleted VMs.
×
283
        for _, domain := range c.domainStore.List() {
×
284
                d := domain.(*api.Domain)
×
285
                vmiRef := v1.NewVMIReferenceWithUUID(
×
286
                        d.ObjectMeta.Namespace,
×
287
                        d.ObjectMeta.Name,
×
288
                        d.Spec.Metadata.KubeVirt.UID)
×
289

×
290
                key := controller.VirtualMachineInstanceKey(vmiRef)
×
291

×
292
                _, exists, _ := c.vmiStore.GetByKey(key)
×
293
                if !exists {
×
294
                        c.queue.Add(key)
×
295
                }
×
296
        }
297

298
        // Start the actual work
299
        for i := 0; i < threadiness; i++ {
×
300
                go wait.Until(c.runWorker, time.Second, stopCh)
×
301
        }
×
302

303
        <-stopCh
×
304
        c.logger.Info("Stopping virt-handler source controller.")
×
305
}
306

307
func (c *MigrationSourceController) runWorker() {
×
308
        for c.Execute() {
×
309
        }
×
310
}
311

312
func (c *MigrationSourceController) Execute() bool {
7✔
313
        key, quit := c.queue.Get()
7✔
314
        if quit {
7✔
315
                return false
×
316
        }
×
317
        defer c.queue.Done(key)
7✔
318
        if err := c.execute(key); err != nil {
7✔
319
                c.logger.Reason(err).Infof("re-enqueuing VirtualMachineInstance %v", key)
×
320
                c.queue.AddRateLimited(key)
×
321
        } else {
7✔
322
                c.logger.V(4).Infof("processed VirtualMachineInstance %v", key)
7✔
323
                c.queue.Forget(key)
7✔
324
        }
7✔
325
        return true
7✔
326
}
327

328
func (c *MigrationSourceController) sync(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
329
        if domain != nil {
14✔
330
                c.logger.Object(vmi).Infof("VMI is in phase: %v | Domain status: %v, reason: %v", vmi.Status.Phase, domain.Status.Status, domain.Status.Reason)
7✔
331
        } else {
7✔
332
                c.logger.Object(vmi).Infof("VMI is in phase: %v", vmi.Status.Phase)
×
333
        }
×
334

335
        oldStatus := vmi.Status.DeepCopy()
7✔
336

7✔
337
        syncErr := c.processVMI(vmi, domain)
7✔
338

7✔
339
        if syncErr != nil {
7✔
340
                c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.SyncFailed.String(), syncErr.Error())
×
341
                // `syncErr` will be propagated anyway, and it will be logged in `re-enqueueing`
×
342
                // so there is no need to log it twice in hot path without increased verbosity.
×
343
                c.logger.Object(vmi).Reason(syncErr).Error("Synchronizing the VirtualMachineInstance failed.")
×
344
        }
×
345

346
        updateErr := c.updateStatus(vmi, domain)
7✔
347

7✔
348
        if updateErr != nil {
7✔
349
                c.logger.Object(vmi).Reason(updateErr).Error("Updating the migration status failed.")
×
350
        }
×
351

352
        netUpdateErr := c.netStat.UpdateStatus(vmi, domain)
7✔
353

7✔
354
        if netUpdateErr != nil {
7✔
355
                log.Log.Object(vmi).Reason(updateErr).Error("Updating network interfaces status failed.")
×
356
        }
×
357

358
        // update the VMI if necessary
359
        if !equality.Semantic.DeepEqual(*oldStatus, vmi.Status) {
9✔
360
                key := controller.VirtualMachineInstanceKey(vmi)
2✔
361
                c.vmiExpectations.SetExpectations(key, 1, 0)
2✔
362
                _, err := c.clientset.VirtualMachineInstance(vmi.ObjectMeta.Namespace).Update(context.Background(), vmi, metav1.UpdateOptions{})
2✔
363
                if err != nil {
2✔
364
                        c.vmiExpectations.SetExpectations(key, 0, 0)
×
365
                        return err
×
366
                }
×
367
        }
368

369
        if syncErr != nil {
7✔
370
                return syncErr
×
371
        }
×
372

373
        if updateErr != nil {
7✔
374
                return updateErr
×
375
        }
×
376

377
        if netUpdateErr != nil {
7✔
378
                return netUpdateErr
×
379
        }
×
380

381
        c.logger.Object(vmi).V(4).Info("Source synchronization loop succeeded.")
7✔
382

7✔
383
        return nil
7✔
384
}
385

386
func (c *MigrationSourceController) execute(key string) error {
7✔
387
        vmi, vmiExists, err := c.getVMIFromCache(key)
7✔
388
        if err != nil {
7✔
389
                return err
×
390
        }
×
391

392
        if !vmiExists || ((vmi.IsDecentralizedMigration() && vmi.Status.Phase == v1.Succeeded) ||
7✔
393
                !vmi.IsDecentralizedMigration() && vmi.IsFinal()) ||
7✔
394
                vmi.DeletionTimestamp != nil {
7✔
395
                c.logger.V(4).Infof("vmi for key %v is terminating, succeeded or does not exists", key)
×
396
                return nil
×
397
        }
×
398

399
        if !c.vmiExpectations.SatisfiedExpectations(key) {
7✔
400
                c.logger.V(4).Object(vmi).Info("waiting for expectations to be satisfied")
×
401
                return nil
×
402
        }
×
403

404
        domain, domainExists, _, err := c.getDomainFromCache(key)
7✔
405
        if err != nil {
7✔
406
                return err
×
407
        }
×
408

409
        if domainExists && domain.Spec.Metadata.KubeVirt.UID != vmi.UID {
7✔
410
                c.logger.V(4).Object(vmi).Infof("Detected stale vmi %s that still needs cleanup before new vmi with identical name/namespace can be processed", vmi.UID)
×
411
                return nil
×
412
        }
×
413

414
        if vmi.Status.MigrationState == nil {
7✔
415
                c.logger.V(4).Object(vmi).Info("no migration is in progress")
×
416
                return nil
×
417
        }
×
418

419
        // post migration clean up
420
        if isMigrationDone(vmi.Status.MigrationState) {
7✔
421
                c.migrationProxy.StopSourceListener(string(vmi.UID))
×
422
                return nil
×
423
        }
×
424

425
        if !c.isMigrationSource(vmi) {
7✔
426
                c.logger.Object(vmi).V(4).Info("not a migration source")
×
427
                return nil
×
428
        }
×
429

430
        return c.sync(vmi.DeepCopy(), domain)
7✔
431
}
432

433
func (c *MigrationSourceController) isMigrationSource(vmi *v1.VirtualMachineInstance) bool {
13✔
434
        if vmi.IsDecentralizedMigration() {
13✔
435
                return vmi.Status.MigrationState != nil &&
×
436
                        vmi.Status.MigrationState.SourceNode == c.host &&
×
437
                        vmi.IsMigrationSource()
×
438
        }
×
439
        return vmi.Status.MigrationState != nil &&
13✔
440
                vmi.Status.NodeName == c.host &&
13✔
441
                vmi.Status.MigrationState.SourceNode == c.host
13✔
442
}
443

444
func (c *MigrationSourceController) handleSourceMigrationProxy(vmi *v1.VirtualMachineInstance) error {
6✔
445

6✔
446
        res, err := c.podIsolationDetector.Detect(vmi)
6✔
447
        if err != nil {
6✔
448
                return err
×
449
        }
×
450
        // the migration-proxy is no longer shared via host mount, so we
451
        // pass in the virt-launcher's baseDir to reach the unix sockets.
452
        baseDir := fmt.Sprintf(filepath.Join(c.virtLauncherFSRunDirPattern, "kubevirt"), res.Pid())
6✔
453
        if vmi.Status.MigrationState.TargetDirectMigrationNodePorts == nil {
6✔
454
                return errWaitingForTargetPorts
×
455
        }
×
456

457
        err = c.migrationProxy.StartSourceListener(
6✔
458
                string(vmi.UID),
6✔
459
                vmi.Status.MigrationState.TargetNodeAddress,
6✔
460
                vmi.Status.MigrationState.TargetDirectMigrationNodePorts,
6✔
461
                baseDir,
6✔
462
        )
6✔
463
        if err != nil {
6✔
464
                return err
×
465
        }
×
466

467
        return nil
6✔
468
}
469

470
func (c *MigrationSourceController) migrateVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
471
        shouldReturn, err := c.checkLauncherClient(vmi)
7✔
472
        if shouldReturn {
7✔
473
                return err
×
474
        }
×
475

476
        client, err := c.launcherClients.GetLauncherClient(vmi)
7✔
477
        if err != nil {
7✔
478
                return fmt.Errorf(unableCreateVirtLauncherConnectionFmt, err)
×
479
        }
×
480

481
        if vmi.Status.MigrationState.AbortRequested {
8✔
482
                err = c.handleMigrationAbort(vmi, client)
1✔
483
                return err
1✔
484
        }
1✔
485

486
        if isMigrationInProgress(vmi, domain) {
6✔
487
                // we already started this migration, no need to rerun this
×
488
                c.logger.Object(vmi).V(4).Infof("migration %s has already been started", vmi.Status.MigrationState.MigrationUID)
×
489
                return nil
×
490
        }
×
491

492
        err = c.handleSourceMigrationProxy(vmi)
6✔
493
        if errors.Is(err, errWaitingForTargetPorts) {
6✔
494
                c.logger.Object(vmi).V(4).Info("waiting for target node to publish migration ports")
×
495
                c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), 1*time.Second)
×
496
                return nil
×
497
        } else if err != nil {
6✔
498
                return fmt.Errorf("failed to handle migration proxy: %v", err)
×
499
        }
×
500

501
        var migrationConfiguration *v1.MigrationConfiguration
6✔
502
        if vmi.Status.MigrationState.MigrationConfiguration == nil {
9✔
503
                migrationConfiguration = c.clusterConfig.GetMigrationConfiguration()
3✔
504
        } else {
6✔
505
                migrationConfiguration = vmi.Status.MigrationState.MigrationConfiguration.DeepCopy()
3✔
506
        }
3✔
507

508
        // This check is only for backward compatibility.
509
        // During upgrade, AllowWorkloadDisruption could be nil since the migration controller is
510
        // updated later the virt-handler.
511
        // This check can be removed in future
512
        if migrationConfiguration.AllowWorkloadDisruption == nil {
7✔
513
                migrationConfiguration.AllowWorkloadDisruption = pointer.P(*migrationConfiguration.AllowPostCopy)
1✔
514
        }
1✔
515

516
        options := &cmdclient.MigrationOptions{
6✔
517
                Bandwidth:               *migrationConfiguration.BandwidthPerMigration,
6✔
518
                ProgressTimeout:         *migrationConfiguration.ProgressTimeout,
6✔
519
                CompletionTimeoutPerGiB: *migrationConfiguration.CompletionTimeoutPerGiB,
6✔
520
                UnsafeMigration:         *migrationConfiguration.UnsafeMigrationOverride,
6✔
521
                AllowAutoConverge:       *migrationConfiguration.AllowAutoConverge,
6✔
522
                AllowPostCopy:           *migrationConfiguration.AllowPostCopy,
6✔
523
                AllowWorkloadDisruption: *migrationConfiguration.AllowWorkloadDisruption,
6✔
524
        }
6✔
525

6✔
526
        configureParallelMigrationThreads(options, vmi)
6✔
527

6✔
528
        marshalledOptions, err := json.Marshal(options)
6✔
529
        if err != nil {
6✔
530
                c.logger.Object(vmi).Warning("failed to marshall matched migration options")
×
531
        } else {
6✔
532
                c.logger.Object(vmi).Infof("migration options matched for vmi %s: %s", vmi.Name, string(marshalledOptions))
6✔
533
        }
6✔
534

535
        vmiCopy := vmi.DeepCopy()
6✔
536
        err = hostdisk.ReplacePVCByHostDisk(vmiCopy)
6✔
537
        if err != nil {
6✔
538
                return err
×
539
        }
×
540

541
        if c.clusterConfig.PasstIPStackMigrationEnabled() {
12✔
542
                if err := c.passtRepairHandler.HandleMigrationSource(vmi, c.passtSocketDirOnHostForVMI); err != nil {
6✔
543
                        c.logger.Object(vmi).Warningf("failed to call passt-repair for migration source, %v", err)
×
544
                }
×
545
        }
546

547
        err = client.MigrateVirtualMachine(vmiCopy, options)
6✔
548
        if err != nil {
6✔
549
                return err
×
550
        }
×
551
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIMigrating)
6✔
552
        return nil
6✔
553
}
554

555
func isMigrationDone(state *v1.VirtualMachineInstanceMigrationState) bool {
7✔
556
        return state == nil || (state.EndTimestamp != nil && (state.Completed || state.Failed))
7✔
557
}
7✔
558

559
func (c *MigrationSourceController) processVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
7✔
560
        domainAlive := domain != nil &&
7✔
561
                domain.Status.Status != api.Shutoff &&
7✔
562
                domain.Status.Status != api.Crashed &&
7✔
563
                domain.Status.Status != ""
7✔
564

7✔
565
        if !domainAlive {
7✔
566
                c.logger.V(4).Object(vmi).Info("domain is not alive")
×
567
                return nil
×
568
        }
×
569

570
        return c.migrateVMI(vmi, domain)
7✔
571
}
572

573
func (c *MigrationSourceController) addFunc(obj interface{}) {
×
574
        key, err := controller.KeyFunc(obj)
×
575
        if err == nil {
×
576
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
577
                c.queue.Add(key)
×
578
        }
×
579
}
580

581
func (c *MigrationSourceController) deleteFunc(obj interface{}) {
×
582
        key, err := controller.KeyFunc(obj)
×
583
        if err == nil {
×
584
                c.queue.Add(key)
×
585
        }
×
586
}
587

588
func (c *MigrationSourceController) updateFunc(_, new interface{}) {
×
589
        key, err := controller.KeyFunc(new)
×
590
        if err == nil {
×
591
                c.vmiExpectations.SetExpectations(key, 0, 0)
×
592
                c.queue.Add(key)
×
593
        }
×
594
}
595

596
func (c *MigrationSourceController) addDeleteDomainFunc(obj interface{}) {
×
597
        key, err := controller.KeyFunc(obj)
×
598
        if err == nil {
×
599
                c.queue.Add(key)
×
600
        }
×
601
}
602

603
func (c *MigrationSourceController) updateDomainFunc(_, new interface{}) {
×
604
        key, err := controller.KeyFunc(new)
×
605
        if err == nil {
×
606
                c.queue.Add(key)
×
607
        }
×
608
}
609

610
func (c *MigrationSourceController) handleMigrationAbort(vmi *v1.VirtualMachineInstance, client cmdclient.LauncherClient) error {
6✔
611
        if vmi.Status.MigrationState.AbortStatus == v1.MigrationAbortInProgress || vmi.Status.MigrationState.AbortStatus == v1.MigrationAbortSucceeded {
8✔
612
                return nil
2✔
613
        }
2✔
614

615
        if err := client.CancelVirtualMachineMigration(vmi); err != nil {
5✔
616
                if err.Error() == migrations.CancelMigrationFailedVmiNotMigratingErr {
1✔
617
                        // If migration did not even start there is no need to cancel it
×
618
                        c.logger.Object(vmi).Infof("skipping migration cancellation since vmi is not migrating")
×
619
                }
×
620
                return err
1✔
621
        }
622

623
        c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIAbortingMigration)
3✔
624
        return nil
3✔
625
}
626

627
func configureParallelMigrationThreads(options *cmdclient.MigrationOptions, vm *v1.VirtualMachineInstance) {
6✔
628
        // When the CPU is limited, there's a risk of the migration threads choking the CPU resources on the compute container.
6✔
629
        // For this reason, we will avoid configuring migration threads in such scenarios.
6✔
630
        if cpuLimit, cpuLimitExists := vm.Spec.Domain.Resources.Limits[k8sv1.ResourceCPU]; cpuLimitExists && !cpuLimit.IsZero() {
7✔
631
                return
1✔
632
        }
1✔
633

634
        if options.AllowPostCopy {
7✔
635
                return
2✔
636
        }
2✔
637

638
        options.ParallelMigrationThreads = pointer.P(parallelMultifdMigrationThreads)
3✔
639
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc