• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5784

21 Jan 2026 04:20AM UTC coverage: 49.631% (+0.2%) from 49.439%
#5784

Pull #4010

travis-ci

halfcrazy
fix review

Signed-off-by: Yan Zhu <hackzhuyan@gmail.com>
Pull Request #4010: feat: Add checksum validation for HTTP/HTTPS DataVolume sources

154 of 212 new or added lines in 14 files covered. (72.64%)

520 existing lines in 2 files now uncovered.

14781 of 29782 relevant lines covered (49.63%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.17
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Below k8s 1.29 there's no way to know the source volume mode
427
                // Let's at least expose this info on our own snapshots
428
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
429
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
430
                        if err != nil {
1✔
431
                                return res, err
×
432
                        }
×
433
                        if volMode != nil {
2✔
434
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
435
                        }
1✔
436
                }
437
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
2✔
438
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
439
                if err != nil {
2✔
440
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
441
                                return res, err
1✔
442
                        }
443
                } else {
444
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
445
                }
2✔
446
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
447
                        return res, err
×
448
                }
×
449
                importSucceeded = true
1✔
450
        default:
1✔
451
                if len(imports) > 0 {
1✔
452
                        imports = imports[1:]
1✔
UNCOV
453
                        dataImportCron.Status.CurrentImports = imports
×
UNCOV
454
                }
×
455
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
456
        }
1✔
457

2✔
458
        if importSucceeded {
1✔
459
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
460
                        return res, err
1✔
461
                }
1✔
462
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
463
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
464
                        return res, err
2✔
465
                }
1✔
UNCOV
466
        }
×
UNCOV
467

×
468
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
469
                return res, err
1✔
470
        }
×
UNCOV
471

×
472
        // Skip if schedule is disabled
473
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
474
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
UNCOV
475
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
×
UNCOV
476
                if err != nil {
×
477
                        return pollRes, err
478
                }
479
                res = pollRes
2✔
480
        }
1✔
481

1✔
482
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
2✔
483
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
484
        if digestUpdated {
1✔
485
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
486
                if dv != nil {
487
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
488
                                return res, err
1✔
489
                        }
1✔
490
                }
2✔
491
                if importSucceeded || len(imports) == 0 {
1✔
492
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
1✔
UNCOV
493
                                return res, err
×
UNCOV
494
                        }
×
UNCOV
495
                }
×
496
        } else if importSucceeded {
497
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
2✔
498
                        return res, err
2✔
499
                }
1✔
500
        } else if len(imports) > 0 {
1✔
501
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
502
        } else {
2✔
503
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
UNCOV
504
        }
×
UNCOV
505

×
506
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
2✔
507
                return res, err
1✔
508
        }
2✔
509

1✔
510
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
1✔
511
                if err := r.client.Update(ctx, dataImportCron); err != nil {
512
                        return res, err
1✔
513
                }
×
UNCOV
514
        }
×
515
        return res, nil
516
}
2✔
517

1✔
UNCOV
518
// Returns the current import DV if exists, and the last imported PVC
×
UNCOV
519
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
×
520
        imports := cron.Status.CurrentImports
521
        if len(imports) == 0 {
1✔
522
                return nil, nil, nil
523
        }
524

525
        dvName := imports[0].DataVolumeName
1✔
526
        dv := &cdiv1.DataVolume{}
1✔
527
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
528
                if !k8serrors.IsNotFound(err) {
1✔
529
                        return nil, nil, err
1✔
530
                }
531
                dv = nil
1✔
532
        }
1✔
533

2✔
534
        pvc := &corev1.PersistentVolumeClaim{}
1✔
UNCOV
535
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
×
UNCOV
536
                if !k8serrors.IsNotFound(err) {
×
537
                        return nil, nil, err
1✔
538
                }
539
                pvc = nil
540
        }
1✔
541
        return dv, pvc, nil
2✔
542
}
1✔
UNCOV
543

×
UNCOV
544
// Returns the current import DV if exists, and the last imported PVC
×
545
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
546
        imports := cron.Status.CurrentImports
547
        if len(imports) == 0 {
1✔
548
                return nil, nil
549
        }
550

551
        snapName := imports[0].DataVolumeName
1✔
552
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
553
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
554
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
555
                        return nil, err
1✔
556
                }
557
                return nil, nil
1✔
558
        }
1✔
559

2✔
560
        return snapshot, nil
1✔
UNCOV
561
}
×
UNCOV
562

×
563
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
564
        dataSourceName := dataImportCron.Spec.ManagedDataSource
565
        dataSource := &cdiv1.DataSource{}
566
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
1✔
567
                return nil, err
568
        }
569
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
570
                log := r.log.WithName("getCronManagedDataSource")
1✔
571
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
1✔
572
                return nil, ErrNotManagedByCron
2✔
573
        }
1✔
574
        return dataSource, nil
1✔
575
}
1✔
UNCOV
576

×
UNCOV
577
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
×
UNCOV
578
        objCopy := obj.DeepCopyObject()
×
UNCOV
579
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
×
580
        r.setDataImportCronResourceLabels(cron, obj)
1✔
581
        if !reflect.DeepEqual(obj, objCopy) {
582
                if err := r.client.Update(ctx, obj); err != nil {
583
                        return err
1✔
584
                }
1✔
585
        }
1✔
586
        return nil
1✔
587
}
2✔
588

1✔
589
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
590
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
591
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
592
                if cond.Status == corev1.ConditionFalse &&
1✔
593
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
594
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
595
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
596
                        dv.Labels[common.DataImportCronLabel] = ""
×
597
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
598
                                return err
×
599
                        }
×
600
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
601
                                return err
×
602
                        }
×
603
                        cron.Status.CurrentImports = nil
×
UNCOV
604
                }
×
UNCOV
605
        }
×
606
        return nil
×
UNCOV
607
}
×
UNCOV
608

×
UNCOV
609
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
×
610
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
611
        regSource, err := getCronRegistrySource(dataImportCron)
UNCOV
612
        if err != nil {
×
613
                return err
614
        }
615
        if regSource.ImageStream == nil {
1✔
616
                return nil
1✔
617
        }
1✔
618
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
UNCOV
619
        if err != nil {
×
UNCOV
620
                return err
×
621
        }
1✔
UNCOV
622
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
×
UNCOV
623
        if err != nil {
×
624
                return err
1✔
625
        }
2✔
626
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
627
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
1✔
628
                log.Info("Updating DataImportCron", "digest", digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
2✔
630
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
631
        }
1✔
632
        return nil
1✔
633
}
2✔
634

1✔
635
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
636
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
637
        podName := getPollerPodName(cron)
1✔
638
        ns := cron.Namespace
1✔
639
        nn := types.NamespacedName{Name: podName, Namespace: ns}
640
        pod := &corev1.Pod{}
641

1✔
642
        if err := r.client.Get(ctx, nn, pod); err == nil {
1✔
643
                digest, err := fetchContainerImageDigest(pod)
1✔
644
                if err != nil || digest == "" {
1✔
645
                        return false, err
1✔
646
                }
1✔
647
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
648
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
649
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
650
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
UNCOV
651
                }
×
UNCOV
652
                return true, r.client.Delete(ctx, pod)
×
653
        } else if cc.IgnoreNotFound(err) != nil {
1✔
654
                return false, err
2✔
655
        }
1✔
656

1✔
657
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
658
        if err != nil {
1✔
659
                return false, err
1✔
660
        }
×
UNCOV
661
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
×
662
        if platform != nil && platform.Architecture != "" {
663
                if workloadNodePlacement.NodeSelector == nil {
1✔
664
                        workloadNodePlacement.NodeSelector = map[string]string{}
1✔
665
                }
×
666
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
667
        }
1✔
668

1✔
UNCOV
669
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
×
UNCOV
670

×
UNCOV
671
        pod = &corev1.Pod{
×
UNCOV
672
                ObjectMeta: metav1.ObjectMeta{
×
673
                        Name:      podName,
674
                        Namespace: ns,
675
                        OwnerReferences: []metav1.OwnerReference{
1✔
676
                                {
1✔
677
                                        APIVersion:         cron.APIVersion,
1✔
678
                                        Kind:               cron.Kind,
1✔
679
                                        Name:               cron.Name,
1✔
680
                                        UID:                cron.UID,
1✔
681
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
682
                                        Controller:         ptr.To[bool](true),
1✔
683
                                },
1✔
684
                        },
1✔
685
                },
1✔
686
                Spec: corev1.PodSpec{
1✔
687
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
688
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
689
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
690
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
691
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
692
                        Volumes: []corev1.Volume{
1✔
693
                                {
1✔
694
                                        Name: "shared-volume",
1✔
695
                                        VolumeSource: corev1.VolumeSource{
1✔
696
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
697
                                        },
1✔
698
                                },
1✔
699
                        },
1✔
700
                        InitContainers: []corev1.Container{
1✔
701
                                {
1✔
702
                                        Name:                     "init",
1✔
703
                                        Image:                    r.image,
1✔
704
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
705
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
706
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
707
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
708
                                },
1✔
709
                        },
1✔
710
                        Containers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "image-container",
1✔
713
                                        Image:                    containerImage,
1✔
714
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
715
                                        Command:                  []string{"/shared/server", "-h"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                },
1✔
721
        }
1✔
722

1✔
723
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
724
        if pod.Spec.SecurityContext != nil {
1✔
725
                pod.Spec.SecurityContext.FSGroup = nil
1✔
726
        }
1✔
727

1✔
728
        return false, r.client.Create(ctx, pod)
1✔
729
}
1✔
730

2✔
731
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
732
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
733
                return "", nil
734
        }
1✔
735

736
        status := pod.Status.ContainerStatuses[0]
737
        if status.State.Waiting != nil {
1✔
738
                reason := status.State.Waiting.Reason
1✔
739
                switch reason {
×
740
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
741
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
742
                }
1✔
743
                return "", nil
1✔
UNCOV
744
        }
×
UNCOV
745

×
UNCOV
746
        if status.State.Terminated == nil {
×
747
                return "", nil
×
748
        }
UNCOV
749

×
750
        imageID := status.ImageID
751
        if imageID == "" {
752
                return "", errors.Errorf("Container has no imageID")
1✔
753
        }
×
UNCOV
754
        idx := strings.Index(imageID, digestSha256Prefix)
×
755
        if idx < 0 {
756
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
1✔
757
        }
1✔
UNCOV
758

×
UNCOV
759
        return imageID[idx:], nil
×
760
}
1✔
761

1✔
UNCOV
762
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
×
UNCOV
763
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
×
764
        pvcSource, err := getCronPvcSource(dataImportCron)
765
        if err != nil {
1✔
766
                return err
767
        }
768
        ns := pvcSource.Namespace
1✔
769
        if ns == "" {
1✔
770
                ns = dataImportCron.Namespace
1✔
771
        }
1✔
UNCOV
772
        pvc := &corev1.PersistentVolumeClaim{}
×
UNCOV
773
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
×
774
                return err
1✔
775
        }
2✔
776
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
777
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
778
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
1✔
779
                log.Info("Updating DataImportCron", "digest", digest)
2✔
780
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
781
        }
1✔
782
        return nil
1✔
783
}
1✔
784

2✔
785
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
786
        log := r.log.WithName("updateDataSource")
1✔
787
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
788
        if err != nil {
1✔
789
                if k8serrors.IsNotFound(err) {
790
                        dataSource = r.newDataSource(dataImportCron)
791
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
792
                                return err
1✔
793
                        }
1✔
794
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
2✔
795
                } else if errors.Is(err, ErrNotManagedByCron) {
2✔
796
                        return nil
1✔
797
                } else {
1✔
798
                        return err
×
799
                }
×
800
        }
1✔
UNCOV
801
        dataSourceCopy := dataSource.DeepCopy()
×
UNCOV
802
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
×
UNCOV
803

×
UNCOV
804
        sourcePVC := dataImportCron.Status.LastImportedPVC
×
UNCOV
805
        populateDataSource(format, dataSource, sourcePVC)
×
806

807
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
1✔
808
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
809
                        return err
1✔
810
                }
1✔
811
        }
1✔
812

1✔
813
        return nil
2✔
814
}
1✔
UNCOV
815

×
UNCOV
816
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
×
817
        if sourcePVC == nil {
818
                return
819
        }
1✔
820

821
        switch format {
822
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
823
                dataSource.Spec.Source = cdiv1.DataSourceSource{
2✔
824
                        PVC: sourcePVC,
1✔
825
                }
1✔
826
        case cdiv1.DataImportCronSourceFormatSnapshot:
827
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
828
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
829
                                Namespace: sourcePVC.Namespace,
1✔
830
                                Name:      sourcePVC.Name,
1✔
831
                        },
1✔
832
                }
1✔
833
        }
1✔
834
}
1✔
835

1✔
836
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
837
        if dataImportCron.Status.CurrentImports == nil {
1✔
838
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
1✔
839
        }
840
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
841
                Namespace: dataImportCron.Namespace,
842
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
843
        }
1✔
UNCOV
844
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
×
UNCOV
845
                dataImportCron.Status.LastImportedPVC = sourcePVC
×
846
                now := metav1.Now()
1✔
847
                dataImportCron.Status.LastImportTimestamp = &now
1✔
848
        }
1✔
849
        return nil
1✔
850
}
2✔
851

1✔
852
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
853
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
854
        if lastTimeStr == "" {
1✔
855
                return nil
1✔
856
        }
857
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
858
        if err != nil {
1✔
859
                return err
1✔
860
        }
2✔
861
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
1✔
862
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
863
        }
1✔
864
        return nil
1✔
UNCOV
865
}
×
UNCOV
866

×
867
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
2✔
868
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
869
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
870
        if digest == "" {
1✔
871
                return nil
872
        }
873
        dvName, err := createDvName(dataSourceName, digest)
1✔
874
        if err != nil {
1✔
875
                return err
1✔
876
        }
1✔
UNCOV
877

×
UNCOV
878
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
×
879
        for _, src := range sources {
1✔
880
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
881
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
882
                                return err
1✔
883
                        }
884
                } else {
1✔
885
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
2✔
886
                                return err
2✔
887
                        }
1✔
UNCOV
888
                        // If source exists don't create DV
×
UNCOV
889
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
×
890
                        return nil
1✔
891
                }
1✔
UNCOV
892
        }
×
UNCOV
893

×
894
        dv := r.newSourceDataVolume(dataImportCron, dvName)
895
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
896
                return err
1✔
897
        } else if !allowed {
898
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
899
                        "Not authorized to create DataVolume", notAuthorized)
900
                return nil
1✔
901
        }
1✔
UNCOV
902
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
×
903
                return err
2✔
904
        }
1✔
905
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
906

1✔
907
        return nil
1✔
908
}
1✔
UNCOV
909

×
UNCOV
910
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
×
911
        if !isPvcSource(dataImportCron) {
1✔
912
                return true, nil
1✔
913
        }
1✔
914
        saName := "default"
915
        if dataImportCron.Spec.ServiceAccountName != nil {
916
                saName = *dataImportCron.Spec.ServiceAccountName
1✔
917
        }
2✔
918
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
919
                return false, err
1✔
920
        } else if !resp.Allowed {
1✔
921
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
UNCOV
922
                return false, nil
×
UNCOV
923
        }
×
924

1✔
UNCOV
925
        return true, nil
×
926
}
2✔
927

1✔
928
type authProxy struct {
1✔
929
        client client.Client
1✔
930
}
931

1✔
932
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
933
        if err := p.client.Create(context.TODO(), sar); err != nil {
934
                return nil, err
935
        }
936
        return sar, nil
937
}
938

1✔
939
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
UNCOV
940
        ns := &corev1.Namespace{}
×
UNCOV
941
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
×
942
                return nil, err
1✔
943
        }
944
        return ns, nil
945
}
1✔
946

1✔
947
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
1✔
948
        das := &cdiv1.DataSource{}
×
949
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
950
                return nil, err
1✔
951
        }
952
        return das, nil
UNCOV
953
}
×
UNCOV
954

×
UNCOV
955
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
×
UNCOV
956
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
×
UNCOV
957
        if !ok {
×
958
                // nothing to delete
×
959
                return nil
960
        }
961
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
962
        if err != nil {
1✔
963
                return err
1✔
964
        }
×
UNCOV
965
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
×
UNCOV
966
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
×
967
        for _, src := range sources {
1✔
968
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
969
                        return err
×
970
                }
×
971
        }
1✔
972
        for _, src := range sources {
1✔
973
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
2✔
974
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
1✔
975
                }
×
UNCOV
976
        }
×
977
        // Only update desired storage class once garbage collection went through
978
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
2✔
979
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
UNCOV
980
        if err != nil {
×
981
                return err
×
982
        }
983

984
        return nil
1✔
985
}
1✔
986

1✔
UNCOV
987
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
×
UNCOV
988
        switch format {
×
989
        case cdiv1.DataImportCronSourceFormatPvc:
990
                return nil
1✔
991
        case cdiv1.DataImportCronSourceFormatSnapshot:
992
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
993
        default:
1✔
994
                return fmt.Errorf("unknown source format for snapshot")
1✔
995
        }
1✔
996
}
1✔
997

1✔
998
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
UNCOV
999
        if pvc == nil {
×
1000
                return nil
×
1001
        }
1002
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
1003
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1004
                return nil
1✔
1005
        }
1✔
UNCOV
1006
        storageProfile := &cdiv1.StorageProfile{}
×
UNCOV
1007
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
×
1008
                return err
2✔
1009
        }
1✔
1010
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1011
        if err != nil {
1✔
1012
                return err
1✔
1013
        }
1✔
UNCOV
1014
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
×
UNCOV
1015
                ObjectMeta: metav1.ObjectMeta{
×
1016
                        Name:      pvc.Name,
1✔
1017
                        Namespace: dataImportCron.Namespace,
1✔
UNCOV
1018
                        Labels: map[string]string{
×
UNCOV
1019
                                common.CDILabelKey:       common.CDILabelValue,
×
1020
                                common.CDIComponentLabel: "",
1✔
1021
                        },
1✔
1022
                },
1✔
1023
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1024
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1025
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1026
                        },
1✔
1027
                        VolumeSnapshotClassName: &className,
1✔
1028
                },
1✔
1029
        }
1✔
1030
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1031
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1032

1✔
1033
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1034
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
1✔
1035
                if !k8serrors.IsNotFound(err) {
1✔
1036
                        return err
1✔
1037
                }
1✔
1038
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1039
                if pvc.Spec.VolumeMode != nil {
1✔
1040
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
2✔
1041
                }
1✔
UNCOV
1042
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
×
1043
                        return err
×
1044
                }
1✔
1045
        } else {
1✔
1046
                if cc.IsSnapshotReady(currentSnapshot) {
1✔
1047
                        // Clean up DV/PVC as they are not needed anymore
2✔
1048
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1049
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1050
                                return err
2✔
1051
                        }
1✔
1052
                }
1✔
1053
        }
1✔
UNCOV
1054

×
UNCOV
1055
        return nil
×
1056
}
1✔
1057

2✔
1058
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1059
        dataImportCron.Status.SourceFormat = &format
1✔
1060

1✔
UNCOV
1061
        switch format {
×
UNCOV
1062
        case cdiv1.DataImportCronSourceFormatPvc:
×
1063
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1064
        case cdiv1.DataImportCronSourceFormatSnapshot:
1065
                if snapshot == nil {
1066
                        // Snapshot create/update will trigger reconcile
1✔
1067
                        return nil
1068
                }
1069
                if cc.IsSnapshotReady(snapshot) {
1✔
1070
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1071
                } else {
1✔
1072
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1073
                }
1✔
1074
        default:
1✔
1075
                return fmt.Errorf("unknown source format for snapshot")
1✔
1076
        }
2✔
1077

1✔
1078
        return nil
1✔
1079
}
1✔
1080

2✔
1081
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1082
        format := cdiv1.DataImportCronSourceFormatPvc
2✔
1083
        if desiredStorageClass == nil {
1✔
1084
                return format, nil
1✔
UNCOV
1085
        }
×
UNCOV
1086

×
1087
        storageProfile := &cdiv1.StorageProfile{}
1088
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1089
                return format, err
1✔
1090
        }
1091
        if storageProfile.Status.DataImportCronSourceFormat != nil {
1092
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1093
        }
1✔
1094

2✔
1095
        return format, nil
1✔
1096
}
1✔
1097

1098
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1099
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1100
                return nil
×
1101
        }
×
1102
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
2✔
1103
        if err != nil {
1✔
1104
                return err
1✔
1105
        }
1106

1✔
1107
        maxImports := defaultImportsToKeepPerCron
1108

1109
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
1✔
1110
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
UNCOV
1111
        }
×
UNCOV
1112

×
1113
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1114
                return err
1✔
1115
        }
×
UNCOV
1116
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
×
1117
                return err
1118
        }
1✔
1119

1✔
1120
        return nil
2✔
1121
}
1✔
1122

1✔
1123
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1124
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
UNCOV
1125

×
UNCOV
1126
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
×
1127
                return err
1✔
1128
        }
×
UNCOV
1129
        if len(pvcList.Items) > maxImports {
×
1130
                sort.Slice(pvcList.Items, func(i, j int) bool {
1131
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1132
                })
1133
                for _, pvc := range pvcList.Items[maxImports:] {
1134
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1135
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1136
                                return err
1✔
1137
                        }
1✔
UNCOV
1138
                }
×
UNCOV
1139
        }
×
1140

2✔
1141
        dvList := &cdiv1.DataVolumeList{}
2✔
1142
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1143
                return err
1✔
1144
        }
2✔
1145

1✔
1146
        if len(dvList.Items) > maxImports {
1✔
UNCOV
1147
                for _, dv := range dvList.Items {
×
UNCOV
1148
                        pvc := &corev1.PersistentVolumeClaim{}
×
1149
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1150
                                return err
1151
                        }
1152

1✔
1153
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
1✔
UNCOV
1154
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
×
UNCOV
1155
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
×
1156
                                        return err
1157
                                }
2✔
1158
                        }
2✔
1159
                }
1✔
1160
        }
1✔
UNCOV
1161

×
UNCOV
1162
        return nil
×
1163
}
1164

2✔
1165
// deleteDvPvc deletes DV or PVC if DV was GCed
1✔
1166
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
UNCOV
1167
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
×
UNCOV
1168
        dv := &cdiv1.DataVolume{ObjectMeta: om}
×
1169
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
1170
                return err
1171
        }
1172
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1173
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1174
                return err
1175
        }
1176
        return nil
1177
}
1✔
1178

1✔
1179
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1180
        snapList := &snapshotv1.VolumeSnapshotList{}
2✔
1181

1✔
1182
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1183
                if meta.IsNoMatchError(err) {
1✔
1184
                        return nil
1✔
1185
                }
×
1186
                return err
×
1187
        }
1✔
1188
        if len(snapList.Items) > maxImports {
1189
                sort.Slice(snapList.Items, func(i, j int) bool {
1190
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
1✔
1191
                })
1✔
1192
                for _, snap := range snapList.Items[maxImports:] {
1✔
1193
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
1✔
1194
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1195
                                return err
×
1196
                        }
×
UNCOV
1197
                }
×
1198
        }
1199

1✔
UNCOV
1200
        return nil
×
UNCOV
1201
}
×
UNCOV
1202

×
UNCOV
1203
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
×
UNCOV
1204
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
×
UNCOV
1205
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
×
UNCOV
1206

×
UNCOV
1207
        if err := r.deleteJobs(ctx, cron); err != nil {
×
1208
                return err
1209
        }
1210
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1211
        if err != nil {
1✔
1212
                return err
1213
        }
1214
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1215
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1216
                return err
1✔
1217
        }
1✔
1218
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1219
                return err
×
1220
        }
×
1221
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1222
                return err
1✔
1223
        }
×
UNCOV
1224
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
×
1225
                return err
1✔
1226
        }
1✔
UNCOV
1227
        return nil
×
UNCOV
1228
}
×
1229

1✔
UNCOV
1230
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
×
UNCOV
1231
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
×
1232
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
UNCOV
1233
        if err != nil {
×
1234
                return err
×
1235
        }
1✔
UNCOV
1236
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
×
UNCOV
1237
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
×
1238
                return err
1✔
1239
        }
1240
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1241
                return err
1✔
1242
        }
1✔
1243

1✔
1244
        return nil
1✔
UNCOV
1245
}
×
UNCOV
1246

×
1247
// NewDataImportCronController creates a new instance of the DataImportCron controller
1✔
1248
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
1✔
1249
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1250
                Scheme: mgr.GetScheme(),
×
1251
                Mapper: mgr.GetRESTMapper(),
1✔
1252
        })
×
1253
        if err != nil {
×
1254
                return nil, err
1255
        }
1✔
1256
        reconciler := &DataImportCronReconciler{
1257
                client:          mgr.GetClient(),
1258
                uncachedClient:  uncachedClient,
1259
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1260
                scheme:          mgr.GetScheme(),
×
1261
                log:             log.WithName(dataImportControllerName),
×
1262
                image:           importerImage,
×
1263
                pullPolicy:      pullPolicy,
×
1264
                cdiNamespace:    util.GetNamespace(),
×
1265
                installerLabels: installerLabels,
×
1266
        }
×
1267
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1268
                MaxConcurrentReconciles: 3,
×
1269
                Reconciler:              reconciler,
×
1270
        })
×
1271
        if err != nil {
×
1272
                return nil, err
×
1273
        }
×
1274
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1275
                return nil, err
×
1276
        }
×
1277
        log.Info("Initialized DataImportCron controller")
×
1278
        return dataImportCronController, nil
×
UNCOV
1279
}
×
UNCOV
1280

×
1281
func getCronName(obj client.Object) string {
×
1282
        return obj.GetLabels()[common.DataImportCronLabel]
×
1283
}
×
UNCOV
1284

×
1285
func getCronNs(obj client.Object) string {
×
1286
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1287
}
×
UNCOV
1288

×
1289
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1290
        if cronName := getCronName(obj); cronName != "" {
1291
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
1292
        }
×
1293
        return nil
×
UNCOV
1294
}
×
1295

1296
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1297
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1298
                return err
×
1299
        }
UNCOV
1300

×
1301
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1302
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1303
                // Otherwise we risk losing the storage profile event
×
1304
                var crons cdiv1.DataImportCronList
×
1305
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
1306
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
1307
                        return nil
×
1308
                }
×
UNCOV
1309
                // Storage profiles are 1:1 to storage classes
×
1310
                scName := obj.GetName()
×
1311
                var reqs []reconcile.Request
1312
                for _, cron := range crons.Items {
×
1313
                        dataVolume := cron.Spec.Template
×
1314
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1315
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1316
                        if err != nil || templateSc == nil {
×
1317
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1318
                                return reqs
×
1319
                        }
×
1320
                        if templateSc.Name == scName {
1321
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1322
                        }
×
UNCOV
1323
                }
×
1324
                return reqs
×
UNCOV
1325
        }
×
UNCOV
1326

×
1327
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1328
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1329
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1330
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1331
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1332
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
UNCOV
1333
                },
×
1334
        )); err != nil {
1335
                return err
×
1336
        }
1337

1338
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1339
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1340
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1341
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1342
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1343
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1344
                },
1345
        )); err != nil {
×
1346
                return err
×
1347
        }
×
1348

1349
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1350
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1351
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1352
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1353
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1354
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1355
                },
1356
        )); err != nil {
×
1357
                return err
×
1358
        }
×
1359

1360
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1361
                return err
×
1362
        }
×
UNCOV
1363

×
1364
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1365
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1366
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
1367
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1368
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1369
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1370
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
1371
                        },
×
UNCOV
1372
                },
×
1373
        )); err != nil {
×
1374
                return err
1375
        }
×
UNCOV
1376

×
1377
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1378
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1379
        }
×
UNCOV
1380

×
1381
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1382
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1383
                predicate.TypedFuncs[*batchv1.CronJob]{
1384
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1385
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1386
                        },
×
1387
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
1388
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
UNCOV
1389
                },
×
1390
        )); err != nil {
×
1391
                return err
1392
        }
×
UNCOV
1393

×
1394
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1395
                if meta.IsNoMatchError(err) {
×
1396
                        // Back out if there's no point to attempt watch
×
1397
                        return nil
×
1398
                }
×
1399
                if !cc.IsErrCacheNotStarted(err) {
×
1400
                        return err
1401
                }
×
UNCOV
1402
        }
×
1403
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1404
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
1405
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1406
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1407
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1408
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
UNCOV
1409
                },
×
1410
        )); err != nil {
×
1411
                return err
×
1412
        }
×
1413

1414
        return nil
×
UNCOV
1415
}
×
UNCOV
1416

×
UNCOV
1417
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
×
1418
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1419
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1420
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
1421
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1422
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1423
                                log.Info("Update", "sc", obj.GetName(),
×
1424
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
1425
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1426
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
1427
                                if err != nil {
1428
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
1429
                                }
×
1430
                                return reqs
×
UNCOV
1431
                        },
×
UNCOV
1432
                ),
×
UNCOV
1433
                predicate.TypedFuncs[*storagev1.StorageClass]{
×
1434
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1435
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1436
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1437
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1438
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1439
                        },
×
UNCOV
1440
                },
×
1441
        )); err != nil {
×
1442
                return err
1443
        }
1444

1445
        return nil
×
UNCOV
1446
}
×
UNCOV
1447

×
1448
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1449
        dicList := &cdiv1.DataImportCronList{}
×
1450
        if err := c.List(ctx, dicList); err != nil {
×
1451
                return nil, err
1452
        }
×
1453
        reqs := []reconcile.Request{}
×
1454
        for _, dic := range dicList.Items {
×
1455
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
1456
                        continue
×
1457
                }
1458

1459
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
UNCOV
1460
        }
×
UNCOV
1461

×
1462
        return reqs, nil
×
UNCOV
1463
}
×
UNCOV
1464

×
UNCOV
1465
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
×
UNCOV
1466
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
×
UNCOV
1467
                return false, nil
×
1468
        }
1469

UNCOV
1470
        sc := pvc.Spec.StorageClassName
×
1471
        if sc == nil || *sc == desiredStorageClass {
1472
                return false, nil
UNCOV
1473
        }
×
1474

1475
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1476
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1477
                return false, err
2✔
1478
        }
1✔
1479

1✔
1480
        return true, nil
1481
}
1✔
1482

2✔
1483
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1484
        cronJob := &batchv1.CronJob{}
1✔
1485
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1486
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
1✔
1487
                return false, cc.IgnoreNotFound(err)
1✔
UNCOV
1488
        }
×
UNCOV
1489

×
1490
        cronJobCopy := cronJob.DeepCopy()
1491
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1492
                return false, err
1493
        }
1494

1✔
1495
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
1✔
1496
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1497
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
2✔
1498
                        return false, cc.IgnoreNotFound(err)
1✔
1499
                }
1✔
1500
        }
1501
        return true, nil
1✔
1502
}
1✔
UNCOV
1503

×
UNCOV
1504
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
×
1505
        cronJob := &batchv1.CronJob{
1506
                ObjectMeta: metav1.ObjectMeta{
2✔
1507
                        Name:      GetCronJobName(cron),
1✔
1508
                        Namespace: r.cdiNamespace,
1✔
UNCOV
1509
                },
×
UNCOV
1510
        }
×
1511
        if err := r.initCronJob(cron, cronJob); err != nil {
1512
                return nil, err
1✔
1513
        }
1514
        return cronJob, nil
1515
}
1✔
1516

1✔
1517
// InitPollerPod inits poller Pod
1✔
1518
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1519
        regSource, err := getCronRegistrySource(cron)
1✔
1520
        if err != nil {
1✔
1521
                return err
1✔
1522
        }
1✔
UNCOV
1523
        if regSource.URL == nil {
×
1524
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1525
        }
1✔
1526
        cdiConfig := &cdiv1.CDIConfig{}
1527
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1528
                return err
1529
        }
1✔
1530
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1531
        if err != nil {
1✔
1532
                return err
×
1533
        }
×
1534
        container := corev1.Container{
1✔
UNCOV
1535
                Name:  "cdi-source-update-poller",
×
UNCOV
1536
                Image: image,
×
1537
                Command: []string{
1✔
1538
                        "/usr/bin/cdi-source-update-poller",
1✔
UNCOV
1539
                        "-ns", cron.Namespace,
×
UNCOV
1540
                        "-cron", cron.Name,
×
1541
                        "-url", *regSource.URL,
1✔
1542
                },
1✔
UNCOV
1543
                ImagePullPolicy:          pullPolicy,
×
UNCOV
1544
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
×
1545
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1546
        }
1✔
1547

1✔
1548
        var volumes []corev1.Volume
1✔
1549
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1550
        if hasCertConfigMap {
1✔
1551
                vm := corev1.VolumeMount{
1✔
1552
                        Name:      CertVolName,
1✔
1553
                        MountPath: common.ImporterCertDir,
1✔
1554
                }
1✔
1555
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1556
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
1✔
1557
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
1✔
1558
        }
1✔
1559

1✔
1560
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
1✔
1561
                vm := corev1.VolumeMount{
1✔
UNCOV
1562
                        Name:      ProxyCertVolName,
×
UNCOV
1563
                        MountPath: common.ImporterProxyCertDir,
×
UNCOV
1564
                }
×
UNCOV
1565
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
UNCOV
1566
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
×
UNCOV
1567
        }
×
UNCOV
1568

×
UNCOV
1569
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
×
1570
                container.Env = append(container.Env,
1571
                        corev1.EnvVar{
2✔
1572
                                Name: common.ImporterAccessKeyID,
1✔
1573
                                ValueFrom: &corev1.EnvVarSource{
1✔
1574
                                        SecretKeyRef: &corev1.SecretKeySelector{
1✔
1575
                                                LocalObjectReference: corev1.LocalObjectReference{
1✔
1576
                                                        Name: *regSource.SecretRef,
1✔
1577
                                                },
1✔
1578
                                                Key: common.KeyAccess,
1✔
1579
                                        },
1580
                                },
1✔
1581
                        },
×
1582
                        corev1.EnvVar{
×
1583
                                Name: common.ImporterSecretKey,
×
1584
                                ValueFrom: &corev1.EnvVarSource{
×
1585
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1586
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1587
                                                        Name: *regSource.SecretRef,
×
1588
                                                },
×
1589
                                                Key: common.KeySecret,
×
1590
                                        },
×
1591
                                },
×
1592
                        },
×
1593
                )
×
1594
        }
×
UNCOV
1595

×
UNCOV
1596
        addEnvVar := func(varName, value string) {
×
UNCOV
1597
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
×
UNCOV
1598
        }
×
UNCOV
1599

×
UNCOV
1600
        if insecureTLS {
×
1601
                addEnvVar(common.InsecureTLSVar, "true")
×
1602
        }
×
UNCOV
1603

×
UNCOV
1604
        addEnvVarFromImportProxyConfig := func(varName string) {
×
UNCOV
1605
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
×
1606
                        addEnvVar(varName, value)
1607
                }
2✔
1608
        }
1✔
1609

1✔
1610
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1611
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
UNCOV
1612
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
×
UNCOV
1613

×
1614
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1615
        if err != nil {
2✔
1616
                return err
2✔
1617
        }
1✔
1618
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1619
        if err != nil {
1620
                return err
1621
        }
1✔
1622

1✔
1623
        podSpec := &pod.Spec
1✔
1624

1✔
1625
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1626
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
UNCOV
1627
        podSpec.Containers = []corev1.Container{container}
×
UNCOV
1628
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
×
1629
        podSpec.Volumes = volumes
1✔
1630
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
UNCOV
1631
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
×
UNCOV
1632
        podSpec.Tolerations = workloadNodePlacement.Tolerations
×
1633
        podSpec.Affinity = workloadNodePlacement.Affinity
1634

1✔
1635
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1636
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1637
        if podSpec.SecurityContext != nil {
1✔
1638
                podSpec.SecurityContext.FSGroup = nil
1✔
1639
        }
1✔
1640
        if podSpec.Containers[0].SecurityContext != nil {
1✔
1641
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1642
        }
1✔
1643

1✔
1644
        if pod.Labels == nil {
1✔
1645
                pod.Labels = map[string]string{}
1✔
1646
        }
1✔
1647
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1648

2✔
1649
        return nil
1✔
1650
}
1✔
1651

2✔
1652
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1653
        cronJobSpec := &cronJob.Spec
1✔
1654
        cronJobSpec.Schedule = cron.Spec.Schedule
1655
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
2✔
1656
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1657
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1658

1✔
1659
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1660
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1661
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1662
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1663

1✔
1664
        pod := &jobSpec.Template
1✔
1665
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1666
                return err
1✔
1667
        }
1✔
1668
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1669
                return err
1✔
1670
        }
1✔
1671
        return nil
1✔
1672
}
1✔
1673

1✔
1674
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1675
        job := &batchv1.Job{
1✔
1676
                ObjectMeta: metav1.ObjectMeta{
1✔
UNCOV
1677
                        Name:      GetInitialJobName(cron),
×
UNCOV
1678
                        Namespace: cronJob.Namespace,
×
1679
                },
1✔
UNCOV
1680
                Spec: cronJob.Spec.JobTemplate.Spec,
×
UNCOV
1681
        }
×
1682
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1683
                return nil, err
1684
        }
1685
        return job, nil
1✔
1686
}
1✔
1687

1✔
1688
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1689
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1690
                return err
1✔
1691
        }
1✔
1692
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1693
        labels := obj.GetLabels()
1✔
UNCOV
1694
        labels[common.DataImportCronNsLabel] = cron.Namespace
×
UNCOV
1695
        labels[common.DataImportCronLabel] = cron.Name
×
1696
        obj.SetLabels(labels)
1✔
1697
        return nil
1698
}
1699

1✔
1700
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
UNCOV
1701
        dv := cron.Spec.Template.DeepCopy()
×
UNCOV
1702
        if isCronRegistrySource(cron) {
×
1703
                var digestedURL string
1✔
1704
                if isURLSource(cron) {
1✔
1705
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1706
                } else if isImageStreamSource(cron) {
1✔
1707
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1708
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1709
                        dv.Spec.Source.Registry.ImageStream = nil
1710
                }
1711
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1712
        }
1✔
1713
        dv.Name = dataVolumeName
2✔
1714
        dv.Namespace = cron.Namespace
1✔
1715
        r.setDataImportCronResourceLabels(cron, dv)
2✔
1716
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1717
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
3✔
1718
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1719

1✔
1720
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
1✔
1721
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1722
        }
1✔
1723

1724
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1725

1✔
1726
        return dv
1✔
1727
}
1✔
1728

1✔
1729
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1730
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1731
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
2✔
1732
        labels := obj.GetLabels()
1✔
1733
        labels[common.DataImportCronLabel] = cron.Name
1✔
1734
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
1735
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1736
        }
1✔
1737
        obj.SetLabels(labels)
1✔
1738
}
1739

1740
func untagDigestedDockerURL(dockerURL string) string {
1✔
1741
        if u, err := url.Parse(dockerURL); err == nil {
1✔
1742
                url := u.Host + u.Path
1✔
1743
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1744
                // Check for tag
1✔
1745
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1746
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
1✔
1747
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1748
                        }
1✔
1749
                }
1750
        }
1751
        return dockerURL
1✔
1752
}
2✔
1753

1✔
1754
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1755
        if val := cron.Labels[ann]; val != "" {
1✔
1756
                cc.AddLabel(dv, ann, val)
2✔
1757
        }
2✔
1758
}
1✔
1759

1✔
1760
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1761
        if val := cron.Annotations[ann]; val != "" {
1762
                cc.AddAnnotation(dv, ann, val)
1✔
1763
        }
1764
}
1765

1✔
1766
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
2✔
1767
        dataSource := &cdiv1.DataSource{
1✔
1768
                ObjectMeta: metav1.ObjectMeta{
1✔
1769
                        Name:      cron.Spec.ManagedDataSource,
1770
                        Namespace: cron.Namespace,
1771
                },
1✔
1772
        }
1✔
UNCOV
1773
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
×
UNCOV
1774
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
×
1775
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1776
        return dataSource
1777
}
1✔
1778

1✔
1779
// Create DataVolume name based on the DataSource name + prefix of the digest
1✔
1780
func createDvName(prefix, digest string) (string, error) {
1✔
1781
        digestPrefix := ""
1✔
1782
        if strings.HasPrefix(digest, digestSha256Prefix) {
1✔
1783
                digestPrefix = digestSha256Prefix
1✔
1784
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
1✔
1785
                digestPrefix = digestUIDPrefix
1✔
1786
        } else {
1✔
1787
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1788
        }
1✔
1789
        fromIdx := len(digestPrefix)
1790
        toIdx := fromIdx + digestDvNameSuffixLength
1791
        if len(digest) < toIdx {
1✔
1792
                return "", errors.Errorf("Digest is too short")
1✔
1793
        }
2✔
1794
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1795
}
3✔
1796

1✔
1797
// GetCronJobName get CronJob name based on cron name and UID
2✔
1798
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1799
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1800
}
1✔
1801

1✔
1802
// GetInitialJobName get initial job name based on cron name and UID
2✔
1803
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1804
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1805
}
1✔
1806

1807
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1808
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1809
}
1✔
1810

1✔
1811
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1812
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1813
}
1814

1✔
1815
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1816
        dv := &cron.Spec.Template
1✔
1817

1818
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1819
                return explicitVolumeMode, nil
1✔
1820
        }
1✔
1821

1822
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1823
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1824
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1825
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1826
                        AccessModes:      accessModes,
1✔
1827
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1828
                        Resources: corev1.VolumeResourceRequirements{
1✔
1829
                                Requests: corev1.ResourceList{
1✔
UNCOV
1830
                                        // Doesn't matter
×
UNCOV
1831
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
×
1832
                                },
1833
                        },
1✔
1834
                },
1✔
1835
        }
1✔
1836
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1837
                return nil, err
1✔
1838
        }
1✔
1839

1✔
1840
        return inferredPvc.Spec.VolumeMode, nil
1✔
1841
}
1✔
1842

1✔
1843
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1✔
1844
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1845
        if dv.Spec.PVC != nil {
1✔
1846
                return dv.Spec.PVC.VolumeMode
1✔
1847
        }
1✔
UNCOV
1848

×
UNCOV
1849
        if dv.Spec.Storage != nil {
×
1850
                return dv.Spec.Storage.VolumeMode
1851
        }
1✔
1852

1853
        return nil
1854
}
1855

1✔
1856
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1✔
UNCOV
1857
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
×
UNCOV
1858
        if dv.Spec.PVC != nil {
×
1859
                return dv.Spec.PVC.AccessModes
1860
        }
2✔
1861

1✔
1862
        if dv.Spec.Storage != nil {
1✔
1863
                return dv.Spec.Storage.AccessModes
UNCOV
1864
        }
×
1865

1866
        return nil
1867
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc