• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5326

14 May 2025 04:40PM UTC coverage: 59.303% (+0.02%) from 59.288%
#5326

push

travis-ci

web-flow
Enable WebhookPvcRendering feature gate by default (#3736)

The feature is available since v1.59, and we enable it by default to
allow increasing PVC size to the minimum supported by its provisioner
(#3711), and mainly in order to support:
https://github.com/kubevirt/kubevirt/pull/14637

As a bonus, the related Serial tests are now parallel. Thanks akalenyu:)

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

16870 of 28447 relevant lines covered (59.3%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.55
/pkg/controller/clone-controller.go
1
package controller
2

3
import (
4
        "context"
5
        "crypto/rsa"
6
        "fmt"
7
        "reflect"
8
        "strconv"
9
        "strings"
10
        "time"
11

12
        "github.com/go-logr/logr"
13
        "github.com/pkg/errors"
14

15
        corev1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        "k8s.io/apimachinery/pkg/api/resource"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/runtime"
20
        "k8s.io/apimachinery/pkg/types"
21
        "k8s.io/apimachinery/pkg/util/sets"
22
        "k8s.io/client-go/tools/cache"
23
        "k8s.io/client-go/tools/record"
24

25
        "sigs.k8s.io/controller-runtime/pkg/client"
26
        "sigs.k8s.io/controller-runtime/pkg/controller"
27
        "sigs.k8s.io/controller-runtime/pkg/handler"
28
        "sigs.k8s.io/controller-runtime/pkg/manager"
29
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
        "sigs.k8s.io/controller-runtime/pkg/source"
31

32
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
33
        "kubevirt.io/containerized-data-importer/pkg/common"
34
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
35
        "kubevirt.io/containerized-data-importer/pkg/operator"
36
        "kubevirt.io/containerized-data-importer/pkg/util"
37
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
38
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
39
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
40
)
41

42
const (
43
        // TokenKeyDir is the path to the apiserver public key dir
44
        //nolint:gosec // This is a path, not the key itself
45
        TokenKeyDir = "/var/run/cdi/token/keys"
46

47
        // TokenPublicKeyPath is the path to the apiserver public key
48
        TokenPublicKeyPath = TokenKeyDir + "/id_rsa.pub"
49

50
        // TokenPrivateKeyPath is the path to the apiserver private key
51
        TokenPrivateKeyPath = TokenKeyDir + "/id_rsa"
52

53
        // CloneSucceededPVC provides a const to indicate a clone to the PVC succeeded
54
        CloneSucceededPVC = "CloneSucceeded"
55

56
        cloneSourcePodFinalizer = "cdi.kubevirt.io/cloneSource"
57

58
        hostAssistedCloneSource = "cdi.kubevirt.io/hostAssistedSourcePodCloneSource"
59
)
60

61
// CloneReconciler members
62
type CloneReconciler struct {
63
        client              client.Client
64
        scheme              *runtime.Scheme
65
        recorder            record.EventRecorder
66
        clientCertGenerator generator.CertGenerator
67
        serverCAFetcher     fetcher.CertBundleFetcher
68
        log                 logr.Logger
69
        multiTokenValidator *cc.MultiTokenValidator
70
        image               string
71
        verbose             string
72
        pullPolicy          string
73
        installerLabels     map[string]string
74
}
75

76
// NewCloneController creates a new instance of the config controller.
77
func NewCloneController(mgr manager.Manager,
78
        log logr.Logger,
79
        image, pullPolicy,
80
        verbose string,
81
        clientCertGenerator generator.CertGenerator,
82
        serverCAFetcher fetcher.CertBundleFetcher,
83
        apiServerKey *rsa.PublicKey,
84
        installerLabels map[string]string) (controller.Controller, error) {
×
85
        reconciler := &CloneReconciler{
×
86
                client:              mgr.GetClient(),
×
87
                scheme:              mgr.GetScheme(),
×
88
                log:                 log.WithName("clone-controller"),
×
89
                multiTokenValidator: cc.NewMultiTokenValidator(apiServerKey),
×
90
                image:               image,
×
91
                verbose:             verbose,
×
92
                pullPolicy:          pullPolicy,
×
93
                recorder:            mgr.GetEventRecorderFor("clone-controller"),
×
94
                clientCertGenerator: clientCertGenerator,
×
95
                serverCAFetcher:     serverCAFetcher,
×
96
                installerLabels:     installerLabels,
×
97
        }
×
98
        cloneController, err := controller.New("clone-controller", mgr, controller.Options{
×
99
                MaxConcurrentReconciles: 3,
×
100
                Reconciler:              reconciler,
×
101
        })
×
102
        if err != nil {
×
103
                return nil, err
×
104
        }
×
105
        if err := addCloneControllerWatches(mgr, cloneController); err != nil {
×
106
                return nil, err
×
107
        }
×
108
        return cloneController, nil
×
109
}
110

111
// addCloneControllerWatches sets up the watches used by the clone controller.
112
func addCloneControllerWatches(mgr manager.Manager, cloneController controller.Controller) error {
×
113
        // Setup watches
×
114
        if err := cloneController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, &handler.TypedEnqueueRequestForObject[*corev1.PersistentVolumeClaim]{})); err != nil {
×
115
                return err
×
116
        }
×
117
        if err := cloneController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestForOwner[*corev1.Pod](
×
118
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
119
                return err
×
120
        }
×
121
        if err := cloneController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Pod](
×
122
                func(ctx context.Context, obj *corev1.Pod) []reconcile.Request {
×
123
                        target, ok := obj.GetAnnotations()[AnnOwnerRef]
×
124
                        if !ok {
×
125
                                return nil
×
126
                        }
×
127
                        namespace, name, err := cache.SplitMetaNamespaceKey(target)
×
128
                        if err != nil {
×
129
                                return nil
×
130
                        }
×
131
                        return []reconcile.Request{
×
132
                                {
×
133
                                        NamespacedName: types.NamespacedName{
×
134
                                                Namespace: namespace,
×
135
                                                Name:      name,
×
136
                                        },
×
137
                                },
×
138
                        }
×
139
                },
140
        ))); err != nil {
×
141
                return err
×
142
        }
×
143
        return nil
×
144
}
145

146
func (r *CloneReconciler) shouldReconcile(pvc *corev1.PersistentVolumeClaim, log logr.Logger) bool {
1✔
147
        return checkPVC(pvc, cc.AnnCloneRequest, log) &&
1✔
148
                !metav1.HasAnnotation(pvc.ObjectMeta, cc.AnnCloneOf) &&
1✔
149
                isBound(pvc, log)
1✔
150
}
1✔
151

152
// Reconcile the reconcile loop for host assisted clone pvc.
153
func (r *CloneReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
154
        // Get the PVC.
1✔
155
        pvc := &corev1.PersistentVolumeClaim{}
1✔
156
        if err := r.client.Get(ctx, req.NamespacedName, pvc); err != nil {
2✔
157
                if k8serrors.IsNotFound(err) {
2✔
158
                        return reconcile.Result{}, nil
1✔
159
                }
1✔
160
                return reconcile.Result{}, err
×
161
        }
162
        log := r.log.WithValues("PVC", req.NamespacedName)
1✔
163
        log.V(1).Info("reconciling Clone PVCs")
1✔
164
        if pvc.DeletionTimestamp != nil || !r.shouldReconcile(pvc, log) {
2✔
165
                log.V(1).Info("Should not reconcile this PVC",
1✔
166
                        "checkPVC(AnnCloneRequest)", checkPVC(pvc, cc.AnnCloneRequest, log),
1✔
167
                        "NOT has annotation(AnnCloneOf)", !metav1.HasAnnotation(pvc.ObjectMeta, cc.AnnCloneOf),
1✔
168
                        "isBound", isBound(pvc, log),
1✔
169
                        "has finalizer?", cc.HasFinalizer(pvc, cloneSourcePodFinalizer))
1✔
170
                if cc.HasFinalizer(pvc, cloneSourcePodFinalizer) || pvc.DeletionTimestamp != nil {
2✔
171
                        // Clone completed, remove source pod and/or finalizer
1✔
172
                        if err := r.cleanup(pvc, log); err != nil {
1✔
173
                                return reconcile.Result{}, err
×
174
                        }
×
175
                }
176
                return reconcile.Result{}, nil
1✔
177
        }
178

179
        ready, err := r.waitTargetPodRunningOrSucceeded(pvc, log)
1✔
180
        if err != nil {
2✔
181
                return reconcile.Result{}, errors.Wrap(err, "error ensuring target upload pod running")
1✔
182
        }
1✔
183

184
        if !ready {
2✔
185
                log.V(3).Info("Upload pod not ready yet for PVC")
1✔
186
                return reconcile.Result{}, nil
1✔
187
        }
1✔
188

189
        sourcePod, err := r.findCloneSourcePod(pvc)
1✔
190
        if err != nil {
1✔
191
                return reconcile.Result{}, err
×
192
        }
×
193

194
        _, nameExists := pvc.Annotations[cc.AnnCloneSourcePod]
1✔
195
        if !nameExists && sourcePod == nil {
2✔
196
                pvc.Annotations[cc.AnnCloneSourcePod] = cc.CreateCloneSourcePodName(pvc)
1✔
197

1✔
198
                // add finalizer before creating clone source pod
1✔
199
                cc.AddFinalizer(pvc, cloneSourcePodFinalizer)
1✔
200

1✔
201
                if err := r.updatePVC(pvc); err != nil {
1✔
202
                        return reconcile.Result{}, err
×
203
                }
×
204

205
                // will reconcile again after PVC update notification
206
                return reconcile.Result{}, nil
1✔
207
        }
208

209
        if requeueAfter, err := r.reconcileSourcePod(ctx, sourcePod, pvc, log); requeueAfter != 0 || err != nil {
2✔
210
                return reconcile.Result{RequeueAfter: requeueAfter}, err
1✔
211
        }
1✔
212

213
        if err := r.ensureCertSecret(sourcePod, pvc); err != nil {
2✔
214
                return reconcile.Result{}, err
1✔
215
        }
1✔
216

217
        if err := r.updatePvcFromPod(sourcePod, pvc, log); err != nil {
1✔
218
                return reconcile.Result{}, err
×
219
        }
×
220
        return reconcile.Result{}, nil
1✔
221
}
222

223
func (r *CloneReconciler) reconcileSourcePod(ctx context.Context, sourcePod *corev1.Pod, targetPvc *corev1.PersistentVolumeClaim, log logr.Logger) (time.Duration, error) {
1✔
224
        if sourcePod == nil {
2✔
225
                sourcePvc, err := r.getCloneRequestSourcePVC(targetPvc)
1✔
226
                if err != nil {
1✔
227
                        return 0, err
×
228
                }
×
229

230
                sourcePopulated, err := cc.IsPopulated(sourcePvc, r.client)
1✔
231
                if err != nil {
1✔
232
                        return 0, err
×
233
                }
×
234
                if !sourcePopulated {
1✔
235
                        return 2 * time.Second, nil
×
236
                }
×
237

238
                if err := r.validateSourceAndTarget(ctx, sourcePvc, targetPvc); err != nil {
2✔
239
                        return 0, err
1✔
240
                }
1✔
241

242
                pods, err := cc.GetPodsUsingPVCs(ctx, r.client, sourcePvc.Namespace, sets.New(sourcePvc.Name), true)
1✔
243
                if err != nil {
1✔
244
                        return 0, err
×
245
                }
×
246

247
                if len(pods) > 0 {
2✔
248
                        es, err := cc.GetAnnotatedEventSource(ctx, r.client, targetPvc)
1✔
249
                        if err != nil {
1✔
250
                                return 0, err
×
251
                        }
×
252
                        for _, pod := range pods {
2✔
253
                                r.log.V(1).Info("can't create clone source pod, pvc in use by other pod",
1✔
254
                                        "namespace", sourcePvc.Namespace, "name", sourcePvc.Name, "pod", pod.Name)
1✔
255
                                r.recorder.Eventf(es, corev1.EventTypeWarning, cc.CloneSourceInUse,
1✔
256
                                        "pod %s/%s using PersistentVolumeClaim %s", pod.Namespace, pod.Name, sourcePvc.Name)
1✔
257
                        }
1✔
258
                        return 2 * time.Second, nil
1✔
259
                }
260

261
                sourcePod, err := r.CreateCloneSourcePod(r.image, r.pullPolicy, targetPvc, log)
1✔
262
                // Check if pod has failed and, in that case, record an event with the error
1✔
263
                if podErr := cc.HandleFailedPod(err, cc.CreateCloneSourcePodName(targetPvc), targetPvc, r.recorder, r.client); podErr != nil {
1✔
264
                        return 0, podErr
×
265
                }
×
266

267
                log.V(3).Info("Created source pod ", "sourcePod.Namespace", sourcePod.Namespace, "sourcePod.Name", sourcePod.Name)
1✔
268
        }
269
        return 0, nil
1✔
270
}
271

272
func (r *CloneReconciler) ensureCertSecret(sourcePod *corev1.Pod, targetPvc *corev1.PersistentVolumeClaim) error {
1✔
273
        if sourcePod == nil {
2✔
274
                return nil
1✔
275
        }
1✔
276

277
        if sourcePod.Status.Phase == corev1.PodRunning {
1✔
278
                return nil
×
279
        }
×
280

281
        clientName, ok := targetPvc.Annotations[AnnUploadClientName]
1✔
282
        if !ok {
2✔
283
                return errors.Errorf("PVC %s/%s missing required %s annotation", targetPvc.Namespace, targetPvc.Name, AnnUploadClientName)
1✔
284
        }
1✔
285

286
        certConfig, err := operator.GetCertConfigWithDefaults(context.TODO(), r.client)
1✔
287
        if err != nil {
1✔
288
                return err
×
289
        }
×
290

291
        cert, key, err := r.clientCertGenerator.MakeClientCert(clientName, nil, certConfig.Client.Duration.Duration)
1✔
292
        if err != nil {
1✔
293
                return err
×
294
        }
×
295

296
        secret := &corev1.Secret{
1✔
297
                ObjectMeta: metav1.ObjectMeta{
1✔
298
                        Name:      sourcePod.Name,
1✔
299
                        Namespace: sourcePod.Namespace,
1✔
300
                        Annotations: map[string]string{
1✔
301
                                cc.AnnCreatedBy: "yes",
1✔
302
                        },
1✔
303
                        Labels: map[string]string{
1✔
304
                                common.CDILabelKey:       common.CDILabelValue, //filtered by the podInformer
1✔
305
                                common.CDIComponentLabel: common.ClonerSourcePodName,
1✔
306
                        },
1✔
307
                        OwnerReferences: []metav1.OwnerReference{
1✔
308
                                MakePodOwnerReference(sourcePod),
1✔
309
                        },
1✔
310
                },
1✔
311
                Data: map[string][]byte{
1✔
312
                        "tls.key": key,
1✔
313
                        "tls.crt": cert,
1✔
314
                },
1✔
315
        }
1✔
316

1✔
317
        util.SetRecommendedLabels(secret, r.installerLabels, "cdi-controller")
1✔
318

1✔
319
        err = r.client.Create(context.TODO(), secret)
1✔
320
        if err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
321
                return errors.Wrap(err, "error creating cert secret")
×
322
        }
×
323

324
        return nil
1✔
325
}
326

327
func (r *CloneReconciler) updatePvcFromPod(sourcePod *corev1.Pod, pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
1✔
328
        currentPvcCopy := pvc.DeepCopyObject()
1✔
329
        log.V(1).Info("Updating PVC from pod")
1✔
330

1✔
331
        log.V(3).Info("Pod phase for PVC", "PVC phase", pvc.Annotations[cc.AnnPodPhase])
1✔
332

1✔
333
        if podSucceededFromPVC(pvc) && pvc.Annotations[cc.AnnCloneOf] != "true" && sourcePodFinished(sourcePod) {
2✔
334
                log.V(1).Info("Adding CloneOf annotation to PVC")
1✔
335
                pvc.Annotations[cc.AnnCloneOf] = "true"
1✔
336
                r.recorder.Event(pvc, corev1.EventTypeNormal, CloneSucceededPVC, cc.CloneComplete)
1✔
337
        }
1✔
338

339
        setAnnotationsFromPodWithPrefix(pvc.Annotations, sourcePod, nil, cc.AnnSourceRunningCondition)
1✔
340

1✔
341
        if !reflect.DeepEqual(currentPvcCopy, pvc) {
2✔
342
                return r.updatePVC(pvc)
1✔
343
        }
1✔
344
        return nil
1✔
345
}
346

347
func sourcePodFinished(sourcePod *corev1.Pod) bool {
1✔
348
        if sourcePod == nil {
1✔
349
                return true
×
350
        }
×
351

352
        return sourcePod.Status.Phase == corev1.PodSucceeded || sourcePod.Status.Phase == corev1.PodFailed
1✔
353
}
354

355
func (r *CloneReconciler) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
356
        if err := r.client.Update(context.TODO(), pvc); err != nil {
1✔
357
                return err
×
358
        }
×
359
        return nil
1✔
360
}
361

362
func (r *CloneReconciler) waitTargetPodRunningOrSucceeded(pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, error) {
1✔
363
        rs, ok := pvc.Annotations[cc.AnnPodReady]
1✔
364
        if !ok {
2✔
365
                log.V(3).Info("clone target pod not ready")
1✔
366
                return false, nil
1✔
367
        }
1✔
368

369
        ready, err := strconv.ParseBool(rs)
1✔
370
        if err != nil {
2✔
371
                return false, errors.Wrapf(err, "error parsing %s annotation", cc.AnnPodReady)
1✔
372
        }
1✔
373

374
        return ready || podSucceededFromPVC(pvc), nil
1✔
375
}
376

377
func (r *CloneReconciler) findCloneSourcePod(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
1✔
378
        isCloneRequest, sourceNamespace, _ := ParseCloneRequestAnnotation(pvc)
1✔
379
        if !isCloneRequest {
1✔
380
                return nil, nil
×
381
        }
×
382
        cloneSourcePodName, exists := pvc.Annotations[cc.AnnCloneSourcePod]
1✔
383
        if !exists {
2✔
384
                // fallback to legacy name, to find any pod that still might be running after upgrade
1✔
385
                cloneSourcePodName = cc.CreateCloneSourcePodName(pvc)
1✔
386
        }
1✔
387

388
        selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
1✔
389
                MatchLabels: map[string]string{
1✔
390
                        cc.CloneUniqueID: cloneSourcePodName,
1✔
391
                },
1✔
392
        })
1✔
393
        if err != nil {
1✔
394
                return nil, errors.Wrap(err, "error creating label selector")
×
395
        }
×
396

397
        podList := &corev1.PodList{}
1✔
398
        if err := r.client.List(context.TODO(), podList, &client.ListOptions{Namespace: sourceNamespace, LabelSelector: selector}); err != nil {
1✔
399
                return nil, errors.Wrap(err, "error listing pods")
×
400
        }
×
401

402
        if len(podList.Items) > 1 {
1✔
403
                return nil, errors.Errorf("multiple source pods found for clone PVC %s/%s", pvc.Namespace, pvc.Name)
×
404
        }
×
405

406
        if len(podList.Items) == 0 {
2✔
407
                return nil, nil
1✔
408
        }
1✔
409

410
        return &podList.Items[0], nil
1✔
411
}
412

413
func (r *CloneReconciler) validateSourceAndTarget(ctx context.Context, sourcePvc, targetPvc *corev1.PersistentVolumeClaim) error {
1✔
414
        if err := r.multiTokenValidator.ValidatePVC(sourcePvc, targetPvc); err != nil {
1✔
415
                return err
×
416
        }
×
417
        contentType, err := ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc)
1✔
418
        if err != nil {
2✔
419
                return err
1✔
420
        }
1✔
421
        err = ValidateCanCloneSourceAndTargetSpec(ctx, r.client, sourcePvc, targetPvc, contentType)
1✔
422
        if err == nil {
2✔
423
                // Validation complete, put source PVC bound status in annotation
1✔
424
                setBoundConditionFromPVC(targetPvc.GetAnnotations(), cc.AnnBoundCondition, sourcePvc)
1✔
425
                return nil
1✔
426
        }
1✔
427
        return err
1✔
428
}
429

430
// returns the CloneRequest string which contains the pvc name (and namespace) from which we want to clone the image.
431
func (r *CloneReconciler) getCloneRequestSourcePVC(pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
1✔
432
        exists, namespace, name := ParseCloneRequestAnnotation(pvc)
1✔
433
        if !exists {
1✔
434
                return nil, errors.New("error parsing clone request annotation")
×
435
        }
×
436
        pvc = &corev1.PersistentVolumeClaim{}
1✔
437
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, pvc); err != nil {
1✔
438
                return nil, errors.Wrap(err, "error getting clone source PVC")
×
439
        }
×
440
        return pvc, nil
1✔
441
}
442

443
func (r *CloneReconciler) cleanup(pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
1✔
444
        log.V(3).Info("Cleaning up for PVC", "pvc.Namespace", pvc.Namespace, "pvc.Name", pvc.Name)
1✔
445

1✔
446
        pod, err := r.findCloneSourcePod(pvc)
1✔
447
        if err != nil {
1✔
448
                return err
×
449
        }
×
450

451
        if pod != nil && pod.DeletionTimestamp == nil {
2✔
452
                if podSucceededFromPVC(pvc) && pod.Status.Phase == corev1.PodRunning {
1✔
453
                        log.V(3).Info("Clone succeeded, waiting for source pod to stop running", "pod.Namespace", pod.Namespace, "pod.Name", pod.Name)
×
454
                        return nil
×
455
                }
×
456
                if cc.ShouldDeletePod(pvc) {
2✔
457
                        log.V(3).Info("Deleting pod", "pod.Name", pod.Name)
1✔
458
                        if err = r.client.Delete(context.TODO(), pod); err != nil {
1✔
459
                                if !k8serrors.IsNotFound(err) {
×
460
                                        return errors.Wrap(err, "error deleting clone source pod")
×
461
                                }
×
462
                        }
463
                }
464
        }
465

466
        cc.RemoveFinalizer(pvc, cloneSourcePodFinalizer)
1✔
467

1✔
468
        return r.updatePVC(pvc)
1✔
469
}
470

471
// CreateCloneSourcePod creates our cloning src pod which will be used for out of band cloning to read the contents of the src PVC
472
func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (*corev1.Pod, error) {
1✔
473
        exists, _, _ := ParseCloneRequestAnnotation(pvc)
1✔
474
        if !exists {
1✔
475
                return nil, errors.Errorf("bad CloneRequest Annotation")
×
476
        }
×
477

478
        ownerKey, err := cache.MetaNamespaceKeyFunc(pvc)
1✔
479
        if err != nil {
1✔
480
                return nil, errors.Wrap(err, "error getting cache key")
×
481
        }
×
482

483
        serverCABundle, err := r.serverCAFetcher.BundleBytes()
1✔
484
        if err != nil {
1✔
485
                return nil, err
×
486
        }
×
487

488
        podResourceRequirements, err := cc.GetDefaultPodResourceRequirements(r.client)
1✔
489
        if err != nil {
1✔
490
                return nil, err
×
491
        }
×
492

493
        imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
1✔
494
        if err != nil {
1✔
495
                return nil, err
×
496
        }
×
497

498
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), r.client)
1✔
499
        if err != nil {
1✔
500
                return nil, err
×
501
        }
×
502

503
        sourcePvc, err := r.getCloneRequestSourcePVC(pvc)
1✔
504
        if err != nil {
1✔
505
                return nil, err
×
506
        }
×
507

508
        var sourceVolumeMode corev1.PersistentVolumeMode
1✔
509
        if sourcePvc.Spec.VolumeMode != nil {
2✔
510
                sourceVolumeMode = *sourcePvc.Spec.VolumeMode
1✔
511
        } else {
2✔
512
                sourceVolumeMode = corev1.PersistentVolumeFilesystem
1✔
513
        }
1✔
514

515
        pod := MakeCloneSourcePodSpec(sourceVolumeMode, image, pullPolicy, ownerKey, imagePullSecrets, serverCABundle, pvc, sourcePvc, podResourceRequirements, workloadNodePlacement)
1✔
516
        util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")
1✔
517

1✔
518
        if err := r.client.Create(context.TODO(), pod); err != nil {
1✔
519
                return nil, errors.Wrap(err, "source pod API create errored")
×
520
        }
×
521

522
        log.V(1).Info("cloning source pod (image) created\n", "pod.Namespace", pod.Namespace, "pod.Name", pod.Name, "image", image)
1✔
523

1✔
524
        return pod, nil
1✔
525
}
526

527
// MakeCloneSourcePodSpec creates and returns the clone source pod spec based on the target pvc.
528
func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image, pullPolicy, ownerRefAnno string, imagePullSecrets []corev1.LocalObjectReference,
529
        serverCACert []byte, targetPvc, sourcePvc *corev1.PersistentVolumeClaim, resourceRequirements *corev1.ResourceRequirements,
530
        workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {
1✔
531
        sourcePvcName := sourcePvc.GetName()
1✔
532
        sourcePvcNamespace := sourcePvc.GetNamespace()
1✔
533
        sourcePvcUID := string(sourcePvc.GetUID())
1✔
534

1✔
535
        var ownerID string
1✔
536
        cloneSourcePodName := targetPvc.Annotations[cc.AnnCloneSourcePod]
1✔
537
        url := GetUploadServerURL(targetPvc.Namespace, targetPvc.Name, common.UploadPathSync)
1✔
538
        pvcOwner := metav1.GetControllerOf(targetPvc)
1✔
539
        if pvcOwner != nil && pvcOwner.Kind == "DataVolume" {
1✔
540
                ownerID = string(pvcOwner.UID)
×
541
        } else {
1✔
542
                ouid, ok := targetPvc.Annotations[cc.AnnOwnerUID]
1✔
543
                if ok {
1✔
544
                        ownerID = ouid
×
545
                }
×
546
        }
547

548
        preallocationRequested := targetPvc.Annotations[cc.AnnPreallocationRequested]
1✔
549

1✔
550
        pod := &corev1.Pod{
1✔
551
                ObjectMeta: metav1.ObjectMeta{
1✔
552
                        Name:      cloneSourcePodName,
1✔
553
                        Namespace: sourcePvcNamespace,
1✔
554
                        Annotations: map[string]string{
1✔
555
                                cc.AnnCreatedBy: "yes",
1✔
556
                                AnnOwnerRef:     ownerRefAnno,
1✔
557
                        },
1✔
558
                        Labels: map[string]string{
1✔
559
                                common.CDILabelKey:       common.CDILabelValue, //filtered by the podInformer
1✔
560
                                common.CDIComponentLabel: common.ClonerSourcePodName,
1✔
561
                                // this label is used when searching for a pvc's cloner source pod.
1✔
562
                                cc.CloneUniqueID:          cloneSourcePodName,
1✔
563
                                common.PrometheusLabelKey: common.PrometheusLabelValue,
1✔
564
                                hostAssistedCloneSource:   sourcePvcUID,
1✔
565
                        },
1✔
566
                },
1✔
567
                Spec: corev1.PodSpec{
1✔
568
                        Containers: []corev1.Container{
1✔
569
                                {
1✔
570
                                        Name:            common.ClonerSourcePodName,
1✔
571
                                        Image:           image,
1✔
572
                                        ImagePullPolicy: corev1.PullPolicy(pullPolicy),
1✔
573
                                        Env: []corev1.EnvVar{
1✔
574
                                                {
1✔
575
                                                        Name: "CLIENT_KEY",
1✔
576
                                                        ValueFrom: &corev1.EnvVarSource{
1✔
577
                                                                SecretKeyRef: &corev1.SecretKeySelector{
1✔
578
                                                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
579
                                                                                Name: cloneSourcePodName,
1✔
580
                                                                        },
1✔
581
                                                                        Key: "tls.key",
1✔
582
                                                                },
1✔
583
                                                        },
1✔
584
                                                },
1✔
585
                                                {
1✔
586
                                                        Name: "CLIENT_CERT",
1✔
587
                                                        ValueFrom: &corev1.EnvVarSource{
1✔
588
                                                                SecretKeyRef: &corev1.SecretKeySelector{
1✔
589
                                                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
590
                                                                                Name: cloneSourcePodName,
1✔
591
                                                                        },
1✔
592
                                                                        Key: "tls.crt",
1✔
593
                                                                },
1✔
594
                                                        },
1✔
595
                                                },
1✔
596
                                                {
1✔
597
                                                        Name:  "SERVER_CA_CERT",
1✔
598
                                                        Value: string(serverCACert),
1✔
599
                                                },
1✔
600
                                                {
1✔
601
                                                        Name:  "UPLOAD_URL",
1✔
602
                                                        Value: url,
1✔
603
                                                },
1✔
604
                                                {
1✔
605
                                                        Name:  common.OwnerUID,
1✔
606
                                                        Value: ownerID,
1✔
607
                                                },
1✔
608
                                                {
1✔
609
                                                        Name:  common.Preallocation,
1✔
610
                                                        Value: preallocationRequested,
1✔
611
                                                },
1✔
612
                                        },
1✔
613
                                        Ports: []corev1.ContainerPort{
1✔
614
                                                {
1✔
615
                                                        Name:          "metrics",
1✔
616
                                                        ContainerPort: 8443,
1✔
617
                                                        Protocol:      corev1.ProtocolTCP,
1✔
618
                                                },
1✔
619
                                        },
1✔
620
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
621
                                },
1✔
622
                        },
1✔
623
                        ImagePullSecrets: imagePullSecrets,
1✔
624
                        RestartPolicy:    corev1.RestartPolicyOnFailure,
1✔
625
                        Volumes: []corev1.Volume{
1✔
626
                                {
1✔
627
                                        Name: cc.DataVolName,
1✔
628
                                        VolumeSource: corev1.VolumeSource{
1✔
629
                                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
630
                                                        ClaimName: sourcePvcName,
1✔
631
                                                },
1✔
632
                                        },
1✔
633
                                },
1✔
634
                        },
1✔
635
                        NodeSelector:      workloadNodePlacement.NodeSelector,
1✔
636
                        Tolerations:       workloadNodePlacement.Tolerations,
1✔
637
                        Affinity:          workloadNodePlacement.Affinity,
1✔
638
                        PriorityClassName: cc.GetPriorityClass(targetPvc),
1✔
639
                },
1✔
640
        }
1✔
641

1✔
642
        if pod.Spec.Affinity == nil {
2✔
643
                pod.Spec.Affinity = &corev1.Affinity{}
1✔
644
        }
1✔
645

646
        if pod.Spec.Affinity.PodAffinity == nil {
2✔
647
                pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
1✔
648
        }
1✔
649

650
        if len(sourcePvc.Spec.AccessModes) == 1 && sourcePvc.Spec.AccessModes[0] == corev1.ReadWriteOnce {
1✔
651
                pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
×
652
                        pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
×
653
                        corev1.PodAffinityTerm{
×
654
                                LabelSelector: &metav1.LabelSelector{
×
655
                                        MatchExpressions: []metav1.LabelSelectorRequirement{
×
656
                                                {
×
657
                                                        Key:      hostAssistedCloneSource,
×
658
                                                        Operator: metav1.LabelSelectorOpIn,
×
659
                                                        Values:   []string{sourcePvcUID},
×
660
                                                },
×
661
                                        },
×
662
                                },
×
663
                                Namespaces:  []string{sourcePvcNamespace},
×
664
                                TopologyKey: corev1.LabelHostname,
×
665
                        },
×
666
                )
×
667
        }
×
668

669
        pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(
1✔
670
                pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
1✔
671
                corev1.WeightedPodAffinityTerm{
1✔
672
                        Weight: 100,
1✔
673
                        PodAffinityTerm: corev1.PodAffinityTerm{
1✔
674
                                LabelSelector: &metav1.LabelSelector{
1✔
675
                                        MatchExpressions: []metav1.LabelSelectorRequirement{
1✔
676
                                                {
1✔
677
                                                        Key:      common.UploadTargetLabel,
1✔
678
                                                        Operator: metav1.LabelSelectorOpIn,
1✔
679
                                                        Values:   []string{string(targetPvc.UID)},
1✔
680
                                                },
1✔
681
                                        },
1✔
682
                                },
1✔
683
                                Namespaces:  []string{targetPvc.Namespace},
1✔
684
                                TopologyKey: corev1.LabelHostname,
1✔
685
                        },
1✔
686
                },
1✔
687
        )
1✔
688

1✔
689
        if resourceRequirements != nil {
2✔
690
                pod.Spec.Containers[0].Resources = *resourceRequirements
1✔
691
        }
1✔
692

693
        var addVars []corev1.EnvVar
1✔
694

1✔
695
        if sourceVolumeMode == corev1.PersistentVolumeBlock {
2✔
696
                pod.Spec.Containers[0].VolumeDevices = cc.AddVolumeDevices()
1✔
697
                addVars = []corev1.EnvVar{
1✔
698
                        {
1✔
699
                                Name:  "VOLUME_MODE",
1✔
700
                                Value: "block",
1✔
701
                        },
1✔
702
                        {
1✔
703
                                Name:  "MOUNT_POINT",
1✔
704
                                Value: common.WriteBlockPath,
1✔
705
                        },
1✔
706
                }
1✔
707
        } else {
2✔
708
                pod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
1✔
709
                        {
1✔
710
                                Name:      cc.DataVolName,
1✔
711
                                MountPath: common.ClonerMountPath,
1✔
712
                                ReadOnly:  true,
1✔
713
                        },
1✔
714
                }
1✔
715
                addVars = []corev1.EnvVar{
1✔
716
                        {
1✔
717
                                Name:  "VOLUME_MODE",
1✔
718
                                Value: "filesystem",
1✔
719
                        },
1✔
720
                        {
1✔
721
                                Name:  "MOUNT_POINT",
1✔
722
                                Value: common.ClonerMountPath,
1✔
723
                        },
1✔
724
                }
1✔
725
        }
1✔
726

727
        pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, addVars...)
1✔
728
        cc.CopyAllowedAnnotations(targetPvc, pod)
1✔
729
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
730
        return pod
1✔
731
}
732

733
// ParseCloneRequestAnnotation parses the clone request annotation
734
func ParseCloneRequestAnnotation(pvc *corev1.PersistentVolumeClaim) (exists bool, namespace, name string) {
1✔
735
        var ann string
1✔
736
        ann, exists = pvc.Annotations[cc.AnnCloneRequest]
1✔
737
        if !exists {
2✔
738
                return false, "", ""
1✔
739
        }
1✔
740

741
        sp := strings.Split(ann, "/")
1✔
742
        if len(sp) != 2 {
2✔
743
                return false, "", ""
1✔
744
        }
1✔
745

746
        return true, sp[0], sp[1]
1✔
747
}
748

749
// ValidateCanCloneSourceAndTargetContentType validates the pvcs passed has the same content type.
750
func ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc *corev1.PersistentVolumeClaim) (cdiv1.DataVolumeContentType, error) {
1✔
751
        sourceContentType := cc.GetPVCContentType(sourcePvc)
1✔
752
        targetContentType := cc.GetPVCContentType(targetPvc)
1✔
753
        if sourceContentType != targetContentType {
2✔
754
                return "", fmt.Errorf("source contentType (%s) and target contentType (%s) do not match", sourceContentType, targetContentType)
1✔
755
        }
1✔
756
        return sourceContentType, nil
1✔
757
}
758

759
// ValidateCanCloneSourceAndTargetSpec validates the specs passed in are compatible for cloning.
760
func ValidateCanCloneSourceAndTargetSpec(ctx context.Context, c client.Client, sourcePvc, targetPvc *corev1.PersistentVolumeClaim, contentType cdiv1.DataVolumeContentType) error {
1✔
761
        // This annotation is only needed for some specific cases, when the target size is actually calculated by
1✔
762
        // the size detection pod, and there are some differences in fs overhead between src and target volumes
1✔
763
        _, permissive := targetPvc.Annotations[cc.AnnPermissiveClone]
1✔
764

1✔
765
        // Allow different source and target volume modes only on KubeVirt content type
1✔
766
        sourceVolumeMode := util.ResolveVolumeMode(sourcePvc.Spec.VolumeMode)
1✔
767
        targetVolumeMode := util.ResolveVolumeMode(targetPvc.Spec.VolumeMode)
1✔
768
        if sourceVolumeMode != targetVolumeMode && contentType != cdiv1.DataVolumeKubeVirt {
2✔
769
                return fmt.Errorf("source volumeMode (%s) and target volumeMode (%s) do not match, contentType (%s)",
1✔
770
                        sourceVolumeMode, targetVolumeMode, contentType)
1✔
771
        }
1✔
772

773
        // TODO: use detection pod here, then permissive should not be needed
774
        sourceUsableSpace, err := getUsableSpace(ctx, c, sourcePvc)
1✔
775
        if err != nil {
1✔
776
                return err
×
777
        }
×
778
        targetUsableSpace, err := getUsableSpace(ctx, c, targetPvc)
1✔
779
        if err != nil {
1✔
780
                return err
×
781
        }
×
782

783
        if !permissive && sourceUsableSpace.Cmp(targetUsableSpace) > 0 {
1✔
784
                return errors.New("target resources requests storage size is smaller than the source")
×
785
        }
×
786

787
        // Can clone.
788
        return nil
1✔
789
}
790

791
func getUsableSpace(ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim) (resource.Quantity, error) {
1✔
792
        sizeRequest := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
793
        volumeMode := util.ResolveVolumeMode(pvc.Spec.VolumeMode)
1✔
794

1✔
795
        if volumeMode == corev1.PersistentVolumeFilesystem {
2✔
796
                fsOverhead, err := cc.GetFilesystemOverheadForStorageClass(ctx, c, pvc.Spec.StorageClassName)
1✔
797
                if err != nil {
1✔
798
                        return resource.Quantity{}, err
×
799
                }
×
800
                fsOverheadFloat, _ := strconv.ParseFloat(string(fsOverhead), 64)
1✔
801
                usableSpaceRaw := util.GetUsableSpace(fsOverheadFloat, sizeRequest.Value())
1✔
802

1✔
803
                return *resource.NewScaledQuantity(usableSpaceRaw, 0), nil
1✔
804
        }
805

806
        return sizeRequest, nil
1✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc