• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5995

12 May 2026 06:16AM UTC coverage: 49.616% (+0.002%) from 49.614%
#5995

Pull #4124

travis-ci

RamLavi
Add open-default-ports annotation to upload server pod

Since the upload service became headless (#4052), DNS resolves directly
to the upload server pod IP. In OVN-Kubernetes clusters where the target
namespace uses network segmentation, the pod's default network interface
is locked down. The k8s.ovn.org/open-default-ports annotation opens port
8443 on the default interface, allowing the upload proxy to connect.

The annotation is harmless on non-OVN clusters where it is simply
ignored.

Assisted-by: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Ram Lavi <ralavi@redhat.com>
Pull Request #4124: Add open-default-ports annotation to upload server pod

7 of 7 new or added lines in 1 file covered. (100.0%)

5 existing lines in 2 files now uncovered.

14988 of 30208 relevant lines covered (49.62%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.4
/pkg/controller/upload-controller.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/go-logr/logr"
28
        "github.com/pkg/errors"
29

30
        corev1 "k8s.io/api/core/v1"
31
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        runtime "k8s.io/apimachinery/pkg/runtime"
34
        "k8s.io/apimachinery/pkg/types"
35
        "k8s.io/apimachinery/pkg/util/intstr"
36
        "k8s.io/apimachinery/pkg/util/sets"
37
        "k8s.io/client-go/tools/record"
38
        "k8s.io/utils/ptr"
39

40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/controller"
42
        "sigs.k8s.io/controller-runtime/pkg/handler"
43
        "sigs.k8s.io/controller-runtime/pkg/manager"
44
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
45
        "sigs.k8s.io/controller-runtime/pkg/source"
46

47
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
48
        "kubevirt.io/containerized-data-importer/pkg/common"
49
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
50
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
51
        "kubevirt.io/containerized-data-importer/pkg/operator"
52
        "kubevirt.io/containerized-data-importer/pkg/util"
53
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
54
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
55
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
56
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
57
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
58
)
59

60
const (
61
        // AnnUploadClientName is the TLS name uploadserver will accept requests from
62
        AnnUploadClientName = "cdi.kubevirt.io/uploadClientName"
63

64
        // AnnUploadPod name of the upload pod
65
        AnnUploadPod = "cdi.kubevirt.io/storage.uploadPodName"
66

67
        annCreatedByUpload = "cdi.kubevirt.io/storage.createdByUploadController"
68

69
        uploadServerClientName = "client.upload-server.cdi.kubevirt.io"
70

71
        // UploadSucceededPVC provides a const to indicate an import to the PVC failed
72
        UploadSucceededPVC = "UploadSucceeded"
73

74
        // UploadTargetInUse is reason for event created when an upload pvc is in use
75
        UploadTargetInUse = "UploadTargetInUse"
76

77
        certVolName = "tls-config"
78

79
        certMountPath = "/etc/tls/"
80

81
        serverCertFile = certMountPath + "tls.crt"
82

83
        serverKeyFile = certMountPath + "tls.key"
84

85
        clientCertFile = certMountPath + "ca.crt"
86
)
87

88
// UploadReconciler members
89
type UploadReconciler struct {
90
        client              client.Client
91
        recorder            record.EventRecorder
92
        scheme              *runtime.Scheme
93
        log                 logr.Logger
94
        image               string
95
        verbose             string
96
        pullPolicy          string
97
        serverCertGenerator generator.CertGenerator
98
        clientCAFetcher     fetcher.CertBundleFetcher
99
        featureGates        featuregates.FeatureGates
100
        installerLabels     map[string]string
101
}
102

103
// UploadPodArgs are the parameters required to create an upload pod
104
type UploadPodArgs struct {
105
        Name                            string
106
        PVC                             *corev1.PersistentVolumeClaim
107
        ScratchPVCName                  string
108
        ClientName                      string
109
        FilesystemOverhead              string
110
        ServerCert, ServerKey, ClientCA []byte
111
        Preallocation                   string
112
        CryptoEnvVars                   CryptoEnvVars
113
        Deadline                        *time.Time
114
}
115

116
// CryptoEnvVars holds the TLS crypto-related configurables for the upload server
117
type CryptoEnvVars struct {
118
        Ciphers       string
119
        MinTLSVersion string
120
}
121

122
// Reconcile the reconcile loop for the CDIConfig object.
123
func (r *UploadReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
124
        log := r.log.WithValues("PVC", req.NamespacedName)
1✔
125
        log.V(1).Info("reconciling Upload PVCs")
1✔
126

1✔
127
        // Get the PVC.
1✔
128
        pvc := &corev1.PersistentVolumeClaim{}
1✔
129
        if err := r.client.Get(context.TODO(), req.NamespacedName, pvc); err != nil {
2✔
130
                if k8serrors.IsNotFound(err) {
2✔
131
                        return reconcile.Result{}, nil
1✔
132
                }
1✔
133
                return reconcile.Result{}, err
×
134
        }
135

136
        _, isUpload := pvc.Annotations[cc.AnnUploadRequest]
1✔
137
        _, isCloneTarget := pvc.Annotations[cc.AnnCloneRequest]
1✔
138

1✔
139
        if isUpload && isCloneTarget {
2✔
140
                log.V(1).Info("PVC has both clone and upload annotations")
1✔
141
                return reconcile.Result{}, errors.New("PVC has both clone and upload annotations")
1✔
142
        }
1✔
143

144
        if isUpload || isCloneTarget {
2✔
145
                if err := cc.UpdatePVCBoundContionFromEvents(pvc, r.client, log); err != nil {
1✔
146
                        return reconcile.Result{}, err
×
147
                }
×
148
        }
149

150
        shouldReconcile, err := r.shouldReconcile(isUpload, isCloneTarget, pvc, log)
1✔
151
        if err != nil {
1✔
152
                return reconcile.Result{}, err
×
153
        }
×
154
        // force cleanup if PVC pending delete and pod running or the upload/clone annotation was removed
155
        if !shouldReconcile || podSucceededFromPVC(pvc) || pvc.DeletionTimestamp != nil {
2✔
156
                log.V(1).Info("not doing anything with PVC",
1✔
157
                        "isUpload", isUpload,
1✔
158
                        "isCloneTarget", isCloneTarget,
1✔
159
                        "isBound", isBound(pvc, log),
1✔
160
                        "podSucceededFromPVC", podSucceededFromPVC(pvc),
1✔
161
                        "deletionTimeStamp set?", pvc.DeletionTimestamp != nil)
1✔
162
                if err := r.cleanup(pvc); err != nil {
1✔
163
                        return reconcile.Result{}, err
×
164
                }
×
165
                return reconcile.Result{}, nil
1✔
166
        }
167

168
        log.Info("Calling Upload reconcile PVC")
1✔
169
        return r.reconcilePVC(log, pvc, isCloneTarget)
1✔
170
}
171

172
func (r *UploadReconciler) shouldReconcile(isUpload bool, isCloneTarget bool, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, error) {
1✔
173
        waitForFirstConsumerEnabled, err := cc.IsWaitForFirstConsumerEnabled(pvc, r.featureGates)
1✔
174
        if err != nil {
1✔
175
                return false, err
×
176
        }
×
177

178
        return (isUpload || isCloneTarget) &&
1✔
179
                        shouldHandlePvc(pvc, waitForFirstConsumerEnabled, log),
1✔
180
                nil
1✔
181
}
182

183
func (r *UploadReconciler) reconcilePVC(log logr.Logger, pvc *corev1.PersistentVolumeClaim, isCloneTarget bool) (reconcile.Result, error) {
1✔
184
        var uploadClientName string
1✔
185
        pvcCopy := pvc.DeepCopy()
1✔
186
        anno := pvcCopy.Annotations
1✔
187

1✔
188
        if isCloneTarget {
2✔
189
                source, err := r.getCloneRequestSourcePVC(pvc)
1✔
190
                if err != nil {
2✔
191
                        return reconcile.Result{}, err
1✔
192
                }
1✔
193
                contentType, err := ValidateCanCloneSourceAndTargetContentType(source, pvc)
1✔
194
                if err != nil {
1✔
195
                        return reconcile.Result{}, err
×
196
                }
×
197
                if err = ValidateCanCloneSourceAndTargetSpec(context.TODO(), r.client, source, pvc, contentType); err != nil {
1✔
198
                        log.Error(err, "Error validating clone spec, ignoring")
×
199
                        r.recorder.Eventf(pvc, corev1.EventTypeWarning, cc.ErrIncompatiblePVC, err.Error())
×
200
                        return reconcile.Result{}, nil
×
201
                }
×
202

203
                uploadClientName = fmt.Sprintf("%s/%s-%s/%s", source.Namespace, source.Name, pvc.Namespace, pvc.Name)
1✔
204
                anno[AnnUploadClientName] = uploadClientName
1✔
205
        } else {
1✔
206
                uploadClientName = uploadServerClientName
1✔
207
        }
1✔
208

209
        pod, err := r.findUploadPodForPvc(pvc)
1✔
210
        if err != nil {
2✔
211
                return reconcile.Result{}, err
1✔
212
        }
1✔
213

214
        if pod == nil {
2✔
215
                podsUsingPVC, err := cc.GetPodsUsingPVCs(context.TODO(), r.client, pvc.Namespace, sets.New(pvc.Name), false)
1✔
216
                if err != nil {
1✔
217
                        return reconcile.Result{}, err
×
218
                }
×
219

220
                if len(podsUsingPVC) > 0 {
2✔
221
                        es, err := cc.GetAnnotatedEventSource(context.TODO(), r.client, pvc)
1✔
222
                        if err != nil {
1✔
223
                                return reconcile.Result{}, err
×
224
                        }
×
225

226
                        for _, pod := range podsUsingPVC {
2✔
227
                                log.V(1).Info("can't create upload pod, pvc in use by other pod",
1✔
228
                                        "namespace", pvc.Namespace, "name", pvc.Name, "pod", pod.Name)
1✔
229
                                r.recorder.Eventf(es, corev1.EventTypeWarning, UploadTargetInUse,
1✔
230
                                        "pod %s/%s using PersistentVolumeClaim %s", pod.Namespace, pod.Name, pvc.Name)
1✔
231
                        }
1✔
232
                        return reconcile.Result{Requeue: true}, nil
1✔
233
                }
234

235
                podName, ok := pvc.Annotations[AnnUploadPod]
1✔
236

1✔
237
                if !ok {
2✔
238
                        podName = createUploadResourceName(pvc.Name)
1✔
239
                        if err := r.updatePvcPodName(pvc, podName, log); err != nil {
1✔
240
                                return reconcile.Result{}, err
×
241
                        }
×
242
                        return reconcile.Result{Requeue: true}, nil
1✔
243
                }
244
                pod, err = r.createUploadPodForPvc(pvc, podName, uploadClientName, isCloneTarget)
1✔
245
                if err != nil {
1✔
246
                        return reconcile.Result{}, err
×
247
                }
×
248
        }
249

250
        // Always try to get or create the scratch PVC for a pod that is not successful yet, if it exists nothing happens otherwise attempt to create.
251
        scratchPVCName, exists := getScratchNameFromPod(pod)
1✔
252
        if exists {
2✔
253
                _, err := r.getOrCreateScratchPvc(pvcCopy, pod, scratchPVCName)
1✔
254
                if err != nil {
1✔
255
                        return reconcile.Result{}, err
×
256
                }
×
257
        }
258

259
        svcName := naming.GetServiceNameFromResourceName(pod.Name)
1✔
260
        if _, err = r.getOrCreateUploadService(pvc, svcName); err != nil {
2✔
261
                return reconcile.Result{}, err
1✔
262
        }
1✔
263

264
        termMsg, err := parseTerminationMessage(pod)
1✔
265
        if err != nil {
1✔
266
                return reconcile.Result{}, err
×
267
        }
×
268

269
        deadlinePassed := termMsg != nil && termMsg.DeadlinePassed != nil && *termMsg.DeadlinePassed
1✔
270
        if deadlinePassed {
2✔
271
                if pod.DeletionTimestamp == nil {
2✔
272
                        log.V(1).Info("Deleting pod because deadline exceeded")
1✔
273
                        if err := r.client.Delete(context.TODO(), pod); err != nil {
1✔
274
                                return reconcile.Result{}, err
×
275
                        }
×
276
                }
277

278
                anno[cc.AnnPodPhase] = ""
1✔
279
                anno[cc.AnnPodReady] = "false"
1✔
280
        } else {
1✔
281
                anno[cc.AnnPodPhase] = string(pod.Status.Phase)
1✔
282
                anno[cc.AnnPodReady] = strconv.FormatBool(isPodReady(pod))
1✔
283
        }
1✔
284

285
        setAnnotationsFromPodWithPrefix(anno, pod, termMsg, cc.AnnRunningCondition)
1✔
286

1✔
287
        if !reflect.DeepEqual(pvc, pvcCopy) {
2✔
288
                if err := r.updatePVC(pvcCopy); err != nil {
1✔
289
                        return reconcile.Result{}, err
×
290
                }
×
291
                if podSucceededFromPVC(pvcCopy) && !isCloneTarget {
1✔
292
                        // Upload completed, emit event. clone controller will emit clone complete.
×
293
                        r.recorder.Event(pvc, corev1.EventTypeNormal, UploadSucceededPVC, "Upload Successful")
×
294
                }
×
295
        }
296

297
        return reconcile.Result{}, nil
1✔
298
}
299

300
func (r *UploadReconciler) updatePvcPodName(pvc *corev1.PersistentVolumeClaim, podName string, log logr.Logger) error {
1✔
301
        currentPvcCopy := pvc.DeepCopyObject()
1✔
302

1✔
303
        log.V(1).Info("Updating PVC from pod")
1✔
304
        anno := pvc.GetAnnotations()
1✔
305
        anno[AnnUploadPod] = podName
1✔
306

1✔
307
        if !reflect.DeepEqual(currentPvcCopy, pvc) {
2✔
308
                if err := r.updatePVC(pvc); err != nil {
1✔
309
                        return err
×
310
                }
×
311
                log.V(1).Info("Updated PVC", "pvc.anno.AnnImportPod", anno[AnnUploadPod])
1✔
312
        }
313
        return nil
1✔
314
}
315

316
func (r *UploadReconciler) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
317
        r.log.V(1).Info("Phase is now", "pvc.anno.Phase", pvc.GetAnnotations()[cc.AnnPodPhase])
1✔
318
        if err := r.client.Update(context.TODO(), pvc); err != nil {
1✔
319
                return err
×
320
        }
×
321
        return nil
1✔
322
}
323

324
func (r *UploadReconciler) getCloneRequestSourcePVC(targetPvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
1✔
325
        sourceVolumeMode := corev1.PersistentVolumeFilesystem
1✔
326
        targetVolumeMode := corev1.PersistentVolumeFilesystem
1✔
327

1✔
328
        exists, namespace, name := ParseCloneRequestAnnotation(targetPvc)
1✔
329
        if !exists {
1✔
330
                return nil, errors.New("error parsing clone request annotation")
×
331
        }
×
332
        sourcePvc := &corev1.PersistentVolumeClaim{}
1✔
333
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, sourcePvc); err != nil {
1✔
334
                return nil, errors.Wrap(err, "error getting clone source PVC")
×
335
        }
×
336
        if sourcePvc.Spec.VolumeMode != nil {
2✔
337
                sourceVolumeMode = *sourcePvc.Spec.VolumeMode
1✔
338
        }
1✔
339
        if targetPvc.Spec.VolumeMode != nil {
1✔
340
                targetVolumeMode = *targetPvc.Spec.VolumeMode
×
341
        }
×
342
        // Allow different source and target volume modes only on KubeVirt content type
343
        contentType, err := ValidateCanCloneSourceAndTargetContentType(sourcePvc, targetPvc)
1✔
344
        if err != nil {
2✔
345
                return nil, err
1✔
346
        }
1✔
347
        if sourceVolumeMode != targetVolumeMode && contentType != cdiv1.DataVolumeKubeVirt {
2✔
348
                return nil, errors.New("Source and target volume modes do not match, and content type is not kubevirt")
1✔
349
        }
1✔
350
        return sourcePvc, nil
1✔
351
}
352

353
func (r *UploadReconciler) cleanup(pvc *corev1.PersistentVolumeClaim) error {
1✔
354
        resourceName := getUploadResourceNameFromPvc(pvc)
1✔
355
        svcName := naming.GetServiceNameFromResourceName(resourceName)
1✔
356

1✔
357
        // delete service
1✔
358
        if err := r.deleteService(pvc.Namespace, svcName); err != nil {
1✔
359
                return err
×
360
        }
×
361

362
        // delete pod
363
        pod := &corev1.Pod{}
1✔
364
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: resourceName, Namespace: pvc.Namespace}, pod); err != nil {
2✔
365
                if k8serrors.IsNotFound(err) {
2✔
366
                        return nil
1✔
367
                }
1✔
368
                return err
×
369
        }
370
        if pod.DeletionTimestamp == nil && cc.ShouldDeletePod(pvc) {
2✔
371
                if err := r.client.Delete(context.TODO(), pod); cc.IgnoreNotFound(err) != nil {
1✔
372
                        return err
×
373
                }
×
374
        }
375
        return nil
1✔
376
}
377
func (r *UploadReconciler) findUploadPodForPvc(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
1✔
378
        podName := getUploadResourceNameFromPvc(pvc)
1✔
379
        pod := &corev1.Pod{}
1✔
380
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: podName, Namespace: pvc.Namespace}, pod); err != nil {
2✔
381
                if !k8serrors.IsNotFound(err) {
1✔
382
                        return nil, errors.Wrapf(err, "error getting upload pod %s/%s", pvc.Namespace, podName)
×
383
                }
×
384
                return nil, nil
1✔
385
        }
386

387
        if !metav1.IsControlledBy(pod, pvc) {
2✔
388
                return nil, errors.Errorf("%s pod not controlled by pvc %s", podName, pvc.Name)
1✔
389
        }
1✔
390

391
        return pod, nil
1✔
392
}
393

394
func (r *UploadReconciler) createUploadPodForPvc(pvc *corev1.PersistentVolumeClaim, podName, clientName string, isCloneTarget bool) (*corev1.Pod, error) {
1✔
395
        certConfig, err := operator.GetCertConfigWithDefaults(context.TODO(), r.client)
1✔
396
        if err != nil {
1✔
397
                return nil, err
×
398
        }
×
399

400
        serverCert, serverKey, err := r.serverCertGenerator.MakeServerCert(
1✔
401
                pvc.Namespace,
1✔
402
                naming.GetServiceNameFromResourceName(podName),
1✔
403
                certConfig.Server.Duration.Duration,
1✔
404
        )
1✔
405
        if err != nil {
1✔
406
                return nil, err
×
407
        }
×
408

409
        clientCA, err := r.clientCAFetcher.BundleBytes()
1✔
410
        if err != nil {
1✔
411
                return nil, err
×
412
        }
×
413

414
        fsOverhead, err := GetFilesystemOverhead(context.TODO(), r.client, pvc)
1✔
415
        if err != nil {
1✔
416
                return nil, err
×
417
        }
×
418

419
        preallocationRequested := false
1✔
420
        if preallocation, err := strconv.ParseBool(getValueFromAnnotation(pvc, cc.AnnPreallocationRequested)); err == nil {
1✔
421
                preallocationRequested = preallocation
×
422
        }
×
423

424
        config := &cdiv1.CDIConfig{}
1✔
425
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, config); err != nil {
1✔
426
                return nil, err
×
427
        }
×
428
        ciphers, minTLSVersion := cryptowatch.SelectCipherSuitesAndMinTLSVersion(config.Spec.TLSSecurityProfile)
1✔
429
        cryptoVars := CryptoEnvVars{
1✔
430
                Ciphers:       strings.Join(ciphers, ","),
1✔
431
                MinTLSVersion: string(minTLSVersion),
1✔
432
        }
1✔
433

1✔
434
        serverRefresh := certConfig.Server.Duration.Duration - certConfig.Server.RenewBefore.Duration
1✔
435
        clientRefresh := certConfig.Client.Duration.Duration - certConfig.Client.RenewBefore.Duration
1✔
436

1✔
437
        args := UploadPodArgs{
1✔
438
                Name:               podName,
1✔
439
                PVC:                pvc,
1✔
440
                ScratchPVCName:     createScratchPvcNameFromPvc(pvc, isCloneTarget),
1✔
441
                ClientName:         clientName,
1✔
442
                FilesystemOverhead: string(fsOverhead),
1✔
443
                ServerCert:         serverCert,
1✔
444
                ServerKey:          serverKey,
1✔
445
                ClientCA:           clientCA,
1✔
446
                Preallocation:      strconv.FormatBool(preallocationRequested),
1✔
447
                CryptoEnvVars:      cryptoVars,
1✔
448
                Deadline:           ptr.To(time.Now().Add(min(serverRefresh, clientRefresh))),
1✔
449
        }
1✔
450

1✔
451
        r.log.V(3).Info("Creating upload pod")
1✔
452
        pod, err := r.createUploadPod(args)
1✔
453
        // Check if pod has failed and, in that case, record an event with the error
1✔
454
        if podErr := cc.HandleFailedPod(err, podName, pvc, r.recorder, r.client); podErr != nil {
1✔
455
                return nil, podErr
×
456
        }
×
457

458
        if err := r.ensureCertSecret(args, pod); err != nil {
1✔
459
                return nil, err
×
460
        }
×
461

462
        return pod, nil
1✔
463
}
464

465
func (r *UploadReconciler) getOrCreateScratchPvc(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod, name string) (*corev1.PersistentVolumeClaim, error) {
1✔
466
        // Set condition, then check if need to override with scratch pvc message
1✔
467
        anno := pvc.Annotations
1✔
468
        scratchPvc := &corev1.PersistentVolumeClaim{}
1✔
469
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: pvc.Namespace}, scratchPvc); err != nil {
2✔
470
                if !k8serrors.IsNotFound(err) {
1✔
471
                        return nil, errors.Wrap(err, "error getting scratch PVC")
×
472
                }
×
473

474
                storageClassName := GetScratchPvcStorageClass(r.client, pvc)
1✔
475

1✔
476
                anno[cc.AnnBoundCondition] = "false"
1✔
477
                anno[cc.AnnBoundConditionMessage] = "Creating scratch space"
1✔
478
                anno[cc.AnnBoundConditionReason] = creatingScratch
1✔
479
                // Scratch PVC doesn't exist yet, create it.
1✔
480
                scratchPvc, err = createScratchPersistentVolumeClaim(r.client, pvc, pod, name, storageClassName, map[string]string{}, r.recorder)
1✔
481
                if err != nil {
1✔
482
                        return nil, err
×
483
                }
×
484
        } else {
×
485
                if !metav1.IsControlledBy(scratchPvc, pod) {
×
486
                        return nil, errors.Errorf("%s scratch PVC not controlled by pod %s", scratchPvc.Name, pod.Name)
×
487
                }
×
488
        }
489

490
        return scratchPvc, nil
1✔
491
}
492

493
func (r *UploadReconciler) getOrCreateUploadService(pvc *corev1.PersistentVolumeClaim, name string) (*corev1.Service, error) {
1✔
494
        service := &corev1.Service{}
1✔
495
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: pvc.Namespace}, service); err != nil {
2✔
496
                if !k8serrors.IsNotFound(err) {
1✔
497
                        return nil, errors.Wrap(err, "error getting upload service")
×
498
                }
×
499
                service, err = r.createUploadService(name, pvc)
1✔
500
                if err != nil {
1✔
501
                        return nil, err
×
502
                }
×
503
        }
504

505
        if !metav1.IsControlledBy(service, pvc) {
2✔
506
                return nil, errors.Errorf("%s service not controlled by pvc %s", name, pvc.Name)
1✔
507
        }
1✔
508

509
        return service, nil
1✔
510
}
511

512
func (r *UploadReconciler) deleteService(namespace, serviceName string) error {
1✔
513
        service := &corev1.Service{}
1✔
514
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: serviceName, Namespace: namespace}, service); err != nil {
2✔
515
                if k8serrors.IsNotFound(err) {
2✔
516
                        return nil
1✔
517
                }
1✔
518
                return err
×
519
        }
520

521
        if service.DeletionTimestamp == nil {
2✔
522
                if err := r.client.Delete(context.TODO(), service); cc.IgnoreNotFound(err) != nil {
1✔
523
                        return errors.Wrap(err, "error deleting upload service")
×
524
                }
×
525
        }
526

527
        return nil
1✔
528
}
529

530
func isPodReady(pod *corev1.Pod) bool {
1✔
531
        if len(pod.Status.ContainerStatuses) == 0 {
2✔
532
                return false
1✔
533
        }
1✔
534

535
        numReady := 0
1✔
536
        for _, s := range pod.Status.ContainerStatuses {
2✔
537
                if s.Ready {
1✔
538
                        numReady++
×
539
                }
×
540
        }
541

542
        return numReady == len(pod.Status.ContainerStatuses)
1✔
543
}
544

545
// createUploadService creates an upload service manifest and sends it to server
546
func (r *UploadReconciler) createUploadService(name string, pvc *corev1.PersistentVolumeClaim) (*corev1.Service, error) {
1✔
547
        ns := pvc.Namespace
1✔
548
        service := r.makeUploadServiceSpec(name, pvc)
1✔
549
        util.SetRecommendedLabels(service, r.installerLabels, "cdi-controller")
1✔
550

1✔
551
        if err := r.client.Create(context.TODO(), service); err != nil {
1✔
552
                if k8serrors.IsAlreadyExists(err) {
×
553
                        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, service); err != nil {
×
554
                                return nil, errors.Wrap(err, "upload service should exist but couldn't retrieve it")
×
555
                        }
×
556
                } else {
×
557
                        return nil, errors.Wrap(err, "upload service API create errored")
×
558
                }
×
559
        }
560
        r.log.V(1).Info("upload service created\n", "Namespace", service.Namespace, "Name", service.Name)
1✔
561
        return service, nil
1✔
562
}
563

564
// makeUploadServiceSpec creates upload service manifest
565
func (r *UploadReconciler) makeUploadServiceSpec(name string, pvc *corev1.PersistentVolumeClaim) *corev1.Service {
1✔
566
        blockOwnerDeletion := true
1✔
567
        isController := true
1✔
568
        service := &corev1.Service{
1✔
569
                TypeMeta: metav1.TypeMeta{
1✔
570
                        Kind:       "Service",
1✔
571
                        APIVersion: "v1",
1✔
572
                },
1✔
573
                ObjectMeta: metav1.ObjectMeta{
1✔
574
                        Name:      name,
1✔
575
                        Namespace: pvc.Namespace,
1✔
576
                        Annotations: map[string]string{
1✔
577
                                annCreatedByUpload: "yes",
1✔
578
                        },
1✔
579
                        Labels: map[string]string{
1✔
580
                                common.CDILabelKey:       common.CDILabelValue,
1✔
581
                                common.CDIComponentLabel: common.UploadServerCDILabel,
1✔
582
                        },
1✔
583
                        OwnerReferences: []metav1.OwnerReference{
1✔
584
                                {
1✔
585
                                        APIVersion:         "v1",
1✔
586
                                        Kind:               "PersistentVolumeClaim",
1✔
587
                                        Name:               pvc.Name,
1✔
588
                                        UID:                pvc.GetUID(),
1✔
589
                                        BlockOwnerDeletion: &blockOwnerDeletion,
1✔
590
                                        Controller:         &isController,
1✔
591
                                },
1✔
592
                        },
1✔
593
                },
1✔
594
                Spec: corev1.ServiceSpec{
1✔
595
                        ClusterIP: corev1.ClusterIPNone,
1✔
596
                        Ports: []corev1.ServicePort{
1✔
597
                                {
1✔
598
                                        Protocol:   corev1.ProtocolTCP,
1✔
599
                                        Port:       8443,
1✔
600
                                        TargetPort: intstr.FromInt32(8443),
1✔
601
                                },
1✔
602
                        },
1✔
603
                        Selector: map[string]string{
1✔
604
                                common.UploadServerServiceLabel: name,
1✔
605
                        },
1✔
606
                },
1✔
607
        }
1✔
608
        return service
1✔
609
}
1✔
610

611
// createUploadPod creates upload service pod manifest and sends to server
612
func (r *UploadReconciler) createUploadPod(args UploadPodArgs) (*corev1.Pod, error) {
1✔
613
        ns := args.PVC.Namespace
1✔
614

1✔
615
        podResourceRequirements, err := cc.GetDefaultPodResourceRequirements(r.client)
1✔
616
        if err != nil {
1✔
617
                return nil, err
×
618
        }
×
619

620
        imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
1✔
621
        if err != nil {
1✔
622
                return nil, err
×
623
        }
×
624

625
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), r.client)
1✔
626
        if err != nil {
1✔
627
                return nil, err
×
628
        }
×
629

630
        pod := r.makeUploadPodSpec(args, podResourceRequirements, imagePullSecrets, workloadNodePlacement)
1✔
631
        util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")
1✔
632

1✔
633
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: args.Name, Namespace: ns}, pod); err != nil {
2✔
634
                if !k8serrors.IsNotFound(err) {
1✔
635
                        return nil, errors.Wrap(err, "upload pod should exist but couldn't retrieve it")
×
636
                }
×
637
                if err := r.client.Create(context.TODO(), pod); err != nil {
1✔
638
                        return nil, err
×
639
                }
×
640
        }
641

642
        r.log.V(1).Info("upload pod created\n", "Namespace", pod.Namespace, "Name", pod.Name, "Image name", r.image)
1✔
643
        return pod, nil
1✔
644
}
645

646
func (r *UploadReconciler) ensureCertSecret(args UploadPodArgs, pod *corev1.Pod) error {
1✔
647
        if pod.Status.Phase == corev1.PodRunning {
1✔
648
                return nil
×
649
        }
×
650

651
        secret := &corev1.Secret{
1✔
652
                ObjectMeta: metav1.ObjectMeta{
1✔
653
                        Name:      args.Name,
1✔
654
                        Namespace: pod.Namespace,
1✔
655
                        Annotations: map[string]string{
1✔
656
                                annCreatedByUpload: "yes",
1✔
657
                        },
1✔
658
                        Labels: map[string]string{
1✔
659
                                common.CDILabelKey:       common.CDILabelValue,
1✔
660
                                common.CDIComponentLabel: common.UploadServerCDILabel,
1✔
661
                        },
1✔
662
                        OwnerReferences: []metav1.OwnerReference{
1✔
663
                                MakePodOwnerReference(pod),
1✔
664
                        },
1✔
665
                },
1✔
666
                Data: map[string][]byte{
1✔
667
                        "tls.key": args.ServerKey,
1✔
668
                        "tls.crt": args.ServerCert,
1✔
669
                        "ca.crt":  args.ClientCA,
1✔
670
                },
1✔
671
        }
1✔
672

1✔
673
        util.SetRecommendedLabels(secret, r.installerLabels, "cdi-controller")
1✔
674

1✔
675
        err := r.client.Create(context.TODO(), secret)
1✔
676
        if err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
677
                return errors.Wrap(err, "error creating cert secret")
×
678
        }
×
679

680
        return nil
1✔
681
}
682

683
// NewUploadController creates a new instance of the upload controller.
684
func NewUploadController(mgr manager.Manager, log logr.Logger, uploadImage, pullPolicy, verbose string, serverCertGenerator generator.CertGenerator, clientCAFetcher fetcher.CertBundleFetcher, installerLabels map[string]string) (controller.Controller, error) {
×
685
        client := mgr.GetClient()
×
686
        reconciler := &UploadReconciler{
×
687
                client:              client,
×
688
                scheme:              mgr.GetScheme(),
×
689
                log:                 log.WithName("upload-controller"),
×
690
                image:               uploadImage,
×
691
                verbose:             verbose,
×
692
                pullPolicy:          pullPolicy,
×
693
                recorder:            mgr.GetEventRecorderFor("upload-controller"),
×
694
                serverCertGenerator: serverCertGenerator,
×
695
                clientCAFetcher:     clientCAFetcher,
×
696
                featureGates:        featuregates.NewFeatureGates(client),
×
697
                installerLabels:     installerLabels,
×
698
        }
×
699
        uploadController, err := controller.New("upload-controller", mgr, controller.Options{
×
700
                MaxConcurrentReconciles: 3,
×
701
                Reconciler:              reconciler,
×
702
        })
×
703
        if err != nil {
×
704
                return nil, err
×
705
        }
×
706
        if err := addUploadControllerWatches(mgr, uploadController); err != nil {
×
707
                return nil, err
×
708
        }
×
709

710
        return uploadController, nil
×
711
}
712

713
func addUploadControllerWatches(mgr manager.Manager, uploadController controller.Controller) error {
×
714
        // Setup watches
×
715
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, &handler.TypedEnqueueRequestForObject[*corev1.PersistentVolumeClaim]{})); err != nil {
×
716
                return err
×
717
        }
×
718
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestForOwner[*corev1.Pod](
×
719
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
720
                return err
×
721
        }
×
722
        if err := uploadController.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, handler.TypedEnqueueRequestForOwner[*corev1.Service](
×
723
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &corev1.PersistentVolumeClaim{}, handler.OnlyControllerOwner()))); err != nil {
×
724
                return err
×
725
        }
×
726

727
        return nil
×
728
}
729

730
func createScratchPvcNameFromPvc(pvc *corev1.PersistentVolumeClaim, isCloneTarget bool) string {
1✔
731
        if isCloneTarget {
2✔
732
                return ""
1✔
733
        }
1✔
734

735
        return naming.GetResourceName(pvc.Name, common.ScratchNameSuffix)
1✔
736
}
737

738
// getUploadResourceName returns the name given to upload resources
739
func getUploadResourceNameFromPvc(pvc *corev1.PersistentVolumeClaim) string {
1✔
740
        podName, ok := pvc.Annotations[AnnUploadPod]
1✔
741
        if ok {
2✔
742
                return podName
1✔
743
        }
1✔
744

745
        // fallback to legacy naming, in fact the following function is fully compatible with legacy
746
        // name concatenation "cdi-upload-{pvc.Name}" if the name length is under the size limits,
747
        return naming.GetResourceName(common.UploadPodName, pvc.Name)
1✔
748
}
749

750
// createUploadResourceName returns the name given to upload resources
751
func createUploadResourceName(name string) string {
1✔
752
        return naming.GetResourceName(common.UploadPodName, name)
1✔
753
}
1✔
754

755
// UploadPossibleForPVC is called by the api server to see whether to return an upload token
756
func UploadPossibleForPVC(pvc *corev1.PersistentVolumeClaim) error {
×
757
        if _, ok := pvc.Annotations[cc.AnnUploadRequest]; !ok {
×
758
                return errors.Errorf("PVC %s is not an upload target", pvc.Name)
×
759
        }
×
760
        return nil
×
761
}
762

763
// GetUploadServerURL returns the url the proxy should post to for a particular pvc
764
func GetUploadServerURL(namespace, pvc, uploadPath string) string {
1✔
765
        serviceName := createUploadServiceNameFromPvcName(pvc)
1✔
766
        return fmt.Sprintf("https://%s.%s.svc:8443%s", serviceName, namespace, uploadPath)
1✔
767
}
1✔
768

769
// createUploadServiceName returns the name given to upload service shortened if needed
770
func createUploadServiceNameFromPvcName(pvc string) string {
1✔
771
        return naming.GetServiceNameFromResourceName(createUploadResourceName(pvc))
1✔
772
}
1✔
773

774
func (r *UploadReconciler) makeUploadPodSpec(args UploadPodArgs, resourceRequirements *corev1.ResourceRequirements, imagePullSecrets []corev1.LocalObjectReference, workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {
1✔
775
        pod := &corev1.Pod{
1✔
776
                ObjectMeta: metav1.ObjectMeta{
1✔
777
                        Name:      args.Name,
1✔
778
                        Namespace: args.PVC.Namespace,
1✔
779
                        Annotations: map[string]string{
1✔
780
                                annCreatedByUpload:     "yes",
1✔
781
                                cc.AnnOpenDefaultPorts: `[{"protocol":"tcp","port":8443}]`,
1✔
782
                        },
1✔
783
                        Labels: map[string]string{
1✔
784
                                common.CDILabelKey:              common.CDILabelValue,
1✔
785
                                common.CDIComponentLabel:        common.UploadServerCDILabel,
1✔
786
                                common.UploadServerServiceLabel: naming.GetServiceNameFromResourceName(args.Name),
1✔
787
                                common.UploadTargetLabel:        string(args.PVC.UID),
1✔
788
                        },
1✔
789
                        OwnerReferences: []metav1.OwnerReference{
1✔
790
                                MakePVCOwnerReference(args.PVC),
1✔
791
                        },
1✔
792
                },
1✔
793
                Spec: corev1.PodSpec{
1✔
794
                        Containers:         r.makeUploadPodContainers(args, resourceRequirements),
1✔
795
                        Volumes:            r.makeUploadPodVolumes(args),
1✔
796
                        RestartPolicy:      corev1.RestartPolicyOnFailure,
1✔
797
                        NodeSelector:       workloadNodePlacement.NodeSelector,
1✔
798
                        Tolerations:        workloadNodePlacement.Tolerations,
1✔
799
                        Affinity:           workloadNodePlacement.Affinity,
1✔
800
                        PriorityClassName:  cc.GetPriorityClass(args.PVC),
1✔
801
                        ServiceAccountName: cc.GetPodServiceAccount(args.PVC),
1✔
802
                        ImagePullSecrets:   imagePullSecrets,
1✔
803
                        // https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables
1✔
804
                        // Disable service environment variable injection to avoid 'argument list too long'
1✔
805
                        // errors in namespaces with many Services (each injects ~7 env vars).
1✔
806
                        EnableServiceLinks: ptr.To(false),
1✔
807
                },
1✔
808
        }
1✔
809

1✔
810
        cc.CopyAllowedAnnotations(args.PVC, pod)
1✔
811
        cc.SetNodeNameIfPopulator(args.PVC, &pod.Spec)
1✔
812
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
813

1✔
814
        return pod
1✔
815
}
1✔
816

817
func (r *UploadReconciler) makeUploadPodContainers(args UploadPodArgs, resourceRequirements *corev1.ResourceRequirements) []corev1.Container {
1✔
818
        requestImageSize, _ := cc.GetRequestedImageSize(args.PVC)
1✔
819
        containers := []corev1.Container{
1✔
820
                {
1✔
821
                        Name:            common.UploadServerPodname,
1✔
822
                        Image:           r.image,
1✔
823
                        ImagePullPolicy: corev1.PullPolicy(r.pullPolicy),
1✔
824
                        Env: []corev1.EnvVar{
1✔
825
                                {
1✔
826
                                        Name:  "TLS_KEY_FILE",
1✔
827
                                        Value: serverKeyFile,
1✔
828
                                },
1✔
829
                                {
1✔
830
                                        Name:  "TLS_CERT_FILE",
1✔
831
                                        Value: serverCertFile,
1✔
832
                                },
1✔
833
                                {
1✔
834
                                        Name:  "CLIENT_CERT_FILE",
1✔
835
                                        Value: clientCertFile,
1✔
836
                                },
1✔
837
                                {
1✔
838
                                        Name:  common.FilesystemOverheadVar,
1✔
839
                                        Value: args.FilesystemOverhead,
1✔
840
                                },
1✔
841
                                {
1✔
842
                                        Name:  common.UploadImageSize,
1✔
843
                                        Value: requestImageSize,
1✔
844
                                },
1✔
845
                                {
1✔
846
                                        Name:  "CLIENT_NAME",
1✔
847
                                        Value: args.ClientName,
1✔
848
                                },
1✔
849
                                {
1✔
850
                                        Name:  common.Preallocation,
1✔
851
                                        Value: args.Preallocation,
1✔
852
                                },
1✔
853
                                {
1✔
854
                                        Name:  common.CiphersTLSVar,
1✔
855
                                        Value: args.CryptoEnvVars.Ciphers,
1✔
856
                                },
1✔
857
                                {
1✔
858
                                        Name:  common.MinVersionTLSVar,
1✔
859
                                        Value: args.CryptoEnvVars.MinTLSVersion,
1✔
860
                                },
1✔
861
                        },
1✔
862
                        Args: []string{"-v=" + r.verbose},
1✔
863
                        ReadinessProbe: &corev1.Probe{
1✔
864
                                ProbeHandler: corev1.ProbeHandler{
1✔
865
                                        HTTPGet: &corev1.HTTPGetAction{
1✔
866
                                                Path: "/healthz",
1✔
867
                                                Port: intstr.IntOrString{
1✔
868
                                                        Type:   intstr.Int,
1✔
869
                                                        IntVal: 8443,
1✔
870
                                                },
1✔
871
                                                Scheme: corev1.URISchemeHTTPS,
1✔
872
                                        },
1✔
873
                                },
1✔
874
                                InitialDelaySeconds: 2,
1✔
875
                                PeriodSeconds:       5,
1✔
876
                        },
1✔
877
                        VolumeMounts: []corev1.VolumeMount{
1✔
878
                                {
1✔
879
                                        Name:      certVolName,
1✔
880
                                        MountPath: certMountPath,
1✔
881
                                },
1✔
882
                        },
1✔
883
                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
884
                },
1✔
885
        }
1✔
886
        if args.Deadline != nil {
2✔
887
                containers[0].Env = append(containers[0].Env, corev1.EnvVar{
1✔
888
                        Name:  "DEADLINE",
1✔
889
                        Value: args.Deadline.Format(time.RFC3339),
1✔
890
                })
1✔
891
        }
1✔
892
        if cc.GetVolumeMode(args.PVC) == corev1.PersistentVolumeBlock {
1✔
893
                containers[0].VolumeDevices = append(containers[0].VolumeDevices, corev1.VolumeDevice{
×
894
                        Name:       cc.DataVolName,
×
895
                        DevicePath: common.WriteBlockPath,
×
896
                })
×
897
                containers[0].Env = append(containers[0].Env, corev1.EnvVar{
×
UNCOV
898
                        Name:  "DESTINATION",
×
UNCOV
899
                        Value: common.WriteBlockPath,
×
UNCOV
900
                })
×
901
        } else {
1✔
902
                containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
1✔
903
                        Name:      cc.DataVolName,
1✔
904
                        MountPath: common.UploadServerDataDir,
1✔
905
                })
1✔
906
        }
1✔
907
        if args.ScratchPVCName != "" {
2✔
908
                containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
1✔
909
                        Name:      cc.ScratchVolName,
1✔
910
                        MountPath: common.ScratchDataDir,
1✔
911
                })
1✔
912
        }
1✔
913
        if resourceRequirements != nil {
2✔
914
                containers[0].Resources = *resourceRequirements
1✔
915
        }
1✔
916
        return containers
1✔
917
}
918

919
func (r *UploadReconciler) makeUploadPodVolumes(args UploadPodArgs) []corev1.Volume {
1✔
920
        volumes := []corev1.Volume{
1✔
921
                {
1✔
922
                        Name: certVolName,
1✔
923
                        VolumeSource: corev1.VolumeSource{
1✔
924
                                Secret: &corev1.SecretVolumeSource{
1✔
925
                                        SecretName: args.Name,
1✔
926
                                },
1✔
927
                        },
1✔
928
                },
1✔
929
                {
1✔
930
                        Name: cc.DataVolName,
1✔
931
                        VolumeSource: corev1.VolumeSource{
1✔
932
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
933
                                        ClaimName: args.PVC.Name,
1✔
934
                                        ReadOnly:  false,
1✔
935
                                },
1✔
936
                        },
1✔
937
                },
1✔
938
        }
1✔
939
        if args.ScratchPVCName != "" {
2✔
940
                volumes = append(volumes, corev1.Volume{
1✔
941
                        Name: cc.ScratchVolName,
1✔
942
                        VolumeSource: corev1.VolumeSource{
1✔
943
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1✔
944
                                        ClaimName: args.ScratchPVCName,
1✔
945
                                        ReadOnly:  false,
1✔
946
                                },
1✔
947
                        },
1✔
948
                })
1✔
949
        }
1✔
950
        return volumes
1✔
951
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc