• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / e67efb82-2e17-43d4-bd4d-428ade2926a1

26 Mar 2026 08:13PM UTC coverage: 71.685% (+0.006%) from 71.679%
e67efb82-2e17-43d4-bd4d-428ade2926a1

push

prow

web-flow
Merge pull request #17131 from sradco/update_OrphanedVirtualMachineInstances_health_label

update OrphanedVirtualMachineInstances health impact

77246 of 107757 relevant lines covered (71.69%)

473.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.92
/pkg/synchronization-controller/synchronization-controller.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 *
15
 * Copyright The KubeVirt Authors.
16
 *
17
 */
18

19
package synchronization
20

21
import (
22
        "crypto/tls"
23
        "crypto/x509"
24
        "encoding/json"
25
        "errors"
26
        "fmt"
27
        "net"
28
        "strconv"
29
        "sync"
30
        "time"
31

32
        k8sv1 "k8s.io/api/core/v1"
33
        apiequality "k8s.io/apimachinery/pkg/api/equality"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/types"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/tools/cache"
38
        "k8s.io/client-go/util/workqueue"
39

40
        virtv1 "kubevirt.io/api/core/v1"
41

42
        "kubevirt.io/kubevirt/pkg/apimachinery/patch"
43

44
        "kubevirt.io/kubevirt/pkg/controller"
45

46
        context "golang.org/x/net/context"
47
        "google.golang.org/grpc"
48
        "google.golang.org/grpc/credentials"
49
        "kubevirt.io/client-go/kubecli"
50
        "kubevirt.io/client-go/log"
51

52
        syncv1 "kubevirt.io/kubevirt/pkg/synchronizer-com/synchronization/v1"
53
)
54

55
const (
56
        defaultTimeout = 30
57

58
        MyPodIP = "MY_POD_IP"
59

60
        noSourceStatusErrorMsg                        = "must pass source status"
61
        noTargetStatusErrorMsg                        = "must pass target status"
62
        sourceUnableToLocateVMIMigrationIDErrorMsg    = "source: unable to locate VMI for migrationID %s"
63
        targetUnableToLocateVMIMigrationIDErrorMsg    = "target: unable to locate VMI for migrationID %s"
64
        sourceUnableToLocateVMIMigrationIDErrorMsgVMI = "source: unable to locate VMI for migrationID %s, vmi: %s"
65
        targetUnableToLocateVMIMigrationIDErrorMsgVMI = "target: unable to locate VMI for migrationID %s, vmi: %s"
66

67
        waitingForSyncErrorMessage = "waiting for incoming synchronization, unable to proceed"
68

69
        successMessage = "success"
70

71
        maxCloseRetries = 10
72

73
        SynchronizationFinalizer = "synchronization.kubevirt.io/migrationFinalizer"
74
)
75

76
type SynchronizationController struct {
77
        client kubecli.KubevirtClient
78

79
        vmiInformer       cache.SharedIndexInformer
80
        migrationInformer cache.SharedIndexInformer
81

82
        listener        net.Listener
83
        bindAddress     string
84
        bindPort        int
85
        ip              string
86
        clientTLSConfig *tls.Config
87
        serverTLSConfig *tls.Config
88
        timeout         int
89

90
        queue     workqueue.TypedRateLimitingInterface[string]
91
        hasSynced func() bool
92

93
        syncOutboundConnectionMap  *sync.Map
94
        syncReceivingConnectionMap *sync.Map
95
        failedCloseConnections     *sync.Map
96
        grpcServer                 *grpc.Server
97
}
98

99
func NewSynchronizationController(
100
        client kubecli.KubevirtClient,
101
        vmiInformer cache.SharedIndexInformer,
102
        migrationInformer cache.SharedIndexInformer,
103
        clientTLSConfig,
104
        serverTLSConfig *tls.Config,
105
        bindAddress string,
106
        bindPort int,
107
        ip string,
108
) (*SynchronizationController, error) {
82✔
109
        syncController := &SynchronizationController{
82✔
110
                vmiInformer:       vmiInformer,
82✔
111
                migrationInformer: migrationInformer,
82✔
112
                clientTLSConfig:   clientTLSConfig,
82✔
113
                serverTLSConfig:   serverTLSConfig,
82✔
114
                timeout:           defaultTimeout,
82✔
115
                bindAddress:       bindAddress,
82✔
116
                bindPort:          bindPort,
82✔
117
                client:            client,
82✔
118
                ip:                ip,
82✔
119
        }
82✔
120

82✔
121
        queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
82✔
122
                workqueue.DefaultTypedControllerRateLimiter[string](),
82✔
123
                workqueue.TypedRateLimitingQueueConfig[string]{Name: "sync-vmi-status"},
82✔
124
        )
82✔
125
        syncController.queue = queue
82✔
126

82✔
127
        syncController.hasSynced = func() bool {
82✔
128
                return vmiInformer.HasSynced() && migrationInformer.HasSynced()
×
129
        }
×
130

131
        syncController.syncOutboundConnectionMap = &sync.Map{}
82✔
132
        syncController.syncReceivingConnectionMap = &sync.Map{}
82✔
133
        syncController.failedCloseConnections = &sync.Map{}
82✔
134

82✔
135
        _, err := vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
82✔
136
                AddFunc:    syncController.addVmiFunc,
82✔
137
                DeleteFunc: syncController.deleteVmiFunc,
82✔
138
                UpdateFunc: syncController.updateVmiFunc,
82✔
139
        })
82✔
140
        if err != nil {
82✔
141
                return nil, err
×
142
        }
×
143

144
        if err := syncController.migrationInformer.AddIndexers(map[string]cache.IndexFunc{
82✔
145
                "byUID":               indexByMigrationUID,
82✔
146
                "byActiveVMIName":     indexByActiveVmiName,
82✔
147
                "byTargetMigrationID": indexByTargetMigrationID,
82✔
148
                "bySourceMigrationID": indexBySourceMigrationID,
82✔
149
        }); err != nil {
82✔
150
                return nil, err
×
151
        }
×
152

153
        if _, err := syncController.migrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
82✔
154
                AddFunc:    syncController.addMigrationFunc,
82✔
155
                DeleteFunc: syncController.deleteMigrationFunc,
82✔
156
                UpdateFunc: syncController.updateMigrationFunc,
82✔
157
        }); err != nil {
82✔
158
                return nil, err
×
159
        }
×
160

161
        syncController.grpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(serverTLSConfig)))
82✔
162
        syncv1.RegisterSynchronizeServer(syncController.grpcServer, syncController)
82✔
163

82✔
164
        return syncController, nil
82✔
165
}
166

167
func (s *SynchronizationController) addVmiFunc(addObj interface{}) {
×
168
        s.enqueueVirtualMachineInstance(addObj)
×
169
}
×
170

171
func (s *SynchronizationController) deleteVmiFunc(addObj interface{}) {
×
172
        s.enqueueVirtualMachineInstance(addObj)
×
173
}
×
174

175
func (s *SynchronizationController) updateVmiFunc(_, curr interface{}) {
×
176
        s.enqueueVirtualMachineInstance(curr)
×
177
}
×
178

179
func (s *SynchronizationController) enqueueVirtualMachineInstance(obj interface{}) {
×
180
        vmi, ok := obj.(*virtv1.VirtualMachineInstance)
×
181
        if ok {
×
182
                key, err := controller.KeyFunc(vmi)
×
183
                if err != nil {
×
184
                        log.Log.Object(vmi).Reason(err).Error("failed to extract key from virtualmachine.")
×
185
                        return
×
186
                }
×
187
                s.queue.Add(key)
×
188
        }
189
}
190

191
func (s *SynchronizationController) addMigrationFunc(addObj interface{}) {
1✔
192
        s.enqueueVirtualMachineInstanceFromMigration(addObj)
1✔
193
}
1✔
194

195
func (s *SynchronizationController) deleteMigrationFunc(delObj interface{}) {
2✔
196
        // Clean up any synchronization connections in the map.
2✔
197
        s.enqueueVirtualMachineInstanceFromMigration(delObj)
2✔
198
        // Close any connections associated with this migration.
2✔
199
        migration, ok := delObj.(*virtv1.VirtualMachineInstanceMigration)
2✔
200
        if ok {
4✔
201
                if !migration.IsDecentralized() {
2✔
202
                        return
×
203
                }
×
204
                if migration.Spec.Receive != nil {
3✔
205
                        log.Log.V(4).Object(migration).Infof("closing receiving connection for migrationID %s", migration.Spec.Receive.MigrationID)
1✔
206
                        if err := s.closeConnectionForMigrationID(s.syncReceivingConnectionMap, migration.Spec.Receive.MigrationID); err != nil {
1✔
207
                                log.Log.Reason(err).Infof("unable to close connection for migrationID %s, possibly leaked connection", migration.Spec.Receive.MigrationID)
×
208
                        }
×
209
                } else if migration.Spec.SendTo != nil {
2✔
210
                        log.Log.V(4).Object(migration).Infof("closing outbound connection for migrationID %s", migration.Spec.SendTo.MigrationID)
1✔
211
                        if err := s.closeConnectionForMigrationID(s.syncOutboundConnectionMap, migration.Spec.SendTo.MigrationID); err != nil {
1✔
212
                                log.Log.Reason(err).Infof("unable to close connection for migrationID %s, possibly leaked connection", migration.Spec.SendTo.MigrationID)
×
213
                        }
×
214
                }
215
        }
216
}
217

218
func (s *SynchronizationController) closeConnectionForMigrationID(syncMap *sync.Map, migrationID string) error {
5✔
219
        obj, loaded := syncMap.LoadAndDelete(migrationID)
5✔
220
        if loaded {
9✔
221
                log.Log.V(4).Infof("closing connection associated with migrationID %s", migrationID)
4✔
222
                outboundConnection, ok := obj.(*SynchronizationConnection)
4✔
223
                if ok {
7✔
224
                        if err := outboundConnection.Close(); err != nil {
3✔
225
                                log.Log.Warningf("unable to close connection for migrationID %s, %v", migrationID, err)
×
226
                                s.failedCloseConnections.Store(outboundConnection, 0)
×
227
                                return err
×
228
                        }
×
229
                } else {
1✔
230
                        log.Log.Warningf("unable to close connection for migrationID %s, type is %v", migrationID, obj)
1✔
231
                        return fmt.Errorf("unknown type %v", obj)
1✔
232
                }
1✔
233
        }
234
        return nil
4✔
235
}
236

237
func (s *SynchronizationController) updateMigrationFunc(_, curr interface{}) {
1✔
238
        s.enqueueVirtualMachineInstanceFromMigration(curr)
1✔
239
}
1✔
240

241
func (s *SynchronizationController) enqueueVirtualMachineInstanceFromMigration(obj interface{}) {
4✔
242
        migration, ok := obj.(*virtv1.VirtualMachineInstanceMigration)
4✔
243
        if ok {
8✔
244
                key := controller.NamespacedKey(migration.Namespace, migration.Spec.VMIName)
4✔
245
                s.queue.Add(key)
4✔
246
        }
4✔
247
}
248

249
func (s *SynchronizationController) Run(threadiness int, stopCh <-chan struct{}) error {
×
250
        defer controller.HandlePanic()
×
251
        defer s.queue.ShutDown()
×
252
        defer s.closeConnections()
×
253

×
254
        log.Log.Info("starting vmi status synchronization controller.")
×
255

×
256
        // Wait for cache sync before we start the pod controller
×
257
        cache.WaitForCacheSync(stopCh, s.hasSynced)
×
258

×
259
        // Start the actual work
×
260
        for i := 0; i < threadiness; i++ {
×
261
                go wait.Until(s.runWorker, time.Second, stopCh)
×
262
        }
×
263
        go wait.Until(s.runConnectionCleanup, 5*time.Second, stopCh)
×
264

×
265
        conn, err := s.createTcpListener()
×
266
        if err != nil {
×
267
                log.Log.Criticalf("received error %v, exiting", err)
×
268
                return err
×
269
        } else {
×
270
                go func() {
×
271
                        s.grpcServer.Serve(conn)
×
272
                }()
×
273
        }
274
        if err := s.rebuildConnectionsAndUpdateSyncAddress(); err != nil {
×
275
                return err
×
276
        }
×
277

278
        log.Log.Info("waiting on stop signal")
×
279
        <-stopCh
×
280
        log.Log.Info("normally stopping vmi status synchronization controller.")
×
281
        return nil
×
282
}
283

284
func (s *SynchronizationController) closeConnections() {
25✔
285
        log.Log.V(1).Info("closing listener and grpcserver")
25✔
286
        if s.listener != nil {
50✔
287
                s.listener.Close()
25✔
288
        }
25✔
289
        if s.grpcServer != nil {
50✔
290
                s.grpcServer.GracefulStop()
25✔
291
        }
25✔
292
        log.Log.V(1).Infof("closing outbound connections")
25✔
293
        s.syncOutboundConnectionMap.Range(closeMapConnections)
25✔
294
        log.Log.V(1).Infof("closing inbound connections")
25✔
295
        s.syncReceivingConnectionMap.Range(closeMapConnections)
25✔
296
}
297

298
func closeMapConnections(k, obj interface{}) bool {
10✔
299
        outboundConnection, ok := obj.(*SynchronizationConnection)
10✔
300
        if ok && outboundConnection != nil {
20✔
301
                log.Log.V(1).Infof("closing connection for migration ID: %s", outboundConnection.migrationID)
10✔
302
                if err := outboundConnection.Close(); err != nil {
10✔
303
                        log.Log.Warningf("unable to close connection for VMI %s during shutdown, %v", k, err)
×
304
                }
×
305
        } else {
×
306
                log.Log.Warningf("unable to close connection for VMI %s during shutdown", k)
×
307
        }
×
308
        return true
10✔
309
}
310

311
func (s *SynchronizationController) runWorker() {
×
312
        for s.Execute() {
×
313
        }
×
314
}
315

316
func (s *SynchronizationController) Execute() bool {
1✔
317
        key, quit := s.queue.Get()
1✔
318
        if quit {
1✔
319
                return false
×
320
        }
×
321

322
        defer s.queue.Done(key)
1✔
323
        err := s.execute(key)
1✔
324

1✔
325
        if err != nil {
1✔
326
                log.Log.Reason(err).Infof("reenqueuing VirtualMachineInstance %v", key)
×
327
                s.queue.AddRateLimited(key)
×
328
        } else {
1✔
329
                log.Log.V(4).Infof("processed VirtualMachineInstance %v", key)
1✔
330
                s.queue.Forget(key)
1✔
331
        }
1✔
332
        return true
1✔
333
}
334

335
func (s *SynchronizationController) execute(key string) error {
7✔
336
        // Fetch the latest VMI state from cache
7✔
337
        obj, exists, _ := s.vmiInformer.GetStore().GetByKey(key)
7✔
338
        if !exists {
7✔
339
                return nil
×
340
        }
×
341
        vmi := obj.(*virtv1.VirtualMachineInstance)
7✔
342

7✔
343
        migration, err := s.getMigrationForVMI(vmi)
7✔
344
        if err != nil {
7✔
345
                return err
×
346
        }
×
347
        if migration != nil && migration.IsDecentralized() {
13✔
348
                if err := s.handleMigrationFinalizer(migration); err != nil {
6✔
349
                        return err
×
350
                }
×
351
                if migration.IsDecentralizedSource() {
9✔
352
                        if migration.DeletionTimestamp != nil {
3✔
353
                                log.Log.V(2).Object(migration).Infof("migration is being deleted, informing the target that the migration is canceled")
×
354
                                // migration is being deleted, inform the target that the migration is canceled
×
355
                                if err := s.cancelTargetRemoteMigration(vmi, migration); err != nil {
×
356
                                        return err
×
357
                                }
×
358
                                return nil
×
359
                        }
360
                        err := s.handleSourceState(vmi.DeepCopy(), migration)
3✔
361
                        return s.updateDecentralizedFailureOnSource(vmi, migration, err)
3✔
362
                }
363
                if migration.IsDecentralizedTarget() {
6✔
364
                        if migration.DeletionTimestamp != nil {
3✔
365
                                log.Log.V(2).Object(migration).Infof("migration is being deleted, informing the source that the migration is canceled")
×
366
                                // migration is being deleted, inform the target that the migration is canceled
×
367
                                if err := s.cancelSourceRemoteMigration(vmi, migration); err != nil {
×
368
                                        return err
×
369
                                }
×
370
                                return nil
×
371
                        }
372
                        err := s.handleTargetState(vmi.DeepCopy(), migration)
3✔
373
                        return s.updateDecentralizedFailureOnTarget(vmi, migration, err)
3✔
374
                }
375
                return nil
×
376
        } else {
1✔
377
                // No migration found don't do anything
1✔
378
                // We should only clear the condition if we are not waiting for synchronization.
1✔
379
                if err := s.clearDecentralizedLiveMigrationFailure(vmi); err != nil {
1✔
380
                        return err
×
381
                }
×
382
                log.Log.Object(vmi).V(4).Info("no active decentralized migration found for VMI")
1✔
383
                return nil
1✔
384
        }
385
}
386

387
func (s *SynchronizationController) updateDecentralizedFailureOnSource(vmi *virtv1.VirtualMachineInstance, migration *virtv1.VirtualMachineInstanceMigration, opErr error) error {
6✔
388
        if opErr != nil {
8✔
389
                if err := s.setDecentralizedLiveMigrationFailure(vmi, getErrorMessageForDecentralizedLiveMigrationFailure(opErr)); err != nil {
2✔
390
                        return err
×
391
                }
×
392
                return opErr
2✔
393
        }
394
        return s.clearDecentralizedLiveMigrationFailure(vmi)
4✔
395
}
396

397
func (s *SynchronizationController) updateDecentralizedFailureOnTarget(vmi *virtv1.VirtualMachineInstance, migration *virtv1.VirtualMachineInstanceMigration, opErr error) error {
8✔
398
        // failure from handleTargetState
8✔
399
        if opErr != nil {
11✔
400
                return s.setDecentralizedLiveMigrationFailure(vmi, getErrorMessageForDecentralizedLiveMigrationFailure(opErr))
3✔
401
        }
3✔
402

403
        // success: special case waiting for sync
404
        if migration.Status.Phase == virtv1.MigrationWaitingForSync {
6✔
405
                return s.setDecentralizedLiveMigrationFailure(vmi, waitingForSyncErrorMessage)
1✔
406
        }
1✔
407

408
        // success and not waiting for sync: clear condition
409
        return s.clearDecentralizedLiveMigrationFailure(vmi)
4✔
410
}
411

412
func getErrorMessageForDecentralizedLiveMigrationFailure(err error) string {
7✔
413
        log.Log.V(1).Infof("error message: %v", err)
7✔
414
        // Once we upgrade to golang 1.26, we no longer need to check for x509.HostnameError, since https://github.com/golang/go/issues/76445 will be fixed.
7✔
415
        if errors.As(err, &x509.HostnameError{}) {
10✔
416
                return "x509 hostname error"
3✔
417
        }
3✔
418
        message := ""
4✔
419
        if err != nil {
8✔
420
                message = err.Error()
4✔
421
        }
4✔
422
        return message
4✔
423
}
424

425
func (s *SynchronizationController) setDecentralizedLiveMigrationFailure(vmi *virtv1.VirtualMachineInstance, errorMessage string) error {
9✔
426
        orgVmi := vmi.DeepCopy()
9✔
427
        condition := &virtv1.VirtualMachineInstanceCondition{
9✔
428
                Type:               virtv1.VirtualMachineInstanceDecentralizedLiveMigrationFailure,
9✔
429
                Status:             k8sv1.ConditionTrue,
9✔
430
                Reason:             virtv1.VirtualMachineInstanceReasonDecentralizedNotMigratable,
9✔
431
                Message:            errorMessage,
9✔
432
                LastTransitionTime: metav1.Now(),
9✔
433
        }
9✔
434
        controller.NewVirtualMachineInstanceConditionManager().UpdateCondition(vmi, condition)
9✔
435
        if err2 := s.patchVMIConditions(context.Background(), orgVmi, vmi); err2 != nil {
9✔
436
                log.Log.Reason(err2).Infof("unable to patch VMI conditions after decentralized live migration failure")
×
437
                return err2
×
438
        }
×
439
        return nil
9✔
440
}
441

442
func (s *SynchronizationController) clearDecentralizedLiveMigrationFailure(vmi *virtv1.VirtualMachineInstance) error {
10✔
443
        orgVmi := vmi.DeepCopy()
10✔
444
        controller.NewVirtualMachineInstanceConditionManager().RemoveCondition(vmi, virtv1.VirtualMachineInstanceDecentralizedLiveMigrationFailure)
10✔
445
        if err := s.patchVMIConditions(context.Background(), orgVmi, vmi); err != nil {
10✔
446
                log.Log.Reason(err).Infof("unable to patch VMI conditions after clearing decentralized live migration failure")
×
447
                return err
×
448
        }
×
449
        return nil
10✔
450
}
451

452
func (s *SynchronizationController) handleMigrationFinalizer(migration *virtv1.VirtualMachineInstanceMigration) error {
6✔
453
        originalMigration := migration.DeepCopy()
6✔
454
        if !migration.IsFinal() && migration.DeletionTimestamp == nil {
12✔
455
                controller.AddFinalizer(migration, SynchronizationFinalizer)
6✔
456
        } else {
6✔
457
                controller.RemoveFinalizer(migration, SynchronizationFinalizer)
×
458
        }
×
459
        if !apiequality.Semantic.DeepEqual(originalMigration.ObjectMeta, migration.ObjectMeta) {
10✔
460
                log.Log.V(4).Object(migration).Infof("adding or removing finalizer to migration, %v", migration.Finalizers)
4✔
461
                patchSet := patch.New()
4✔
462
                patchSet.AddOption(
4✔
463
                        patch.WithReplace("/metadata/finalizers", migration.Finalizers),
4✔
464
                )
4✔
465
                patchBytes, err := patchSet.GeneratePayload()
4✔
466
                if err != nil {
4✔
467
                        return err
×
468
                }
×
469
                if _, err := s.client.VirtualMachineInstanceMigration(migration.Namespace).Patch(context.Background(), migration.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}); err != nil {
4✔
470
                        return err
×
471
                }
×
472
        }
473
        return nil
6✔
474
}
475

476
func (s *SynchronizationController) cancelSourceRemoteMigration(vmi *virtv1.VirtualMachineInstance, migration *virtv1.VirtualMachineInstanceMigration) error {
×
477
        if vmi == nil || vmi.Status.MigrationState == nil || vmi.Status.MigrationState.SourceState == nil || vmi.Status.MigrationState.SourceState.MigrationUID == "" {
×
478
                return nil
×
479
        }
×
480
        if migration.UID == vmi.Status.MigrationState.SourceState.MigrationUID {
×
481
                return fmt.Errorf("source migration UID %s is the same as the VMI's migration UID %s", migration.UID, vmi.Status.MigrationState.SourceState.MigrationUID)
×
482
        }
×
483
        log.Log.V(4).Object(migration).Infof("cancelling source remote migration for VMI %s/%s", vmi.Namespace, vmi.Name)
×
484
        return s.cancelRemoteMigration(vmi.Status.MigrationState.SourceState.MigrationUID, migration.Spec.Receive.MigrationID, s.syncOutboundConnectionMap)
×
485
}
486

487
func (s *SynchronizationController) cancelTargetRemoteMigration(vmi *virtv1.VirtualMachineInstance, migration *virtv1.VirtualMachineInstanceMigration) error {
×
488
        if vmi == nil || vmi.Status.MigrationState == nil || vmi.Status.MigrationState.TargetState == nil || vmi.Status.MigrationState.TargetState.MigrationUID == "" {
×
489
                return nil
×
490
        }
×
491
        if migration.UID == vmi.Status.MigrationState.TargetState.MigrationUID {
×
492
                return fmt.Errorf("target migration UID %s is the same as the VMI's migration UID %s", migration.UID, vmi.Status.MigrationState.TargetState.MigrationUID)
×
493
        }
×
494
        log.Log.V(4).Object(migration).Infof("cancelling target remote migration for VMI %s/%s", vmi.Namespace, vmi.Name)
×
495
        return s.cancelRemoteMigration(vmi.Status.MigrationState.TargetState.MigrationUID, migration.Spec.SendTo.MigrationID, s.syncReceivingConnectionMap)
×
496
}
497

498
func (s *SynchronizationController) cancelRemoteMigration(migrationUID types.UID, migrationID string, connectionMap *sync.Map) error {
×
499
        obj, ok := connectionMap.Load(migrationID)
×
500
        if !ok {
×
501
                // No connection found, don't do anything
×
502
                return nil
×
503
        }
×
504
        conn, ok := obj.(*SynchronizationConnection)
×
505
        if !ok {
×
506
                return fmt.Errorf("found unknown object in outbound connection cache %#v", conn)
×
507
        }
×
508
        client := syncv1.NewSynchronizeClient(conn.grpcClientConnection)
×
509
        ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.timeout)*time.Second)
×
510
        defer cancel()
×
511
        _, err := client.CancelMigration(ctx, &syncv1.MigrationCancelRequest{
×
512
                MigrationUID: string(migrationUID),
×
513
        })
×
514
        if err != nil {
×
515
                return err
×
516
        }
×
517
        return nil
×
518
}
519

520
func (s *SynchronizationController) getMigrationIDFromUID(migrationUID types.UID) (string, error) {
12✔
521
        objs, err := s.migrationInformer.GetIndexer().ByIndex("byUID", string(migrationUID))
12✔
522
        if err != nil {
12✔
523
                return "", err
×
524
        }
×
525
        if len(objs) > 1 {
12✔
526
                return "", fmt.Errorf("found more than one migration with same UID")
×
527
        }
×
528
        if len(objs) == 0 {
12✔
529
                return "", nil
×
530
        }
×
531
        migration, ok := objs[0].(*virtv1.VirtualMachineInstanceMigration)
12✔
532
        if !ok {
12✔
533
                return "", fmt.Errorf("found unknown object in migration cache")
×
534
        }
×
535
        var migrationID string
12✔
536
        if migration.Spec.Receive != nil {
18✔
537
                migrationID = migration.Spec.Receive.MigrationID
6✔
538
        }
6✔
539
        if migration.Spec.SendTo != nil {
18✔
540
                migrationID = migration.Spec.SendTo.MigrationID
6✔
541
        }
6✔
542
        return migrationID, nil
12✔
543
}
544

545
func (s *SynchronizationController) getOutboundSourceConnection(vmi *virtv1.VirtualMachineInstance, migrationState *virtv1.VirtualMachineInstanceMigrationState) (*SynchronizationConnection, error) {
7✔
546
        if migrationState.TargetState == nil || migrationState.TargetState.SyncAddress == nil || *migrationState.TargetState.SyncAddress == "" {
8✔
547
                return nil, nil
1✔
548
        }
1✔
549
        return s.getOutboundConnection(vmi, migrationState.SourceState.MigrationUID, *migrationState.TargetState.SyncAddress, s.syncOutboundConnectionMap)
6✔
550
}
551

552
func (s *SynchronizationController) getOutboundTargetConnection(vmi *virtv1.VirtualMachineInstance, migrationState *virtv1.VirtualMachineInstanceMigrationState) (*SynchronizationConnection, error) {
7✔
553
        if migrationState.SourceState == nil || migrationState.SourceState.SyncAddress == nil || *migrationState.SourceState.SyncAddress == "" {
8✔
554
                return nil, nil
1✔
555
        }
1✔
556
        return s.getOutboundConnection(vmi, migrationState.TargetState.MigrationUID, *migrationState.SourceState.SyncAddress, s.syncReceivingConnectionMap)
6✔
557
}
558

559
func (s *SynchronizationController) getOutboundConnection(vmi *virtv1.VirtualMachineInstance, migrationUID types.UID, syncAddress string, connectionMap *sync.Map) (*SynchronizationConnection, error) {
12✔
560
        if migrationUID == "" {
12✔
561
                return nil, nil
×
562
        }
×
563
        migrationID, err := s.getMigrationIDFromUID(migrationUID)
12✔
564
        if err != nil {
12✔
565
                return nil, err
×
566
        }
×
567
        log.Log.Object(vmi).V(4).Infof("found migration ID %s", migrationID)
12✔
568
        obj, ok := connectionMap.Load(migrationID)
12✔
569
        if !ok {
22✔
570
                grpcClientConnection, err := s.createOutboundConnection(syncAddress)
10✔
571
                if err != nil {
10✔
572
                        return nil, err
×
573
                }
×
574
                conn := &SynchronizationConnection{
10✔
575
                        migrationID:          migrationID,
10✔
576
                        grpcClientConnection: grpcClientConnection,
10✔
577
                }
10✔
578
                connectionMap.Store(migrationID, conn)
10✔
579
                return conn, nil
10✔
580
        }
581
        outboundSyncConnection, ok := obj.(*SynchronizationConnection)
2✔
582
        if !ok {
2✔
583
                return nil, fmt.Errorf("found unknown object in outbound connection cache %#v", outboundSyncConnection)
×
584
        }
×
585
        return outboundSyncConnection, nil
2✔
586
}
587

588
func (s *SynchronizationController) handleSourceState(vmi *virtv1.VirtualMachineInstance, migration *virtv1.VirtualMachineInstanceMigration) error {
8✔
589
        var outboundConnection *SynchronizationConnection
8✔
590
        var err error
8✔
591
        if vmi.Status.MigrationState == nil {
9✔
592
                // No migration state, don't do anything
1✔
593
                return nil
1✔
594
        }
1✔
595
        if vmi.Status.MigrationState.SourceState == nil || vmi.Status.MigrationState.TargetState == nil {
9✔
596
                // No migration state, don't do anything
2✔
597
                return nil
2✔
598
        }
2✔
599

600
        sourceState := vmi.Status.MigrationState.SourceState
5✔
601
        if sourceState.SyncAddress == nil || *sourceState.SyncAddress == "" {
8✔
602
                syncAddress, err := s.getLocalSynchronizationAddress()
3✔
603
                if err != nil {
3✔
604
                        return err
×
605
                }
×
606
                sourceState.SyncAddress = &syncAddress
3✔
607
        }
608
        targetState := vmi.Status.MigrationState.TargetState
5✔
609
        if targetState.SyncAddress != nil && sourceState.MigrationUID != "" {
10✔
610
                if outboundConnection, err = s.getOutboundSourceConnection(vmi, vmi.Status.MigrationState); err != nil {
5✔
611
                        return err
×
612
                }
×
613
        }
614
        if outboundConnection == nil {
5✔
615
                log.Log.Object(vmi).V(4).Info("no synchronization connection found for source, doing nothing")
×
616
                return nil
×
617
        }
×
618
        vmiStatusJson, err := json.Marshal(vmi.Status)
5✔
619
        if err != nil {
5✔
620
                return err
×
621
        }
×
622
        client := syncv1.NewSynchronizeClient(outboundConnection.grpcClientConnection)
5✔
623
        ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.timeout)*time.Second)
5✔
624
        defer cancel()
5✔
625

5✔
626
        if _, err := client.SyncSourceMigrationStatus(ctx, &syncv1.VMIStatusRequest{
5✔
627
                MigrationID: outboundConnection.migrationID,
5✔
628
                VmiStatus: &syncv1.VMIStatus{
5✔
629
                        VmiStatusJson: vmiStatusJson,
5✔
630
                },
5✔
631
        }); err != nil {
6✔
632
                return err
1✔
633
        }
1✔
634
        if migration.IsFinal() {
4✔
635
                if migration.Spec.SendTo != nil {
×
636
                        log.Log.Object(migration).Infof("completed migration for VMI %s/%s, closing outbound connections", migration.Namespace, migration.Spec.VMIName)
×
637
                        s.closeConnectionForMigrationID(s.syncOutboundConnectionMap, migration.Spec.SendTo.MigrationID)
×
638
                }
×
639
        }
640

641
        return nil
4✔
642
}
643

644
func (s *SynchronizationController) handleTargetState(vmi *virtv1.VirtualMachineInstance, migration *virtv1.VirtualMachineInstanceMigration) error {
8✔
645
        if vmi.Status.MigrationState == nil {
9✔
646
                // No migration state, don't do anything
1✔
647
                return nil
1✔
648
        }
1✔
649
        if vmi.Status.MigrationState.TargetState == nil || vmi.Status.MigrationState.SourceState == nil {
9✔
650
                // No migration state, don't do anything
2✔
651
                return nil
2✔
652
        }
2✔
653

654
        var outboundConnection *SynchronizationConnection
5✔
655
        var err error
5✔
656
        sourceState := vmi.Status.MigrationState.SourceState
5✔
657
        targetState := vmi.Status.MigrationState.TargetState
5✔
658
        if targetState.SyncAddress == nil || *targetState.SyncAddress == "" {
8✔
659
                syncAddress, err := s.getLocalSynchronizationAddress()
3✔
660
                if err != nil {
3✔
661
                        return err
×
662
                }
×
663
                targetState.SyncAddress = &syncAddress
3✔
664
        }
665

666
        if sourceState.SyncAddress != nil && targetState.MigrationUID != "" {
10✔
667
                if outboundConnection, err = s.getOutboundTargetConnection(vmi, vmi.Status.MigrationState); err != nil {
5✔
668
                        return err
×
669
                }
×
670
        }
671
        if outboundConnection == nil {
5✔
672
                log.Log.Object(vmi).V(4).Info("no synchronization connection found for target, doing nothing")
×
673
                return nil
×
674
        }
×
675

676
        vmiStatusJson, err := json.Marshal(vmi.Status)
5✔
677
        if err != nil {
5✔
678
                return err
×
679
        }
×
680
        client := syncv1.NewSynchronizeClient(outboundConnection.grpcClientConnection)
5✔
681
        ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.timeout)*time.Second)
5✔
682
        defer cancel()
5✔
683

5✔
684
        _, err = client.SyncTargetMigrationStatus(ctx, &syncv1.VMIStatusRequest{
5✔
685
                MigrationID: outboundConnection.migrationID,
5✔
686
                VmiStatus: &syncv1.VMIStatus{
5✔
687
                        VmiStatusJson: vmiStatusJson,
5✔
688
                },
5✔
689
        })
5✔
690
        if err != nil {
6✔
691
                return err
1✔
692
        }
1✔
693
        if migration.IsFinal() {
4✔
694
                if migration.Spec.Receive != nil {
×
695
                        log.Log.Object(migration).Infof("completed migration for VMI %s/%s, closing receiving connections", migration.Namespace, migration.Spec.VMIName)
×
696
                        s.closeConnectionForMigrationID(s.syncReceivingConnectionMap, migration.Spec.Receive.MigrationID)
×
697
                }
×
698
        }
699

700
        return nil
4✔
701
}
702

703
func (s *SynchronizationController) getMigrationForVMI(vmi *virtv1.VirtualMachineInstance) (*virtv1.VirtualMachineInstanceMigration, error) {
7✔
704
        objects, err := s.migrationInformer.GetIndexer().ByIndex("byActiveVMIName", vmi.Name)
7✔
705
        if err != nil {
7✔
706
                return nil, err
×
707
        }
×
708
        if len(objects) > 0 {
13✔
709
                count := 0
6✔
710
                var res *virtv1.VirtualMachineInstanceMigration
6✔
711
                for _, migrationObj := range objects {
12✔
712
                        migration, ok := migrationObj.(*virtv1.VirtualMachineInstanceMigration)
6✔
713
                        if !ok {
6✔
714
                                return nil, fmt.Errorf("not a virtual machine instance migration")
×
715
                        }
×
716
                        if migration.Namespace == vmi.Namespace {
12✔
717
                                if migration.IsDecentralizedSource() {
9✔
718
                                        if vmi.Status.MigrationState != nil && vmi.Status.MigrationState.SourceState != nil && migration.UID == vmi.Status.MigrationState.SourceState.MigrationUID {
6✔
719
                                                count++
3✔
720
                                                res = migration
3✔
721
                                        }
3✔
722
                                } else if migration.IsDecentralizedTarget() {
6✔
723
                                        if vmi.Status.MigrationState != nil && vmi.Status.MigrationState.TargetState != nil && migration.UID == vmi.Status.MigrationState.TargetState.MigrationUID {
6✔
724
                                                count++
3✔
725
                                                res = migration
3✔
726
                                        }
3✔
727
                                }
728
                        }
729
                }
730
                if count > 1 {
6✔
731
                        return nil, fmt.Errorf("found more than one migration pointing to same VMI")
×
732
                } else if count == 0 {
6✔
733
                        return nil, nil
×
734
                }
×
735
                return res, nil
6✔
736
        }
737
        return nil, nil
1✔
738
}
739

740
func (s *SynchronizationController) rebuildConnectionsAndUpdateSyncAddress() error {
7✔
741
        // Go and find all active migration resources, if they are decentralized rebuild either
7✔
742
        // the incoming or outbound connections, and call sync to update the remote with the new
7✔
743
        // address.
7✔
744
        objs := s.migrationInformer.GetStore().List()
7✔
745
        log.Log.V(4).Infof("rebuilding any connections, and updating remote VMIs, found %d migrations", len(objs))
7✔
746
        for _, obj := range objs {
13✔
747
                migration, ok := obj.(*virtv1.VirtualMachineInstanceMigration)
6✔
748
                if !ok {
6✔
749
                        return fmt.Errorf("unknown object in migration store %v", obj)
×
750
                }
×
751
                if isOnGoingMigration(migration) {
11✔
752
                        vmi, err := s.getVMIFromMigration(migration)
5✔
753
                        if err != nil {
5✔
754
                                return err
×
755
                        }
×
756
                        if vmi == nil {
6✔
757
                                // No VMI found, can't update it, so skip it.
1✔
758
                                continue
1✔
759
                        }
760
                        // ongoing migration.
761
                        if migration.Spec.Receive != nil {
6✔
762
                                // We are the target
2✔
763
                                log.Log.Object(migration).Object(vmi).Info("found ongoing target migration for vmi, rebuilding connection")
2✔
764
                                if err := s.rebuildTargetConnection(migration, vmi); err != nil {
2✔
765
                                        return err
×
766
                                }
×
767
                        } else if migration.Spec.SendTo != nil {
4✔
768
                                // We are the source
2✔
769
                                log.Log.Object(migration).Object(vmi).Info("found ongoing source migration for vmi, rebuilding connection")
2✔
770
                                if err := s.rebuildSourceConnection(migration, vmi); err != nil {
2✔
771
                                        return err
×
772
                                }
×
773
                        }
774
                }
775
        }
776
        return nil
7✔
777
}
778

779
func isOnGoingMigration(migration *virtv1.VirtualMachineInstanceMigration) bool {
6✔
780
        return migration.IsDecentralized() && migration.Status.Phase != virtv1.MigrationFailed && migration.Status.Phase != virtv1.MigrationSucceeded
6✔
781
}
6✔
782

783
func (s *SynchronizationController) rebuildTargetConnection(migration *virtv1.VirtualMachineInstanceMigration, vmi *virtv1.VirtualMachineInstance) error {
2✔
784
        conn, err := s.getOutboundTargetConnection(vmi, vmi.Status.MigrationState)
2✔
785
        if err != nil {
2✔
786
                return err
×
787
        }
×
788
        if conn == nil {
3✔
789
                return nil
1✔
790
        }
1✔
791
        s.syncReceivingConnectionMap.Store(migration.Spec.Receive.MigrationID, conn)
1✔
792
        if vmi.Status.MigrationState != nil && vmi.Status.MigrationState.TargetState != nil {
2✔
793
                url, err := s.getLocalSynchronizationAddress()
1✔
794
                if err != nil {
1✔
795
                        return err
×
796
                }
×
797
                origVMI := vmi.DeepCopy()
1✔
798
                vmi.Status.MigrationState.TargetState.SyncAddress = &url
1✔
799
                // patching will cause reconcile loop to connect to remote to update
1✔
800
                if err := s.patchVMI(context.Background(), origVMI, vmi); err != nil {
1✔
801
                        return err
×
802
                }
×
803
        }
804
        return nil
1✔
805
}
806

807
func (s *SynchronizationController) rebuildSourceConnection(migration *virtv1.VirtualMachineInstanceMigration, vmi *virtv1.VirtualMachineInstance) error {
2✔
808
        conn, err := s.getOutboundSourceConnection(vmi, vmi.Status.MigrationState)
2✔
809
        if err != nil {
2✔
810
                return err
×
811
        }
×
812
        if conn == nil {
3✔
813
                return nil
1✔
814
        }
1✔
815
        s.syncOutboundConnectionMap.Store(migration.Spec.SendTo.MigrationID, conn)
1✔
816
        if vmi.Status.MigrationState != nil && vmi.Status.MigrationState.SourceState != nil {
2✔
817
                url, err := s.getLocalSynchronizationAddress()
1✔
818
                if err != nil {
1✔
819
                        return err
×
820
                }
×
821
                origVMI := vmi.DeepCopy()
1✔
822
                vmi.Status.MigrationState.SourceState.SyncAddress = &url
1✔
823
                // patching will cause reconcile loop to connect to remote to update
1✔
824
                if err := s.patchVMI(context.Background(), origVMI, vmi); err != nil {
1✔
825
                        return err
×
826
                }
×
827
        }
828
        return nil
1✔
829
}
830

831
func (s *SynchronizationController) getVMIFromMigration(migration *virtv1.VirtualMachineInstanceMigration) (*virtv1.VirtualMachineInstance, error) {
7✔
832
        key := controller.NamespacedKey(migration.Namespace, migration.Spec.VMIName)
7✔
833
        obj, exists, err := s.vmiInformer.GetStore().GetByKey(key)
7✔
834
        if err != nil {
7✔
835
                return nil, err
×
836
        }
×
837
        if !exists {
9✔
838
                return nil, nil
2✔
839
        }
2✔
840
        return obj.(*virtv1.VirtualMachineInstance).DeepCopy(), nil
5✔
841
}
842

843
func (s *SynchronizationController) getLocalSynchronizationAddress() (string, error) {
35✔
844
        if s.ip != "" {
35✔
845
                return fmt.Sprintf("%s:%d", s.ip, s.bindPort), nil
×
846
        }
×
847
        // TODO figure out how to get my URL with or without submariner (url changes based on export)
848
        return s.listener.Addr().String(), nil
35✔
849
}
850

851
func (s *SynchronizationController) createOutboundConnection(connectionURL string) (*grpc.ClientConn, error) {
10✔
852
        logger := log.Log.With("outbound", connectionURL)
10✔
853
        logger.Info("creating new synchronization grpc connection")
10✔
854

10✔
855
        client, err := grpc.NewClient(connectionURL, grpc.WithTransportCredentials(credentials.NewTLS(s.clientTLSConfig)))
10✔
856
        return client, err
10✔
857
}
10✔
858

859
func (s *SynchronizationController) createTcpListener() (net.Listener, error) {
25✔
860
        if s.listener != nil {
25✔
861
                return s.listener, nil
×
862
        }
×
863
        var ln net.Listener
25✔
864
        var err error
25✔
865
        addr := net.JoinHostPort(s.bindAddress, strconv.Itoa(s.bindPort))
25✔
866
        ln, err = net.Listen("tcp", addr)
25✔
867
        if err != nil {
25✔
868
                log.Log.Reason(err).Error("failed to create tcp listener")
×
869
                return nil, err
×
870
        }
×
871
        s.listener = ln
25✔
872
        return ln, nil
25✔
873
}
874

875
func (s *SynchronizationController) findTargetMigrationFromMigrationID(migrationID string) (*virtv1.VirtualMachineInstanceMigration, error) {
24✔
876
        return s.findMigrationFromMigrationIDByIndex("byTargetMigrationID", migrationID)
24✔
877
}
24✔
878

879
func (s *SynchronizationController) findSourceMigrationFromMigrationID(migrationID string) (*virtv1.VirtualMachineInstanceMigration, error) {
24✔
880
        return s.findMigrationFromMigrationIDByIndex("bySourceMigrationID", migrationID)
24✔
881
}
24✔
882

883
func (s *SynchronizationController) findMigrationFromMigrationIDByIndex(indexName, migrationID string) (*virtv1.VirtualMachineInstanceMigration, error) {
49✔
884
        objs, err := s.migrationInformer.GetIndexer().ByIndex(indexName, migrationID)
49✔
885
        if err != nil {
49✔
886
                return nil, err
×
887
        }
×
888
        if len(objs) > 1 {
49✔
889
                log.Log.Warningf("found multiple migrations for migrationID %s, picking first one", migrationID)
×
890
        }
×
891
        for _, obj := range objs {
95✔
892
                migration, _ := obj.(*virtv1.VirtualMachineInstanceMigration)
46✔
893
                return migration, nil
46✔
894
        }
46✔
895
        return nil, nil
3✔
896
}
897

898
func (s *SynchronizationController) SyncSourceMigrationStatus(ctx context.Context, request *syncv1.VMIStatusRequest) (*syncv1.VMIStatusResponse, error) {
15✔
899
        if request.VmiStatus == nil || len(request.VmiStatus.VmiStatusJson) == 0 {
16✔
900
                return &syncv1.VMIStatusResponse{
1✔
901
                        Message: noSourceStatusErrorMsg,
1✔
902
                }, fmt.Errorf(noSourceStatusErrorMsg)
1✔
903
        }
1✔
904
        migration, err := s.findTargetMigrationFromMigrationID(request.MigrationID)
14✔
905
        if migration == nil {
15✔
906
                return &syncv1.VMIStatusResponse{
1✔
907
                        Message: fmt.Sprintf(sourceUnableToLocateVMIMigrationIDErrorMsg, request.MigrationID),
1✔
908
                }, fmt.Errorf(sourceUnableToLocateVMIMigrationIDErrorMsg, request.MigrationID)
1✔
909
        }
1✔
910
        key := controller.NamespacedKey(migration.Namespace, migration.Spec.VMIName)
13✔
911
        log.Log.Object(migration).V(5).Infof("looking up VMI %s", key)
13✔
912
        obj, exists, err := s.vmiInformer.GetStore().GetByKey(key)
13✔
913
        if err != nil || !exists {
15✔
914
                if err == nil {
4✔
915
                        err = fmt.Errorf(sourceUnableToLocateVMIMigrationIDErrorMsgVMI, request.MigrationID, key)
2✔
916
                }
2✔
917
                return &syncv1.VMIStatusResponse{
2✔
918
                        Message: fmt.Sprintf(sourceUnableToLocateVMIMigrationIDErrorMsgVMI, request.MigrationID, key),
2✔
919
                }, err
2✔
920
        }
921
        vmi := obj.(*virtv1.VirtualMachineInstance)
11✔
922
        remoteStatus := &virtv1.VirtualMachineInstanceStatus{}
11✔
923
        if err := json.Unmarshal(request.VmiStatus.VmiStatusJson, remoteStatus); err != nil {
12✔
924
                return &syncv1.VMIStatusResponse{
1✔
925
                        Message: fmt.Sprintf("unable to unmarshal vmistatus for migrationID %s", request.MigrationID),
1✔
926
                }, err
1✔
927
        }
1✔
928
        if remoteStatus.MigrationState == nil {
11✔
929
                return &syncv1.VMIStatusResponse{
1✔
930
                        Message: noSourceStatusErrorMsg,
1✔
931
                }, fmt.Errorf(noSourceStatusErrorMsg)
1✔
932
        }
1✔
933
        newVMI := vmi.DeepCopy()
9✔
934
        if newVMI.Status.MigrationState == nil {
13✔
935
                newVMI.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{}
4✔
936
        }
4✔
937

938
        // Only update SourceState if this migration is still active and matches the VMI's current migration.
939
        // This prevents stale updates from a completed decentralized migration from interfering with a new compute migration.
940
        if migration.IsFinal() {
10✔
941
                log.Log.Object(migration).Infof("Migration is final, ignoring source state update for VMI %s/%s", vmi.Namespace, vmi.Name)
1✔
942
                return &syncv1.VMIStatusResponse{
1✔
943
                        Message: successMessage,
1✔
944
                }, nil
1✔
945
        }
1✔
946

947
        // Check if the VMI's current migration matches this migration
948
        if newVMI.Status.MigrationState.MigrationUID != "" && newVMI.Status.MigrationState.MigrationUID != migration.UID {
9✔
949
                log.Log.Object(migration).Warningf("VMI %s/%s has different migration UID %s, ignoring source state update for migration %s",
1✔
950
                        vmi.Namespace, vmi.Name, newVMI.Status.MigrationState.MigrationUID, migration.UID)
1✔
951
                return &syncv1.VMIStatusResponse{
1✔
952
                        Message: successMessage,
1✔
953
                }, nil
1✔
954
        }
1✔
955

956
        log.Log.Object(newVMI).V(5).Infof("vmi migration source state: %#v", newVMI.Status.MigrationState.SourceState)
7✔
957
        log.Log.Object(newVMI).V(5).Infof("remote migration source state: %#v", remoteStatus.MigrationState.SourceState)
7✔
958
        newVMI.Status.MigrationState.SourceState = remoteStatus.MigrationState.SourceState.DeepCopy()
7✔
959
        copyLegacySourceFields(newVMI, remoteStatus.MigrationState)
7✔
960
        if len(remoteStatus.MigratedVolumes) > 0 {
7✔
961
                log.Log.Object(newVMI).V(5).Infof("SyncSourceMigrationStatus: Copying migrated volumes to target state, %#v", newVMI.Status.MigratedVolumes)
×
962
                newVMI.Status.MigratedVolumes = getMergedSourceMigratedVolumes(newVMI.Status.MigratedVolumes, remoteStatus.MigratedVolumes)
×
963
        } else {
7✔
964
                log.Log.Object(newVMI).V(5).Info("SyncSourceMigrationStatus: No source migrated volumes found")
7✔
965
        }
7✔
966
        newVMI.Status.MigrationMethod = remoteStatus.MigrationMethod
7✔
967
        if !apiequality.Semantic.DeepEqual(vmi.Status, newVMI.Status) {
14✔
968
                if err := s.patchVMI(ctx, vmi, newVMI); err != nil {
8✔
969
                        return &syncv1.VMIStatusResponse{
1✔
970
                                Message: fmt.Sprintf("unable to synchronize VMI for migrationID %s", request.MigrationID),
1✔
971
                        }, err
1✔
972
                }
1✔
973
                log.Log.Object(newVMI).With("MigrationID", request.MigrationID).V(5).Info("successfully patched VMI with source state")
6✔
974
        }
975
        log.Log.Object(newVMI).V(5).Info("returning success to grpc caller, source")
6✔
976
        return &syncv1.VMIStatusResponse{
6✔
977
                Message: successMessage,
6✔
978
        }, nil
6✔
979
}
980

981
func getMergedTargetMigratedVolumes(vmiMigratedVolumes []virtv1.StorageMigratedVolumeInfo, remoteMigratedVolumes []virtv1.StorageMigratedVolumeInfo) []virtv1.StorageMigratedVolumeInfo {
16✔
982
        remoteVolumeMap := make(map[string]virtv1.StorageMigratedVolumeInfo)
16✔
983
        for _, volume := range remoteMigratedVolumes {
25✔
984
                remoteVolumeMap[volume.VolumeName] = volume
9✔
985
        }
9✔
986
        mergedVolumes := make([]virtv1.StorageMigratedVolumeInfo, 0)
16✔
987
        for _, volume := range vmiMigratedVolumes {
26✔
988
                if remoteVolume, ok := remoteVolumeMap[volume.VolumeName]; ok {
18✔
989
                        mergedVolume := virtv1.StorageMigratedVolumeInfo{
8✔
990
                                VolumeName: volume.VolumeName,
8✔
991
                        }
8✔
992
                        if remoteVolume.DestinationPVCInfo != nil {
15✔
993
                                mergedVolume.DestinationPVCInfo = remoteVolume.DestinationPVCInfo.DeepCopy()
7✔
994
                        }
7✔
995
                        if volume.SourcePVCInfo != nil {
15✔
996
                                mergedVolume.SourcePVCInfo = volume.SourcePVCInfo.DeepCopy()
7✔
997
                        }
7✔
998
                        mergedVolumes = append(mergedVolumes, mergedVolume)
8✔
999
                } else {
2✔
1000
                        mergedVolumes = append(mergedVolumes, volume)
2✔
1001
                }
2✔
1002
        }
1003
        return mergedVolumes
16✔
1004
}
1005

1006
func getMergedSourceMigratedVolumes(vmiMigratedVolumes []virtv1.StorageMigratedVolumeInfo, remoteMigratedVolumes []virtv1.StorageMigratedVolumeInfo) []virtv1.StorageMigratedVolumeInfo {
11✔
1007
        remoteVolumeMap := make(map[string]virtv1.StorageMigratedVolumeInfo)
11✔
1008
        for _, volume := range remoteMigratedVolumes {
24✔
1009
                remoteVolumeMap[volume.VolumeName] = volume
13✔
1010
        }
13✔
1011
        mergedVolumes := make([]virtv1.StorageMigratedVolumeInfo, 0)
11✔
1012
        for _, vmiVolume := range vmiMigratedVolumes {
22✔
1013
                if remoteVolume, ok := remoteVolumeMap[vmiVolume.VolumeName]; ok {
21✔
1014
                        log.Log.V(5).Infof("Merging volume %s", vmiVolume.VolumeName)
10✔
1015
                        // Found a match merge the current target volume with the incoming source volume
10✔
1016
                        mergedVolume := virtv1.StorageMigratedVolumeInfo{
10✔
1017
                                VolumeName: vmiVolume.VolumeName,
10✔
1018
                        }
10✔
1019
                        if vmiVolume.SourcePVCInfo != nil {
17✔
1020
                                mergedVolume.SourcePVCInfo = vmiVolume.SourcePVCInfo.DeepCopy()
7✔
1021
                        } else {
10✔
1022
                                mergedVolume.SourcePVCInfo = remoteVolume.SourcePVCInfo.DeepCopy()
3✔
1023
                        }
3✔
1024
                        if vmiVolume.DestinationPVCInfo != nil {
19✔
1025
                                mergedVolume.DestinationPVCInfo = vmiVolume.DestinationPVCInfo.DeepCopy()
9✔
1026
                        }
9✔
1027
                        mergedVolumes = append(mergedVolumes, mergedVolume)
10✔
1028
                        delete(remoteVolumeMap, vmiVolume.VolumeName)
10✔
1029
                }
1030
        }
1031
        for _, volume := range remoteVolumeMap {
14✔
1032
                mergedVolumes = append(mergedVolumes, volume)
3✔
1033
        }
3✔
1034
        return mergedVolumes
11✔
1035
}
1036

1037
func (s *SynchronizationController) SyncTargetMigrationStatus(ctx context.Context, request *syncv1.VMIStatusRequest) (*syncv1.VMIStatusResponse, error) {
15✔
1038
        if request.VmiStatus == nil || len(request.VmiStatus.VmiStatusJson) == 0 {
16✔
1039
                return &syncv1.VMIStatusResponse{
1✔
1040
                        Message: noTargetStatusErrorMsg,
1✔
1041
                }, fmt.Errorf(noTargetStatusErrorMsg)
1✔
1042
        }
1✔
1043

1044
        migration, err := s.findSourceMigrationFromMigrationID(request.MigrationID)
14✔
1045
        if migration == nil {
15✔
1046
                return &syncv1.VMIStatusResponse{
1✔
1047
                        Message: fmt.Sprintf(targetUnableToLocateVMIMigrationIDErrorMsg, request.MigrationID),
1✔
1048
                }, fmt.Errorf(targetUnableToLocateVMIMigrationIDErrorMsg, request.MigrationID)
1✔
1049
        }
1✔
1050

1051
        key := controller.NamespacedKey(migration.Namespace, migration.Spec.VMIName)
13✔
1052
        obj, exists, err := s.vmiInformer.GetStore().GetByKey(key)
13✔
1053
        if err != nil || !exists {
15✔
1054
                if err == nil {
4✔
1055
                        err = fmt.Errorf(targetUnableToLocateVMIMigrationIDErrorMsgVMI, request.MigrationID, key)
2✔
1056
                }
2✔
1057
                return &syncv1.VMIStatusResponse{
2✔
1058
                        Message: fmt.Sprintf(targetUnableToLocateVMIMigrationIDErrorMsgVMI, request.MigrationID, key),
2✔
1059
                }, err
2✔
1060
        }
1061
        vmi := obj.(*virtv1.VirtualMachineInstance)
11✔
1062
        remoteStatus := &virtv1.VirtualMachineInstanceStatus{}
11✔
1063
        if err := json.Unmarshal(request.VmiStatus.VmiStatusJson, remoteStatus); err != nil {
12✔
1064
                return &syncv1.VMIStatusResponse{
1✔
1065
                        Message: fmt.Sprintf("unable to unmarshal vmistatus for migrationID %s", request.MigrationID),
1✔
1066
                }, err
1✔
1067
        }
1✔
1068
        if remoteStatus.MigrationState == nil {
11✔
1069
                return &syncv1.VMIStatusResponse{
1✔
1070
                        Message: noTargetStatusErrorMsg,
1✔
1071
                }, fmt.Errorf(noTargetStatusErrorMsg)
1✔
1072
        }
1✔
1073
        newVMI := vmi.DeepCopy()
9✔
1074
        if newVMI.Status.MigrationState == nil {
13✔
1075
                newVMI.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{}
4✔
1076
        }
4✔
1077

1078
        // Only update TargetState if this migration is still active and matches the VMI's current migration.
1079
        // This prevents stale updates from a completed decentralized migration from interfering with a new compute migration.
1080
        if migration.IsFinal() {
10✔
1081
                log.Log.Object(migration).Infof("Migration is final, ignoring target state update for VMI %s/%s", vmi.Namespace, vmi.Name)
1✔
1082
                return &syncv1.VMIStatusResponse{
1✔
1083
                        Message: successMessage,
1✔
1084
                }, nil
1✔
1085
        }
1✔
1086

1087
        // Check if the VMI's current migration matches this migration
1088
        if newVMI.Status.MigrationState.MigrationUID != "" && newVMI.Status.MigrationState.MigrationUID != migration.UID {
9✔
1089
                log.Log.Object(migration).Warningf("VMI %s/%s has different migration UID %s, ignoring target state update for migration %s",
1✔
1090
                        vmi.Namespace, vmi.Name, newVMI.Status.MigrationState.MigrationUID, migration.UID)
1✔
1091
                return &syncv1.VMIStatusResponse{
1✔
1092
                        Message: successMessage,
1✔
1093
                }, nil
1✔
1094
        }
1✔
1095

1096
        log.Log.Object(newVMI).V(5).Infof("vmi migration target state: %#v", newVMI.Status.MigrationState.TargetState)
7✔
1097
        log.Log.Object(newVMI).V(5).Infof("remote migration target state: %#v", remoteStatus.MigrationState.TargetState)
7✔
1098
        newVMI.Status.MigrationState.TargetState = remoteStatus.MigrationState.TargetState.DeepCopy()
7✔
1099
        newVMI.Status.MigratedVolumes = getMergedTargetMigratedVolumes(newVMI.Status.MigratedVolumes, remoteStatus.MigratedVolumes)
7✔
1100
        copyLegacyTargetFields(newVMI, remoteStatus.MigrationState)
7✔
1101
        if !apiequality.Semantic.DeepEqual(vmi.Status.MigrationState, newVMI.Status.MigrationState) {
14✔
1102
                if err := s.patchVMI(ctx, vmi, newVMI); err != nil {
8✔
1103
                        return &syncv1.VMIStatusResponse{
1✔
1104
                                Message: fmt.Sprintf("unable to synchronize VMI for migrationID %s", request.MigrationID),
1✔
1105
                        }, err
1✔
1106
                }
1✔
1107
                log.Log.Object(newVMI).With("MigrationID", request.MigrationID).V(5).Info("successfully patched VMI with target state")
6✔
1108
        }
1109
        log.Log.Object(newVMI).V(5).Info("returning success to grpc caller, target")
6✔
1110
        return &syncv1.VMIStatusResponse{
6✔
1111
                Message: successMessage,
6✔
1112
        }, nil
6✔
1113
}
1114

1115
func (s *SynchronizationController) patchVMIConditions(ctx context.Context, origVMI, newVMI *virtv1.VirtualMachineInstance) error {
19✔
1116
        patchSet := patch.New()
19✔
1117
        if !apiequality.Semantic.DeepEqual(origVMI.Status.Conditions, newVMI.Status.Conditions) {
31✔
1118
                patchSet.AddOption(
12✔
1119
                        patch.WithTest("/status/conditions", origVMI.Status.Conditions),
12✔
1120
                        patch.WithReplace("/status/conditions", newVMI.Status.Conditions),
12✔
1121
                )
12✔
1122
        }
12✔
1123
        if !patchSet.IsEmpty() {
31✔
1124
                patchBytes, err := patchSet.GeneratePayload()
12✔
1125
                if err != nil {
12✔
1126
                        return err
×
1127
                }
×
1128
                log.Log.Object(origVMI).V(5).Infof("patch VMI conditions with %s", string(patchBytes))
12✔
1129
                if _, err := s.client.VirtualMachineInstance(origVMI.Namespace).Patch(ctx, origVMI.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}); err != nil {
12✔
1130
                        return err
×
1131
                }
×
1132
        }
1133
        return nil
19✔
1134
}
1135

1136
func (s *SynchronizationController) patchVMI(ctx context.Context, origVMI, newVMI *virtv1.VirtualMachineInstance) error {
16✔
1137
        if origVMI.Status.MigrationState != nil &&
16✔
1138
                origVMI.Status.MigrationState.Completed {
16✔
1139
                log.Log.Object(origVMI).V(3).Infof("VMI is completed, skipping patch")
×
1140
                return nil
×
1141
        }
×
1142
        patchSet := patch.New()
16✔
1143

16✔
1144
        if !apiequality.Semantic.DeepEqual(origVMI.Labels, newVMI.Labels) {
16✔
1145
                if len(origVMI.Labels) == 0 {
×
1146
                        patchSet.AddOption(
×
1147
                                patch.WithAdd("/metadata/labels", newVMI.Labels))
×
1148
                } else {
×
1149
                        patchSet.AddOption(
×
1150
                                patch.WithTest("/metadata/labels", origVMI.Labels),
×
1151
                                patch.WithReplace("/metadata/labels", newVMI.Labels),
×
1152
                        )
×
1153
                }
×
1154
        }
1155

1156
        if !apiequality.Semantic.DeepEqual(origVMI.Status.MigrationMethod, newVMI.Status.MigrationMethod) {
16✔
1157
                if origVMI.Status.MigrationMethod == "" {
×
1158
                        patchSet.AddOption(
×
1159
                                patch.WithAdd("/status/migrationMethod", newVMI.Status.MigrationMethod))
×
1160
                } else {
×
1161
                        patchSet.AddOption(
×
1162
                                patch.WithTest("/status/migrationMethod", origVMI.Status.MigrationMethod),
×
1163
                                patch.WithReplace("/status/migrationMethod", newVMI.Status.MigrationMethod),
×
1164
                        )
×
1165
                }
×
1166
        }
1167

1168
        if !apiequality.Semantic.DeepEqual(origVMI.Status.MigratedVolumes, newVMI.Status.MigratedVolumes) {
16✔
1169
                if origVMI.Status.MigratedVolumes == nil {
×
1170
                        patchSet.AddOption(
×
1171
                                patch.WithAdd("/status/migratedVolumes", newVMI.Status.MigratedVolumes))
×
1172
                } else {
×
1173
                        patchSet.AddOption(
×
1174
                                patch.WithTest("/status/migratedVolumes", origVMI.Status.MigratedVolumes),
×
1175
                                patch.WithReplace("/status/migratedVolumes", newVMI.Status.MigratedVolumes),
×
1176
                        )
×
1177
                }
×
1178
        }
1179

1180
        if !apiequality.Semantic.DeepEqual(origVMI.Status.MigrationState, newVMI.Status.MigrationState) {
32✔
1181
                if origVMI.Status.MigrationState == nil {
22✔
1182
                        patchSet.AddOption(
6✔
1183
                                patch.WithAdd("/status/migrationState", newVMI.Status.MigrationState))
6✔
1184
                } else {
16✔
1185
                        patchSet.AddOption(
10✔
1186
                                patch.WithTest("/status/migrationState", origVMI.Status.MigrationState),
10✔
1187
                                patch.WithReplace("/status/migrationState", newVMI.Status.MigrationState),
10✔
1188
                        )
10✔
1189
                }
10✔
1190
        }
1191

1192
        if !patchSet.IsEmpty() {
32✔
1193
                patchBytes, err := patchSet.GeneratePayload()
16✔
1194
                if err != nil {
16✔
1195
                        return err
×
1196
                }
×
1197
                log.Log.Object(origVMI).V(5).Infof("patch VMI with %s", string(patchBytes))
16✔
1198
                if _, err := s.client.VirtualMachineInstance(origVMI.Namespace).Patch(ctx, origVMI.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}); err != nil {
18✔
1199
                        return err
2✔
1200
                }
2✔
1201
        }
1202
        return nil
14✔
1203
}
1204

1205
func indexByMigrationUID(obj interface{}) ([]string, error) {
66✔
1206
        migration, ok := obj.(*virtv1.VirtualMachineInstanceMigration)
66✔
1207
        if !ok {
67✔
1208
                return nil, nil
1✔
1209
        }
1✔
1210
        return []string{string(migration.UID)}, nil
65✔
1211
}
1212

1213
func indexByActiveVmiName(obj interface{}) ([]string, error) {
66✔
1214
        migration, ok := obj.(*virtv1.VirtualMachineInstanceMigration)
66✔
1215
        if !ok {
67✔
1216
                return nil, nil
1✔
1217
        }
1✔
1218
        return []string{migration.Spec.VMIName}, nil
65✔
1219
}
1220

1221
func indexByTargetMigrationID(obj interface{}) ([]string, error) {
66✔
1222
        migration, ok := obj.(*virtv1.VirtualMachineInstanceMigration)
66✔
1223
        if !ok {
67✔
1224
                return nil, nil
1✔
1225
        }
1✔
1226
        if migration.Spec.Receive != nil {
100✔
1227
                return []string{migration.Spec.Receive.MigrationID}, nil
35✔
1228
        }
35✔
1229
        return []string{}, nil
30✔
1230
}
1231

1232
func indexBySourceMigrationID(obj interface{}) ([]string, error) {
66✔
1233
        migration, ok := obj.(*virtv1.VirtualMachineInstanceMigration)
66✔
1234
        if !ok {
67✔
1235
                return nil, nil
1✔
1236
        }
1✔
1237
        if migration.Spec.SendTo != nil {
95✔
1238
                return []string{migration.Spec.SendTo.MigrationID}, nil
30✔
1239
        }
30✔
1240
        return []string{}, nil
35✔
1241
}
1242

1243
func copyLegacyTargetFields(vmi *virtv1.VirtualMachineInstance, migrationState *virtv1.VirtualMachineInstanceMigrationState) {
7✔
1244
        targetState := migrationState.TargetState
7✔
1245
        vmi.Status.MigrationState.TargetNode = targetState.Node
7✔
1246
        if targetState.AttachmentPodUID != nil {
7✔
1247
                vmi.Status.MigrationState.TargetAttachmentPodUID = *targetState.AttachmentPodUID
×
1248
        }
×
1249
        vmi.Status.MigrationState.TargetCPUSet = targetState.CPUSet
7✔
1250
        vmi.Status.MigrationState.TargetDirectMigrationNodePorts = targetState.DirectMigrationNodePorts
7✔
1251
        if targetState.NodeAddress != nil {
7✔
1252
                vmi.Status.MigrationState.TargetNodeAddress = *targetState.NodeAddress
×
1253
        }
×
1254
        vmi.Status.MigrationState.TargetNodeDomainDetected = targetState.DomainDetected
7✔
1255
        vmi.Status.MigrationState.TargetNodeDomainReadyTimestamp = targetState.DomainReadyTimestamp
7✔
1256
        if targetState.NodeTopology != nil {
7✔
1257
                vmi.Status.MigrationState.TargetNodeTopology = *targetState.NodeTopology
×
1258
        }
×
1259
        if targetState.PersistentStatePVCName != nil {
7✔
1260
                vmi.Status.MigrationState.TargetPersistentStatePVCName = *targetState.PersistentStatePVCName
×
1261
        }
×
1262
        vmi.Status.MigrationState.TargetPod = targetState.Pod
7✔
1263
        copyCommonLegacyFields(vmi.Status.MigrationState, migrationState)
7✔
1264
        vmi.Status.MigrationState.Completed = migrationState.Completed
7✔
1265
        vmi.Status.MigrationState.Failed = migrationState.Failed
7✔
1266
}
1267

1268
func copyLegacySourceFields(vmi *virtv1.VirtualMachineInstance, migrationState *virtv1.VirtualMachineInstanceMigrationState) {
7✔
1269
        vmi.Status.MigrationState.SourceNode = migrationState.SourceState.Node
7✔
1270
        if migrationState.SourceState.PersistentStatePVCName != nil {
7✔
1271
                vmi.Status.MigrationState.SourcePersistentStatePVCName = *migrationState.SourceState.PersistentStatePVCName
×
1272
        }
×
1273
        vmi.Status.MigrationState.SourcePod = migrationState.SourceState.Pod
7✔
1274
        copyCommonLegacyFields(vmi.Status.MigrationState, migrationState)
7✔
1275
}
1276

1277
func copyCommonLegacyFields(targetMigrationState, sourceMigrationState *virtv1.VirtualMachineInstanceMigrationState) {
14✔
1278
        // Copy regular fields.
14✔
1279
        if sourceMigrationState.MigrationPolicyName != nil {
14✔
1280
                targetMigrationState.MigrationPolicyName = sourceMigrationState.MigrationPolicyName
×
1281
        }
×
1282
        if sourceMigrationState.MigrationConfiguration != nil {
14✔
1283
                targetMigrationState.MigrationConfiguration = sourceMigrationState.MigrationConfiguration
×
1284
        }
×
1285
        if sourceMigrationState.StartTimestamp != nil {
14✔
1286
                targetMigrationState.StartTimestamp = sourceMigrationState.StartTimestamp
×
1287
        }
×
1288
        if sourceMigrationState.EndTimestamp != nil {
14✔
1289
                targetMigrationState.EndTimestamp = sourceMigrationState.EndTimestamp
×
1290
        }
×
1291
}
1292

1293
func (s *SynchronizationController) runConnectionCleanup() {
×
1294
        s.failedCloseConnections.Range(func(k, v interface{}) bool {
×
1295
                retryCount, ok := v.(int)
×
1296
                if !ok {
×
1297
                        log.Log.Warningf("invalid retry count type during connection cleanup: %v", v)
×
1298
                        s.failedCloseConnections.Delete(k)
×
1299
                        return true
×
1300
                }
×
1301
                if retryCount >= maxCloseRetries {
×
1302
                        log.Log.Warningf("connection for migrationID %s failed to close after %d retries, not attempting to close again", k, retryCount)
×
1303
                        s.failedCloseConnections.Delete(k)
×
1304
                }
×
1305
                outboundConnection, ok := k.(*SynchronizationConnection)
×
1306
                if !ok {
×
1307
                        log.Log.Warningf("invalid outbound connection type during connection cleanup: %v", k)
×
1308
                        s.failedCloseConnections.Delete(k)
×
1309
                        return true
×
1310
                }
×
1311
                if err := outboundConnection.Close(); err != nil {
×
1312
                        log.Log.Warningf("unable to close connection for migrationID, trying again: %s, %v", outboundConnection.migrationID, err)
×
1313
                        s.failedCloseConnections.Store(outboundConnection, retryCount+1)
×
1314
                } else {
×
1315
                        s.failedCloseConnections.Delete(k)
×
1316
                }
×
1317
                return true
×
1318
        })
1319
}
1320

1321
func (s *SynchronizationController) CancelMigration(ctx context.Context, request *syncv1.MigrationCancelRequest) (*syncv1.MigrationCancelResponse, error) {
1✔
1322
        migrationUID := request.MigrationUID
1✔
1323

1✔
1324
        migration, err := s.findMigrationFromMigrationIDByIndex("byUID", migrationUID)
1✔
1325
        if err != nil {
1✔
1326
                return &syncv1.MigrationCancelResponse{
×
1327
                        Message: fmt.Sprintf("unable to find migration to cancel for migrationUID %s", migrationUID),
×
1328
                }, err
×
1329
        }
×
1330
        if migration != nil {
1✔
1331
                log.Log.V(2).Object(migration).Infof("found migration to cancel for migrationUID %s", migrationUID)
×
1332
                if err := s.client.VirtualMachineInstanceMigration(migration.Namespace).Delete(ctx, migration.Name, metav1.DeleteOptions{}); err != nil {
×
1333
                        return &syncv1.MigrationCancelResponse{
×
1334
                                Message: fmt.Sprintf("unable to cancel migration for migrationUID %s", migrationUID),
×
1335
                        }, err
×
1336
                }
×
1337
                log.Log.V(2).Object(migration).Infof("successfully deleted migration %s/%s for migrationUID %s", migration.Namespace, migration.Name, migrationUID)
×
1338
        }
1339
        return &syncv1.MigrationCancelResponse{
1✔
1340
                Message: "migration canceled",
1✔
1341
        }, nil
1✔
1342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc