• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5903

23 Mar 2026 05:52PM UTC coverage: 49.333% (+0.05%) from 49.286%
#5903

push

travis-ci

web-flow
feat: set EnableServiceLinks=false on importer/cloner/uploader pods (#4067)

CDI pods fail with 'argument list too long' in namespaces with 2000+
Services because Kubernetes injects ~7 env vars per Service by default.
Set EnableServiceLinks=false on all CDI worker pod specs (importer,
clone source, upload server, forklift populator) to prevent this.

Same fix KubeVirt applied to virt-launcher pods (kubevirt/kubevirt#4393).

Fixes: https://github.com/kubevirt/containerized-data-importer/issues/4059

Signed-off-by: Jeff Holm <jeff.holm@gmail.com>

16 of 19 new or added lines in 5 files covered. (84.21%)

540 existing lines in 6 files now uncovered.

14757 of 29913 relevant lines covered (49.33%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.36
/pkg/controller/upload-controller.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/go-logr/logr"
28
        "github.com/pkg/errors"
29

30
        corev1 "k8s.io/api/core/v1"
31
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        runtime "k8s.io/apimachinery/pkg/runtime"
34
        "k8s.io/apimachinery/pkg/types"
35
        "k8s.io/apimachinery/pkg/util/intstr"
36
        "k8s.io/apimachinery/pkg/util/sets"
37
        "k8s.io/client-go/tools/record"
38
        "k8s.io/utils/ptr"
39

40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/controller"
42
        "sigs.k8s.io/controller-runtime/pkg/handler"
43
        "sigs.k8s.io/controller-runtime/pkg/manager"
44
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
45
        "sigs.k8s.io/controller-runtime/pkg/source"
46

47
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
48
        "kubevirt.io/containerized-data-importer/pkg/common"
49
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
50
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
51
        "kubevirt.io/containerized-data-importer/pkg/operator"
52
        "kubevirt.io/containerized-data-importer/pkg/util"
53
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
54
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
55
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
56
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
57
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
58
)
59

60
const (
61
        // AnnUploadClientName is the TLS name uploadserver will accept requests from
62
        AnnUploadClientName = "cdi.kubevirt.io/uploadClientName"
63

64
        // AnnUploadPod name of the upload pod
65
        AnnUploadPod = "cdi.kubevirt.io/storage.uploadPodName"
66

67
        annCreatedByUpload = "cdi.kubevirt.io/storage.createdByUploadController"
68

69
        uploadServerClientName = "client.upload-server.cdi.kubevirt.io"
70

71
        // UploadSucceededPVC provides a const to indicate an import to the PVC failed
72
        UploadSucceededPVC = "UploadSucceeded"
73

74
        // UploadTargetInUse is reason for event created when an upload pvc is in use
75
        UploadTargetInUse = "UploadTargetInUse"
76

77
        certVolName = "tls-config"
78

79
        certMountPath = "/etc/tls/"
80

81
        serverCertFile = certMountPath + "tls.crt"
82

83
        serverKeyFile = certMountPath + "tls.key"
84

85
        clientCertFile = certMountPath + "ca.crt"
86
)
87

88
// UploadReconciler members
89
type UploadReconciler struct {
90
        client              client.Client
91
        recorder            record.EventRecorder
92
        scheme              *runtime.Scheme
93
        log                 logr.Logger
94
        image               string
95
        verbose             string
96
        pullPolicy          string
97
        serverCertGenerator generator.CertGenerator
98
        clientCAFetcher     fetcher.CertBundleFetcher
99
        featureGates        featuregates.FeatureGates
100
        installerLabels     map[string]string
101
}
102

103
// UploadPodArgs are the parameters required to create an upload pod
104
type UploadPodArgs struct {
105
        Name                            string
106
        PVC                             *corev1.PersistentVolumeClaim
107
        ScratchPVCName                  string
108
        ClientName                      string
109
        FilesystemOverhead              string
110
        ServerCert, ServerKey, ClientCA []byte
111
        Preallocation                   string
112
        CryptoEnvVars                   CryptoEnvVars
113
        Deadline                        *time.Time
114
}
115

116
// CryptoEnvVars holds the TLS crypto-related configurables for the upload server
117
type CryptoEnvVars struct {
118
        Ciphers       string
119
        MinTLSVersion string
120
}
121

122
// Reconcile the reconcile loop for the CDIConfig object.
123
func (r *UploadReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
124
        log := r.log.WithValues("PVC", req.NamespacedName)
1✔
125
        log.V(1).Info("reconciling Upload PVCs")
1✔
126

1✔
127
        // Get the PVC.
1✔
128
        pvc := &corev1.PersistentVolumeClaim{}
1✔
129
        if err := r.client.Get(context.TODO(), req.NamespacedName, pvc); err != nil {
2✔
130
                if k8serrors.IsNotFound(err) {
2✔
131
                        return reconcile.Result{}, nil
1✔
132
                }
1✔
133
                return reconcile.Result{}, err
×
134
        }
135

136
        _, isUpload := pvc.Annotations[cc.AnnUploadRequest]
1✔
137
        _, isCloneTarget := pvc.Annotations[cc.AnnCloneRequest]
1✔
138

1✔
139
        if isUpload && isCloneTarget {
2✔
140
                log.V(1).Info("PVC has both clone and upload annotations")
1✔
141
                return reconcile.Result{}, errors.New("PVC has both clone and upload annotations")
1✔
142
        }
1✔
143

144
        if isUpload || isCloneTarget {
2✔
145
                if err := cc.UpdatePVCBoundContionFromEvents(pvc, r.client, log); err != nil {
1✔
146
                        return reconcile.Result{}, err
×
147
                }
×
148
        }
149

150
        shouldReconcile, err := r.shouldReconcile(isUpload, isCloneTarget, pvc, log)
1✔
151
        if err != nil {
1✔
152
                return reconcile.Result{}, err
×
153
        }
×
154
        // force cleanup if PVC pending delete and pod running or the upload/clone annotation was removed
155
        if !shouldReconcile || podSucceededFromPVC(pvc) || pvc.DeletionTimestamp != nil {
2✔
156
                log.V(1).Info("not doing anything with PVC",
1✔
157
                        "isUpload", isUpload,
1✔
158
                        "isCloneTarget", isCloneTarget,
1✔
159
                        "isBound", isBound(pvc, log),
1✔
160
                        "podSucceededFromPVC", podSucceededFromPVC(pvc),
1✔
161
                        "deletionTimeStamp set?", pvc.DeletionTimestamp != nil)
1✔
162
                if err := r.cleanup(pvc); err != nil {
1✔
163
                        return reconcile.Result{}, err
×
164
                }
×
165
                return reconcile.Result{}, nil
1✔
166
        }
167

168
        log.Info("Calling Upload reconcile PVC")
1✔
169
        return r.reconcilePVC(log, pvc, isCloneTarget)
1✔
170
}
171

172
func (r *UploadReconciler) shouldReconcile(isUpload bool, isCloneTarget bool, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, error) {
1✔
173
        waitForFirstConsumerEnabled, err := cc.IsWaitForFirstConsumerEnabled(pvc, r.featureGates)
1✔
174
        if err != nil {
1✔
175
                return false, err
×
176
        }
×
177

178
        return (isUpload || isCloneTarget) &&
1✔
179
                        shouldHandlePvc(pvc, waitForFirstConsumerEnabled, log),
1✔
180
                nil
1✔
181
}
182

183
func (r *UploadReconciler) reconcilePVC(log logr.Logger, pvc *corev1.PersistentVolumeClaim, isCloneTarget bool) (reconcile.Result, error) {
1✔
184
        var uploadClientName string
1✔
185
        pvcCopy := pvc.DeepCopy()
1✔
186
        anno := pvcCopy.Annotations
1✔
187

1✔
188
        if isCloneTarget {
2✔
189
                source, err := r.getCloneRequestSourcePVC(pvc)
1✔
190
                if err != nil {
2✔
191
                        return reconcile.Result{}, err
1✔
192
                }
1✔
193
                contentType, err := ValidateCanCloneSourceAndTargetContentType(source, pvc)
1✔
194
                if err != nil {
1✔
195
                        return reconcile.Result{}, err
×
196
                }
×
197
                if err = ValidateCanCloneSourceAndTargetSpec(context.TODO(), r.client, source, pvc, contentType); err != nil {
1✔
198
                        log.Error(err, "Error validating clone spec, ignoring")
×
199
                        r.recorder.Eventf(pvc, corev1.EventTypeWarning, cc.ErrIncompatiblePVC, err.Error())
×
200
                        return reconcile.Result{}, nil
×
201
                }
×
202

203
                uploadClientName = fmt.Sprintf("%s/%s-%s/%s", source.Namespace, source.Name, pvc.Namespace, pvc.Name)
1✔
204
                anno[AnnUploadClientName] = uploadClientName
1✔
205
        } else {
1✔
206
                uploadClientName = uploadServerClientName
1✔
207
        }
1✔
208

209
        pod, err := r.findUploadPodForPvc(pvc)
1✔
210
        if err != nil {
2✔
211
                return reconcile.Result{}, err
1✔
212
        }
1✔
213

214
        if pod == nil {
2✔
215
                podsUsingPVC, err := cc.GetPodsUsingPVCs(context.TODO(), r.client, pvc.Namespace, sets.New(pvc.Name), false)
1✔
216
                if err != nil {
1✔
217
                        return reconcile.Result{}, err
×
218
                }
×
219

220
                if len(podsUsingPVC) > 0 {
2✔
221
                        es, err := cc.GetAnnotatedEventSource(context.TODO(), r.client, pvc)
1✔
222
                        if err != nil {
1✔
223
                                return reconcile.Result{}, err
×
224
                        }
×
225

226
                        for _, pod := range podsUsingPVC {
2✔
227
                                log.V(1).Info("can't create upload pod, pvc in use by other pod",
1✔
228
                                        "namespace", pvc.Namespace, "name", pvc.Name, "pod", pod.Name)
1✔
229
                                r.recorder.Eventf(es, corev1.EventTypeWarning, UploadTargetInUse,
1✔
230
                                        "pod %s/%s using PersistentVolumeClaim %s", pod.Namespace, pod.Name, pvc.Name)
1✔
231
                        }
1✔
232
                        return reconcile.Result{Requeue: true}, nil
1✔
233
                }
234

235
                podName, ok := pvc.Annotations[AnnUploadPod]
1✔
236

1✔
237
                if !ok {
2✔
238
                        podName = createUploadResourceName(pvc.Name)
1✔
239
                        if err := r.updatePvcPodName(pvc, podName, log); err != nil {
1✔
240
                                return reconcile.Result{}, err
×
241
                        }
×
242
                        return reconcile.Result{Requeue: true}, nil
1✔
243
                }
244
                pod, err = r.createUploadPodForPvc(pvc, podName, uploadClientName, isCloneTarget)
1✔
245
                if err != nil {
1✔
246
                        return reconcile.Result{}, err
×
247
                }
×
248
        }
249

250
        // Always try to get or create the scratch PVC for a pod that is not successful yet, if it exists nothing happens otherwise attempt to create.
251
        scratchPVCName, exists := getScratchNameFromPod(pod)
1✔
252
        if exists {
2✔
253
                _, err := r.getOrCreateScratchPvc(pvcCopy, pod, scratchPVCName)
1✔
254
                if err != nil {
1✔
255
                        return reconcile.Result{}, err
×
256
                }
×
257
        }
258

259
        svcName := naming.GetServiceNameFromResourceName(pod.Name)
1✔
260
        if _, err = r.getOrCreateUploadService(pvc, svcName); err != nil {
2✔
261
                return reconcile.Result{}, err
1✔
262
        }
1✔
263

264
        termMsg, err := parseTerminationMessage(pod)
1✔
265
        if err != nil {
1✔
266
                return reconcile.Result{}, err
×
267
        }
×
268

269
        deadlinePassed := termMsg != nil && termMsg.DeadlinePassed != nil && *termMsg.DeadlinePassed
1✔
270
        if deadlinePassed {
2✔
271
                if pod.DeletionTimestamp == nil {
2✔
272
                        log.V(1).Info("Deleting pod because deadline exceeded")
1✔
273
                        if err := r.client.Delete(context.TODO(), pod); err != nil {
1✔
274
                                return reconcile.Result{}, err
×
275
                        }
×
276
                }
277

278
                anno[cc.AnnPodPhase] = ""
1✔
279
                anno[cc.AnnPodReady] = "false"
1✔
280
        } else {
1✔
281
                anno[cc.AnnPodPhase] = string(pod.Status.Phase)
1✔
282
                anno[cc.AnnPodReady] = strconv.FormatBool(isPodReady(pod))
1✔
283
        }
1✔
284

285
        setAnnotationsFromPodWithPrefix(anno, pod, termMsg, cc.AnnRunningCondition)
1✔
286

1✔
287
        if !reflect.DeepEqual(pvc, pvcCopy) {
2✔
288
                if err := r.updatePVC(pvcCopy); err != nil {
1✔
289
                        return reconcile.Result{}, err
×
290
                }
×
291
                if podSucceededFromPVC(pvcCopy) && !isCloneTarget {
1✔
292
                        // Upload completed, emit event. clone controller will emit clone complete.
×
293
                        r.recorder.Event(pvc, corev1.EventTypeNormal, UploadSucceededPVC, "Upload Successful")
×
294
                }
×
295
        }
296

297
        return reconcile.Result{}, nil
1✔
298
}
299

300
func (r *UploadReconciler) updatePvcPodName(pvc *corev1.PersistentVolumeClaim, podName string, log logr.Logger) error {
1✔
301
        currentPvcCopy := pvc.DeepCopyObject()
1✔
302

1✔
303
        log.V(1).Info("Updating PVC from pod")
1✔
304
        anno := pvc.GetAnnotations()
1✔
305
        anno[AnnUploadPod] = podName
1✔
306

1✔
307
        if !reflect.DeepEqual(currentPvcCopy, pvc) {
2✔
308
                if err := r.updatePVC(pvc); err != nil {
1✔
309
                        return err
×
310
                }
×
311
                log.V(1).Info("Updated PVC", "pvc.anno.AnnImportPod", anno[AnnUploadPod])
1✔
312
        }
313
        return nil
1✔
314
}
315

316
func (r *UploadReconciler) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
317
        r.log.V(1).Info("Phase is now", "pvc.anno.Phase", pvc.GetAnnotations()[cc.AnnPodPhase])
1✔
318
        if err := r.client.Update(context.TODO(), pvc); err != nil {
1✔
319
                return err
×
320
        }
×
321
        return nil
1✔
322
}
323

324
func (r *UploadReconciler) getCloneRequestSourcePVC(targetPvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
1✔
325
        sourceVolumeMode := corev1.PersistentVolumeFilesystem
1✔
326
        targetVolumeMode := corev1.PersistentVolumeFilesystem
1✔
327

1✔
328
        exists, namespace, name := ParseCloneRequestAnnotation(targetPvc)
1✔
329
        if !exists {
1✔
330
                return nil, errors.New("error parsing clone request annotation")
×
331
        }
×
332
        sourcePvc := &corev1.PersistentVolumeClaim{}
1✔
333
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, sourcePvc); err != nil {
1✔
334
                return nil, errors.Wrap(err, "error getting clone source PVC")
×
335
        }
×
336
        if sourcePvc.Spec.VolumeMode != nil {
2✔
337
                sourceVolumeMode = *sourcePvc.Spec.VolumeMode
1✔
338
        }
1✔
339
        if targetPvc.Spec.VolumeMode != nil {
1✔
340
                targetVolumeMode = *targetPvc.Spec.VolumeMode
×
341
        }
×
342
        // Allow different source and target volume modes only on KubeVirt content type
343
        contentType, err := ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc)
1✔
344
        if err != nil {
2✔
345
                return nil, err
1✔
346
        }
1✔
347
        if sourceVolumeMode != targetVolumeMode && contentType != cdiv1.DataVolumeKubeVirt {
2✔
348
                return nil, errors.New("Source and target volume modes do not match, and content type is not kubevirt")
1✔
349
        }
1✔
350
        return sourcePvc, nil
1✔
351
}
352

353
func (r *UploadReconciler) cleanup(pvc *corev1.PersistentVolumeClaim) error {
1✔
354
        resourceName := getUploadResourceNameFromPvc(pvc)
1✔
355
        svcName := naming.GetServiceNameFromResourceName(resourceName)
1✔
356

1✔
357
        // delete service
1✔
358
        if err := r.deleteService(pvc.Namespace, svcName); err != nil {
1✔
359
                return err
×
360
        }
×
361

362
        // delete pod
363
        pod := &corev1.Pod{}
1✔
364
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: resourceName, Namespace: pvc.Namespace}, pod); err != nil {
2✔
365
                if k8serrors.IsNotFound(err) {
2✔
366
                        return nil
1✔
367
                }
1✔
368
                return err
×
369
        }
370
        if pod.DeletionTimestamp == nil && cc.ShouldDeletePod(pvc) {
2✔
371
                if err := r.client.Delete(context.TODO(), pod); cc.IgnoreNotFound(err) != nil {
1✔
372
                        return err
×
373
                }
×
374
        }
375
        return nil
1✔
376
}
377
func (r *UploadReconciler) findUploadPodForPvc(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
1✔
378
        podName := getUploadResourceNameFromPvc(pvc)
1✔
379
        pod := &corev1.Pod{}
1✔
380
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: podName, Namespace: pvc.Namespace}, pod); err != nil {
2✔
381
                if !k8serrors.IsNotFound(err) {
1✔
382
                        return nil, errors.Wrapf(err, "error getting upload pod %s/%s", pvc.Namespace, podName)
×
383
                }
×
384
                return nil, nil
1✔
385
        }
386

387
        if !metav1.IsControlledBy(pod, pvc) {
2✔
388
                return nil, errors.Errorf("%s pod not controlled by pvc %s", podName, pvc.Name)
1✔
389
        }
1✔
390

391
        return pod, nil
1✔
392
}
393

394
func (r *UploadReconciler) createUploadPodForPvc(pvc *corev1.PersistentVolumeClaim, podName, clientName string, isCloneTarget bool) (*corev1.Pod, error) {
1✔
395
        certConfig, err := operator.GetCertConfigWithDefaults(context.TODO(), r.client)
1✔
396
        if err != nil {
1✔
397
                return nil, err
×
398
        }
×
399

400
        serverCert, serverKey, err := r.serverCertGenerator.MakeServerCert(
1✔
401
                pvc.Namespace,
1✔
402
                naming.GetServiceNameFromResourceName(podName),
1✔
403
                certConfig.Server.Duration.Duration,
1✔
404
        )
1✔
405
        if err != nil {
1✔
406
                return nil, err
×
407
        }
×
408

409
        clientCA, err := r.clientCAFetcher.BundleBytes()
1✔
410
        if err != nil {
1✔
411
                return nil, err
×
412
        }
×
413

414
        fsOverhead, err := GetFilesystemOverhead(context.TODO(), r.client, pvc)
1✔
415
        if err != nil {
1✔
416
                return nil, err
×
417
        }
×
418

419
        preallocationRequested := false
1✔
420
        if preallocation, err := strconv.ParseBool(getValueFromAnnotation(pvc, cc.AnnPreallocationRequested)); err == nil {
1✔
421
                preallocationRequested = preallocation
×
422
        }
×
423

424
        config := &cdiv1.CDIConfig{}
1✔
425
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, config); err != nil {
1✔
426
                return nil, err
×
427
        }
×
428
        ciphers, minTLSVersion := cryptowatch.SelectCipherSuitesAndMinTLSVersion(config.Spec.TLSSecurityProfile)
1✔
429
        cryptoVars := CryptoEnvVars{
1✔
430
                Ciphers:       strings.Join(ciphers, ","),
1✔
431
                MinTLSVersion: string(minTLSVersion),
1✔
432
        }
1✔
433

1✔
434
        serverRefresh := certConfig.Server.Duration.Duration - certConfig.Server.RenewBefore.Duration
1✔
435
        clientRefresh := certConfig.Client.Duration.Duration - certConfig.Client.RenewBefore.Duration
1✔
436

1✔
437
        args := UploadPodArgs{
1✔
438
                Name:               podName,
1✔
439
                PVC:                pvc,
1✔
440
                ScratchPVCName:     createScratchPvcNameFromPvc(pvc, isCloneTarget),
1✔
441
                ClientName:         clientName,
1✔
442
                FilesystemOverhead: string(fsOverhead),
1✔
443
                ServerCert:         serverCert,
1✔
444
                ServerKey:          serverKey,
1✔
445
                ClientCA:           clientCA,
1✔
446
                Preallocation:      strconv.FormatBool(preallocationRequested),
1✔
447
                CryptoEnvVars:      cryptoVars,
1✔
448
                Deadline:           ptr.To(time.Now().Add(min(serverRefresh, clientRefresh))),
1✔
449
        }
1✔
450

1✔
451
        r.log.V(3).Info("Creating upload pod")
1✔
452
        pod, err := r.createUploadPod(args)
1✔
453
        // Check if pod has failed and, in that case, record an event with the error
1✔
454
        if podErr := cc.HandleFailedPod(err, podName, pvc, r.recorder, r.client); podErr != nil {
1✔
455
                return nil, podErr
×
456
        }
×
457

458
        if err := r.ensureCertSecret(args, pod); err != nil {
1✔
459
                return nil, err
×
460
        }
×
461

462
        return pod, nil
1✔
463
}
464

465
func (r *UploadReconciler) getOrCreateScratchPvc(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod, name string) (*corev1.PersistentVolumeClaim, error) {
1✔
466
        // Set condition, then check if need to override with scratch pvc message
1✔
467
        anno := pvc.Annotations
1✔
468
        scratchPvc := &corev1.PersistentVolumeClaim{}
1✔
469
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: pvc.Namespace}, scratchPvc); err != nil {
2✔
470
                if !k8serrors.IsNotFound(err) {
1✔
471
                        return nil, errors.Wrap(err, "error getting scratch PVC")
×
472
                }
×
473

474
                storageClassName := GetScratchPvcStorageClass(r.client, pvc)
1✔
475

1✔
476
                anno[cc.AnnBoundCondition] = "false"
1✔
477
                anno[cc.AnnBoundConditionMessage] = "Creating scratch space"
1✔
478
                anno[cc.AnnBoundConditionReason] = creatingScratch
1✔
479
                // Scratch PVC doesn't exist yet, create it.
1✔
480
                scratchPvc, err = createScratchPersistentVolumeClaim(r.client, pvc, pod, name, storageClassName, map[string]string{}, r.recorder)
1✔
481
                if err != nil {
1✔
482
                        return nil, err
×
483
                }
×
484
        } else {
×
485
                if !metav1.IsControlledBy(scratchPvc, pod) {
×
486
                        return nil, errors.Errorf("%s scratch PVC not controlled by pod %s", scratchPvc.Name, pod.Name)
×
487
                }
×
488
        }
489

490
        return scratchPvc, nil
1✔
491
}
492

493
func (r *UploadReconciler) getOrCreateUploadService(pvc *corev1.PersistentVolumeClaim, name string) (*corev1.Service, error) {
1✔
494
        service := &corev1.Service{}
1✔
495
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: pvc.Namespace}, service); err != nil {
2✔
496
                if !k8serrors.IsNotFound(err) {
1✔
497
                        return nil, errors.Wrap(err, "error getting upload service")
×
498
                }
×
499
                service, err = r.createUploadService(name, pvc)
1✔
500
                if err != nil {
1✔
501
                        return nil, err
×
502
                }
×
503
        }
504

505
        if !metav1.IsControlledBy(service, pvc) {
2✔
506
                return nil, errors.Errorf("%s service not controlled by pvc %s", name, pvc.Name)
1✔
507
        }
1✔
508

509
        return service, nil
1✔
510
}
511

512
func (r *UploadReconciler) deleteService(namespace, serviceName string) error {
1✔
513
        service := &corev1.Service{}
1✔
514
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: serviceName, Namespace: namespace}, service); err != nil {
2✔
515
                if k8serrors.IsNotFound(err) {
2✔
516
                        return nil
1✔
517
                }
1✔
518
                return err
×
519
        }
520

521
        if service.DeletionTimestamp == nil {
2✔
522
                if err := r.client.Delete(context.TODO(), service); cc.IgnoreNotFound(err) != nil {
1✔
523
                        return errors.Wrap(err, "error deleting upload service")
×
524
                }
×
525
        }
526

527
        return nil
1✔
528
}
529

530
func isPodReady(pod *corev1.Pod) bool {
1✔
531
        if len(pod.Status.ContainerStatuses) == 0 {
2✔
532
                return false
1✔
533
        }
1✔
534

535
        numReady := 0
1✔
536
        for _, s := range pod.Status.ContainerStatuses {
2✔
537
                if s.Ready {
1✔
538
                        numReady++
×
539
                }
×
540
        }
541

542
        return numReady == len(pod.Status.ContainerStatuses)
1✔
543
}
544

545
// createUploadService creates an upload service manifest and sends it to server
546
func (r *UploadReconciler) createUploadService(name string, pvc *corev1.PersistentVolumeClaim) (*corev1.Service, error) {
1✔
547
        ns := pvc.Namespace
1✔
548
        service := r.makeUploadServiceSpec(name, pvc)
1✔
549
        util.SetRecommendedLabels(service, r.installerLabels, "cdi-controller")
1✔
550

1✔
551
        if err := r.client.Create(context.TODO(), service); err != nil {
1✔
552
                if k8serrors.IsAlreadyExists(err) {
×
553
                        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, service); err != nil {
×
554
                                return nil, errors.Wrap(err, "upload service should exist but couldn't retrieve it")
×
555
                        }
×
556
                } else {
×
557
                        return nil, errors.Wrap(err, "upload service API create errored")
×
558
                }
×
559
        }
560
        r.log.V(1).Info("upload service created\n", "Namespace", service.Namespace, "Name", service.Name)
1✔
561
        return service, nil
1✔
562
}
563

564
// makeUploadServiceSpec creates upload service manifest
565
func (r *UploadReconciler) makeUploadServiceSpec(name string, pvc *corev1.PersistentVolumeClaim) *corev1.Service {
1✔
566
        blockOwnerDeletion := true
1✔
567
        isController := true
1✔
568
        service := &corev1.Service{
1✔
569
                TypeMeta: metav1.TypeMeta{
1✔
570
                        Kind:       "Service",
1✔
571
                        APIVersion: "v1",
1✔
572
                },
1✔
573
                ObjectMeta: metav1.ObjectMeta{
1✔
574
                        Name:      name,
1✔
575
                        Namespace: pvc.Namespace,
1✔
576
                        Annotations: map[string]string{
1✔
577
                                annCreatedByUpload: "yes",
1✔
578
                        },
1✔
579
                        Labels: map[string]string{
1✔
580
                                common.CDILabelKey:       common.CDILabelValue,
1✔
581
                                common.CDIComponentLabel: common.UploadServerCDILabel,
1✔
582
                        },
1✔
583
                        OwnerReferences: []metav1.OwnerReference{
1✔
584
                                {
1✔
585
                                        APIVersion:         "v1",
1✔
586
                                        Kind:               "PersistentVolumeClaim",
1✔
587
                                        Name:               pvc.Name,
1✔
588
                                        UID:                pvc.GetUID(),
1✔
589
                                        BlockOwnerDeletion: &blockOwnerDeletion,
1✔
590
                                        Controller:         &isController,
1✔
591
                                },
1✔
592
                        },
1✔
593
                },
1✔
594
                Spec: corev1.ServiceSpec{
1✔
595
                        ClusterIP: corev1.ClusterIPNone,
1✔
596
                        Ports: []corev1.ServicePort{
1✔
597
                                {
1✔
598
                                        Protocol:   corev1.ProtocolTCP,
1✔
599
                                        Port:       8443,
1✔
600
                                        TargetPort: intstr.FromInt32(8443),
1✔
601
                                },
1✔
602
                        },
1✔
603
                        Selector: map[string]string{
1✔
604
                                common.UploadServerServiceLabel: name,
1✔
605
                        },
1✔
606
                },
1✔
607
        }
1✔
608
        return service
1✔
609
}
1✔
610

611
// createUploadPod creates upload service pod manifest and sends to server
612
func (r *UploadReconciler) createUploadPod(args UploadPodArgs) (*corev1.Pod, error) {
1✔
613
        ns := args.PVC.Namespace
1✔
614

1✔
615
        podResourceRequirements, err := cc.GetDefaultPodResourceRequirements(r.client)
1✔
616
        if err != nil {
1✔
617
                return nil, err
×
618
        }
×
619

620
        imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
1✔
621
        if err != nil {
1✔
622
                return nil, err
×
623
        }
×
624

625
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), r.client)
1✔
626
        if err != nil {
1✔
627
                return nil, err
×
628
        }
×
629

630
        pod := r.makeUploadPodSpec(args, podResourceRequirements, imagePullSecrets, workloadNodePlacement)
1✔
631
        util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")
1✔
632

1✔
633
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: args.Name, Namespace: ns}, pod); err != nil {
2✔
634
                if !k8serrors.IsNotFound(err) {
1✔
635
                        return nil, errors.Wrap(err, "upload pod should exist but couldn't retrieve it")
×
636
                }
×
637
                if err := r.client.Create(context.TODO(), pod); err != nil {
1✔
638
                        return nil, err
×
639
                }
×
640
        }
641

642
        r.log.V(1).Info("upload pod created\n", "Namespace", pod.Namespace, "Name", pod.Name, "Image name", r.image)
1✔
643
        return pod, nil
1✔
644
}
645

646
func (r *UploadReconciler) ensureCertSecret(args UploadPodArgs, pod *corev1.Pod) error {
1✔
647
        if pod.Status.Phase == corev1.PodRunning {
1✔
648
                return nil
×
649
        }
×
650

651
        secret := &corev1.Secret{
1✔
652
                ObjectMeta: metav1.ObjectMeta{
1✔
653
                        Name:      args.Name,
1✔
654
                        Namespace: pod.Namespace,
1✔
655
                        Annotations: map[string]string{
1✔
656
                                annCreatedByUpload: "yes",
1✔
657
                        },
1✔
658
                        Labels: map[string]string{
1✔
659
                                common.CDILabelKey:       common.CDILabelValue,
1✔
660
                                common.CDIComponentLabel: common.UploadServerCDILabel,
1✔
661
                        },
1✔
662
                        OwnerReferences: []metav1.OwnerReference{
1✔
663
                                MakePodOwnerReference(pod),
1✔
664
                        },
1✔
665
                },
1✔
666
                Data: map[string][]byte{
1✔
667
                        "tls.key": args.ServerKey,
1✔
668
                        "tls.crt": args.ServerCert,
1✔
669
                        "ca.crt":  args.ClientCA,
1✔
670
                },
1✔
671
        }
1✔
672

1✔
673
        util.SetRecommendedLabels(secret, r.installerLabels, "cdi-controller")
1✔
674

1✔
675
        err := r.client.Create(context.TODO(), secret)
1✔
676
        if err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
677
                return errors.Wrap(err, "error creating cert secret")
×
678
        }
×
679

680
        return nil
1✔
681
}
682

683
// NewUploadController creates a new instance of the upload controller.
684
func NewUploadController(mgr manager.Manager, log logr.Logger, uploadImage, pullPolicy, verbose string, serverCertGenerator generator.CertGenerator, clientCAFetcher fetcher.CertBundleFetcher, installerLabels map[string]string) (controller.Controller, error) {
×
685
        client := mgr.GetClient()
×
686
        reconciler := &UploadReconciler{
×
687
                client:              client,
×
688
                scheme:              mgr.GetScheme(),
×
689
                log:                 log.WithName("upload-controller"),
×
690
                image:               uploadImage,
×
691
                verbose:             verbose,
×
692
                pullPolicy:          pullPolicy,
×
693
                recorder:            mgr.GetEventRecorderFor("upload-controller"),
×
694
                serverCertGenerator: serverCertGenerator,
×
695
                clientCAFetcher:     clientCAFetcher,
×
696
                featureGates:        featuregates.NewFeatureGates(client),
×
697
                installerLabels:     installerLabels,
×
698
        }
×
699
        uploadController, err := controller.New("upload-controller", mgr, controller.Options{
×
700
                MaxConcurrentReconciles: 3,
×
701
                Reconciler:              reconciler,
×
702
        })
×
703
        if err != nil {
×
704
                return nil, err
×
705
        }
×
706
        if err := addUploadControllerWatches(mgr, uploadController); err != nil {
×
707
                return nil, err
×
708
        }
×
709

710
        return uploadController, nil
×
711
}
712

713
func addUploadControllerWatches(mgr manager.Manager, uploadController controller.Controller) error {
×
714
        // Setup watches
×
715
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, &handler.TypedEnqueueRequestForObject[*corev1.PersistentVolumeClaim]{})); err != nil {
×
716
                return err
×
717
        }
×
718
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestForOwner[*corev1.Pod](
×
719
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
720
                return err
×
721
        }
×
722
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, handler.TypedEnqueueRequestForOwner[*corev1.Service](
×
723
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
724
                return err
×
725
        }
×
726

727
        return nil
×
728
}
729

730
func createScratchPvcNameFromPvc(pvc *corev1.PersistentVolumeClaim, isCloneTarget bool) string {
1✔
731
        if isCloneTarget {
2✔
732
                return ""
1✔
733
        }
1✔
734

735
        return naming.GetResourceName(pvc.Name, common.ScratchNameSuffix)
1✔
736
}
737

738
// getUploadResourceName returns the name given to upload resources
739
func getUploadResourceNameFromPvc(pvc *corev1.PersistentVolumeClaim) string {
1✔
740
        podName, ok := pvc.Annotations[AnnUploadPod]
1✔
741
        if ok {
2✔
742
                return podName
1✔
743
        }
1✔
744

745
        // fallback to legacy naming, in fact the following function is fully compatible with legacy
746
        // name concatenation "cdi-upload-{pvc.Name}" if the name length is under the size limits,
747
        return naming.GetResourceName(common.UploadPodName, pvc.Name)
1✔
748
}
749

750
// createUploadResourceName returns the name given to upload resources
751
func createUploadResourceName(name string) string {
1✔
752
        return naming.GetResourceName(common.UploadPodName, name)
1✔
753
}
1✔
754

755
// UploadPossibleForPVC is called by the api server to see whether to return an upload token
756
func UploadPossibleForPVC(pvc *corev1.PersistentVolumeClaim) error {
×
757
        if _, ok := pvc.Annotations[cc.AnnUploadRequest]; !ok {
×
758
                return errors.Errorf("PVC %s is not an upload target", pvc.Name)
×
759
        }
×
760
        return nil
×
761
}
762

763
// GetUploadServerURL returns the url the proxy should post to for a particular pvc
764
func GetUploadServerURL(namespace, pvc, uploadPath string) string {
1✔
765
        serviceName := createUploadServiceNameFromPvcName(pvc)
1✔
766
        return fmt.Sprintf("https://%s.%s.svc:8443%s", serviceName, namespace, uploadPath)
1✔
767
}
1✔
768

769
// createUploadServiceName returns the name given to upload service shortened if needed
770
func createUploadServiceNameFromPvcName(pvc string) string {
1✔
771
        return naming.GetServiceNameFromResourceName(createUploadResourceName(pvc))
1✔
772
}
1✔
773

774
func (r *UploadReconciler) makeUploadPodSpec(args UploadPodArgs, resourceRequirements *corev1.ResourceRequirements, imagePullSecrets []corev1.LocalObjectReference, workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {
1✔
775
        pod := &corev1.Pod{
1✔
776
                ObjectMeta: metav1.ObjectMeta{
1✔
777
                        Name:      args.Name,
1✔
778
                        Namespace: args.PVC.Namespace,
1✔
779
                        Annotations: map[string]string{
1✔
780
                                annCreatedByUpload: "yes",
1✔
781
                        },
1✔
782
                        Labels: map[string]string{
1✔
783
                                common.CDILabelKey:              common.CDILabelValue,
1✔
784
                                common.CDIComponentLabel:        common.UploadServerCDILabel,
1✔
785
                                common.UploadServerServiceLabel: naming.GetServiceNameFromResourceName(args.Name),
1✔
786
                                common.UploadTargetLabel:        string(args.PVC.UID),
1✔
787
                        },
1✔
788
                        OwnerReferences: []metav1.OwnerReference{
1✔
789
                                MakePVCOwnerReference(args.PVC),
1✔
790
                        },
1✔
791
                },
1✔
792
                Spec: corev1.PodSpec{
1✔
793
                        Containers:         r.makeUploadPodContainers(args, resourceRequirements),
1✔
794
                        Volumes:            r.makeUploadPodVolumes(args),
1✔
795
                        RestartPolicy:      corev1.RestartPolicyOnFailure,
1✔
796
                        NodeSelector:       workloadNodePlacement.NodeSelector,
1✔
797
                        Tolerations:        workloadNodePlacement.Tolerations,
1✔
798
                        Affinity:           workloadNodePlacement.Affinity,
1✔
799
                        PriorityClassName:  cc.GetPriorityClass(args.PVC),
1✔
800
                        ServiceAccountName: cc.GetPodServiceAccount(args.PVC),
1✔
801
                        ImagePullSecrets:   imagePullSecrets,
1✔
802
                        // https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables
1✔
803
                        // Disable service environment variable injection to avoid 'argument list too long'
1✔
804
                        // errors in namespaces with many Services (each injects ~7 env vars).
1✔
805
                        EnableServiceLinks: ptr.To(false),
1✔
806
                },
1✔
807
        }
1✔
808

1✔
809
        cc.CopyAllowedAnnotations(args.PVC, pod)
1✔
810
        cc.SetNodeNameIfPopulator(args.PVC, &pod.Spec)
1✔
811
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
812

1✔
813
        return pod
1✔
814
}
1✔
815

816
func (r *UploadReconciler) makeUploadPodContainers(args UploadPodArgs, resourceRequirements *corev1.ResourceRequirements) []corev1.Container {
1✔
817
        requestImageSize, _ := cc.GetRequestedImageSize(args.PVC)
1✔
818
        containers := []corev1.Container{
1✔
819
                {
1✔
820
                        Name:            common.UploadServerPodname,
1✔
821
                        Image:           r.image,
1✔
822
                        ImagePullPolicy: corev1.PullPolicy(r.pullPolicy),
1✔
823
                        Env: []corev1.EnvVar{
1✔
824
                                {
1✔
825
                                        Name:  "TLS_KEY_FILE",
1✔
826
                                        Value: serverKeyFile,
1✔
827
                                },
1✔
828
                                {
1✔
829
                                        Name:  "TLS_CERT_FILE",
1✔
830
                                        Value: serverCertFile,
1✔
831
                                },
1✔
832
                                {
1✔
833
                                        Name:  "CLIENT_CERT_FILE",
1✔
834
                                        Value: clientCertFile,
1✔
835
                                },
1✔
836
                                {
1✔
837
                                        Name:  common.FilesystemOverheadVar,
1✔
838
                                        Value: args.FilesystemOverhead,
1✔
839
                                },
1✔
840
                                {
1✔
841
                                        Name:  common.UploadImageSize,
1✔
842
                                        Value: requestImageSize,
1✔
843
                                },
1✔
844
                                {
1✔
845
                                        Name:  "CLIENT_NAME",
1✔
846
                                        Value: args.ClientName,
1✔
847
                                },
1✔
848
                                {
1✔
849
                                        Name:  common.Preallocation,
1✔
850
                                        Value: args.Preallocation,
1✔
851
                                },
1✔
852
                                {
1✔
853
                                        Name:  common.CiphersTLSVar,
1✔
854
                                        Value: args.CryptoEnvVars.Ciphers,
1✔
855
                                },
1✔
856
                                {
1✔
857
                                        Name:  common.MinVersionTLSVar,
1✔
858
                                        Value: args.CryptoEnvVars.MinTLSVersion,
1✔
859
                                },
1✔
860
                        },
1✔
861
                        Args: []string{"-v=" + r.verbose},
1✔
862
                        ReadinessProbe: &corev1.Probe{
1✔
863
                                ProbeHandler: corev1.ProbeHandler{
1✔
864
                                        HTTPGet: &corev1.HTTPGetAction{
1✔
865
                                                Path: "/healthz",
1✔
866
                                                Port: intstr.IntOrString{
1✔
867
                                                        Type:   intstr.Int,
1✔
868
                                                        IntVal: 8443,
1✔
869
                                                },
1✔
870
                                                Scheme: corev1.URISchemeHTTPS,
1✔
871
                                        },
1✔
872
                                },
1✔
873
                                InitialDelaySeconds: 2,
1✔
874
                                PeriodSeconds:       5,
1✔
875
                        },
1✔
876
                        VolumeMounts: []corev1.VolumeMount{
1✔
877
                                {
1✔
878
                                        Name:      certVolName,
1✔
879
                                        MountPath: certMountPath,
1✔
880
                                },
1✔
881
                        },
1✔
882
                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
883
                },
1✔
884
        }
1✔
885
        if args.Deadline != nil {
2✔
886
                containers[0].Env = append(containers[0].Env, corev1.EnvVar{
1✔
887
                        Name:  "DEADLINE",
1✔
888
                        Value: args.Deadline.Format(time.RFC3339),
1✔
889
                })
1✔
890
        }
1✔
891
        if cc.GetVolumeMode(args.PVC) == corev1.PersistentVolumeBlock {
1✔
892
                containers[0].VolumeDevices = append(containers[0].VolumeDevices, corev1.VolumeDevice{
×
893
                        Name:       cc.DataVolName,
×
894
                        DevicePath: common.WriteBlockPath,
×
895
                })
×
896
                containers[0].Env = append(containers[0].Env, corev1.EnvVar{
×
897
                        Name:  "DESTINATION",
×
898
                        Value: common.WriteBlockPath,
×
UNCOV
899
                })
×
900
        } else {
1✔
901
                containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
1✔
902
                        Name:      cc.DataVolName,
1✔
903
                        MountPath: common.UploadServerDataDir,
1✔
904
                })
1✔
905
        }
1✔
906
        if args.ScratchPVCName != "" {
2✔
907
                containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
1✔
908
                        Name:      cc.ScratchVolName,
1✔
909
                        MountPath: common.ScratchDataDir,
1✔
910
                })
1✔
911
        }
1✔
912
        if resourceRequirements != nil {
2✔
913
                containers[0].Resources = *resourceRequirements
1✔
914
        }
1✔
915
        return containers
1✔
916
}
917

918
func (r *UploadReconciler) makeUploadPodVolumes(args UploadPodArgs) []corev1.Volume {
1✔
919
        volumes := []corev1.Volume{
1✔
920
                {
1✔
921
                        Name: certVolName,
1✔
922
                        VolumeSource: corev1.VolumeSource{
1✔
923
                                Secret: &corev1.SecretVolumeSource{
1✔
924
                                        SecretName: args.Name,
1✔
925
                                },
1✔
926
                        },
1✔
927
                },
1✔
928
                {
1✔
929
                        Name: cc.DataVolName,
1✔
930
                        VolumeSource: corev1.VolumeSource{
1✔
931
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
932
                                        ClaimName: args.PVC.Name,
1✔
933
                                        ReadOnly:  false,
1✔
934
                                },
1✔
935
                        },
1✔
936
                },
1✔
937
        }
1✔
938
        if args.ScratchPVCName != "" {
2✔
939
                volumes = append(volumes, corev1.Volume{
1✔
940
                        Name: cc.ScratchVolName,
1✔
941
                        VolumeSource: corev1.VolumeSource{
1✔
942
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
943
                                        ClaimName: args.ScratchPVCName,
1✔
944
                                        ReadOnly:  false,
1✔
945
                                },
1✔
946
                        },
1✔
947
                })
1✔
948
        }
1✔
949
        return volumes
1✔
950
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc