• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5758

11 Jan 2026 06:39PM UTC coverage: 49.444% (+0.06%) from 49.388%
#5758

Pull #4008

travis-ci

akalenyu
Expose advised restore size on cron snapshots

Some provisioners (trident nfs) have a weird restoresize
which may direct users towards using a wrong disk restore size.

Expose our own advised size in the annotations.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
Pull Request #4008: Expose advised restore size on cron snapshots

25 of 27 new or added lines in 1 file covered. (92.59%)

2 existing lines in 1 file now uncovered.

14631 of 29591 relevant lines covered (49.44%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.17
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Below k8s 1.29 there's no way to know the source volume mode
427
                // Let's at least expose this info on our own snapshots
428
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
429
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
430
                        if err != nil {
1✔
431
                                return res, err
×
432
                        }
×
433
                        if volMode != nil {
2✔
434
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
435
                        }
1✔
436
                }
437
                if _, ok := snapshot.Annotations[cc.AnnAdvisedRestoreSize]; !ok {
2✔
438
                        size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, snapshot, nil)
1✔
439
                        if size != nil {
2✔
440
                                cc.AddAnnotation(snapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
441
                        }
1✔
442
                }
443
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
444
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
445
                if err != nil {
2✔
446
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
447
                                return res, err
×
448
                        }
×
449
                } else {
1✔
450
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
451
                }
1✔
452
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
453
                        return res, err
×
454
                }
×
455
                importSucceeded = true
1✔
456
        default:
1✔
457
                if len(imports) > 0 {
2✔
458
                        imports = imports[1:]
1✔
459
                        dataImportCron.Status.CurrentImports = imports
1✔
460
                }
1✔
461
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
462
        }
463

464
        if importSucceeded {
2✔
465
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
466
                        return res, err
×
467
                }
×
468
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
469
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
470
                        return res, err
×
471
                }
×
472
        }
473

474
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
475
                return res, err
×
476
        }
×
477

478
        // Skip if schedule is disabled
479
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
480
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
481
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
482
                if err != nil {
2✔
483
                        return pollRes, err
1✔
484
                }
1✔
485
                res = pollRes
1✔
486
        }
487

488
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
489
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
490
        if digestUpdated {
2✔
491
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
492
                if dv != nil {
1✔
493
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
494
                                return res, err
×
495
                        }
×
496
                }
497
                if importSucceeded || len(imports) == 0 {
2✔
498
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
499
                                return res, err
1✔
500
                        }
1✔
501
                }
502
        } else if importSucceeded {
2✔
503
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
504
                        return res, err
×
505
                }
×
506
        } else if len(imports) > 0 {
2✔
507
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
508
        } else {
2✔
509
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
510
        }
1✔
511

512
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
513
                return res, err
×
514
        }
×
515

516
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
517
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
518
                        return res, err
×
519
                }
×
520
        }
521
        return res, nil
1✔
522
}
523

524
// Returns the current import DV if exists, and the last imported PVC
525
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
526
        imports := cron.Status.CurrentImports
1✔
527
        if len(imports) == 0 {
2✔
528
                return nil, nil, nil
1✔
529
        }
1✔
530

531
        dvName := imports[0].DataVolumeName
1✔
532
        dv := &cdiv1.DataVolume{}
1✔
533
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
534
                if !k8serrors.IsNotFound(err) {
1✔
535
                        return nil, nil, err
×
536
                }
×
537
                dv = nil
1✔
538
        }
539

540
        pvc := &corev1.PersistentVolumeClaim{}
1✔
541
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
542
                if !k8serrors.IsNotFound(err) {
1✔
543
                        return nil, nil, err
×
544
                }
×
545
                pvc = nil
1✔
546
        }
547
        return dv, pvc, nil
1✔
548
}
549

550
// Returns the current import DV if exists, and the last imported PVC
551
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
552
        imports := cron.Status.CurrentImports
1✔
553
        if len(imports) == 0 {
2✔
554
                return nil, nil
1✔
555
        }
1✔
556

557
        snapName := imports[0].DataVolumeName
1✔
558
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
559
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
560
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
561
                        return nil, err
×
562
                }
×
563
                return nil, nil
1✔
564
        }
565

566
        return snapshot, nil
1✔
567
}
568

569
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
570
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
571
        dataSource := &cdiv1.DataSource{}
1✔
572
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
573
                return nil, err
1✔
574
        }
1✔
575
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
576
                log := r.log.WithName("getCronManagedDataSource")
×
577
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
578
                return nil, ErrNotManagedByCron
×
579
        }
×
580
        return dataSource, nil
1✔
581
}
582

583
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
584
        objCopy := obj.DeepCopyObject()
1✔
585
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
586
        r.setDataImportCronResourceLabels(cron, obj)
1✔
587
        if !reflect.DeepEqual(obj, objCopy) {
2✔
588
                if err := r.client.Update(ctx, obj); err != nil {
1✔
589
                        return err
×
590
                }
×
591
        }
592
        return nil
1✔
593
}
594

595
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
596
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
597
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
598
                if cond.Status == corev1.ConditionFalse &&
×
599
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
600
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
601
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
602
                        dv.Labels[common.DataImportCronLabel] = ""
×
603
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
604
                                return err
×
605
                        }
×
606
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
607
                                return err
×
608
                        }
×
609
                        cron.Status.CurrentImports = nil
×
610
                }
611
        }
612
        return nil
×
613
}
614

615
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
616
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
617
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
618
        if err != nil {
1✔
619
                return err
×
620
        }
×
621
        if regSource.ImageStream == nil {
1✔
622
                return nil
×
623
        }
×
624
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
625
        if err != nil {
2✔
626
                return err
1✔
627
        }
1✔
628
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
629
        if err != nil {
2✔
630
                return err
1✔
631
        }
1✔
632
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
633
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
634
                log.Info("Updating DataImportCron", "digest", digest)
1✔
635
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
636
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
637
        }
1✔
638
        return nil
1✔
639
}
640

641
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
642
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
643
        podName := getPollerPodName(cron)
1✔
644
        ns := cron.Namespace
1✔
645
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
646
        pod := &corev1.Pod{}
1✔
647

1✔
648
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
649
                digest, err := fetchContainerImageDigest(pod)
1✔
650
                if err != nil || digest == "" {
1✔
651
                        return false, err
×
652
                }
×
653
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
654
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
655
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
656
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
657
                }
1✔
658
                return true, r.client.Delete(ctx, pod)
1✔
659
        } else if cc.IgnoreNotFound(err) != nil {
1✔
660
                return false, err
×
661
        }
×
662

663
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
664
        if err != nil {
1✔
665
                return false, err
×
666
        }
×
667
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
668
        if platform != nil && platform.Architecture != "" {
1✔
669
                if workloadNodePlacement.NodeSelector == nil {
×
670
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
671
                }
×
672
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
673
        }
674

675
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
676

1✔
677
        pod = &corev1.Pod{
1✔
678
                ObjectMeta: metav1.ObjectMeta{
1✔
679
                        Name:      podName,
1✔
680
                        Namespace: ns,
1✔
681
                        OwnerReferences: []metav1.OwnerReference{
1✔
682
                                {
1✔
683
                                        APIVersion:         cron.APIVersion,
1✔
684
                                        Kind:               cron.Kind,
1✔
685
                                        Name:               cron.Name,
1✔
686
                                        UID:                cron.UID,
1✔
687
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
688
                                        Controller:         ptr.To[bool](true),
1✔
689
                                },
1✔
690
                        },
1✔
691
                },
1✔
692
                Spec: corev1.PodSpec{
1✔
693
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
694
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
695
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
696
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
697
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
698
                        Volumes: []corev1.Volume{
1✔
699
                                {
1✔
700
                                        Name: "shared-volume",
1✔
701
                                        VolumeSource: corev1.VolumeSource{
1✔
702
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
703
                                        },
1✔
704
                                },
1✔
705
                        },
1✔
706
                        InitContainers: []corev1.Container{
1✔
707
                                {
1✔
708
                                        Name:                     "init",
1✔
709
                                        Image:                    r.image,
1✔
710
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
711
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
712
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
713
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
714
                                },
1✔
715
                        },
1✔
716
                        Containers: []corev1.Container{
1✔
717
                                {
1✔
718
                                        Name:                     "image-container",
1✔
719
                                        Image:                    containerImage,
1✔
720
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
721
                                        Command:                  []string{"/shared/server", "-h"},
1✔
722
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
723
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
724
                                },
1✔
725
                        },
1✔
726
                },
1✔
727
        }
1✔
728

1✔
729
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
730
        if pod.Spec.SecurityContext != nil {
2✔
731
                pod.Spec.SecurityContext.FSGroup = nil
1✔
732
        }
1✔
733

734
        return false, r.client.Create(ctx, pod)
1✔
735
}
736

737
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
738
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
739
                return "", nil
×
740
        }
×
741

742
        status := pod.Status.ContainerStatuses[0]
1✔
743
        if status.State.Waiting != nil {
1✔
744
                reason := status.State.Waiting.Reason
×
745
                switch reason {
×
746
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
747
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
748
                }
749
                return "", nil
×
750
        }
751

752
        if status.State.Terminated == nil {
1✔
753
                return "", nil
×
754
        }
×
755

756
        imageID := status.ImageID
1✔
757
        if imageID == "" {
1✔
758
                return "", errors.Errorf("Container has no imageID")
×
759
        }
×
760
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
761
        if idx < 0 {
1✔
762
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
763
        }
×
764

765
        return imageID[idx:], nil
1✔
766
}
767

768
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
769
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
770
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
771
        if err != nil {
1✔
772
                return err
×
773
        }
×
774
        ns := pvcSource.Namespace
1✔
775
        if ns == "" {
2✔
776
                ns = dataImportCron.Namespace
1✔
777
        }
1✔
778
        pvc := &corev1.PersistentVolumeClaim{}
1✔
779
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
780
                return err
1✔
781
        }
1✔
782
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
783
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
784
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
785
                log.Info("Updating DataImportCron", "digest", digest)
1✔
786
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
787
        }
1✔
788
        return nil
1✔
789
}
790

791
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
792
        log := r.log.WithName("updateDataSource")
1✔
793
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
794
        if err != nil {
2✔
795
                if k8serrors.IsNotFound(err) {
2✔
796
                        dataSource = r.newDataSource(dataImportCron)
1✔
797
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
798
                                return err
×
799
                        }
×
800
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
801
                } else if errors.Is(err, ErrNotManagedByCron) {
×
802
                        return nil
×
803
                } else {
×
804
                        return err
×
805
                }
×
806
        }
807
        dataSourceCopy := dataSource.DeepCopy()
1✔
808
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
809

1✔
810
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
811
        populateDataSource(format, dataSource, sourcePVC)
1✔
812

1✔
813
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
814
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
815
                        return err
×
816
                }
×
817
        }
818

819
        return nil
1✔
820
}
821

822
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
823
        if sourcePVC == nil {
2✔
824
                return
1✔
825
        }
1✔
826

827
        switch format {
1✔
828
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
829
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
830
                        PVC: sourcePVC,
1✔
831
                }
1✔
832
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
833
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
834
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
835
                                Namespace: sourcePVC.Namespace,
1✔
836
                                Name:      sourcePVC.Name,
1✔
837
                        },
1✔
838
                }
1✔
839
        }
840
}
841

842
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
843
        if dataImportCron.Status.CurrentImports == nil {
1✔
844
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
845
        }
×
846
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
847
                Namespace: dataImportCron.Namespace,
1✔
848
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
849
        }
1✔
850
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
851
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
852
                now := metav1.Now()
1✔
853
                dataImportCron.Status.LastImportTimestamp = &now
1✔
854
        }
1✔
855
        return nil
1✔
856
}
857

858
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
859
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
860
        if lastTimeStr == "" {
2✔
861
                return nil
1✔
862
        }
1✔
863
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
864
        if err != nil {
1✔
865
                return err
×
866
        }
×
867
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
868
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
869
        }
1✔
870
        return nil
1✔
871
}
872

873
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
874
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
875
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
876
        if digest == "" {
1✔
877
                return nil
×
878
        }
×
879
        dvName, err := createDvName(dataSourceName, digest)
1✔
880
        if err != nil {
2✔
881
                return err
1✔
882
        }
1✔
883

884
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
885
        for _, src := range sources {
2✔
886
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
887
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
888
                                return err
×
889
                        }
×
890
                } else {
1✔
891
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
892
                                return err
×
893
                        }
×
894
                        // If source exists don't create DV
895
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
896
                        return nil
1✔
897
                }
898
        }
899

900
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
901
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
902
                return err
×
903
        } else if !allowed {
2✔
904
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
905
                        "Not authorized to create DataVolume", notAuthorized)
1✔
906
                return nil
1✔
907
        }
1✔
908
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
909
                return err
×
910
        }
×
911
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
912

1✔
913
        return nil
1✔
914
}
915

916
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
917
        if !isPvcSource(dataImportCron) {
2✔
918
                return true, nil
1✔
919
        }
1✔
920
        saName := "default"
1✔
921
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
922
                saName = *dataImportCron.Spec.ServiceAccountName
×
923
        }
×
924
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
925
                return false, err
×
926
        } else if !resp.Allowed {
2✔
927
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
928
                return false, nil
1✔
929
        }
1✔
930

931
        return true, nil
1✔
932
}
933

934
type authProxy struct {
935
        client client.Client
936
}
937

938
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
939
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
940
                return nil, err
×
941
        }
×
942
        return sar, nil
1✔
943
}
944

945
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
946
        ns := &corev1.Namespace{}
1✔
947
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
948
                return nil, err
×
949
        }
×
950
        return ns, nil
1✔
951
}
952

953
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
954
        das := &cdiv1.DataSource{}
×
955
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
956
                return nil, err
×
957
        }
×
958
        return das, nil
×
959
}
960

961
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
962
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
963
        if !ok {
1✔
964
                // nothing to delete
×
965
                return nil
×
966
        }
×
967
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
968
        if err != nil {
1✔
969
                return err
×
970
        }
×
971
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
972
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
973
        for _, src := range sources {
2✔
974
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
975
                        return err
×
976
                }
×
977
        }
978
        for _, src := range sources {
2✔
979
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
980
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
981
                }
×
982
        }
983
        // Only update desired storage class once garbage collection went through
984
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
985
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
986
        if err != nil {
1✔
987
                return err
×
988
        }
×
989

990
        return nil
1✔
991
}
992

993
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
994
        switch format {
1✔
995
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
996
                return nil
1✔
997
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
998
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
999
        default:
×
1000
                return fmt.Errorf("unknown source format for snapshot")
×
1001
        }
1002
}
1003

1004
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1005
        if pvc == nil {
1✔
1006
                return nil
×
1007
        }
×
1008
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1009
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1010
                return nil
1✔
1011
        }
1✔
1012
        storageProfile := &cdiv1.StorageProfile{}
1✔
1013
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1014
                return err
×
1015
        }
×
1016
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1017
        if err != nil {
1✔
1018
                return err
×
1019
        }
×
1020
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1021
                ObjectMeta: metav1.ObjectMeta{
1✔
1022
                        Name:      pvc.Name,
1✔
1023
                        Namespace: dataImportCron.Namespace,
1✔
1024
                        Labels: map[string]string{
1✔
1025
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1026
                                common.CDIComponentLabel: "",
1✔
1027
                        },
1✔
1028
                },
1✔
1029
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1030
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1031
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1032
                        },
1✔
1033
                        VolumeSnapshotClassName: &className,
1✔
1034
                },
1✔
1035
        }
1✔
1036
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1037
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1038

1✔
1039
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1040
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1041
                if !k8serrors.IsNotFound(err) {
1✔
1042
                        return err
×
1043
                }
×
1044
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1045
                pvcSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
1046
                size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, desiredSnapshot, &pvcSize)
1✔
1047
                if size != nil {
2✔
1048
                        cc.AddAnnotation(desiredSnapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
1049
                }
1✔
1050
                if pvc.Spec.VolumeMode != nil {
2✔
1051
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1052
                }
1✔
1053
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1054
                        return err
×
1055
                }
×
1056
        } else {
1✔
1057
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1058
                        // Clean up DV/PVC as they are not needed anymore
1✔
1059
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1060
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1061
                                return err
×
1062
                        }
×
1063
                }
1064
        }
1065

1066
        return nil
1✔
1067
}
1068

1069
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1070
        dataImportCron.Status.SourceFormat = &format
1✔
1071

1✔
1072
        switch format {
1✔
1073
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1074
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1075
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1076
                if snapshot == nil {
2✔
1077
                        // Snapshot create/update will trigger reconcile
1✔
1078
                        return nil
1✔
1079
                }
1✔
1080
                if cc.IsSnapshotReady(snapshot) {
2✔
1081
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1082
                } else {
2✔
1083
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1084
                }
1✔
1085
        default:
×
1086
                return fmt.Errorf("unknown source format for snapshot")
×
1087
        }
1088

1089
        return nil
1✔
1090
}
1091

1092
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1093
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1094
        if desiredStorageClass == nil {
2✔
1095
                return format, nil
1✔
1096
        }
1✔
1097

1098
        storageProfile := &cdiv1.StorageProfile{}
1✔
1099
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1100
                return format, err
×
1101
        }
×
1102
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1103
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1104
        }
1✔
1105

1106
        return format, nil
1✔
1107
}
1108

1109
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1110
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1111
                return nil
×
1112
        }
×
1113
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1114
        if err != nil {
1✔
1115
                return err
×
1116
        }
×
1117

1118
        maxImports := defaultImportsToKeepPerCron
1✔
1119

1✔
1120
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1121
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1122
        }
1✔
1123

1124
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1125
                return err
×
1126
        }
×
1127
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1128
                return err
×
1129
        }
×
1130

1131
        return nil
1✔
1132
}
1133

1134
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1135
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1136

1✔
1137
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1138
                return err
×
1139
        }
×
1140
        if len(pvcList.Items) > maxImports {
2✔
1141
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1142
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1143
                })
1✔
1144
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1145
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1146
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1147
                                return err
×
1148
                        }
×
1149
                }
1150
        }
1151

1152
        dvList := &cdiv1.DataVolumeList{}
1✔
1153
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1154
                return err
×
1155
        }
×
1156

1157
        if len(dvList.Items) > maxImports {
2✔
1158
                for _, dv := range dvList.Items {
2✔
1159
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1160
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1161
                                return err
×
1162
                        }
×
1163

1164
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1165
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1166
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1167
                                        return err
×
1168
                                }
×
1169
                        }
1170
                }
1171
        }
1172

1173
        return nil
1✔
1174
}
1175

1176
// deleteDvPvc deletes DV or PVC if DV was GCed
1177
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1178
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1179
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1180
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1181
                return err
1✔
1182
        }
1✔
1183
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1184
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1185
                return err
×
1186
        }
×
1187
        return nil
1✔
1188
}
1189

1190
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1191
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1192

1✔
1193
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1194
                if meta.IsNoMatchError(err) {
×
1195
                        return nil
×
1196
                }
×
1197
                return err
×
1198
        }
1199
        if len(snapList.Items) > maxImports {
1✔
1200
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1201
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1202
                })
×
1203
                for _, snap := range snapList.Items[maxImports:] {
×
1204
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1205
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1206
                                return err
×
1207
                        }
×
1208
                }
1209
        }
1210

1211
        return nil
1✔
1212
}
1213

1214
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1215
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1216
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1217

1✔
1218
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1219
                return err
×
1220
        }
×
1221
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1222
        if err != nil {
1✔
1223
                return err
×
1224
        }
×
1225
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1226
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1227
                return err
×
1228
        }
×
1229
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1230
                return err
×
1231
        }
×
1232
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1233
                return err
×
1234
        }
×
1235
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1236
                return err
×
1237
        }
×
1238
        return nil
1✔
1239
}
1240

1241
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1242
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1243
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1244
        if err != nil {
1✔
1245
                return err
×
1246
        }
×
1247
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1248
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1249
                return err
×
1250
        }
×
1251
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1252
                return err
×
1253
        }
×
1254

1255
        return nil
1✔
1256
}
1257

1258
// NewDataImportCronController creates a new instance of the DataImportCron controller
1259
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1260
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1261
                Scheme: mgr.GetScheme(),
×
1262
                Mapper: mgr.GetRESTMapper(),
×
1263
        })
×
1264
        if err != nil {
×
1265
                return nil, err
×
1266
        }
×
1267
        reconciler := &DataImportCronReconciler{
×
1268
                client:          mgr.GetClient(),
×
1269
                uncachedClient:  uncachedClient,
×
1270
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1271
                scheme:          mgr.GetScheme(),
×
1272
                log:             log.WithName(dataImportControllerName),
×
1273
                image:           importerImage,
×
1274
                pullPolicy:      pullPolicy,
×
1275
                cdiNamespace:    util.GetNamespace(),
×
1276
                installerLabels: installerLabels,
×
1277
        }
×
1278
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1279
                MaxConcurrentReconciles: 3,
×
1280
                Reconciler:              reconciler,
×
1281
        })
×
1282
        if err != nil {
×
1283
                return nil, err
×
1284
        }
×
1285
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1286
                return nil, err
×
1287
        }
×
1288
        log.Info("Initialized DataImportCron controller")
×
1289
        return dataImportCronController, nil
×
1290
}
1291

1292
func getCronName(obj client.Object) string {
×
1293
        return obj.GetLabels()[common.DataImportCronLabel]
×
1294
}
×
1295

1296
func getCronNs(obj client.Object) string {
×
1297
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1298
}
×
1299

1300
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1301
        if cronName := getCronName(obj); cronName != "" {
×
1302
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1303
        }
×
1304
        return nil
×
1305
}
1306

1307
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1308
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1309
                return err
×
1310
        }
×
1311

1312
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1313
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1314
                // Otherwise we risk losing the storage profile event
×
1315
                var crons cdiv1.DataImportCronList
×
1316
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1317
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1318
                        return nil
×
1319
                }
×
1320
                // Storage profiles are 1:1 to storage classes
1321
                scName := obj.GetName()
×
1322
                var reqs []reconcile.Request
×
1323
                for _, cron := range crons.Items {
×
1324
                        dataVolume := cron.Spec.Template
×
1325
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1326
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1327
                        if err != nil || templateSc == nil {
×
1328
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1329
                                return reqs
×
1330
                        }
×
1331
                        if templateSc.Name == scName {
×
1332
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1333
                        }
×
1334
                }
1335
                return reqs
×
1336
        }
1337

1338
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1339
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1340
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1341
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1342
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1343
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1344
                },
1345
        )); err != nil {
×
1346
                return err
×
1347
        }
×
1348

1349
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1350
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1351
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1352
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1353
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1354
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1355
                },
1356
        )); err != nil {
×
1357
                return err
×
1358
        }
×
1359

1360
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1361
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1362
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1363
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1364
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1365
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1366
                },
1367
        )); err != nil {
×
1368
                return err
×
1369
        }
×
1370

1371
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1372
                return err
×
1373
        }
×
1374

1375
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1376
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1377
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1378
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1379
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1380
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1381
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1382
                        },
×
1383
                },
1384
        )); err != nil {
×
1385
                return err
×
1386
        }
×
1387

1388
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1389
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1390
        }
×
1391

1392
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1393
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1394
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1395
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1396
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1397
                        },
×
1398
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1399
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1400
                },
1401
        )); err != nil {
×
1402
                return err
×
1403
        }
×
1404

1405
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1406
                if meta.IsNoMatchError(err) {
×
1407
                        // Back out if there's no point to attempt watch
×
1408
                        return nil
×
1409
                }
×
1410
                if !cc.IsErrCacheNotStarted(err) {
×
1411
                        return err
×
1412
                }
×
1413
        }
1414
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1415
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1416
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1417
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1418
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1419
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1420
                },
1421
        )); err != nil {
×
1422
                return err
×
1423
        }
×
1424

1425
        return nil
×
1426
}
1427

1428
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1429
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1430
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1431
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1432
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1433
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1434
                                log.Info("Update", "sc", obj.GetName(),
×
1435
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1436
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1437
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1438
                                if err != nil {
×
1439
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1440
                                }
×
1441
                                return reqs
×
1442
                        },
1443
                ),
1444
                predicate.TypedFuncs[*storagev1.StorageClass]{
1445
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1446
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1447
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1448
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1449
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1450
                        },
×
1451
                },
1452
        )); err != nil {
×
1453
                return err
×
1454
        }
×
1455

1456
        return nil
×
1457
}
1458

1459
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1460
        dicList := &cdiv1.DataImportCronList{}
×
1461
        if err := c.List(ctx, dicList); err != nil {
×
1462
                return nil, err
×
1463
        }
×
1464
        reqs := []reconcile.Request{}
×
1465
        for _, dic := range dicList.Items {
×
1466
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1467
                        continue
×
1468
                }
1469

1470
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1471
        }
1472

1473
        return reqs, nil
×
1474
}
1475

1476
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1477
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1478
                return false, nil
1✔
1479
        }
1✔
1480

1481
        sc := pvc.Spec.StorageClassName
1✔
1482
        if sc == nil || *sc == desiredStorageClass {
2✔
1483
                return false, nil
1✔
1484
        }
1✔
1485

1486
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1487
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1488
                return false, err
×
1489
        }
×
1490

1491
        return true, nil
1✔
1492
}
1493

1494
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1495
        cronJob := &batchv1.CronJob{}
1✔
1496
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1497
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1498
                return false, cc.IgnoreNotFound(err)
1✔
1499
        }
1✔
1500

1501
        cronJobCopy := cronJob.DeepCopy()
1✔
1502
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1503
                return false, err
×
1504
        }
×
1505

1506
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1507
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1508
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1509
                        return false, cc.IgnoreNotFound(err)
×
1510
                }
×
1511
        }
1512
        return true, nil
1✔
1513
}
1514

1515
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1516
        cronJob := &batchv1.CronJob{
1✔
1517
                ObjectMeta: metav1.ObjectMeta{
1✔
1518
                        Name:      GetCronJobName(cron),
1✔
1519
                        Namespace: r.cdiNamespace,
1✔
1520
                },
1✔
1521
        }
1✔
1522
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1523
                return nil, err
×
1524
        }
×
1525
        return cronJob, nil
1✔
1526
}
1527

1528
// InitPollerPod inits poller Pod
1529
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1530
        regSource, err := getCronRegistrySource(cron)
1✔
1531
        if err != nil {
1✔
1532
                return err
×
1533
        }
×
1534
        if regSource.URL == nil {
1✔
1535
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1536
        }
×
1537
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1538
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1539
                return err
×
1540
        }
×
1541
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1542
        if err != nil {
1✔
1543
                return err
×
1544
        }
×
1545
        container := corev1.Container{
1✔
1546
                Name:  "cdi-source-update-poller",
1✔
1547
                Image: image,
1✔
1548
                Command: []string{
1✔
1549
                        "/usr/bin/cdi-source-update-poller",
1✔
1550
                        "-ns", cron.Namespace,
1✔
1551
                        "-cron", cron.Name,
1✔
1552
                        "-url", *regSource.URL,
1✔
1553
                },
1✔
1554
                ImagePullPolicy:          pullPolicy,
1✔
1555
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1556
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1557
        }
1✔
1558

1✔
1559
        var volumes []corev1.Volume
1✔
1560
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1561
        if hasCertConfigMap {
1✔
1562
                vm := corev1.VolumeMount{
×
1563
                        Name:      CertVolName,
×
1564
                        MountPath: common.ImporterCertDir,
×
1565
                }
×
1566
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1567
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1568
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1569
        }
×
1570

1571
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1572
                vm := corev1.VolumeMount{
1✔
1573
                        Name:      ProxyCertVolName,
1✔
1574
                        MountPath: common.ImporterProxyCertDir,
1✔
1575
                }
1✔
1576
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1577
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1578
        }
1✔
1579

1580
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1581
                container.Env = append(container.Env,
×
1582
                        corev1.EnvVar{
×
1583
                                Name: common.ImporterAccessKeyID,
×
1584
                                ValueFrom: &corev1.EnvVarSource{
×
1585
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1586
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1587
                                                        Name: *regSource.SecretRef,
×
1588
                                                },
×
1589
                                                Key: common.KeyAccess,
×
1590
                                        },
×
1591
                                },
×
1592
                        },
×
1593
                        corev1.EnvVar{
×
1594
                                Name: common.ImporterSecretKey,
×
1595
                                ValueFrom: &corev1.EnvVarSource{
×
1596
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1597
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1598
                                                        Name: *regSource.SecretRef,
×
1599
                                                },
×
1600
                                                Key: common.KeySecret,
×
1601
                                        },
×
1602
                                },
×
1603
                        },
×
1604
                )
×
1605
        }
×
1606

1607
        addEnvVar := func(varName, value string) {
2✔
1608
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1609
        }
1✔
1610

1611
        if insecureTLS {
1✔
1612
                addEnvVar(common.InsecureTLSVar, "true")
×
1613
        }
×
1614

1615
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1616
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1617
                        addEnvVar(varName, value)
1✔
1618
                }
1✔
1619
        }
1620

1621
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1622
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1623
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1624

1✔
1625
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1626
        if err != nil {
1✔
1627
                return err
×
1628
        }
×
1629
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1630
        if err != nil {
1✔
1631
                return err
×
1632
        }
×
1633

1634
        podSpec := &pod.Spec
1✔
1635

1✔
1636
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1637
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1638
        podSpec.Containers = []corev1.Container{container}
1✔
1639
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1640
        podSpec.Volumes = volumes
1✔
1641
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1642
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1643
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1644
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1645

1✔
1646
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1647
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1648
        if podSpec.SecurityContext != nil {
2✔
1649
                podSpec.SecurityContext.FSGroup = nil
1✔
1650
        }
1✔
1651
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1652
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1653
        }
1✔
1654

1655
        if pod.Labels == nil {
2✔
1656
                pod.Labels = map[string]string{}
1✔
1657
        }
1✔
1658
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1659

1✔
1660
        return nil
1✔
1661
}
1662

1663
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1664
        cronJobSpec := &cronJob.Spec
1✔
1665
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1666
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1667
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1668
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1669

1✔
1670
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1671
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1672
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1673
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1674

1✔
1675
        pod := &jobSpec.Template
1✔
1676
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1677
                return err
×
1678
        }
×
1679
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1680
                return err
×
1681
        }
×
1682
        return nil
1✔
1683
}
1684

1685
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1686
        job := &batchv1.Job{
1✔
1687
                ObjectMeta: metav1.ObjectMeta{
1✔
1688
                        Name:      GetInitialJobName(cron),
1✔
1689
                        Namespace: cronJob.Namespace,
1✔
1690
                },
1✔
1691
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1692
        }
1✔
1693
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1694
                return nil, err
×
1695
        }
×
1696
        return job, nil
1✔
1697
}
1698

1699
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1700
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1701
                return err
×
1702
        }
×
1703
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1704
        labels := obj.GetLabels()
1✔
1705
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1706
        labels[common.DataImportCronLabel] = cron.Name
1✔
1707
        obj.SetLabels(labels)
1✔
1708
        return nil
1✔
1709
}
1710

1711
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1712
        dv := cron.Spec.Template.DeepCopy()
1✔
1713
        if isCronRegistrySource(cron) {
2✔
1714
                var digestedURL string
1✔
1715
                if isURLSource(cron) {
2✔
1716
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1717
                } else if isImageStreamSource(cron) {
3✔
1718
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1719
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1720
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1721
                }
1✔
1722
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1723
        }
1724
        dv.Name = dataVolumeName
1✔
1725
        dv.Namespace = cron.Namespace
1✔
1726
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1727
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1728
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1729
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1730

1✔
1731
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1732
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1733
        }
1✔
1734

1735
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1736

1✔
1737
        return dv
1✔
1738
}
1739

1740
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1741
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1742
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1743
        labels := obj.GetLabels()
1✔
1744
        labels[common.DataImportCronLabel] = cron.Name
1✔
1745
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1746
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1747
        }
1✔
1748
        obj.SetLabels(labels)
1✔
1749
}
1750

1751
func untagDigestedDockerURL(dockerURL string) string {
1✔
1752
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1753
                url := u.Host + u.Path
1✔
1754
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1755
                // Check for tag
1✔
1756
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1757
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1758
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1759
                        }
1✔
1760
                }
1761
        }
1762
        return dockerURL
1✔
1763
}
1764

1765
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1766
        if val := cron.Labels[ann]; val != "" {
2✔
1767
                cc.AddLabel(dv, ann, val)
1✔
1768
        }
1✔
1769
}
1770

1771
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1772
        if val := cron.Annotations[ann]; val != "" {
1✔
1773
                cc.AddAnnotation(dv, ann, val)
×
1774
        }
×
1775
}
1776

1777
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1778
        dataSource := &cdiv1.DataSource{
1✔
1779
                ObjectMeta: metav1.ObjectMeta{
1✔
1780
                        Name:      cron.Spec.ManagedDataSource,
1✔
1781
                        Namespace: cron.Namespace,
1✔
1782
                },
1✔
1783
        }
1✔
1784
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1785
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1786
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1787
        return dataSource
1✔
1788
}
1✔
1789

1790
// Create DataVolume name based on the DataSource name + prefix of the digest
1791
func createDvName(prefix, digest string) (string, error) {
1✔
1792
        digestPrefix := ""
1✔
1793
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1794
                digestPrefix = digestSha256Prefix
1✔
1795
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1796
                digestPrefix = digestUIDPrefix
1✔
1797
        } else {
2✔
1798
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1799
        }
1✔
1800
        fromIdx := len(digestPrefix)
1✔
1801
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1802
        if len(digest) < toIdx {
2✔
1803
                return "", errors.Errorf("Digest is too short")
1✔
1804
        }
1✔
1805
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1806
}
1807

1808
// GetCronJobName get CronJob name based on cron name and UID
1809
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1810
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1811
}
1✔
1812

1813
// GetInitialJobName get initial job name based on cron name and UID
1814
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1815
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1816
}
1✔
1817

1818
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1819
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1820
}
1✔
1821

1822
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1823
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1824
}
1✔
1825

1826
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1827
        dv := &cron.Spec.Template
1✔
1828

1✔
1829
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1830
                return explicitVolumeMode, nil
×
1831
        }
×
1832

1833
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1834
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1835
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1836
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1837
                        AccessModes:      accessModes,
1✔
1838
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1839
                        Resources: corev1.VolumeResourceRequirements{
1✔
1840
                                Requests: corev1.ResourceList{
1✔
1841
                                        // Doesn't matter
1✔
1842
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1843
                                },
1✔
1844
                        },
1✔
1845
                },
1✔
1846
        }
1✔
1847
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1848
                return nil, err
×
1849
        }
×
1850

1851
        return inferredPvc.Spec.VolumeMode, nil
1✔
1852
}
1853

1854
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1855
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1856
        if dv.Spec.PVC != nil {
1✔
1857
                return dv.Spec.PVC.VolumeMode
×
1858
        }
×
1859

1860
        if dv.Spec.Storage != nil {
2✔
1861
                return dv.Spec.Storage.VolumeMode
1✔
1862
        }
1✔
1863

1864
        return nil
×
1865
}
1866

1867
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1868
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1869
        if dv.Spec.PVC != nil {
1✔
1870
                return dv.Spec.PVC.AccessModes
×
1871
        }
×
1872

1873
        if dv.Spec.Storage != nil {
2✔
1874
                return dv.Spec.Storage.AccessModes
1✔
1875
        }
1✔
1876

1877
        return nil
×
1878
}
1879

1880
func inferAdvisedRestoreSizeForSnapshot(dv *cdiv1.DataVolume, snapshot *snapshotv1.VolumeSnapshot, fallback *resource.Quantity) *resource.Quantity {
1✔
1881
        var dvSize resource.Quantity
1✔
1882

1✔
1883
        if dv.Spec.PVC != nil {
1✔
NEW
1884
                dvSize = dv.Spec.PVC.Resources.Requests[corev1.ResourceStorage]
×
NEW
1885
        }
×
1886

1887
        if dv.Spec.Storage != nil {
2✔
1888
                dvSize = dv.Spec.Storage.Resources.Requests[corev1.ResourceStorage]
1✔
1889
        }
1✔
1890

1891
        if dvSize.IsZero() && fallback != nil {
2✔
1892
                return fallback
1✔
1893
        }
1✔
1894

1895
        if snapshot.Status != nil {
2✔
1896
                if rs := snapshot.Status.RestoreSize; rs != nil && dvSize.Cmp(*rs) < 0 {
2✔
1897
                        return snapshot.Status.RestoreSize
1✔
1898
                }
1✔
1899
        }
1900

1901
        return &dvSize
1✔
1902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc