• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5491

17 Jul 2025 01:25AM UTC coverage: 59.326% (-0.2%) from 59.502%
#5491

push

travis-ci

web-flow
Populate DV with events from PVC Prime (#3764)

* update role for controller so it can get list of events

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add new field index for events so we can filter by the object's name. add new function that gets all events associated with a primePvc and re-emits them for the regular pvc

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add watcher for Events filtered by PVC type. modify copyEvent func to only emit unique events from primePVC

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add new field index for event uids so we can filter accordingly

Signed-off-by: dsanatar <dsanatar@redhat.com>

* sort events by most recent timestamps and so we can loop more efficiently to emit new events

Signed-off-by: dsanatar <dsanatar@redhat.com>

* fix linting

Signed-off-by: dsanatar <dsanatar@redhat.com>

* modify watcher to filter on only prime pvc events. move copyEvents to ReconcileTargetPvc func. modify copyEvents logic to handle edge case where events have same timestamps

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add missing bracket

Signed-off-by: dsanatar <dsanatar@redhat.com>

* modify CopyEvents func to take in two client.Objects instead so we can resuse the same func when we need to copy events from primePvc to pvc and from pvc to dv

Signed-off-by: dsanatar <dsanatar@redhat.com>

* move func call to CopyEvents to emitEvents func so it only occurs when DVs status has changed

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add conditional to CopyEvents so when we are handling DVs we only copy over events from the primePVC

Signed-off-by: dsanatar <dsanatar@redhat.com>

* remove debug logs

Signed-off-by: dsanatar <dsanatar@redhat.com>

* reuse existing function to add pvcPrime name annotation to import populator so that we can access the prime name downstream

Signed-off-by: dsanatar <dsanatar@redhat.com>

* update DV bound condition to include PVC Prime name if one exists while the claim is stil... (continued)

75 of 206 new or added lines in 11 files covered. (36.41%)

5 existing lines in 2 files now uncovered.

17163 of 28930 relevant lines covered (59.33%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.43
/pkg/controller/clone-controller.go
1
package controller
2

3
import (
4
        "context"
5
        "crypto/rsa"
6
        "fmt"
7
        "reflect"
8
        "strconv"
9
        "strings"
10
        "time"
11

12
        "github.com/go-logr/logr"
13
        "github.com/pkg/errors"
14

15
        corev1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        "k8s.io/apimachinery/pkg/api/resource"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/runtime"
20
        "k8s.io/apimachinery/pkg/types"
21
        "k8s.io/apimachinery/pkg/util/sets"
22
        "k8s.io/client-go/tools/cache"
23
        "k8s.io/client-go/tools/record"
24

25
        "sigs.k8s.io/controller-runtime/pkg/client"
26
        "sigs.k8s.io/controller-runtime/pkg/controller"
27
        "sigs.k8s.io/controller-runtime/pkg/handler"
28
        "sigs.k8s.io/controller-runtime/pkg/manager"
29
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
        "sigs.k8s.io/controller-runtime/pkg/source"
31

32
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
33
        "kubevirt.io/containerized-data-importer/pkg/common"
34
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
35
        "kubevirt.io/containerized-data-importer/pkg/operator"
36
        "kubevirt.io/containerized-data-importer/pkg/util"
37
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
38
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
39
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
40
)
41

42
const (
43
        // TokenKeyDir is the path to the apiserver public key dir
44
        //nolint:gosec // This is a path, not the key itself
45
        TokenKeyDir = "/var/run/cdi/token/keys"
46

47
        // TokenPublicKeyPath is the path to the apiserver public key
48
        TokenPublicKeyPath = TokenKeyDir + "/id_rsa.pub"
49

50
        // TokenPrivateKeyPath is the path to the apiserver private key
51
        TokenPrivateKeyPath = TokenKeyDir + "/id_rsa"
52

53
        // CloneSucceededPVC provides a const to indicate a clone to the PVC succeeded
54
        CloneSucceededPVC = "CloneSucceeded"
55

56
        cloneSourcePodFinalizer = "cdi.kubevirt.io/cloneSource"
57

58
        hostAssistedCloneSource = "cdi.kubevirt.io/hostAssistedSourcePodCloneSource"
59
)
60

61
// CloneReconciler members
62
type CloneReconciler struct {
63
        client              client.Client
64
        scheme              *runtime.Scheme
65
        recorder            record.EventRecorder
66
        clientCertGenerator generator.CertGenerator
67
        serverCAFetcher     fetcher.CertBundleFetcher
68
        log                 logr.Logger
69
        multiTokenValidator *cc.MultiTokenValidator
70
        image               string
71
        verbose             string
72
        pullPolicy          string
73
        installerLabels     map[string]string
74
}
75

76
// NewCloneController creates a new instance of the config controller.
77
func NewCloneController(mgr manager.Manager,
78
        log logr.Logger,
79
        image, pullPolicy,
80
        verbose string,
81
        clientCertGenerator generator.CertGenerator,
82
        serverCAFetcher fetcher.CertBundleFetcher,
83
        apiServerKey *rsa.PublicKey,
84
        installerLabels map[string]string) (controller.Controller, error) {
×
85
        reconciler := &CloneReconciler{
×
86
                client:              mgr.GetClient(),
×
87
                scheme:              mgr.GetScheme(),
×
88
                log:                 log.WithName("clone-controller"),
×
89
                multiTokenValidator: cc.NewMultiTokenValidator(apiServerKey),
×
90
                image:               image,
×
91
                verbose:             verbose,
×
92
                pullPolicy:          pullPolicy,
×
93
                recorder:            mgr.GetEventRecorderFor("clone-controller"),
×
94
                clientCertGenerator: clientCertGenerator,
×
95
                serverCAFetcher:     serverCAFetcher,
×
96
                installerLabels:     installerLabels,
×
97
        }
×
98
        cloneController, err := controller.New("clone-controller", mgr, controller.Options{
×
99
                MaxConcurrentReconciles: 3,
×
100
                Reconciler:              reconciler,
×
101
        })
×
102
        if err != nil {
×
103
                return nil, err
×
104
        }
×
105
        if err := addCloneControllerWatches(mgr, cloneController); err != nil {
×
106
                return nil, err
×
107
        }
×
108
        return cloneController, nil
×
109
}
110

111
// addCloneControllerWatches sets up the watches used by the clone controller.
112
func addCloneControllerWatches(mgr manager.Manager, cloneController controller.Controller) error {
×
113
        // Setup watches
×
114
        if err := cloneController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, &handler.TypedEnqueueRequestForObject[*corev1.PersistentVolumeClaim]{})); err != nil {
×
115
                return err
×
116
        }
×
117
        if err := cloneController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestForOwner[*corev1.Pod](
×
118
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
119
                return err
×
120
        }
×
121
        if err := cloneController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Pod](
×
122
                func(ctx context.Context, obj *corev1.Pod) []reconcile.Request {
×
123
                        target, ok := obj.GetAnnotations()[AnnOwnerRef]
×
124
                        if !ok {
×
125
                                return nil
×
126
                        }
×
127
                        namespace, name, err := cache.SplitMetaNamespaceKey(target)
×
128
                        if err != nil {
×
129
                                return nil
×
130
                        }
×
131
                        return []reconcile.Request{
×
132
                                {
×
133
                                        NamespacedName: types.NamespacedName{
×
134
                                                Namespace: namespace,
×
135
                                                Name:      name,
×
136
                                        },
×
137
                                },
×
138
                        }
×
139
                },
140
        ))); err != nil {
×
141
                return err
×
142
        }
×
143
        return nil
×
144
}
145

146
func (r *CloneReconciler) shouldReconcile(pvc *corev1.PersistentVolumeClaim, log logr.Logger) bool {
1✔
147
        return checkPVC(pvc, cc.AnnCloneRequest, log) &&
1✔
148
                !metav1.HasAnnotation(pvc.ObjectMeta, cc.AnnCloneOf) &&
1✔
149
                isBound(pvc, log)
1✔
150
}
1✔
151

152
// Reconcile the reconcile loop for host assisted clone pvc.
153
func (r *CloneReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
154
        // Get the PVC.
1✔
155
        pvc := &corev1.PersistentVolumeClaim{}
1✔
156
        if err := r.client.Get(ctx, req.NamespacedName, pvc); err != nil {
2✔
157
                if k8serrors.IsNotFound(err) {
2✔
158
                        return reconcile.Result{}, nil
1✔
159
                }
1✔
160
                return reconcile.Result{}, err
×
161
        }
162

163
        log := r.log.WithValues("PVC", req.NamespacedName)
1✔
164
        log.V(1).Info("reconciling Clone PVCs")
1✔
165

1✔
166
        if checkPVC(pvc, cc.AnnCloneRequest, log) {
2✔
167
                if err := cc.UpdatePVCBoundContionFromEvents(pvc, r.client, log); err != nil {
1✔
NEW
168
                        return reconcile.Result{}, err
×
NEW
169
                }
×
170
        }
171

172
        if pvc.DeletionTimestamp != nil || !r.shouldReconcile(pvc, log) {
2✔
173
                log.V(1).Info("Should not reconcile this PVC",
1✔
174
                        "checkPVC(AnnCloneRequest)", checkPVC(pvc, cc.AnnCloneRequest, log),
1✔
175
                        "NOT has annotation(AnnCloneOf)", !metav1.HasAnnotation(pvc.ObjectMeta, cc.AnnCloneOf),
1✔
176
                        "isBound", isBound(pvc, log),
1✔
177
                        "has finalizer?", cc.HasFinalizer(pvc, cloneSourcePodFinalizer))
1✔
178
                if cc.HasFinalizer(pvc, cloneSourcePodFinalizer) || pvc.DeletionTimestamp != nil {
2✔
179
                        // Clone completed, remove source pod and/or finalizer
1✔
180
                        if err := r.cleanup(pvc, log); err != nil {
1✔
181
                                return reconcile.Result{}, err
×
182
                        }
×
183
                }
184
                return reconcile.Result{}, nil
1✔
185
        }
186

187
        ready, err := r.waitTargetPodRunningOrSucceeded(pvc, log)
1✔
188
        if err != nil {
2✔
189
                return reconcile.Result{}, errors.Wrap(err, "error ensuring target upload pod running")
1✔
190
        }
1✔
191

192
        if !ready {
2✔
193
                log.V(3).Info("Upload pod not ready yet for PVC")
1✔
194
                return reconcile.Result{}, nil
1✔
195
        }
1✔
196

197
        sourcePod, err := r.findCloneSourcePod(pvc)
1✔
198
        if err != nil {
1✔
199
                return reconcile.Result{}, err
×
200
        }
×
201

202
        _, nameExists := pvc.Annotations[cc.AnnCloneSourcePod]
1✔
203
        if !nameExists && sourcePod == nil {
2✔
204
                pvc.Annotations[cc.AnnCloneSourcePod] = cc.CreateCloneSourcePodName(pvc)
1✔
205

1✔
206
                // add finalizer before creating clone source pod
1✔
207
                cc.AddFinalizer(pvc, cloneSourcePodFinalizer)
1✔
208

1✔
209
                if err := r.updatePVC(pvc); err != nil {
1✔
210
                        return reconcile.Result{}, err
×
211
                }
×
212

213
                // will reconcile again after PVC update notification
214
                return reconcile.Result{}, nil
1✔
215
        }
216

217
        if requeueAfter, err := r.reconcileSourcePod(ctx, sourcePod, pvc, log); requeueAfter != 0 || err != nil {
2✔
218
                return reconcile.Result{RequeueAfter: requeueAfter}, err
1✔
219
        }
1✔
220

221
        if err := r.ensureCertSecret(sourcePod, pvc); err != nil {
2✔
222
                return reconcile.Result{}, err
1✔
223
        }
1✔
224

225
        if err := r.updatePvcFromPod(sourcePod, pvc, log); err != nil {
1✔
226
                return reconcile.Result{}, err
×
227
        }
×
228
        return reconcile.Result{}, nil
1✔
229
}
230

231
func (r *CloneReconciler) reconcileSourcePod(ctx context.Context, sourcePod *corev1.Pod, targetPvc *corev1.PersistentVolumeClaim, log logr.Logger) (time.Duration, error) {
1✔
232
        if sourcePod == nil {
2✔
233
                sourcePvc, err := r.getCloneRequestSourcePVC(targetPvc)
1✔
234
                if err != nil {
1✔
235
                        return 0, err
×
236
                }
×
237

238
                sourcePopulated, err := cc.IsPopulated(sourcePvc, r.client)
1✔
239
                if err != nil {
1✔
240
                        return 0, err
×
241
                }
×
242
                if !sourcePopulated {
1✔
243
                        return 2 * time.Second, nil
×
244
                }
×
245

246
                if err := r.validateSourceAndTarget(ctx, sourcePvc, targetPvc); err != nil {
2✔
247
                        return 0, err
1✔
248
                }
1✔
249

250
                pods, err := cc.GetPodsUsingPVCs(ctx, r.client, sourcePvc.Namespace, sets.New(sourcePvc.Name), true)
1✔
251
                if err != nil {
1✔
252
                        return 0, err
×
253
                }
×
254

255
                if len(pods) > 0 {
2✔
256
                        es, err := cc.GetAnnotatedEventSource(ctx, r.client, targetPvc)
1✔
257
                        if err != nil {
1✔
258
                                return 0, err
×
259
                        }
×
260
                        for _, pod := range pods {
2✔
261
                                r.log.V(1).Info("can't create clone source pod, pvc in use by other pod",
1✔
262
                                        "namespace", sourcePvc.Namespace, "name", sourcePvc.Name, "pod", pod.Name)
1✔
263
                                r.recorder.Eventf(es, corev1.EventTypeWarning, cc.CloneSourceInUse,
1✔
264
                                        "pod %s/%s using PersistentVolumeClaim %s", pod.Namespace, pod.Name, sourcePvc.Name)
1✔
265
                        }
1✔
266
                        return 2 * time.Second, nil
1✔
267
                }
268

269
                sourcePod, err := r.CreateCloneSourcePod(r.image, r.pullPolicy, targetPvc, log)
1✔
270
                // Check if pod has failed and, in that case, record an event with the error
1✔
271
                if podErr := cc.HandleFailedPod(err, cc.CreateCloneSourcePodName(targetPvc), targetPvc, r.recorder, r.client); podErr != nil {
1✔
272
                        return 0, podErr
×
273
                }
×
274

275
                log.V(3).Info("Created source pod ", "sourcePod.Namespace", sourcePod.Namespace, "sourcePod.Name", sourcePod.Name)
1✔
276
        }
277
        return 0, nil
1✔
278
}
279

280
func (r *CloneReconciler) ensureCertSecret(sourcePod *corev1.Pod, targetPvc *corev1.PersistentVolumeClaim) error {
1✔
281
        if sourcePod == nil {
2✔
282
                return nil
1✔
283
        }
1✔
284

285
        if sourcePod.Status.Phase == corev1.PodRunning {
1✔
286
                return nil
×
287
        }
×
288

289
        clientName, ok := targetPvc.Annotations[AnnUploadClientName]
1✔
290
        if !ok {
2✔
291
                return errors.Errorf("PVC %s/%s missing required %s annotation", targetPvc.Namespace, targetPvc.Name, AnnUploadClientName)
1✔
292
        }
1✔
293

294
        certConfig, err := operator.GetCertConfigWithDefaults(context.TODO(), r.client)
1✔
295
        if err != nil {
1✔
296
                return err
×
297
        }
×
298

299
        cert, key, err := r.clientCertGenerator.MakeClientCert(clientName, nil, certConfig.Client.Duration.Duration)
1✔
300
        if err != nil {
1✔
301
                return err
×
302
        }
×
303

304
        secret := &corev1.Secret{
1✔
305
                ObjectMeta: metav1.ObjectMeta{
1✔
306
                        Name:      sourcePod.Name,
1✔
307
                        Namespace: sourcePod.Namespace,
1✔
308
                        Annotations: map[string]string{
1✔
309
                                cc.AnnCreatedBy: "yes",
1✔
310
                        },
1✔
311
                        Labels: map[string]string{
1✔
312
                                common.CDILabelKey:       common.CDILabelValue, //filtered by the podInformer
1✔
313
                                common.CDIComponentLabel: common.ClonerSourcePodName,
1✔
314
                        },
1✔
315
                        OwnerReferences: []metav1.OwnerReference{
1✔
316
                                MakePodOwnerReference(sourcePod),
1✔
317
                        },
1✔
318
                },
1✔
319
                Data: map[string][]byte{
1✔
320
                        "tls.key": key,
1✔
321
                        "tls.crt": cert,
1✔
322
                },
1✔
323
        }
1✔
324

1✔
325
        util.SetRecommendedLabels(secret, r.installerLabels, "cdi-controller")
1✔
326

1✔
327
        err = r.client.Create(context.TODO(), secret)
1✔
328
        if err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
329
                return errors.Wrap(err, "error creating cert secret")
×
330
        }
×
331

332
        return nil
1✔
333
}
334

335
func (r *CloneReconciler) updatePvcFromPod(sourcePod *corev1.Pod, pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
1✔
336
        currentPvcCopy := pvc.DeepCopyObject()
1✔
337
        log.V(1).Info("Updating PVC from pod")
1✔
338

1✔
339
        log.V(3).Info("Pod phase for PVC", "PVC phase", pvc.Annotations[cc.AnnPodPhase])
1✔
340

1✔
341
        if podSucceededFromPVC(pvc) && pvc.Annotations[cc.AnnCloneOf] != "true" && sourcePodFinished(sourcePod) {
2✔
342
                log.V(1).Info("Adding CloneOf annotation to PVC")
1✔
343
                pvc.Annotations[cc.AnnCloneOf] = "true"
1✔
344
                r.recorder.Event(pvc, corev1.EventTypeNormal, CloneSucceededPVC, cc.CloneComplete)
1✔
345
        }
1✔
346

347
        setAnnotationsFromPodWithPrefix(pvc.Annotations, sourcePod, nil, cc.AnnSourceRunningCondition)
1✔
348

1✔
349
        if !reflect.DeepEqual(currentPvcCopy, pvc) {
2✔
350
                return r.updatePVC(pvc)
1✔
351
        }
1✔
352
        return nil
1✔
353
}
354

355
func sourcePodFinished(sourcePod *corev1.Pod) bool {
1✔
356
        if sourcePod == nil {
1✔
357
                return true
×
358
        }
×
359

360
        return sourcePod.Status.Phase == corev1.PodSucceeded || sourcePod.Status.Phase == corev1.PodFailed
1✔
361
}
362

363
func (r *CloneReconciler) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
364
        if err := r.client.Update(context.TODO(), pvc); err != nil {
1✔
365
                return err
×
366
        }
×
367
        return nil
1✔
368
}
369

370
func (r *CloneReconciler) waitTargetPodRunningOrSucceeded(pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, error) {
1✔
371
        rs, ok := pvc.Annotations[cc.AnnPodReady]
1✔
372
        if !ok {
2✔
373
                log.V(3).Info("clone target pod not ready")
1✔
374
                return false, nil
1✔
375
        }
1✔
376

377
        ready, err := strconv.ParseBool(rs)
1✔
378
        if err != nil {
2✔
379
                return false, errors.Wrapf(err, "error parsing %s annotation", cc.AnnPodReady)
1✔
380
        }
1✔
381

382
        return ready || podSucceededFromPVC(pvc), nil
1✔
383
}
384

385
func (r *CloneReconciler) findCloneSourcePod(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
1✔
386
        isCloneRequest, sourceNamespace, _ := ParseCloneRequestAnnotation(pvc)
1✔
387
        if !isCloneRequest {
1✔
388
                return nil, nil
×
389
        }
×
390
        cloneSourcePodName, exists := pvc.Annotations[cc.AnnCloneSourcePod]
1✔
391
        if !exists {
2✔
392
                // fallback to legacy name, to find any pod that still might be running after upgrade
1✔
393
                cloneSourcePodName = cc.CreateCloneSourcePodName(pvc)
1✔
394
        }
1✔
395

396
        selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
1✔
397
                MatchLabels: map[string]string{
1✔
398
                        cc.CloneUniqueID: cloneSourcePodName,
1✔
399
                },
1✔
400
        })
1✔
401
        if err != nil {
1✔
402
                return nil, errors.Wrap(err, "error creating label selector")
×
403
        }
×
404

405
        podList := &corev1.PodList{}
1✔
406
        if err := r.client.List(context.TODO(), podList, &client.ListOptions{Namespace: sourceNamespace, LabelSelector: selector}); err != nil {
1✔
407
                return nil, errors.Wrap(err, "error listing pods")
×
408
        }
×
409

410
        if len(podList.Items) > 1 {
1✔
411
                return nil, errors.Errorf("multiple source pods found for clone PVC %s/%s", pvc.Namespace, pvc.Name)
×
412
        }
×
413

414
        if len(podList.Items) == 0 {
2✔
415
                return nil, nil
1✔
416
        }
1✔
417

418
        return &podList.Items[0], nil
1✔
419
}
420

421
func (r *CloneReconciler) validateSourceAndTarget(ctx context.Context, sourcePvc, targetPvc *corev1.PersistentVolumeClaim) error {
1✔
422
        if err := r.multiTokenValidator.ValidatePVC(sourcePvc, targetPvc); err != nil {
1✔
423
                return err
×
424
        }
×
425
        contentType, err := ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc)
1✔
426
        if err != nil {
2✔
427
                return err
1✔
428
        }
1✔
429
        err = ValidateCanCloneSourceAndTargetSpec(ctx, r.client, sourcePvc, targetPvc, contentType)
1✔
430
        if err == nil {
2✔
431
                // Validation complete, put source PVC bound status in annotation
1✔
432
                setBoundConditionFromPVC(targetPvc.GetAnnotations(), cc.AnnBoundCondition, sourcePvc)
1✔
433
                return nil
1✔
434
        }
1✔
435
        return err
1✔
436
}
437

438
// returns the CloneRequest string which contains the pvc name (and namespace) from which we want to clone the image.
439
func (r *CloneReconciler) getCloneRequestSourcePVC(pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
1✔
440
        exists, namespace, name := ParseCloneRequestAnnotation(pvc)
1✔
441
        if !exists {
1✔
442
                return nil, errors.New("error parsing clone request annotation")
×
443
        }
×
444
        pvc = &corev1.PersistentVolumeClaim{}
1✔
445
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, pvc); err != nil {
1✔
446
                return nil, errors.Wrap(err, "error getting clone source PVC")
×
447
        }
×
448
        return pvc, nil
1✔
449
}
450

451
func (r *CloneReconciler) cleanup(pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
1✔
452
        log.V(3).Info("Cleaning up for PVC", "pvc.Namespace", pvc.Namespace, "pvc.Name", pvc.Name)
1✔
453

1✔
454
        pod, err := r.findCloneSourcePod(pvc)
1✔
455
        if err != nil {
1✔
456
                return err
×
457
        }
×
458

459
        if pod != nil && pod.DeletionTimestamp == nil {
2✔
460
                if podSucceededFromPVC(pvc) && pod.Status.Phase == corev1.PodRunning {
1✔
461
                        log.V(3).Info("Clone succeeded, waiting for source pod to stop running", "pod.Namespace", pod.Namespace, "pod.Name", pod.Name)
×
462
                        return nil
×
463
                }
×
464
                if cc.ShouldDeletePod(pvc) {
2✔
465
                        log.V(3).Info("Deleting pod", "pod.Name", pod.Name)
1✔
466
                        if err = r.client.Delete(context.TODO(), pod); err != nil {
1✔
467
                                if !k8serrors.IsNotFound(err) {
×
468
                                        return errors.Wrap(err, "error deleting clone source pod")
×
469
                                }
×
470
                        }
471
                }
472
        }
473

474
        cc.RemoveFinalizer(pvc, cloneSourcePodFinalizer)
1✔
475

1✔
476
        return r.updatePVC(pvc)
1✔
477
}
478

479
// CreateCloneSourcePod creates our cloning src pod which will be used for out of band cloning to read the contents of the src PVC
480
func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (*corev1.Pod, error) {
1✔
481
        exists, _, _ := ParseCloneRequestAnnotation(pvc)
1✔
482
        if !exists {
1✔
483
                return nil, errors.Errorf("bad CloneRequest Annotation")
×
484
        }
×
485

486
        ownerKey, err := cache.MetaNamespaceKeyFunc(pvc)
1✔
487
        if err != nil {
1✔
488
                return nil, errors.Wrap(err, "error getting cache key")
×
489
        }
×
490

491
        serverCABundle, err := r.serverCAFetcher.BundleBytes()
1✔
492
        if err != nil {
1✔
493
                return nil, err
×
494
        }
×
495

496
        podResourceRequirements, err := cc.GetDefaultPodResourceRequirements(r.client)
1✔
497
        if err != nil {
1✔
498
                return nil, err
×
499
        }
×
500

501
        imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
1✔
502
        if err != nil {
1✔
503
                return nil, err
×
504
        }
×
505

506
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), r.client)
1✔
507
        if err != nil {
1✔
508
                return nil, err
×
509
        }
×
510

511
        sourcePvc, err := r.getCloneRequestSourcePVC(pvc)
1✔
512
        if err != nil {
1✔
513
                return nil, err
×
514
        }
×
515

516
        var sourceVolumeMode corev1.PersistentVolumeMode
1✔
517
        if sourcePvc.Spec.VolumeMode != nil {
2✔
518
                sourceVolumeMode = *sourcePvc.Spec.VolumeMode
1✔
519
        } else {
2✔
520
                sourceVolumeMode = corev1.PersistentVolumeFilesystem
1✔
521
        }
1✔
522

523
        pod := MakeCloneSourcePodSpec(sourceVolumeMode, image, pullPolicy, ownerKey, imagePullSecrets, serverCABundle, pvc, sourcePvc, podResourceRequirements, workloadNodePlacement)
1✔
524
        util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")
1✔
525

1✔
526
        if err := r.client.Create(context.TODO(), pod); err != nil {
1✔
527
                return nil, errors.Wrap(err, "source pod API create errored")
×
528
        }
×
529

530
        log.V(1).Info("cloning source pod (image) created\n", "pod.Namespace", pod.Namespace, "pod.Name", pod.Name, "image", image)
1✔
531

1✔
532
        return pod, nil
1✔
533
}
534

535
// MakeCloneSourcePodSpec creates and returns the clone source pod spec based on the target pvc.
536
func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image, pullPolicy, ownerRefAnno string, imagePullSecrets []corev1.LocalObjectReference,
537
        serverCACert []byte, targetPvc, sourcePvc *corev1.PersistentVolumeClaim, resourceRequirements *corev1.ResourceRequirements,
538
        workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {
1✔
539
        sourcePvcName := sourcePvc.GetName()
1✔
540
        sourcePvcNamespace := sourcePvc.GetNamespace()
1✔
541
        sourcePvcUID := string(sourcePvc.GetUID())
1✔
542

1✔
543
        var ownerID string
1✔
544
        cloneSourcePodName := targetPvc.Annotations[cc.AnnCloneSourcePod]
1✔
545
        url := GetUploadServerURL(targetPvc.Namespace, targetPvc.Name, common.UploadPathSync)
1✔
546
        pvcOwner := metav1.GetControllerOf(targetPvc)
1✔
547
        if pvcOwner != nil && pvcOwner.Kind == "DataVolume" {
1✔
548
                ownerID = string(pvcOwner.UID)
×
549
        } else {
1✔
550
                ouid, ok := targetPvc.Annotations[cc.AnnOwnerUID]
1✔
551
                if ok {
1✔
552
                        ownerID = ouid
×
553
                }
×
554
        }
555

556
        preallocationRequested := targetPvc.Annotations[cc.AnnPreallocationRequested]
1✔
557

1✔
558
        pod := &corev1.Pod{
1✔
559
                ObjectMeta: metav1.ObjectMeta{
1✔
560
                        Name:      cloneSourcePodName,
1✔
561
                        Namespace: sourcePvcNamespace,
1✔
562
                        Annotations: map[string]string{
1✔
563
                                cc.AnnCreatedBy: "yes",
1✔
564
                                AnnOwnerRef:     ownerRefAnno,
1✔
565
                        },
1✔
566
                        Labels: map[string]string{
1✔
567
                                common.CDILabelKey:       common.CDILabelValue, //filtered by the podInformer
1✔
568
                                common.CDIComponentLabel: common.ClonerSourcePodName,
1✔
569
                                // this label is used when searching for a pvc's cloner source pod.
1✔
570
                                cc.CloneUniqueID:          cloneSourcePodName,
1✔
571
                                common.PrometheusLabelKey: common.PrometheusLabelValue,
1✔
572
                                hostAssistedCloneSource:   sourcePvcUID,
1✔
573
                        },
1✔
574
                },
1✔
575
                Spec: corev1.PodSpec{
1✔
576
                        Containers: []corev1.Container{
1✔
577
                                {
1✔
578
                                        Name:            common.ClonerSourcePodName,
1✔
579
                                        Image:           image,
1✔
580
                                        ImagePullPolicy: corev1.PullPolicy(pullPolicy),
1✔
581
                                        Env: []corev1.EnvVar{
1✔
582
                                                {
1✔
583
                                                        Name: "CLIENT_KEY",
1✔
584
                                                        ValueFrom: &corev1.EnvVarSource{
1✔
585
                                                                SecretKeyRef: &corev1.SecretKeySelector{
1✔
586
                                                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
587
                                                                                Name: cloneSourcePodName,
1✔
588
                                                                        },
1✔
589
                                                                        Key: "tls.key",
1✔
590
                                                                },
1✔
591
                                                        },
1✔
592
                                                },
1✔
593
                                                {
1✔
594
                                                        Name: "CLIENT_CERT",
1✔
595
                                                        ValueFrom: &corev1.EnvVarSource{
1✔
596
                                                                SecretKeyRef: &corev1.SecretKeySelector{
1✔
597
                                                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
598
                                                                                Name: cloneSourcePodName,
1✔
599
                                                                        },
1✔
600
                                                                        Key: "tls.crt",
1✔
601
                                                                },
1✔
602
                                                        },
1✔
603
                                                },
1✔
604
                                                {
1✔
605
                                                        Name:  "SERVER_CA_CERT",
1✔
606
                                                        Value: string(serverCACert),
1✔
607
                                                },
1✔
608
                                                {
1✔
609
                                                        Name:  "UPLOAD_URL",
1✔
610
                                                        Value: url,
1✔
611
                                                },
1✔
612
                                                {
1✔
613
                                                        Name:  common.OwnerUID,
1✔
614
                                                        Value: ownerID,
1✔
615
                                                },
1✔
616
                                                {
1✔
617
                                                        Name:  common.Preallocation,
1✔
618
                                                        Value: preallocationRequested,
1✔
619
                                                },
1✔
620
                                        },
1✔
621
                                        Ports: []corev1.ContainerPort{
1✔
622
                                                {
1✔
623
                                                        Name:          "metrics",
1✔
624
                                                        ContainerPort: 8443,
1✔
625
                                                        Protocol:      corev1.ProtocolTCP,
1✔
626
                                                },
1✔
627
                                        },
1✔
628
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
629
                                },
1✔
630
                        },
1✔
631
                        ImagePullSecrets: imagePullSecrets,
1✔
632
                        RestartPolicy:    corev1.RestartPolicyOnFailure,
1✔
633
                        Volumes: []corev1.Volume{
1✔
634
                                {
1✔
635
                                        Name: cc.DataVolName,
1✔
636
                                        VolumeSource: corev1.VolumeSource{
1✔
637
                                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
638
                                                        ClaimName: sourcePvcName,
1✔
639
                                                },
1✔
640
                                        },
1✔
641
                                },
1✔
642
                        },
1✔
643
                        NodeSelector:      workloadNodePlacement.NodeSelector,
1✔
644
                        Tolerations:       workloadNodePlacement.Tolerations,
1✔
645
                        Affinity:          workloadNodePlacement.Affinity,
1✔
646
                        PriorityClassName: cc.GetPriorityClass(targetPvc),
1✔
647
                },
1✔
648
        }
1✔
649

1✔
650
        if pod.Spec.Affinity == nil {
2✔
651
                pod.Spec.Affinity = &corev1.Affinity{}
1✔
652
        }
1✔
653

654
        if pod.Spec.Affinity.PodAffinity == nil {
2✔
655
                pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
1✔
656
        }
1✔
657

658
        if len(sourcePvc.Spec.AccessModes) == 1 && sourcePvc.Spec.AccessModes[0] == corev1.ReadWriteOnce {
1✔
659
                pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
×
660
                        pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
×
661
                        corev1.PodAffinityTerm{
×
662
                                LabelSelector: &metav1.LabelSelector{
×
663
                                        MatchExpressions: []metav1.LabelSelectorRequirement{
×
664
                                                {
×
665
                                                        Key:      hostAssistedCloneSource,
×
666
                                                        Operator: metav1.LabelSelectorOpIn,
×
667
                                                        Values:   []string{sourcePvcUID},
×
668
                                                },
×
669
                                        },
×
670
                                },
×
671
                                Namespaces:  []string{sourcePvcNamespace},
×
672
                                TopologyKey: corev1.LabelHostname,
×
673
                        },
×
674
                )
×
675
        }
×
676

677
        pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(
1✔
678
                pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
1✔
679
                corev1.WeightedPodAffinityTerm{
1✔
680
                        Weight: 100,
1✔
681
                        PodAffinityTerm: corev1.PodAffinityTerm{
1✔
682
                                LabelSelector: &metav1.LabelSelector{
1✔
683
                                        MatchExpressions: []metav1.LabelSelectorRequirement{
1✔
684
                                                {
1✔
685
                                                        Key:      common.UploadTargetLabel,
1✔
686
                                                        Operator: metav1.LabelSelectorOpIn,
1✔
687
                                                        Values:   []string{string(targetPvc.UID)},
1✔
688
                                                },
1✔
689
                                        },
1✔
690
                                },
1✔
691
                                Namespaces:  []string{targetPvc.Namespace},
1✔
692
                                TopologyKey: corev1.LabelHostname,
1✔
693
                        },
1✔
694
                },
1✔
695
        )
1✔
696

1✔
697
        if resourceRequirements != nil {
2✔
698
                pod.Spec.Containers[0].Resources = *resourceRequirements
1✔
699
        }
1✔
700

701
        var addVars []corev1.EnvVar
1✔
702

1✔
703
        if sourceVolumeMode == corev1.PersistentVolumeBlock {
2✔
704
                pod.Spec.Containers[0].VolumeDevices = cc.AddVolumeDevices()
1✔
705
                addVars = []corev1.EnvVar{
1✔
706
                        {
1✔
707
                                Name:  "VOLUME_MODE",
1✔
708
                                Value: "block",
1✔
709
                        },
1✔
710
                        {
1✔
711
                                Name:  "MOUNT_POINT",
1✔
712
                                Value: common.WriteBlockPath,
1✔
713
                        },
1✔
714
                }
1✔
715
        } else {
2✔
716
                pod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
1✔
717
                        {
1✔
718
                                Name:      cc.DataVolName,
1✔
719
                                MountPath: common.ClonerMountPath,
1✔
720
                                ReadOnly:  true,
1✔
721
                        },
1✔
722
                }
1✔
723
                addVars = []corev1.EnvVar{
1✔
724
                        {
1✔
725
                                Name:  "VOLUME_MODE",
1✔
726
                                Value: "filesystem",
1✔
727
                        },
1✔
728
                        {
1✔
729
                                Name:  "MOUNT_POINT",
1✔
730
                                Value: common.ClonerMountPath,
1✔
731
                        },
1✔
732
                }
1✔
733
        }
1✔
734

735
        pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, addVars...)
1✔
736
        cc.CopyAllowedAnnotations(targetPvc, pod)
1✔
737
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
738
        return pod
1✔
739
}
740

741
// ParseCloneRequestAnnotation parses the clone request annotation
742
func ParseCloneRequestAnnotation(pvc *corev1.PersistentVolumeClaim) (exists bool, namespace, name string) {
1✔
743
        var ann string
1✔
744
        ann, exists = pvc.Annotations[cc.AnnCloneRequest]
1✔
745
        if !exists {
2✔
746
                return false, "", ""
1✔
747
        }
1✔
748

749
        sp := strings.Split(ann, "/")
1✔
750
        if len(sp) != 2 {
2✔
751
                return false, "", ""
1✔
752
        }
1✔
753

754
        return true, sp[0], sp[1]
1✔
755
}
756

757
// ValidateCanCloneSourceAndTargetContentType validates the pvcs passed has the same content type.
758
func ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc *corev1.PersistentVolumeClaim) (cdiv1.DataVolumeContentType, error) {
1✔
759
        sourceContentType := cc.GetPVCContentType(sourcePvc)
1✔
760
        targetContentType := cc.GetPVCContentType(targetPvc)
1✔
761
        if sourceContentType != targetContentType {
2✔
762
                return "", fmt.Errorf("source contentType (%s) and target contentType (%s) do not match", sourceContentType, targetContentType)
1✔
763
        }
1✔
764
        return sourceContentType, nil
1✔
765
}
766

767
// ValidateCanCloneSourceAndTargetSpec validates the specs passed in are compatible for cloning.
768
func ValidateCanCloneSourceAndTargetSpec(ctx context.Context, c client.Client, sourcePvc, targetPvc *corev1.PersistentVolumeClaim, contentType cdiv1.DataVolumeContentType) error {
1✔
769
        // This annotation is only needed for some specific cases, when the target size is actually calculated by
1✔
770
        // the size detection pod, and there are some differences in fs overhead between src and target volumes
1✔
771
        _, permissive := targetPvc.Annotations[cc.AnnPermissiveClone]
1✔
772

1✔
773
        // Allow different source and target volume modes only on KubeVirt content type
1✔
774
        sourceVolumeMode := util.ResolveVolumeMode(sourcePvc.Spec.VolumeMode)
1✔
775
        targetVolumeMode := util.ResolveVolumeMode(targetPvc.Spec.VolumeMode)
1✔
776
        if sourceVolumeMode != targetVolumeMode && contentType != cdiv1.DataVolumeKubeVirt {
2✔
777
                return fmt.Errorf("source volumeMode (%s) and target volumeMode (%s) do not match, contentType (%s)",
1✔
778
                        sourceVolumeMode, targetVolumeMode, contentType)
1✔
779
        }
1✔
780

781
        // TODO: use detection pod here, then permissive should not be needed
782
        sourceUsableSpace, err := getUsableSpace(ctx, c, sourcePvc)
1✔
783
        if err != nil {
1✔
784
                return err
×
785
        }
×
786
        targetUsableSpace, err := getUsableSpace(ctx, c, targetPvc)
1✔
787
        if err != nil {
1✔
788
                return err
×
789
        }
×
790

791
        if !permissive && sourceUsableSpace.Cmp(targetUsableSpace) > 0 {
1✔
792
                return errors.New("target resources requests storage size is smaller than the source")
×
793
        }
×
794

795
        // Can clone.
796
        return nil
1✔
797
}
798

799
func getUsableSpace(ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim) (resource.Quantity, error) {
1✔
800
        sizeRequest := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
801
        volumeMode := util.ResolveVolumeMode(pvc.Spec.VolumeMode)
1✔
802

1✔
803
        if volumeMode == corev1.PersistentVolumeFilesystem {
2✔
804
                fsOverhead, err := cc.GetFilesystemOverheadForStorageClass(ctx, c, pvc.Spec.StorageClassName)
1✔
805
                if err != nil {
1✔
806
                        return resource.Quantity{}, err
×
807
                }
×
808
                fsOverheadFloat, _ := strconv.ParseFloat(string(fsOverhead), 64)
1✔
809
                usableSpaceRaw := util.GetUsableSpace(fsOverheadFloat, sizeRequest.Value())
1✔
810

1✔
811
                return *resource.NewScaledQuantity(usableSpaceRaw, 0), nil
1✔
812
        }
813

814
        return sizeRequest, nil
1✔
815
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc