• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5783

20 Jan 2026 07:26PM UTC coverage: 49.456% (+0.02%) from 49.439%
#5783

Pull #3991

travis-ci

noamasu
Add provisioner-aware DataImportCron configuration via StorageProfile annotations

Add support for provisioner-specific requirements when creating snapshots
and PVCs for DataImportCron. Some provisioners have specific needs:

- GKE Persistent Disk requires snapshot-type: images parameter in VSC
- GKE Persistent Disk and Rook Ceph RBD require RWO access mode for DataImportCron PVCs

Change details:
- Add StorageProfile annotations for DataImportCron configuration:
    cdi.kubevirt.io/useReadWriteOnceForDataImportCron: Signals RWO access mode
    cdi.kubevirt.io/snapshotClassForDataImportCron: Specifies VSC name
- Centralize provisioner-specific configuration in storagecapabilities:
    UseReadWriteOnceForDataImportCronByProvisionerKey: Maps provisioners requiring RWO
    SnapshotClassParametersForDataImportCronByProvisionerKey: Maps provisioners to VSC parameters
- StorageProfile controller automatically reconciles annotations based on provisioner
- DataImportCron controller applies RWO from StorageProfile when DV doesn't specify access modes
- DataImportCron controller selects VSC with priority: StorageProfile annotation > StorageProfile status > standard selection
- Unit tests for both controllers
- Update documentation with annotation details

Signed-off-by: Noam Assouline <nassouli@redhat.com>
Pull Request #3991: Add provisioner-aware VolumeSnapshotClass selection and RWO access mode for DataImportCron

82 of 147 new or added lines in 3 files covered. (55.78%)

2 existing lines in 1 file now uncovered.

14689 of 29701 relevant lines covered (49.46%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.5
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Check if snapshot class changed
427
                if desiredStorageClass != nil {
2✔
428
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
429
                        if err != nil {
1✔
NEW
430
                                return res, err
×
NEW
431
                        }
×
432
                        if changed {
1✔
NEW
433
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
NEW
434
                        }
×
435
                }
436
                // Below k8s 1.29 there's no way to know the source volume mode
437
                // Let's at least expose this info on our own snapshots
438
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
439
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
440
                        if err != nil {
1✔
441
                                return res, err
×
442
                        }
×
443
                        if volMode != nil {
2✔
444
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
445
                        }
1✔
446
                }
447
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
448
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
449
                if err != nil {
2✔
450
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
451
                                return res, err
×
452
                        }
×
453
                } else {
1✔
454
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
455
                }
1✔
456
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
457
                        return res, err
×
458
                }
×
459
                importSucceeded = true
1✔
460
        default:
1✔
461
                if len(imports) > 0 {
2✔
462
                        imports = imports[1:]
1✔
463
                        dataImportCron.Status.CurrentImports = imports
1✔
464
                }
1✔
465
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
466
        }
467

468
        if importSucceeded {
2✔
469
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
470
                        return res, err
×
471
                }
×
472
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
473
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
474
                        return res, err
×
475
                }
×
476
        }
477

478
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
479
                return res, err
×
480
        }
×
481

482
        // Skip if schedule is disabled
483
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
484
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
485
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
486
                if err != nil {
2✔
487
                        return pollRes, err
1✔
488
                }
1✔
489
                res = pollRes
1✔
490
        }
491

492
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
493
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
494
        if digestUpdated {
2✔
495
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
496
                if dv != nil {
1✔
497
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
498
                                return res, err
×
499
                        }
×
500
                }
501
                if importSucceeded || len(imports) == 0 {
2✔
502
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
2✔
503
                                return res, err
1✔
504
                        }
1✔
505
                }
506
        } else if importSucceeded {
2✔
507
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
508
                        return res, err
×
509
                }
×
510
        } else if len(imports) > 0 {
2✔
511
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
512
        } else {
2✔
513
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
514
        }
1✔
515

516
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
517
                return res, err
×
518
        }
×
519

520
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
521
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
522
                        return res, err
×
523
                }
×
524
        }
525
        return res, nil
1✔
526
}
527

528
// Returns the current import DV if exists, and the last imported PVC
529
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
530
        imports := cron.Status.CurrentImports
1✔
531
        if len(imports) == 0 {
2✔
532
                return nil, nil, nil
1✔
533
        }
1✔
534

535
        dvName := imports[0].DataVolumeName
1✔
536
        dv := &cdiv1.DataVolume{}
1✔
537
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
538
                if !k8serrors.IsNotFound(err) {
1✔
539
                        return nil, nil, err
×
540
                }
×
541
                dv = nil
1✔
542
        }
543

544
        pvc := &corev1.PersistentVolumeClaim{}
1✔
545
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
546
                if !k8serrors.IsNotFound(err) {
1✔
547
                        return nil, nil, err
×
548
                }
×
549
                pvc = nil
1✔
550
        }
551
        return dv, pvc, nil
1✔
552
}
553

554
// Returns the current import DV if exists, and the last imported PVC
555
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
556
        imports := cron.Status.CurrentImports
1✔
557
        if len(imports) == 0 {
2✔
558
                return nil, nil
1✔
559
        }
1✔
560

561
        snapName := imports[0].DataVolumeName
1✔
562
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
563
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
564
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
565
                        return nil, err
×
566
                }
×
567
                return nil, nil
1✔
568
        }
569

570
        return snapshot, nil
1✔
571
}
572

573
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
574
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
575
        dataSource := &cdiv1.DataSource{}
1✔
576
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
577
                return nil, err
1✔
578
        }
1✔
579
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
580
                log := r.log.WithName("getCronManagedDataSource")
×
581
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
582
                return nil, ErrNotManagedByCron
×
583
        }
×
584
        return dataSource, nil
1✔
585
}
586

587
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
588
        objCopy := obj.DeepCopyObject()
1✔
589
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
590
        r.setDataImportCronResourceLabels(cron, obj)
1✔
591
        if !reflect.DeepEqual(obj, objCopy) {
2✔
592
                if err := r.client.Update(ctx, obj); err != nil {
1✔
593
                        return err
×
594
                }
×
595
        }
596
        return nil
1✔
597
}
598

599
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
600
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
601
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
602
                if cond.Status == corev1.ConditionFalse &&
×
603
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
604
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
605
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
606
                        dv.Labels[common.DataImportCronLabel] = ""
×
607
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
608
                                return err
×
609
                        }
×
610
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
611
                                return err
×
612
                        }
×
613
                        cron.Status.CurrentImports = nil
×
614
                }
615
        }
616
        return nil
×
617
}
618

619
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
620
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
621
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
622
        if err != nil {
1✔
623
                return err
×
624
        }
×
625
        if regSource.ImageStream == nil {
1✔
626
                return nil
×
627
        }
×
628
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
629
        if err != nil {
2✔
630
                return err
1✔
631
        }
1✔
632
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
633
        if err != nil {
2✔
634
                return err
1✔
635
        }
1✔
636
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
637
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
638
                log.Info("Updating DataImportCron", "digest", digest)
1✔
639
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
640
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
641
        }
1✔
642
        return nil
1✔
643
}
644

645
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
646
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
647
        podName := getPollerPodName(cron)
1✔
648
        ns := cron.Namespace
1✔
649
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
650
        pod := &corev1.Pod{}
1✔
651

1✔
652
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
653
                digest, err := fetchContainerImageDigest(pod)
1✔
654
                if err != nil || digest == "" {
1✔
655
                        return false, err
×
656
                }
×
657
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
658
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
659
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
660
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
661
                }
1✔
662
                return true, r.client.Delete(ctx, pod)
1✔
663
        } else if cc.IgnoreNotFound(err) != nil {
1✔
664
                return false, err
×
665
        }
×
666

667
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
668
        if err != nil {
1✔
669
                return false, err
×
670
        }
×
671
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
672
        if platform != nil && platform.Architecture != "" {
1✔
673
                if workloadNodePlacement.NodeSelector == nil {
×
674
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
675
                }
×
676
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
677
        }
678

679
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
680

1✔
681
        pod = &corev1.Pod{
1✔
682
                ObjectMeta: metav1.ObjectMeta{
1✔
683
                        Name:      podName,
1✔
684
                        Namespace: ns,
1✔
685
                        OwnerReferences: []metav1.OwnerReference{
1✔
686
                                {
1✔
687
                                        APIVersion:         cron.APIVersion,
1✔
688
                                        Kind:               cron.Kind,
1✔
689
                                        Name:               cron.Name,
1✔
690
                                        UID:                cron.UID,
1✔
691
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
692
                                        Controller:         ptr.To[bool](true),
1✔
693
                                },
1✔
694
                        },
1✔
695
                },
1✔
696
                Spec: corev1.PodSpec{
1✔
697
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
698
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
699
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
700
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
701
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
702
                        Volumes: []corev1.Volume{
1✔
703
                                {
1✔
704
                                        Name: "shared-volume",
1✔
705
                                        VolumeSource: corev1.VolumeSource{
1✔
706
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
707
                                        },
1✔
708
                                },
1✔
709
                        },
1✔
710
                        InitContainers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "init",
1✔
713
                                        Image:                    r.image,
1✔
714
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
715
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                        Containers: []corev1.Container{
1✔
721
                                {
1✔
722
                                        Name:                     "image-container",
1✔
723
                                        Image:                    containerImage,
1✔
724
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
725
                                        Command:                  []string{"/shared/server", "-h"},
1✔
726
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
727
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
728
                                },
1✔
729
                        },
1✔
730
                },
1✔
731
        }
1✔
732

1✔
733
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
734
        if pod.Spec.SecurityContext != nil {
2✔
735
                pod.Spec.SecurityContext.FSGroup = nil
1✔
736
        }
1✔
737

738
        return false, r.client.Create(ctx, pod)
1✔
739
}
740

741
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
742
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
743
                return "", nil
×
744
        }
×
745

746
        status := pod.Status.ContainerStatuses[0]
1✔
747
        if status.State.Waiting != nil {
1✔
748
                reason := status.State.Waiting.Reason
×
749
                switch reason {
×
750
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
751
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
752
                }
753
                return "", nil
×
754
        }
755

756
        if status.State.Terminated == nil {
1✔
757
                return "", nil
×
758
        }
×
759

760
        imageID := status.ImageID
1✔
761
        if imageID == "" {
1✔
762
                return "", errors.Errorf("Container has no imageID")
×
763
        }
×
764
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
765
        if idx < 0 {
1✔
766
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
767
        }
×
768

769
        return imageID[idx:], nil
1✔
770
}
771

772
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
773
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
774
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
775
        if err != nil {
1✔
776
                return err
×
777
        }
×
778
        ns := pvcSource.Namespace
1✔
779
        if ns == "" {
2✔
780
                ns = dataImportCron.Namespace
1✔
781
        }
1✔
782
        pvc := &corev1.PersistentVolumeClaim{}
1✔
783
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
784
                return err
1✔
785
        }
1✔
786
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
787
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
788
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
789
                log.Info("Updating DataImportCron", "digest", digest)
1✔
790
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
791
        }
1✔
792
        return nil
1✔
793
}
794

795
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
796
        log := r.log.WithName("updateDataSource")
1✔
797
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
798
        if err != nil {
2✔
799
                if k8serrors.IsNotFound(err) {
2✔
800
                        dataSource = r.newDataSource(dataImportCron)
1✔
801
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
802
                                return err
×
803
                        }
×
804
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
805
                } else if errors.Is(err, ErrNotManagedByCron) {
×
806
                        return nil
×
807
                } else {
×
808
                        return err
×
809
                }
×
810
        }
811
        dataSourceCopy := dataSource.DeepCopy()
1✔
812
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
813

1✔
814
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
815
        populateDataSource(format, dataSource, sourcePVC)
1✔
816

1✔
817
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
818
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
819
                        return err
×
820
                }
×
821
        }
822

823
        return nil
1✔
824
}
825

826
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
827
        if sourcePVC == nil {
2✔
828
                return
1✔
829
        }
1✔
830

831
        switch format {
1✔
832
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
833
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
834
                        PVC: sourcePVC,
1✔
835
                }
1✔
836
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
837
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
838
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
839
                                Namespace: sourcePVC.Namespace,
1✔
840
                                Name:      sourcePVC.Name,
1✔
841
                        },
1✔
842
                }
1✔
843
        }
844
}
845

846
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
847
        if dataImportCron.Status.CurrentImports == nil {
1✔
848
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
849
        }
×
850
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
851
                Namespace: dataImportCron.Namespace,
1✔
852
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
853
        }
1✔
854
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
855
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
856
                now := metav1.Now()
1✔
857
                dataImportCron.Status.LastImportTimestamp = &now
1✔
858
        }
1✔
859
        return nil
1✔
860
}
861

862
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
863
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
864
        if lastTimeStr == "" {
2✔
865
                return nil
1✔
866
        }
1✔
867
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
868
        if err != nil {
1✔
869
                return err
×
870
        }
×
871
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
872
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
873
        }
1✔
874
        return nil
1✔
875
}
876

877
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
1✔
878
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
879
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
880
        if digest == "" {
1✔
881
                return nil
×
882
        }
×
883
        dvName, err := createDvName(dataSourceName, digest)
1✔
884
        if err != nil {
2✔
885
                return err
1✔
886
        }
1✔
887

888
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
889
        for _, src := range sources {
2✔
890
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
891
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
892
                                return err
×
893
                        }
×
894
                } else {
1✔
895
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
896
                                return err
×
897
                        }
×
898
                        // If source exists don't create DV
899
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
900
                        return nil
1✔
901
                }
902
        }
903

904
        storageProfile := &cdiv1.StorageProfile{}
1✔
905
        if desiredStorageClass != nil {
2✔
906
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
NEW
907
                        return err
×
NEW
908
                }
×
909
        }
910
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
911
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
912
                return err
×
913
        } else if !allowed {
2✔
914
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
915
                        "Not authorized to create DataVolume", notAuthorized)
1✔
916
                return nil
1✔
917
        }
1✔
918
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
919
                return err
×
920
        }
×
921
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
922

1✔
923
        return nil
1✔
924
}
925

926
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
927
        if !isPvcSource(dataImportCron) {
2✔
928
                return true, nil
1✔
929
        }
1✔
930
        saName := "default"
1✔
931
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
932
                saName = *dataImportCron.Spec.ServiceAccountName
×
933
        }
×
934
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
935
                return false, err
×
936
        } else if !resp.Allowed {
2✔
937
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
938
                return false, nil
1✔
939
        }
1✔
940

941
        return true, nil
1✔
942
}
943

944
type authProxy struct {
945
        client client.Client
946
}
947

948
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
949
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
950
                return nil, err
×
951
        }
×
952
        return sar, nil
1✔
953
}
954

955
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
956
        ns := &corev1.Namespace{}
1✔
957
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
958
                return nil, err
×
959
        }
×
960
        return ns, nil
1✔
961
}
962

963
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
964
        das := &cdiv1.DataSource{}
×
965
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
966
                return nil, err
×
967
        }
×
968
        return das, nil
×
969
}
970

971
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
972
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
973
        if !ok {
1✔
974
                // nothing to delete
×
975
                return nil
×
976
        }
×
977
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
978
        if err != nil {
1✔
979
                return err
×
980
        }
×
981
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
982
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
983
        for _, src := range sources {
2✔
984
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
985
                        return err
×
986
                }
×
987
        }
988
        for _, src := range sources {
2✔
989
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
990
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
991
                }
×
992
        }
993
        // Only update desired storage class once garbage collection went through
994
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
995
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
996
        if err != nil {
1✔
997
                return err
×
998
        }
×
999

1000
        return nil
1✔
1001
}
1002

1003
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
1004
        switch format {
1✔
1005
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1006
                return nil
1✔
1007
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1008
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
1009
        default:
×
1010
                return fmt.Errorf("unknown source format for snapshot")
×
1011
        }
1012
}
1013

1014
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1015
        if pvc == nil {
1✔
1016
                return nil
×
1017
        }
×
1018
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1019
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1020
                return nil
1✔
1021
        }
1✔
1022
        storageProfile := &cdiv1.StorageProfile{}
1✔
1023
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1024
                return err
×
1025
        }
×
1026

1027
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1028
                ObjectMeta: metav1.ObjectMeta{
1✔
1029
                        Name:      pvc.Name,
1✔
1030
                        Namespace: dataImportCron.Namespace,
1✔
1031
                        Labels: map[string]string{
1✔
1032
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1033
                                common.CDIComponentLabel: "",
1✔
1034
                        },
1✔
1035
                },
1✔
1036
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1037
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1038
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1039
                        },
1✔
1040
                },
1✔
1041
        }
1✔
1042
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1043
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1044
        if err != nil {
1✔
NEW
1045
                return err
×
NEW
1046
        }
×
1047
        desiredSnapshot.Spec.VolumeSnapshotClassName = snapshotClassName
1✔
1048
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1049
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1050

1✔
1051
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1052
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1053
                if !k8serrors.IsNotFound(err) {
1✔
1054
                        return err
×
1055
                }
×
1056
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1057
                if pvc.Spec.VolumeMode != nil {
2✔
1058
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1059
                }
1✔
1060
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1061
                        return err
×
1062
                }
×
1063
        } else {
1✔
1064
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1065
                        // Clean up DV/PVC as they are not needed anymore
1✔
1066
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1067
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1068
                                return err
×
1069
                        }
×
1070
                }
1071
        }
1072

1073
        return nil
1✔
1074
}
1075

1076
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1077
        sp := &cdiv1.StorageProfile{}
1✔
1078
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
NEW
1079
                return false, client.IgnoreNotFound(err)
×
NEW
1080
        }
×
1081

1082
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1✔
1083
        if err != nil {
1✔
NEW
1084
                return false, err
×
NEW
1085
        }
×
1086
        actualVSC := snapshot.Spec.VolumeSnapshotClassName
1✔
1087
        if desiredVSC == nil || ptr.Equal(actualVSC, desiredVSC) {
2✔
1088
                return false, nil
1✔
1089
        }
1✔
1090

NEW
1091
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", *actualVSC, "to", *desiredVSC)
×
NEW
1092
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
NEW
1093
                return false, client.IgnoreNotFound(err)
×
NEW
1094
        }
×
NEW
1095
        return true, nil
×
1096
}
1097

1098
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
1099
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (*string, error) {
1✔
1100
        if storageProfile.Annotations != nil {
2✔
1101
                if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
2✔
1102
                        return &vscName, nil
1✔
1103
                }
1✔
1104
        }
1105
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1106
        if err != nil {
1✔
NEW
1107
                return nil, err
×
NEW
1108
        }
×
1109
        if className != "" {
2✔
1110
                return &className, nil
1✔
1111
        }
1✔
1112
        return nil, nil
1✔
1113
}
1114

1115
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1116
        dataImportCron.Status.SourceFormat = &format
1✔
1117

1✔
1118
        switch format {
1✔
1119
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1120
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1121
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1122
                if snapshot == nil {
2✔
1123
                        // Snapshot create/update will trigger reconcile
1✔
1124
                        return nil
1✔
1125
                }
1✔
1126
                if cc.IsSnapshotReady(snapshot) {
2✔
1127
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1128
                } else {
2✔
1129
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1130
                }
1✔
1131
        default:
×
1132
                return fmt.Errorf("unknown source format for snapshot")
×
1133
        }
1134

1135
        return nil
1✔
1136
}
1137

1138
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1139
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1140
        if desiredStorageClass == nil {
2✔
1141
                return format, nil
1✔
1142
        }
1✔
1143

1144
        storageProfile := &cdiv1.StorageProfile{}
1✔
1145
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1146
                return format, err
×
1147
        }
×
1148
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1149
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1150
        }
1✔
1151

1152
        return format, nil
1✔
1153
}
1154

1155
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1156
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1157
                return nil
×
1158
        }
×
1159
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1160
        if err != nil {
1✔
1161
                return err
×
1162
        }
×
1163

1164
        maxImports := defaultImportsToKeepPerCron
1✔
1165

1✔
1166
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1167
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1168
        }
1✔
1169

1170
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1171
                return err
×
1172
        }
×
1173
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1174
                return err
×
1175
        }
×
1176

1177
        return nil
1✔
1178
}
1179

1180
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1181
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1182

1✔
1183
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1184
                return err
×
1185
        }
×
1186
        if len(pvcList.Items) > maxImports {
2✔
1187
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1188
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1189
                })
1✔
1190
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1191
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1192
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1193
                                return err
×
1194
                        }
×
1195
                }
1196
        }
1197

1198
        dvList := &cdiv1.DataVolumeList{}
1✔
1199
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1200
                return err
×
1201
        }
×
1202

1203
        if len(dvList.Items) > maxImports {
2✔
1204
                for _, dv := range dvList.Items {
2✔
1205
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1206
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1207
                                return err
×
1208
                        }
×
1209

1210
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1211
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1212
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1213
                                        return err
×
1214
                                }
×
1215
                        }
1216
                }
1217
        }
1218

1219
        return nil
1✔
1220
}
1221

1222
// deleteDvPvc deletes DV or PVC if DV was GCed
1223
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1224
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1225
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1226
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1227
                return err
1✔
1228
        }
1✔
1229
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1230
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1231
                return err
×
1232
        }
×
1233
        return nil
1✔
1234
}
1235

1236
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1237
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1238

1✔
1239
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1240
                if meta.IsNoMatchError(err) {
×
1241
                        return nil
×
1242
                }
×
1243
                return err
×
1244
        }
1245
        if len(snapList.Items) > maxImports {
1✔
1246
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1247
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1248
                })
×
1249
                for _, snap := range snapList.Items[maxImports:] {
×
1250
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1251
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1252
                                return err
×
1253
                        }
×
1254
                }
1255
        }
1256

1257
        return nil
1✔
1258
}
1259

1260
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1261
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1262
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1263

1✔
1264
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1265
                return err
×
1266
        }
×
1267
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1268
        if err != nil {
1✔
1269
                return err
×
1270
        }
×
1271
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1272
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1273
                return err
×
1274
        }
×
1275
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1276
                return err
×
1277
        }
×
1278
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1279
                return err
×
1280
        }
×
1281
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1282
                return err
×
1283
        }
×
1284
        return nil
1✔
1285
}
1286

1287
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1288
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1289
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1290
        if err != nil {
1✔
1291
                return err
×
1292
        }
×
1293
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1294
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1295
                return err
×
1296
        }
×
1297
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1298
                return err
×
1299
        }
×
1300

1301
        return nil
1✔
1302
}
1303

1304
// NewDataImportCronController creates a new instance of the DataImportCron controller
1305
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1306
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1307
                Scheme: mgr.GetScheme(),
×
1308
                Mapper: mgr.GetRESTMapper(),
×
1309
        })
×
1310
        if err != nil {
×
1311
                return nil, err
×
1312
        }
×
1313
        reconciler := &DataImportCronReconciler{
×
1314
                client:          mgr.GetClient(),
×
1315
                uncachedClient:  uncachedClient,
×
1316
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1317
                scheme:          mgr.GetScheme(),
×
1318
                log:             log.WithName(dataImportControllerName),
×
1319
                image:           importerImage,
×
1320
                pullPolicy:      pullPolicy,
×
1321
                cdiNamespace:    util.GetNamespace(),
×
1322
                installerLabels: installerLabels,
×
1323
        }
×
1324
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1325
                MaxConcurrentReconciles: 3,
×
1326
                Reconciler:              reconciler,
×
1327
        })
×
1328
        if err != nil {
×
1329
                return nil, err
×
1330
        }
×
1331
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1332
                return nil, err
×
1333
        }
×
1334
        log.Info("Initialized DataImportCron controller")
×
1335
        return dataImportCronController, nil
×
1336
}
1337

1338
func getCronName(obj client.Object) string {
×
1339
        return obj.GetLabels()[common.DataImportCronLabel]
×
1340
}
×
1341

1342
func getCronNs(obj client.Object) string {
×
1343
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1344
}
×
1345

1346
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1347
        if cronName := getCronName(obj); cronName != "" {
×
1348
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1349
        }
×
1350
        return nil
×
1351
}
1352

1353
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1354
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1355
                return err
×
1356
        }
×
1357

1358
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1359
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1360
                // Otherwise we risk losing the storage profile event
×
1361
                var crons cdiv1.DataImportCronList
×
1362
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1363
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1364
                        return nil
×
1365
                }
×
1366
                // Storage profiles are 1:1 to storage classes
1367
                scName := obj.GetName()
×
1368
                var reqs []reconcile.Request
×
1369
                for _, cron := range crons.Items {
×
1370
                        dataVolume := cron.Spec.Template
×
1371
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1372
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1373
                        if err != nil || templateSc == nil {
×
1374
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1375
                                return reqs
×
1376
                        }
×
1377
                        if templateSc.Name == scName {
×
1378
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1379
                        }
×
1380
                }
1381
                return reqs
×
1382
        }
1383

1384
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1385
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1386
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1387
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1388
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1389
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1390
                },
1391
        )); err != nil {
×
1392
                return err
×
1393
        }
×
1394

1395
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1396
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1397
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1398
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1399
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1400
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1401
                },
1402
        )); err != nil {
×
1403
                return err
×
1404
        }
×
1405

1406
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1407
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1408
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1409
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1410
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1411
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1412
                },
1413
        )); err != nil {
×
1414
                return err
×
1415
        }
×
1416

1417
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1418
                return err
×
1419
        }
×
1420

1421
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1422
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1423
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1424
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1425
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1426
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
NEW
1427
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
×
1428
                        },
×
1429
                },
1430
        )); err != nil {
×
1431
                return err
×
1432
        }
×
1433

1434
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1435
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1436
        }
×
1437

1438
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1439
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1440
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1441
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1442
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1443
                        },
×
1444
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1445
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1446
                },
1447
        )); err != nil {
×
1448
                return err
×
1449
        }
×
1450

1451
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1452
                if meta.IsNoMatchError(err) {
×
1453
                        // Back out if there's no point to attempt watch
×
1454
                        return nil
×
1455
                }
×
1456
                if !cc.IsErrCacheNotStarted(err) {
×
1457
                        return err
×
1458
                }
×
1459
        }
1460
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1461
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1462
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1463
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1464
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1465
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1466
                },
1467
        )); err != nil {
×
1468
                return err
×
1469
        }
×
1470

1471
        return nil
×
1472
}
1473

NEW
1474
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
NEW
1475
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
NEW
1476
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
NEW
1477
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
×
NEW
1478
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
NEW
1479
}
×
1480

1481
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1482
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1483
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1484
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1485
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1486
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1487
                                log.Info("Update", "sc", obj.GetName(),
×
1488
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1489
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1490
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1491
                                if err != nil {
×
1492
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1493
                                }
×
1494
                                return reqs
×
1495
                        },
1496
                ),
1497
                predicate.TypedFuncs[*storagev1.StorageClass]{
1498
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1499
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1500
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1501
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1502
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1503
                        },
×
1504
                },
1505
        )); err != nil {
×
1506
                return err
×
1507
        }
×
1508

1509
        return nil
×
1510
}
1511

1512
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1513
        dicList := &cdiv1.DataImportCronList{}
×
1514
        if err := c.List(ctx, dicList); err != nil {
×
1515
                return nil, err
×
1516
        }
×
1517
        reqs := []reconcile.Request{}
×
1518
        for _, dic := range dicList.Items {
×
1519
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1520
                        continue
×
1521
                }
1522

1523
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1524
        }
1525

1526
        return reqs, nil
×
1527
}
1528

1529
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1530
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1531
                return false, nil
1✔
1532
        }
1✔
1533

1534
        sc := pvc.Spec.StorageClassName
1✔
1535
        if sc == nil || *sc == desiredStorageClass {
2✔
1536
                return false, nil
1✔
1537
        }
1✔
1538

1539
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1540
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1541
                return false, err
×
1542
        }
×
1543

1544
        return true, nil
1✔
1545
}
1546

1547
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1548
        cronJob := &batchv1.CronJob{}
1✔
1549
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1550
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1551
                return false, cc.IgnoreNotFound(err)
1✔
1552
        }
1✔
1553

1554
        cronJobCopy := cronJob.DeepCopy()
1✔
1555
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1556
                return false, err
×
1557
        }
×
1558

1559
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1560
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1561
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1562
                        return false, cc.IgnoreNotFound(err)
×
1563
                }
×
1564
        }
1565
        return true, nil
1✔
1566
}
1567

1568
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1569
        cronJob := &batchv1.CronJob{
1✔
1570
                ObjectMeta: metav1.ObjectMeta{
1✔
1571
                        Name:      GetCronJobName(cron),
1✔
1572
                        Namespace: r.cdiNamespace,
1✔
1573
                },
1✔
1574
        }
1✔
1575
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1576
                return nil, err
×
1577
        }
×
1578
        return cronJob, nil
1✔
1579
}
1580

1581
// InitPollerPod inits poller Pod
1582
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1583
        regSource, err := getCronRegistrySource(cron)
1✔
1584
        if err != nil {
1✔
1585
                return err
×
1586
        }
×
1587
        if regSource.URL == nil {
1✔
1588
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1589
        }
×
1590
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1591
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1592
                return err
×
1593
        }
×
1594
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1595
        if err != nil {
1✔
1596
                return err
×
1597
        }
×
1598
        container := corev1.Container{
1✔
1599
                Name:  "cdi-source-update-poller",
1✔
1600
                Image: image,
1✔
1601
                Command: []string{
1✔
1602
                        "/usr/bin/cdi-source-update-poller",
1✔
1603
                        "-ns", cron.Namespace,
1✔
1604
                        "-cron", cron.Name,
1✔
1605
                        "-url", *regSource.URL,
1✔
1606
                },
1✔
1607
                ImagePullPolicy:          pullPolicy,
1✔
1608
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1609
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1610
        }
1✔
1611

1✔
1612
        var volumes []corev1.Volume
1✔
1613
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1614
        if hasCertConfigMap {
1✔
1615
                vm := corev1.VolumeMount{
×
1616
                        Name:      CertVolName,
×
1617
                        MountPath: common.ImporterCertDir,
×
1618
                }
×
1619
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1620
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1621
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1622
        }
×
1623

1624
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1625
                vm := corev1.VolumeMount{
1✔
1626
                        Name:      ProxyCertVolName,
1✔
1627
                        MountPath: common.ImporterProxyCertDir,
1✔
1628
                }
1✔
1629
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1630
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1631
        }
1✔
1632

1633
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1634
                container.Env = append(container.Env,
×
1635
                        corev1.EnvVar{
×
1636
                                Name: common.ImporterAccessKeyID,
×
1637
                                ValueFrom: &corev1.EnvVarSource{
×
1638
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1639
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1640
                                                        Name: *regSource.SecretRef,
×
1641
                                                },
×
1642
                                                Key: common.KeyAccess,
×
1643
                                        },
×
1644
                                },
×
1645
                        },
×
1646
                        corev1.EnvVar{
×
1647
                                Name: common.ImporterSecretKey,
×
1648
                                ValueFrom: &corev1.EnvVarSource{
×
1649
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1650
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1651
                                                        Name: *regSource.SecretRef,
×
1652
                                                },
×
1653
                                                Key: common.KeySecret,
×
1654
                                        },
×
1655
                                },
×
1656
                        },
×
1657
                )
×
1658
        }
×
1659

1660
        addEnvVar := func(varName, value string) {
2✔
1661
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1662
        }
1✔
1663

1664
        if insecureTLS {
1✔
1665
                addEnvVar(common.InsecureTLSVar, "true")
×
1666
        }
×
1667

1668
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1669
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1670
                        addEnvVar(varName, value)
1✔
1671
                }
1✔
1672
        }
1673

1674
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1675
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1676
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1677

1✔
1678
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1679
        if err != nil {
1✔
1680
                return err
×
1681
        }
×
1682
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1683
        if err != nil {
1✔
1684
                return err
×
1685
        }
×
1686

1687
        podSpec := &pod.Spec
1✔
1688

1✔
1689
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1690
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1691
        podSpec.Containers = []corev1.Container{container}
1✔
1692
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1693
        podSpec.Volumes = volumes
1✔
1694
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1695
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1696
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1697
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1698

1✔
1699
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1700
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1701
        if podSpec.SecurityContext != nil {
2✔
1702
                podSpec.SecurityContext.FSGroup = nil
1✔
1703
        }
1✔
1704
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1705
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1706
        }
1✔
1707

1708
        if pod.Labels == nil {
2✔
1709
                pod.Labels = map[string]string{}
1✔
1710
        }
1✔
1711
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1712

1✔
1713
        return nil
1✔
1714
}
1715

1716
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1717
        cronJobSpec := &cronJob.Spec
1✔
1718
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1719
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1720
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1721
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1722

1✔
1723
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1724
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1725
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1726
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1727

1✔
1728
        pod := &jobSpec.Template
1✔
1729
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1730
                return err
×
1731
        }
×
1732
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1733
                return err
×
1734
        }
×
1735
        return nil
1✔
1736
}
1737

1738
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1739
        job := &batchv1.Job{
1✔
1740
                ObjectMeta: metav1.ObjectMeta{
1✔
1741
                        Name:      GetInitialJobName(cron),
1✔
1742
                        Namespace: cronJob.Namespace,
1✔
1743
                },
1✔
1744
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1745
        }
1✔
1746
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1747
                return nil, err
×
1748
        }
×
1749
        return job, nil
1✔
1750
}
1751

1752
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1753
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1754
                return err
×
1755
        }
×
1756
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1757
        labels := obj.GetLabels()
1✔
1758
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1759
        labels[common.DataImportCronLabel] = cron.Name
1✔
1760
        obj.SetLabels(labels)
1✔
1761
        return nil
1✔
1762
}
1763

1764
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1765
        dv := cron.Spec.Template.DeepCopy()
1✔
1766
        if isCronRegistrySource(cron) {
2✔
1767
                var digestedURL string
1✔
1768
                if isURLSource(cron) {
2✔
1769
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1770
                } else if isImageStreamSource(cron) {
3✔
1771
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1772
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1773
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1774
                }
1✔
1775
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1776
        }
1777
        dv.Name = dataVolumeName
1✔
1778
        dv.Namespace = cron.Namespace
1✔
1779
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1780
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1781
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1782
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1783

1✔
1784
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1785
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1786
        }
1✔
1787

1788
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1789

1✔
1790
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1791
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1792
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
2✔
1793
                if dv.Spec.PVC != nil && len(dv.Spec.PVC.AccessModes) == 0 {
2✔
1794
                        dv.Spec.PVC.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1795
                } else if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
3✔
1796
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1797
                }
1✔
1798
        }
1799

1800
        return dv
1✔
1801
}
1802

1803
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1804
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1805
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1806
        labels := obj.GetLabels()
1✔
1807
        labels[common.DataImportCronLabel] = cron.Name
1✔
1808
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1809
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1810
        }
1✔
1811
        obj.SetLabels(labels)
1✔
1812
}
1813

1814
func untagDigestedDockerURL(dockerURL string) string {
1✔
1815
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1816
                url := u.Host + u.Path
1✔
1817
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1818
                // Check for tag
1✔
1819
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1820
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1821
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1822
                        }
1✔
1823
                }
1824
        }
1825
        return dockerURL
1✔
1826
}
1827

1828
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1829
        if val := cron.Labels[ann]; val != "" {
2✔
1830
                cc.AddLabel(dv, ann, val)
1✔
1831
        }
1✔
1832
}
1833

1834
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1835
        if val := cron.Annotations[ann]; val != "" {
1✔
1836
                cc.AddAnnotation(dv, ann, val)
×
1837
        }
×
1838
}
1839

1840
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1841
        dataSource := &cdiv1.DataSource{
1✔
1842
                ObjectMeta: metav1.ObjectMeta{
1✔
1843
                        Name:      cron.Spec.ManagedDataSource,
1✔
1844
                        Namespace: cron.Namespace,
1✔
1845
                },
1✔
1846
        }
1✔
1847
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1848
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1849
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1850
        return dataSource
1✔
1851
}
1✔
1852

1853
// Create DataVolume name based on the DataSource name + prefix of the digest
1854
func createDvName(prefix, digest string) (string, error) {
1✔
1855
        digestPrefix := ""
1✔
1856
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1857
                digestPrefix = digestSha256Prefix
1✔
1858
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1859
                digestPrefix = digestUIDPrefix
1✔
1860
        } else {
2✔
1861
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1862
        }
1✔
1863
        fromIdx := len(digestPrefix)
1✔
1864
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1865
        if len(digest) < toIdx {
2✔
1866
                return "", errors.Errorf("Digest is too short")
1✔
1867
        }
1✔
1868
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1869
}
1870

1871
// GetCronJobName get CronJob name based on cron name and UID
1872
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1873
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1874
}
1✔
1875

1876
// GetInitialJobName get initial job name based on cron name and UID
1877
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1878
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1879
}
1✔
1880

1881
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1882
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1883
}
1✔
1884

1885
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1886
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1887
}
1✔
1888

1889
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1890
        dv := &cron.Spec.Template
1✔
1891

1✔
1892
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1893
                return explicitVolumeMode, nil
×
1894
        }
×
1895

1896
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1897
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1898
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1899
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1900
                        AccessModes:      accessModes,
1✔
1901
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1902
                        Resources: corev1.VolumeResourceRequirements{
1✔
1903
                                Requests: corev1.ResourceList{
1✔
1904
                                        // Doesn't matter
1✔
1905
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1906
                                },
1✔
1907
                        },
1✔
1908
                },
1✔
1909
        }
1✔
1910
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1911
                return nil, err
×
1912
        }
×
1913

1914
        return inferredPvc.Spec.VolumeMode, nil
1✔
1915
}
1916

1917
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1918
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1919
        if dv.Spec.PVC != nil {
1✔
1920
                return dv.Spec.PVC.VolumeMode
×
1921
        }
×
1922

1923
        if dv.Spec.Storage != nil {
2✔
1924
                return dv.Spec.Storage.VolumeMode
1✔
1925
        }
1✔
1926

1927
        return nil
×
1928
}
1929

1930
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1931
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1932
        if dv.Spec.PVC != nil {
1✔
1933
                return dv.Spec.PVC.AccessModes
×
1934
        }
×
1935

1936
        if dv.Spec.Storage != nil {
2✔
1937
                return dv.Spec.Storage.AccessModes
1✔
1938
        }
1✔
1939

1940
        return nil
×
1941
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc