• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5326

14 May 2025 04:40PM UTC coverage: 59.303% (+0.02%) from 59.288%
#5326

push

travis-ci

web-flow
Enable WebhookPvcRendering feature gate by default (#3736)

The feature is available since v1.59, and we enable it by default to
allow increasing PVC size to the minimum supported by its provisioner
(#3711), and mainly in order to support:
https://github.com/kubevirt/kubevirt/pull/14637

As a bonus, the related Serial tests are now parallel. Thanks akalenyu:)

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

16870 of 28447 relevant lines covered (59.3%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.31
/pkg/controller/upload-controller.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/go-logr/logr"
28
        "github.com/pkg/errors"
29

30
        corev1 "k8s.io/api/core/v1"
31
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        runtime "k8s.io/apimachinery/pkg/runtime"
34
        "k8s.io/apimachinery/pkg/types"
35
        "k8s.io/apimachinery/pkg/util/intstr"
36
        "k8s.io/apimachinery/pkg/util/sets"
37
        "k8s.io/client-go/tools/record"
38
        "k8s.io/utils/ptr"
39

40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/controller"
42
        "sigs.k8s.io/controller-runtime/pkg/handler"
43
        "sigs.k8s.io/controller-runtime/pkg/manager"
44
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
45
        "sigs.k8s.io/controller-runtime/pkg/source"
46

47
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
48
        "kubevirt.io/containerized-data-importer/pkg/common"
49
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
50
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
51
        "kubevirt.io/containerized-data-importer/pkg/operator"
52
        "kubevirt.io/containerized-data-importer/pkg/util"
53
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
54
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
55
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
56
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
57
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
58
)
59

60
const (
61
        // AnnUploadClientName is the TLS name uploadserver will accept requests from
62
        AnnUploadClientName = "cdi.kubevirt.io/uploadClientName"
63

64
        // AnnUploadPod name of the upload pod
65
        AnnUploadPod = "cdi.kubevirt.io/storage.uploadPodName"
66

67
        annCreatedByUpload = "cdi.kubevirt.io/storage.createdByUploadController"
68

69
        uploadServerClientName = "client.upload-server.cdi.kubevirt.io"
70

71
        // UploadSucceededPVC provides a const to indicate an import to the PVC failed
72
        UploadSucceededPVC = "UploadSucceeded"
73

74
        // UploadTargetInUse is reason for event created when an upload pvc is in use
75
        UploadTargetInUse = "UploadTargetInUse"
76

77
        certVolName = "tls-config"
78

79
        certMountPath = "/etc/tls/"
80

81
        serverCertFile = certMountPath + "tls.crt"
82

83
        serverKeyFile = certMountPath + "tls.key"
84

85
        clientCertFile = certMountPath + "ca.crt"
86
)
87

88
// UploadReconciler members
89
type UploadReconciler struct {
90
        client              client.Client
91
        recorder            record.EventRecorder
92
        scheme              *runtime.Scheme
93
        log                 logr.Logger
94
        image               string
95
        verbose             string
96
        pullPolicy          string
97
        serverCertGenerator generator.CertGenerator
98
        clientCAFetcher     fetcher.CertBundleFetcher
99
        featureGates        featuregates.FeatureGates
100
        installerLabels     map[string]string
101
}
102

103
// UploadPodArgs are the parameters required to create an upload pod
104
type UploadPodArgs struct {
105
        Name                            string
106
        PVC                             *corev1.PersistentVolumeClaim
107
        ScratchPVCName                  string
108
        ClientName                      string
109
        FilesystemOverhead              string
110
        ServerCert, ServerKey, ClientCA []byte
111
        Preallocation                   string
112
        CryptoEnvVars                   CryptoEnvVars
113
        Deadline                        *time.Time
114
}
115

116
// CryptoEnvVars holds the TLS crypto-related configurables for the upload server
117
type CryptoEnvVars struct {
118
        Ciphers       string
119
        MinTLSVersion string
120
}
121

122
// Reconcile the reconcile loop for the CDIConfig object.
123
func (r *UploadReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
124
        log := r.log.WithValues("PVC", req.NamespacedName)
1✔
125
        log.V(1).Info("reconciling Upload PVCs")
1✔
126

1✔
127
        // Get the PVC.
1✔
128
        pvc := &corev1.PersistentVolumeClaim{}
1✔
129
        if err := r.client.Get(context.TODO(), req.NamespacedName, pvc); err != nil {
2✔
130
                if k8serrors.IsNotFound(err) {
2✔
131
                        return reconcile.Result{}, nil
1✔
132
                }
1✔
133
                return reconcile.Result{}, err
×
134
        }
135

136
        _, isUpload := pvc.Annotations[cc.AnnUploadRequest]
1✔
137
        _, isCloneTarget := pvc.Annotations[cc.AnnCloneRequest]
1✔
138

1✔
139
        if isUpload && isCloneTarget {
2✔
140
                log.V(1).Info("PVC has both clone and upload annotations")
1✔
141
                return reconcile.Result{}, errors.New("PVC has both clone and upload annotations")
1✔
142
        }
1✔
143
        shouldReconcile, err := r.shouldReconcile(isUpload, isCloneTarget, pvc, log)
1✔
144
        if err != nil {
1✔
145
                return reconcile.Result{}, err
×
146
        }
×
147
        // force cleanup if PVC pending delete and pod running or the upload/clone annotation was removed
148
        if !shouldReconcile || podSucceededFromPVC(pvc) || pvc.DeletionTimestamp != nil {
2✔
149
                log.V(1).Info("not doing anything with PVC",
1✔
150
                        "isUpload", isUpload,
1✔
151
                        "isCloneTarget", isCloneTarget,
1✔
152
                        "isBound", isBound(pvc, log),
1✔
153
                        "podSucceededFromPVC", podSucceededFromPVC(pvc),
1✔
154
                        "deletionTimeStamp set?", pvc.DeletionTimestamp != nil)
1✔
155
                if err := r.cleanup(pvc); err != nil {
1✔
156
                        return reconcile.Result{}, err
×
157
                }
×
158
                return reconcile.Result{}, nil
1✔
159
        }
160

161
        log.Info("Calling Upload reconcile PVC")
1✔
162
        return r.reconcilePVC(log, pvc, isCloneTarget)
1✔
163
}
164

165
func (r *UploadReconciler) shouldReconcile(isUpload bool, isCloneTarget bool, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, error) {
1✔
166
        waitForFirstConsumerEnabled, err := cc.IsWaitForFirstConsumerEnabled(pvc, r.featureGates)
1✔
167
        if err != nil {
1✔
168
                return false, err
×
169
        }
×
170

171
        return (isUpload || isCloneTarget) &&
1✔
172
                        shouldHandlePvc(pvc, waitForFirstConsumerEnabled, log),
1✔
173
                nil
1✔
174
}
175

176
func (r *UploadReconciler) reconcilePVC(log logr.Logger, pvc *corev1.PersistentVolumeClaim, isCloneTarget bool) (reconcile.Result, error) {
1✔
177
        var uploadClientName string
1✔
178
        pvcCopy := pvc.DeepCopy()
1✔
179
        anno := pvcCopy.Annotations
1✔
180

1✔
181
        if isCloneTarget {
2✔
182
                source, err := r.getCloneRequestSourcePVC(pvc)
1✔
183
                if err != nil {
2✔
184
                        return reconcile.Result{}, err
1✔
185
                }
1✔
186
                contentType, err := ValidateCanCloneSourceAndTargetContentType(source, pvc)
1✔
187
                if err != nil {
1✔
188
                        return reconcile.Result{}, err
×
189
                }
×
190
                if err = ValidateCanCloneSourceAndTargetSpec(context.TODO(), r.client, source, pvc, contentType); err != nil {
1✔
191
                        log.Error(err, "Error validating clone spec, ignoring")
×
192
                        r.recorder.Eventf(pvc, corev1.EventTypeWarning, cc.ErrIncompatiblePVC, err.Error())
×
193
                        return reconcile.Result{}, nil
×
194
                }
×
195

196
                uploadClientName = fmt.Sprintf("%s/%s-%s/%s", source.Namespace, source.Name, pvc.Namespace, pvc.Name)
1✔
197
                anno[AnnUploadClientName] = uploadClientName
1✔
198
        } else {
1✔
199
                uploadClientName = uploadServerClientName
1✔
200
        }
1✔
201

202
        pod, err := r.findUploadPodForPvc(pvc)
1✔
203
        if err != nil {
2✔
204
                return reconcile.Result{}, err
1✔
205
        }
1✔
206

207
        if pod == nil {
2✔
208
                podsUsingPVC, err := cc.GetPodsUsingPVCs(context.TODO(), r.client, pvc.Namespace, sets.New(pvc.Name), false)
1✔
209
                if err != nil {
1✔
210
                        return reconcile.Result{}, err
×
211
                }
×
212

213
                if len(podsUsingPVC) > 0 {
2✔
214
                        es, err := cc.GetAnnotatedEventSource(context.TODO(), r.client, pvc)
1✔
215
                        if err != nil {
1✔
216
                                return reconcile.Result{}, err
×
217
                        }
×
218

219
                        for _, pod := range podsUsingPVC {
2✔
220
                                log.V(1).Info("can't create upload pod, pvc in use by other pod",
1✔
221
                                        "namespace", pvc.Namespace, "name", pvc.Name, "pod", pod.Name)
1✔
222
                                r.recorder.Eventf(es, corev1.EventTypeWarning, UploadTargetInUse,
1✔
223
                                        "pod %s/%s using PersistentVolumeClaim %s", pod.Namespace, pod.Name, pvc.Name)
1✔
224
                        }
1✔
225
                        return reconcile.Result{Requeue: true}, nil
1✔
226
                }
227

228
                podName, ok := pvc.Annotations[AnnUploadPod]
1✔
229

1✔
230
                if !ok {
2✔
231
                        podName = createUploadResourceName(pvc.Name)
1✔
232
                        if err := r.updatePvcPodName(pvc, podName, log); err != nil {
1✔
233
                                return reconcile.Result{}, err
×
234
                        }
×
235
                        return reconcile.Result{Requeue: true}, nil
1✔
236
                }
237
                pod, err = r.createUploadPodForPvc(pvc, podName, uploadClientName, isCloneTarget)
1✔
238
                if err != nil {
1✔
239
                        return reconcile.Result{}, err
×
240
                }
×
241
        }
242

243
        // Always try to get or create the scratch PVC for a pod that is not successful yet, if it exists nothing happens otherwise attempt to create.
244
        scratchPVCName, exists := getScratchNameFromPod(pod)
1✔
245
        if exists {
2✔
246
                _, err := r.getOrCreateScratchPvc(pvcCopy, pod, scratchPVCName)
1✔
247
                if err != nil {
1✔
248
                        return reconcile.Result{}, err
×
249
                }
×
250
        }
251

252
        svcName := naming.GetServiceNameFromResourceName(pod.Name)
1✔
253
        if _, err = r.getOrCreateUploadService(pvc, svcName); err != nil {
2✔
254
                return reconcile.Result{}, err
1✔
255
        }
1✔
256

257
        termMsg, err := parseTerminationMessage(pod)
1✔
258
        if err != nil {
1✔
259
                return reconcile.Result{}, err
×
260
        }
×
261

262
        deadlinePassed := termMsg != nil && termMsg.DeadlinePassed != nil && *termMsg.DeadlinePassed
1✔
263
        if deadlinePassed {
2✔
264
                if pod.DeletionTimestamp == nil {
2✔
265
                        log.V(1).Info("Deleting pod because deadline exceeded")
1✔
266
                        if err := r.client.Delete(context.TODO(), pod); err != nil {
1✔
267
                                return reconcile.Result{}, err
×
268
                        }
×
269
                }
270

271
                anno[cc.AnnPodPhase] = ""
1✔
272
                anno[cc.AnnPodReady] = "false"
1✔
273
        } else {
1✔
274
                anno[cc.AnnPodPhase] = string(pod.Status.Phase)
1✔
275
                anno[cc.AnnPodReady] = strconv.FormatBool(isPodReady(pod))
1✔
276
        }
1✔
277

278
        setAnnotationsFromPodWithPrefix(anno, pod, termMsg, cc.AnnRunningCondition)
1✔
279

1✔
280
        if !reflect.DeepEqual(pvc, pvcCopy) {
2✔
281
                if err := r.updatePVC(pvcCopy); err != nil {
1✔
282
                        return reconcile.Result{}, err
×
283
                }
×
284
                if podSucceededFromPVC(pvcCopy) && !isCloneTarget {
1✔
285
                        // Upload completed, emit event. clone controller will emit clone complete.
×
286
                        r.recorder.Event(pvc, corev1.EventTypeNormal, UploadSucceededPVC, "Upload Successful")
×
287
                }
×
288
        }
289

290
        return reconcile.Result{}, nil
1✔
291
}
292

293
func (r *UploadReconciler) updatePvcPodName(pvc *corev1.PersistentVolumeClaim, podName string, log logr.Logger) error {
1✔
294
        currentPvcCopy := pvc.DeepCopyObject()
1✔
295

1✔
296
        log.V(1).Info("Updating PVC from pod")
1✔
297
        anno := pvc.GetAnnotations()
1✔
298
        anno[AnnUploadPod] = podName
1✔
299

1✔
300
        if !reflect.DeepEqual(currentPvcCopy, pvc) {
2✔
301
                if err := r.updatePVC(pvc); err != nil {
1✔
302
                        return err
×
303
                }
×
304
                log.V(1).Info("Updated PVC", "pvc.anno.AnnImportPod", anno[AnnUploadPod])
1✔
305
        }
306
        return nil
1✔
307
}
308

309
func (r *UploadReconciler) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
310
        r.log.V(1).Info("Phase is now", "pvc.anno.Phase", pvc.GetAnnotations()[cc.AnnPodPhase])
1✔
311
        if err := r.client.Update(context.TODO(), pvc); err != nil {
1✔
312
                return err
×
313
        }
×
314
        return nil
1✔
315
}
316

317
func (r *UploadReconciler) getCloneRequestSourcePVC(targetPvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
1✔
318
        sourceVolumeMode := corev1.PersistentVolumeFilesystem
1✔
319
        targetVolumeMode := corev1.PersistentVolumeFilesystem
1✔
320

1✔
321
        exists, namespace, name := ParseCloneRequestAnnotation(targetPvc)
1✔
322
        if !exists {
1✔
323
                return nil, errors.New("error parsing clone request annotation")
×
324
        }
×
325
        sourcePvc := &corev1.PersistentVolumeClaim{}
1✔
326
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, sourcePvc); err != nil {
1✔
327
                return nil, errors.Wrap(err, "error getting clone source PVC")
×
328
        }
×
329
        if sourcePvc.Spec.VolumeMode != nil {
2✔
330
                sourceVolumeMode = *sourcePvc.Spec.VolumeMode
1✔
331
        }
1✔
332
        if targetPvc.Spec.VolumeMode != nil {
1✔
333
                targetVolumeMode = *targetPvc.Spec.VolumeMode
×
334
        }
×
335
        // Allow different source and target volume modes only on KubeVirt content type
336
        contentType, err := ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc)
1✔
337
        if err != nil {
2✔
338
                return nil, err
1✔
339
        }
1✔
340
        if sourceVolumeMode != targetVolumeMode && contentType != cdiv1.DataVolumeKubeVirt {
2✔
341
                return nil, errors.New("Source and target volume modes do not match, and content type is not kubevirt")
1✔
342
        }
1✔
343
        return sourcePvc, nil
1✔
344
}
345

346
func (r *UploadReconciler) cleanup(pvc *corev1.PersistentVolumeClaim) error {
1✔
347
        resourceName := getUploadResourceNameFromPvc(pvc)
1✔
348
        svcName := naming.GetServiceNameFromResourceName(resourceName)
1✔
349

1✔
350
        // delete service
1✔
351
        if err := r.deleteService(pvc.Namespace, svcName); err != nil {
1✔
352
                return err
×
353
        }
×
354

355
        // delete pod
356
        pod := &corev1.Pod{}
1✔
357
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: resourceName, Namespace: pvc.Namespace}, pod); err != nil {
2✔
358
                if k8serrors.IsNotFound(err) {
2✔
359
                        return nil
1✔
360
                }
1✔
361
                return err
×
362
        }
363
        if pod.DeletionTimestamp == nil && cc.ShouldDeletePod(pvc) {
2✔
364
                if err := r.client.Delete(context.TODO(), pod); cc.IgnoreNotFound(err) != nil {
1✔
365
                        return err
×
366
                }
×
367
        }
368
        return nil
1✔
369
}
370
func (r *UploadReconciler) findUploadPodForPvc(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
1✔
371
        podName := getUploadResourceNameFromPvc(pvc)
1✔
372
        pod := &corev1.Pod{}
1✔
373
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: podName, Namespace: pvc.Namespace}, pod); err != nil {
2✔
374
                if !k8serrors.IsNotFound(err) {
1✔
375
                        return nil, errors.Wrapf(err, "error getting upload pod %s/%s", pvc.Namespace, podName)
×
376
                }
×
377
                return nil, nil
1✔
378
        }
379

380
        if !metav1.IsControlledBy(pod, pvc) {
2✔
381
                return nil, errors.Errorf("%s pod not controlled by pvc %s", podName, pvc.Name)
1✔
382
        }
1✔
383

384
        return pod, nil
1✔
385
}
386

387
func (r *UploadReconciler) createUploadPodForPvc(pvc *corev1.PersistentVolumeClaim, podName, clientName string, isCloneTarget bool) (*corev1.Pod, error) {
1✔
388
        certConfig, err := operator.GetCertConfigWithDefaults(context.TODO(), r.client)
1✔
389
        if err != nil {
1✔
390
                return nil, err
×
391
        }
×
392

393
        serverCert, serverKey, err := r.serverCertGenerator.MakeServerCert(
1✔
394
                pvc.Namespace,
1✔
395
                naming.GetServiceNameFromResourceName(podName),
1✔
396
                certConfig.Server.Duration.Duration,
1✔
397
        )
1✔
398
        if err != nil {
1✔
399
                return nil, err
×
400
        }
×
401

402
        clientCA, err := r.clientCAFetcher.BundleBytes()
1✔
403
        if err != nil {
1✔
404
                return nil, err
×
405
        }
×
406

407
        fsOverhead, err := GetFilesystemOverhead(context.TODO(), r.client, pvc)
1✔
408
        if err != nil {
1✔
409
                return nil, err
×
410
        }
×
411

412
        preallocationRequested := false
1✔
413
        if preallocation, err := strconv.ParseBool(getValueFromAnnotation(pvc, cc.AnnPreallocationRequested)); err == nil {
1✔
414
                preallocationRequested = preallocation
×
415
        }
×
416

417
        config := &cdiv1.CDIConfig{}
1✔
418
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, config); err != nil {
1✔
419
                return nil, err
×
420
        }
×
421
        ciphers, minTLSVersion := cryptowatch.SelectCipherSuitesAndMinTLSVersion(config.Spec.TLSSecurityProfile)
1✔
422
        cryptoVars := CryptoEnvVars{
1✔
423
                Ciphers:       strings.Join(ciphers, ","),
1✔
424
                MinTLSVersion: string(minTLSVersion),
1✔
425
        }
1✔
426

1✔
427
        serverRefresh := certConfig.Server.Duration.Duration - certConfig.Server.RenewBefore.Duration
1✔
428
        clientRefresh := certConfig.Client.Duration.Duration - certConfig.Client.RenewBefore.Duration
1✔
429

1✔
430
        args := UploadPodArgs{
1✔
431
                Name:               podName,
1✔
432
                PVC:                pvc,
1✔
433
                ScratchPVCName:     createScratchPvcNameFromPvc(pvc, isCloneTarget),
1✔
434
                ClientName:         clientName,
1✔
435
                FilesystemOverhead: string(fsOverhead),
1✔
436
                ServerCert:         serverCert,
1✔
437
                ServerKey:          serverKey,
1✔
438
                ClientCA:           clientCA,
1✔
439
                Preallocation:      strconv.FormatBool(preallocationRequested),
1✔
440
                CryptoEnvVars:      cryptoVars,
1✔
441
                Deadline:           ptr.To(time.Now().Add(min(serverRefresh, clientRefresh))),
1✔
442
        }
1✔
443

1✔
444
        r.log.V(3).Info("Creating upload pod")
1✔
445
        pod, err := r.createUploadPod(args)
1✔
446
        // Check if pod has failed and, in that case, record an event with the error
1✔
447
        if podErr := cc.HandleFailedPod(err, podName, pvc, r.recorder, r.client); podErr != nil {
1✔
448
                return nil, podErr
×
449
        }
×
450

451
        if err := r.ensureCertSecret(args, pod); err != nil {
1✔
452
                return nil, err
×
453
        }
×
454

455
        return pod, nil
1✔
456
}
457

458
func (r *UploadReconciler) getOrCreateScratchPvc(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod, name string) (*corev1.PersistentVolumeClaim, error) {
1✔
459
        // Set condition, then check if need to override with scratch pvc message
1✔
460
        anno := pvc.Annotations
1✔
461
        scratchPvc := &corev1.PersistentVolumeClaim{}
1✔
462
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: pvc.Namespace}, scratchPvc); err != nil {
2✔
463
                if !k8serrors.IsNotFound(err) {
1✔
464
                        return nil, errors.Wrap(err, "error getting scratch PVC")
×
465
                }
×
466

467
                storageClassName := GetScratchPvcStorageClass(r.client, pvc)
1✔
468

1✔
469
                anno[cc.AnnBoundCondition] = "false"
1✔
470
                anno[cc.AnnBoundConditionMessage] = "Creating scratch space"
1✔
471
                anno[cc.AnnBoundConditionReason] = creatingScratch
1✔
472
                // Scratch PVC doesn't exist yet, create it.
1✔
473
                scratchPvc, err = createScratchPersistentVolumeClaim(r.client, pvc, pod, name, storageClassName, map[string]string{}, r.recorder)
1✔
474
                if err != nil {
1✔
475
                        return nil, err
×
476
                }
×
477
        } else {
×
478
                if !metav1.IsControlledBy(scratchPvc, pod) {
×
479
                        return nil, errors.Errorf("%s scratch PVC not controlled by pod %s", scratchPvc.Name, pod.Name)
×
480
                }
×
481
                setBoundConditionFromPVC(anno, cc.AnnBoundCondition, scratchPvc)
×
482
        }
483

484
        return scratchPvc, nil
1✔
485
}
486

487
func (r *UploadReconciler) getOrCreateUploadService(pvc *corev1.PersistentVolumeClaim, name string) (*corev1.Service, error) {
1✔
488
        service := &corev1.Service{}
1✔
489
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: pvc.Namespace}, service); err != nil {
2✔
490
                if !k8serrors.IsNotFound(err) {
1✔
491
                        return nil, errors.Wrap(err, "error getting upload service")
×
492
                }
×
493
                service, err = r.createUploadService(name, pvc)
1✔
494
                if err != nil {
1✔
495
                        return nil, err
×
496
                }
×
497
        }
498

499
        if !metav1.IsControlledBy(service, pvc) {
2✔
500
                return nil, errors.Errorf("%s service not controlled by pvc %s", name, pvc.Name)
1✔
501
        }
1✔
502

503
        return service, nil
1✔
504
}
505

506
func (r *UploadReconciler) deleteService(namespace, serviceName string) error {
1✔
507
        service := &corev1.Service{}
1✔
508
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: serviceName, Namespace: namespace}, service); err != nil {
2✔
509
                if k8serrors.IsNotFound(err) {
2✔
510
                        return nil
1✔
511
                }
1✔
512
                return err
×
513
        }
514

515
        if service.DeletionTimestamp == nil {
2✔
516
                if err := r.client.Delete(context.TODO(), service); cc.IgnoreNotFound(err) != nil {
1✔
517
                        return errors.Wrap(err, "error deleting upload service")
×
518
                }
×
519
        }
520

521
        return nil
1✔
522
}
523

524
func isPodReady(pod *corev1.Pod) bool {
1✔
525
        if len(pod.Status.ContainerStatuses) == 0 {
2✔
526
                return false
1✔
527
        }
1✔
528

529
        numReady := 0
1✔
530
        for _, s := range pod.Status.ContainerStatuses {
2✔
531
                if s.Ready {
1✔
532
                        numReady++
×
533
                }
×
534
        }
535

536
        return numReady == len(pod.Status.ContainerStatuses)
1✔
537
}
538

539
// createUploadService creates an upload service manifest and sends it to server
540
func (r *UploadReconciler) createUploadService(name string, pvc *corev1.PersistentVolumeClaim) (*corev1.Service, error) {
1✔
541
        ns := pvc.Namespace
1✔
542
        service := r.makeUploadServiceSpec(name, pvc)
1✔
543
        util.SetRecommendedLabels(service, r.installerLabels, "cdi-controller")
1✔
544

1✔
545
        if err := r.client.Create(context.TODO(), service); err != nil {
1✔
546
                if k8serrors.IsAlreadyExists(err) {
×
547
                        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, service); err != nil {
×
548
                                return nil, errors.Wrap(err, "upload service should exist but couldn't retrieve it")
×
549
                        }
×
550
                } else {
×
551
                        return nil, errors.Wrap(err, "upload service API create errored")
×
552
                }
×
553
        }
554
        r.log.V(1).Info("upload service created\n", "Namespace", service.Namespace, "Name", service.Name)
1✔
555
        return service, nil
1✔
556
}
557

558
// makeUploadServiceSpec creates upload service manifest
559
func (r *UploadReconciler) makeUploadServiceSpec(name string, pvc *corev1.PersistentVolumeClaim) *corev1.Service {
1✔
560
        blockOwnerDeletion := true
1✔
561
        isController := true
1✔
562
        service := &corev1.Service{
1✔
563
                TypeMeta: metav1.TypeMeta{
1✔
564
                        Kind:       "Service",
1✔
565
                        APIVersion: "v1",
1✔
566
                },
1✔
567
                ObjectMeta: metav1.ObjectMeta{
1✔
568
                        Name:      name,
1✔
569
                        Namespace: pvc.Namespace,
1✔
570
                        Annotations: map[string]string{
1✔
571
                                annCreatedByUpload: "yes",
1✔
572
                        },
1✔
573
                        Labels: map[string]string{
1✔
574
                                common.CDILabelKey:       common.CDILabelValue,
1✔
575
                                common.CDIComponentLabel: common.UploadServerCDILabel,
1✔
576
                        },
1✔
577
                        OwnerReferences: []metav1.OwnerReference{
1✔
578
                                {
1✔
579
                                        APIVersion:         "v1",
1✔
580
                                        Kind:               "PersistentVolumeClaim",
1✔
581
                                        Name:               pvc.Name,
1✔
582
                                        UID:                pvc.GetUID(),
1✔
583
                                        BlockOwnerDeletion: &blockOwnerDeletion,
1✔
584
                                        Controller:         &isController,
1✔
585
                                },
1✔
586
                        },
1✔
587
                },
1✔
588
                Spec: corev1.ServiceSpec{
1✔
589
                        Ports: []corev1.ServicePort{
1✔
590
                                {
1✔
591
                                        Protocol: "TCP",
1✔
592
                                        Port:     443,
1✔
593
                                        TargetPort: intstr.IntOrString{
1✔
594
                                                Type:   intstr.Int,
1✔
595
                                                IntVal: 8443,
1✔
596
                                        },
1✔
597
                                },
1✔
598
                        },
1✔
599
                        Selector: map[string]string{
1✔
600
                                common.UploadServerServiceLabel: name,
1✔
601
                        },
1✔
602
                },
1✔
603
        }
1✔
604
        return service
1✔
605
}
1✔
606

607
// createUploadPod creates upload service pod manifest and sends to server
608
func (r *UploadReconciler) createUploadPod(args UploadPodArgs) (*corev1.Pod, error) {
1✔
609
        ns := args.PVC.Namespace
1✔
610

1✔
611
        podResourceRequirements, err := cc.GetDefaultPodResourceRequirements(r.client)
1✔
612
        if err != nil {
1✔
613
                return nil, err
×
614
        }
×
615

616
        imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
1✔
617
        if err != nil {
1✔
618
                return nil, err
×
619
        }
×
620

621
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), r.client)
1✔
622
        if err != nil {
1✔
623
                return nil, err
×
624
        }
×
625

626
        pod := r.makeUploadPodSpec(args, podResourceRequirements, imagePullSecrets, workloadNodePlacement)
1✔
627
        util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")
1✔
628

1✔
629
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: args.Name, Namespace: ns}, pod); err != nil {
2✔
630
                if !k8serrors.IsNotFound(err) {
1✔
631
                        return nil, errors.Wrap(err, "upload pod should exist but couldn't retrieve it")
×
632
                }
×
633
                if err := r.client.Create(context.TODO(), pod); err != nil {
1✔
634
                        return nil, err
×
635
                }
×
636
        }
637

638
        r.log.V(1).Info("upload pod created\n", "Namespace", pod.Namespace, "Name", pod.Name, "Image name", r.image)
1✔
639
        return pod, nil
1✔
640
}
641

642
func (r *UploadReconciler) ensureCertSecret(args UploadPodArgs, pod *corev1.Pod) error {
1✔
643
        if pod.Status.Phase == corev1.PodRunning {
1✔
644
                return nil
×
645
        }
×
646

647
        secret := &corev1.Secret{
1✔
648
                ObjectMeta: metav1.ObjectMeta{
1✔
649
                        Name:      args.Name,
1✔
650
                        Namespace: pod.Namespace,
1✔
651
                        Annotations: map[string]string{
1✔
652
                                annCreatedByUpload: "yes",
1✔
653
                        },
1✔
654
                        Labels: map[string]string{
1✔
655
                                common.CDILabelKey:       common.CDILabelValue,
1✔
656
                                common.CDIComponentLabel: common.UploadServerCDILabel,
1✔
657
                        },
1✔
658
                        OwnerReferences: []metav1.OwnerReference{
1✔
659
                                MakePodOwnerReference(pod),
1✔
660
                        },
1✔
661
                },
1✔
662
                Data: map[string][]byte{
1✔
663
                        "tls.key": args.ServerKey,
1✔
664
                        "tls.crt": args.ServerCert,
1✔
665
                        "ca.crt":  args.ClientCA,
1✔
666
                },
1✔
667
        }
1✔
668

1✔
669
        util.SetRecommendedLabels(secret, r.installerLabels, "cdi-controller")
1✔
670

1✔
671
        err := r.client.Create(context.TODO(), secret)
1✔
672
        if err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
673
                return errors.Wrap(err, "error creating cert secret")
×
674
        }
×
675

676
        return nil
1✔
677
}
678

679
// NewUploadController creates a new instance of the upload controller.
680
func NewUploadController(mgr manager.Manager, log logr.Logger, uploadImage, pullPolicy, verbose string, serverCertGenerator generator.CertGenerator, clientCAFetcher fetcher.CertBundleFetcher, installerLabels map[string]string) (controller.Controller, error) {
×
681
        client := mgr.GetClient()
×
682
        reconciler := &UploadReconciler{
×
683
                client:              client,
×
684
                scheme:              mgr.GetScheme(),
×
685
                log:                 log.WithName("upload-controller"),
×
686
                image:               uploadImage,
×
687
                verbose:             verbose,
×
688
                pullPolicy:          pullPolicy,
×
689
                recorder:            mgr.GetEventRecorderFor("upload-controller"),
×
690
                serverCertGenerator: serverCertGenerator,
×
691
                clientCAFetcher:     clientCAFetcher,
×
692
                featureGates:        featuregates.NewFeatureGates(client),
×
693
                installerLabels:     installerLabels,
×
694
        }
×
695
        uploadController, err := controller.New("upload-controller", mgr, controller.Options{
×
696
                MaxConcurrentReconciles: 3,
×
697
                Reconciler:              reconciler,
×
698
        })
×
699
        if err != nil {
×
700
                return nil, err
×
701
        }
×
702
        if err := addUploadControllerWatches(mgr, uploadController); err != nil {
×
703
                return nil, err
×
704
        }
×
705

706
        return uploadController, nil
×
707
}
708

709
func addUploadControllerWatches(mgr manager.Manager, uploadController controller.Controller) error {
×
710
        // Setup watches
×
711
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, &handler.TypedEnqueueRequestForObject[*corev1.PersistentVolumeClaim]{})); err != nil {
×
712
                return err
×
713
        }
×
714
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestForOwner[*corev1.Pod](
×
715
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
716
                return err
×
717
        }
×
718
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, handler.TypedEnqueueRequestForOwner[*corev1.Service](
×
719
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
720
                return err
×
721
        }
×
722

723
        return nil
×
724
}
725

726
func createScratchPvcNameFromPvc(pvc *corev1.PersistentVolumeClaim, isCloneTarget bool) string {
1✔
727
        if isCloneTarget {
2✔
728
                return ""
1✔
729
        }
1✔
730

731
        return naming.GetResourceName(pvc.Name, common.ScratchNameSuffix)
1✔
732
}
733

734
// getUploadResourceName returns the name given to upload resources
735
func getUploadResourceNameFromPvc(pvc *corev1.PersistentVolumeClaim) string {
1✔
736
        podName, ok := pvc.Annotations[AnnUploadPod]
1✔
737
        if ok {
2✔
738
                return podName
1✔
739
        }
1✔
740

741
        // fallback to legacy naming, in fact the following function is fully compatible with legacy
742
        // name concatenation "cdi-upload-{pvc.Name}" if the name length is under the size limits,
743
        return naming.GetResourceName(common.UploadPodName, pvc.Name)
1✔
744
}
745

746
// createUploadResourceName returns the name given to upload resources
747
func createUploadResourceName(name string) string {
1✔
748
        return naming.GetResourceName(common.UploadPodName, name)
1✔
749
}
1✔
750

751
// UploadPossibleForPVC is called by the api server to see whether to return an upload token
752
func UploadPossibleForPVC(pvc *corev1.PersistentVolumeClaim) error {
×
753
        if _, ok := pvc.Annotations[cc.AnnUploadRequest]; !ok {
×
754
                return errors.Errorf("PVC %s is not an upload target", pvc.Name)
×
755
        }
×
756
        return nil
×
757
}
758

759
// GetUploadServerURL returns the url the proxy should post to for a particular pvc
760
func GetUploadServerURL(namespace, pvc, uploadPath string) string {
1✔
761
        serviceName := createUploadServiceNameFromPvcName(pvc)
1✔
762
        return fmt.Sprintf("https://%s.%s.svc%s", serviceName, namespace, uploadPath)
1✔
763
}
1✔
764

765
// createUploadServiceName returns the name given to upload service shortened if needed
766
func createUploadServiceNameFromPvcName(pvc string) string {
1✔
767
        return naming.GetServiceNameFromResourceName(createUploadResourceName(pvc))
1✔
768
}
1✔
769

770
func (r *UploadReconciler) makeUploadPodSpec(args UploadPodArgs, resourceRequirements *corev1.ResourceRequirements, imagePullSecrets []corev1.LocalObjectReference, workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {
1✔
771
        pod := &corev1.Pod{
1✔
772
                ObjectMeta: metav1.ObjectMeta{
1✔
773
                        Name:      args.Name,
1✔
774
                        Namespace: args.PVC.Namespace,
1✔
775
                        Annotations: map[string]string{
1✔
776
                                annCreatedByUpload: "yes",
1✔
777
                        },
1✔
778
                        Labels: map[string]string{
1✔
779
                                common.CDILabelKey:              common.CDILabelValue,
1✔
780
                                common.CDIComponentLabel:        common.UploadServerCDILabel,
1✔
781
                                common.UploadServerServiceLabel: naming.GetServiceNameFromResourceName(args.Name),
1✔
782
                                common.UploadTargetLabel:        string(args.PVC.UID),
1✔
783
                        },
1✔
784
                        OwnerReferences: []metav1.OwnerReference{
1✔
785
                                MakePVCOwnerReference(args.PVC),
1✔
786
                        },
1✔
787
                },
1✔
788
                Spec: corev1.PodSpec{
1✔
789
                        Containers:        r.makeUploadPodContainers(args, resourceRequirements),
1✔
790
                        Volumes:           r.makeUploadPodVolumes(args),
1✔
791
                        RestartPolicy:     corev1.RestartPolicyOnFailure,
1✔
792
                        NodeSelector:      workloadNodePlacement.NodeSelector,
1✔
793
                        Tolerations:       workloadNodePlacement.Tolerations,
1✔
794
                        Affinity:          workloadNodePlacement.Affinity,
1✔
795
                        PriorityClassName: cc.GetPriorityClass(args.PVC),
1✔
796
                        ImagePullSecrets:  imagePullSecrets,
1✔
797
                },
1✔
798
        }
1✔
799

1✔
800
        cc.CopyAllowedAnnotations(args.PVC, pod)
1✔
801
        cc.SetNodeNameIfPopulator(args.PVC, &pod.Spec)
1✔
802
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
803

1✔
804
        return pod
1✔
805
}
1✔
806

807
func (r *UploadReconciler) makeUploadPodContainers(args UploadPodArgs, resourceRequirements *corev1.ResourceRequirements) []corev1.Container {
1✔
808
        requestImageSize, _ := cc.GetRequestedImageSize(args.PVC)
1✔
809
        containers := []corev1.Container{
1✔
810
                {
1✔
811
                        Name:            common.UploadServerPodname,
1✔
812
                        Image:           r.image,
1✔
813
                        ImagePullPolicy: corev1.PullPolicy(r.pullPolicy),
1✔
814
                        Env: []corev1.EnvVar{
1✔
815
                                {
1✔
816
                                        Name:  "TLS_KEY_FILE",
1✔
817
                                        Value: serverKeyFile,
1✔
818
                                },
1✔
819
                                {
1✔
820
                                        Name:  "TLS_CERT_FILE",
1✔
821
                                        Value: serverCertFile,
1✔
822
                                },
1✔
823
                                {
1✔
824
                                        Name:  "CLIENT_CERT_FILE",
1✔
825
                                        Value: clientCertFile,
1✔
826
                                },
1✔
827
                                {
1✔
828
                                        Name:  common.FilesystemOverheadVar,
1✔
829
                                        Value: args.FilesystemOverhead,
1✔
830
                                },
1✔
831
                                {
1✔
832
                                        Name:  common.UploadImageSize,
1✔
833
                                        Value: requestImageSize,
1✔
834
                                },
1✔
835
                                {
1✔
836
                                        Name:  "CLIENT_NAME",
1✔
837
                                        Value: args.ClientName,
1✔
838
                                },
1✔
839
                                {
1✔
840
                                        Name:  common.Preallocation,
1✔
841
                                        Value: args.Preallocation,
1✔
842
                                },
1✔
843
                                {
1✔
844
                                        Name:  common.CiphersTLSVar,
1✔
845
                                        Value: args.CryptoEnvVars.Ciphers,
1✔
846
                                },
1✔
847
                                {
1✔
848
                                        Name:  common.MinVersionTLSVar,
1✔
849
                                        Value: args.CryptoEnvVars.MinTLSVersion,
1✔
850
                                },
1✔
851
                        },
1✔
852
                        Args: []string{"-v=" + r.verbose},
1✔
853
                        ReadinessProbe: &corev1.Probe{
1✔
854
                                ProbeHandler: corev1.ProbeHandler{
1✔
855
                                        HTTPGet: &corev1.HTTPGetAction{
1✔
856
                                                Path: "/healthz",
1✔
857
                                                Port: intstr.IntOrString{
1✔
858
                                                        Type:   intstr.Int,
1✔
859
                                                        IntVal: 8443,
1✔
860
                                                },
1✔
861
                                                Scheme: corev1.URISchemeHTTPS,
1✔
862
                                        },
1✔
863
                                },
1✔
864
                                InitialDelaySeconds: 2,
1✔
865
                                PeriodSeconds:       5,
1✔
866
                        },
1✔
867
                        VolumeMounts: []corev1.VolumeMount{
1✔
868
                                {
1✔
869
                                        Name:      certVolName,
1✔
870
                                        MountPath: certMountPath,
1✔
871
                                },
1✔
872
                        },
1✔
873
                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
874
                },
1✔
875
        }
1✔
876
        if args.Deadline != nil {
2✔
877
                containers[0].Env = append(containers[0].Env, corev1.EnvVar{
1✔
878
                        Name:  "DEADLINE",
1✔
879
                        Value: args.Deadline.Format(time.RFC3339),
1✔
880
                })
1✔
881
        }
1✔
882
        if cc.GetVolumeMode(args.PVC) == corev1.PersistentVolumeBlock {
1✔
883
                containers[0].VolumeDevices = append(containers[0].VolumeDevices, corev1.VolumeDevice{
×
884
                        Name:       cc.DataVolName,
×
885
                        DevicePath: common.WriteBlockPath,
×
886
                })
×
887
                containers[0].Env = append(containers[0].Env, corev1.EnvVar{
×
888
                        Name:  "DESTINATION",
×
889
                        Value: common.WriteBlockPath,
×
890
                })
×
891
        } else {
1✔
892
                containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
1✔
893
                        Name:      cc.DataVolName,
1✔
894
                        MountPath: common.UploadServerDataDir,
1✔
895
                })
1✔
896
        }
1✔
897
        if args.ScratchPVCName != "" {
2✔
898
                containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
1✔
899
                        Name:      cc.ScratchVolName,
1✔
900
                        MountPath: common.ScratchDataDir,
1✔
901
                })
1✔
902
        }
1✔
903
        if resourceRequirements != nil {
2✔
904
                containers[0].Resources = *resourceRequirements
1✔
905
        }
1✔
906
        return containers
1✔
907
}
908

909
func (r *UploadReconciler) makeUploadPodVolumes(args UploadPodArgs) []corev1.Volume {
1✔
910
        volumes := []corev1.Volume{
1✔
911
                {
1✔
912
                        Name: certVolName,
1✔
913
                        VolumeSource: corev1.VolumeSource{
1✔
914
                                Secret: &corev1.SecretVolumeSource{
1✔
915
                                        SecretName: args.Name,
1✔
916
                                },
1✔
917
                        },
1✔
918
                },
1✔
919
                {
1✔
920
                        Name: cc.DataVolName,
1✔
921
                        VolumeSource: corev1.VolumeSource{
1✔
922
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
923
                                        ClaimName: args.PVC.Name,
1✔
924
                                        ReadOnly:  false,
1✔
925
                                },
1✔
926
                        },
1✔
927
                },
1✔
928
        }
1✔
929
        if args.ScratchPVCName != "" {
2✔
930
                volumes = append(volumes, corev1.Volume{
1✔
931
                        Name: cc.ScratchVolName,
1✔
932
                        VolumeSource: corev1.VolumeSource{
1✔
933
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
934
                                        ClaimName: args.ScratchPVCName,
1✔
935
                                        ReadOnly:  false,
1✔
936
                                },
1✔
937
                        },
1✔
938
                })
1✔
939
        }
1✔
940
        return volumes
1✔
941
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc