• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5800

30 Jan 2026 06:07PM UTC coverage: 49.499% (+0.008%) from 49.491%
#5800

push

travis-ci

web-flow
Run go run ./robots/uploader -workspace /home/prow/go/src/github.com/kubevirt/project-infra/../containerized-data-importer/WORKSPACE -dry-run=false (#4020)

Signed-off-by: kubevirt-bot <kubevirtbot@redhat.com>

14712 of 29722 relevant lines covered (49.5%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.86
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Check if snapshot class changed
427
                if desiredStorageClass != nil {
2✔
428
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
429
                        if err != nil {
1✔
430
                                return res, err
×
431
                        }
×
432
                        if changed {
1✔
433
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
434
                        }
×
435
                }
436
                // Below k8s 1.29 there's no way to know the source volume mode
437
                // Let's at least expose this info on our own snapshots
438
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
439
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
440
                        if err != nil {
1✔
441
                                return res, err
×
442
                        }
×
443
                        if volMode != nil {
2✔
444
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
445
                        }
1✔
446
                }
447
                if _, ok := snapshot.Annotations[cc.AnnAdvisedRestoreSize]; !ok {
2✔
448
                        size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, snapshot, nil)
1✔
449
                        if size != nil {
2✔
450
                                cc.AddAnnotation(snapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
451
                        }
1✔
452
                }
453
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
454
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
455
                if err != nil {
2✔
456
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
457
                                return res, err
×
458
                        }
×
459
                } else {
1✔
460
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
461
                }
1✔
462
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
463
                        return res, err
×
464
                }
×
465
                importSucceeded = true
1✔
466
        default:
1✔
467
                if len(imports) > 0 {
2✔
468
                        imports = imports[1:]
1✔
469
                        dataImportCron.Status.CurrentImports = imports
1✔
470
                }
1✔
471
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
472
        }
473

474
        if importSucceeded {
2✔
475
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
476
                        return res, err
×
477
                }
×
478
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
479
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
480
                        return res, err
×
481
                }
×
482
        }
483

484
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
485
                return res, err
×
486
        }
×
487

488
        // Skip if schedule is disabled
489
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
490
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
491
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
492
                if err != nil {
2✔
493
                        return pollRes, err
1✔
494
                }
1✔
495
                res = pollRes
1✔
496
        }
497

498
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
499
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
500
        if digestUpdated {
2✔
501
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
502
                if dv != nil {
1✔
503
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
504
                                return res, err
×
505
                        }
×
506
                }
507
                if importSucceeded || len(imports) == 0 {
2✔
508
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
2✔
509
                                return res, err
1✔
510
                        }
1✔
511
                }
512
        } else if importSucceeded {
2✔
513
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
514
                        return res, err
×
515
                }
×
516
        } else if len(imports) > 0 {
2✔
517
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
518
        } else {
2✔
519
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
520
        }
1✔
521

522
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
523
                return res, err
×
524
        }
×
525

526
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
527
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
528
                        return res, err
×
529
                }
×
530
        }
531
        return res, nil
1✔
532
}
533

534
// Returns the current import DV if exists, and the last imported PVC
535
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
536
        imports := cron.Status.CurrentImports
1✔
537
        if len(imports) == 0 {
2✔
538
                return nil, nil, nil
1✔
539
        }
1✔
540

541
        dvName := imports[0].DataVolumeName
1✔
542
        dv := &cdiv1.DataVolume{}
1✔
543
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
544
                if !k8serrors.IsNotFound(err) {
1✔
545
                        return nil, nil, err
×
546
                }
×
547
                dv = nil
1✔
548
        }
549

550
        pvc := &corev1.PersistentVolumeClaim{}
1✔
551
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
552
                if !k8serrors.IsNotFound(err) {
1✔
553
                        return nil, nil, err
×
554
                }
×
555
                pvc = nil
1✔
556
        }
557
        return dv, pvc, nil
1✔
558
}
559

560
// Returns the current import DV if exists, and the last imported PVC
561
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
562
        imports := cron.Status.CurrentImports
1✔
563
        if len(imports) == 0 {
2✔
564
                return nil, nil
1✔
565
        }
1✔
566

567
        snapName := imports[0].DataVolumeName
1✔
568
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
569
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
570
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
571
                        return nil, err
×
572
                }
×
573
                return nil, nil
1✔
574
        }
575

576
        return snapshot, nil
1✔
577
}
578

579
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
580
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
581
        dataSource := &cdiv1.DataSource{}
1✔
582
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
583
                return nil, err
1✔
584
        }
1✔
585
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
586
                log := r.log.WithName("getCronManagedDataSource")
×
587
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
588
                return nil, ErrNotManagedByCron
×
589
        }
×
590
        return dataSource, nil
1✔
591
}
592

593
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
594
        objCopy := obj.DeepCopyObject()
1✔
595
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
596
        r.setDataImportCronResourceLabels(cron, obj)
1✔
597
        if !reflect.DeepEqual(obj, objCopy) {
2✔
598
                if err := r.client.Update(ctx, obj); err != nil {
1✔
599
                        return err
×
600
                }
×
601
        }
602
        return nil
1✔
603
}
604

605
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
606
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
607
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
608
                if cond.Status == corev1.ConditionFalse &&
×
609
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
610
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
611
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
612
                        dv.Labels[common.DataImportCronLabel] = ""
×
613
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
614
                                return err
×
615
                        }
×
616
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
617
                                return err
×
618
                        }
×
619
                        cron.Status.CurrentImports = nil
×
620
                }
621
        }
622
        return nil
×
623
}
624

625
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
626
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
627
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
628
        if err != nil {
1✔
629
                return err
×
630
        }
×
631
        if regSource.ImageStream == nil {
1✔
632
                return nil
×
633
        }
×
634
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
635
        if err != nil {
2✔
636
                return err
1✔
637
        }
1✔
638
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
639
        if err != nil {
2✔
640
                return err
1✔
641
        }
1✔
642
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
643
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
644
                log.Info("Updating DataImportCron", "digest", digest)
1✔
645
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
646
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
647
        }
1✔
648
        return nil
1✔
649
}
650

651
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
652
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
653
        podName := getPollerPodName(cron)
1✔
654
        ns := cron.Namespace
1✔
655
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
656
        pod := &corev1.Pod{}
1✔
657

1✔
658
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
659
                digest, err := fetchContainerImageDigest(pod)
1✔
660
                if err != nil || digest == "" {
1✔
661
                        return false, err
×
662
                }
×
663
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
664
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
665
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
666
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
667
                }
1✔
668
                return true, r.client.Delete(ctx, pod)
1✔
669
        } else if cc.IgnoreNotFound(err) != nil {
1✔
670
                return false, err
×
671
        }
×
672

673
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
674
        if err != nil {
1✔
675
                return false, err
×
676
        }
×
677
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
678
        if platform != nil && platform.Architecture != "" {
1✔
679
                if workloadNodePlacement.NodeSelector == nil {
×
680
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
681
                }
×
682
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
683
        }
684

685
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
686

1✔
687
        pod = &corev1.Pod{
1✔
688
                ObjectMeta: metav1.ObjectMeta{
1✔
689
                        Name:      podName,
1✔
690
                        Namespace: ns,
1✔
691
                        OwnerReferences: []metav1.OwnerReference{
1✔
692
                                {
1✔
693
                                        APIVersion:         cron.APIVersion,
1✔
694
                                        Kind:               cron.Kind,
1✔
695
                                        Name:               cron.Name,
1✔
696
                                        UID:                cron.UID,
1✔
697
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
698
                                        Controller:         ptr.To[bool](true),
1✔
699
                                },
1✔
700
                        },
1✔
701
                },
1✔
702
                Spec: corev1.PodSpec{
1✔
703
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
704
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
705
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
706
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
707
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
708
                        Volumes: []corev1.Volume{
1✔
709
                                {
1✔
710
                                        Name: "shared-volume",
1✔
711
                                        VolumeSource: corev1.VolumeSource{
1✔
712
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
713
                                        },
1✔
714
                                },
1✔
715
                        },
1✔
716
                        InitContainers: []corev1.Container{
1✔
717
                                {
1✔
718
                                        Name:                     "init",
1✔
719
                                        Image:                    r.image,
1✔
720
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
721
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
722
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
723
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
724
                                },
1✔
725
                        },
1✔
726
                        Containers: []corev1.Container{
1✔
727
                                {
1✔
728
                                        Name:                     "image-container",
1✔
729
                                        Image:                    containerImage,
1✔
730
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
731
                                        Command:                  []string{"/shared/server", "-h"},
1✔
732
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
733
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
734
                                },
1✔
735
                        },
1✔
736
                },
1✔
737
        }
1✔
738

1✔
739
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
740
        if pod.Spec.SecurityContext != nil {
2✔
741
                pod.Spec.SecurityContext.FSGroup = nil
1✔
742
        }
1✔
743

744
        return false, r.client.Create(ctx, pod)
1✔
745
}
746

747
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
748
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
749
                return "", nil
×
750
        }
×
751

752
        status := pod.Status.ContainerStatuses[0]
1✔
753
        if status.State.Waiting != nil {
1✔
754
                reason := status.State.Waiting.Reason
×
755
                switch reason {
×
756
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
757
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
758
                }
759
                return "", nil
×
760
        }
761

762
        if status.State.Terminated == nil {
1✔
763
                return "", nil
×
764
        }
×
765

766
        imageID := status.ImageID
1✔
767
        if imageID == "" {
1✔
768
                return "", errors.Errorf("Container has no imageID")
×
769
        }
×
770
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
771
        if idx < 0 {
1✔
772
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
773
        }
×
774

775
        return imageID[idx:], nil
1✔
776
}
777

778
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
779
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
780
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
781
        if err != nil {
1✔
782
                return err
×
783
        }
×
784
        ns := pvcSource.Namespace
1✔
785
        if ns == "" {
2✔
786
                ns = dataImportCron.Namespace
1✔
787
        }
1✔
788
        pvc := &corev1.PersistentVolumeClaim{}
1✔
789
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
790
                return err
1✔
791
        }
1✔
792
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
793
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
794
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
795
                log.Info("Updating DataImportCron", "digest", digest)
1✔
796
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
797
        }
1✔
798
        return nil
1✔
799
}
800

801
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
802
        log := r.log.WithName("updateDataSource")
1✔
803
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
804
        if err != nil {
2✔
805
                if k8serrors.IsNotFound(err) {
2✔
806
                        dataSource = r.newDataSource(dataImportCron)
1✔
807
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
808
                                return err
×
809
                        }
×
810
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
811
                } else if errors.Is(err, ErrNotManagedByCron) {
×
812
                        return nil
×
813
                } else {
×
814
                        return err
×
815
                }
×
816
        }
817
        dataSourceCopy := dataSource.DeepCopy()
1✔
818
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
819

1✔
820
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
821
        populateDataSource(format, dataSource, sourcePVC)
1✔
822

1✔
823
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
824
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
825
                        return err
×
826
                }
×
827
        }
828

829
        return nil
1✔
830
}
831

832
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
833
        if sourcePVC == nil {
2✔
834
                return
1✔
835
        }
1✔
836

837
        switch format {
1✔
838
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
839
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
840
                        PVC: sourcePVC,
1✔
841
                }
1✔
842
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
843
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
844
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
845
                                Namespace: sourcePVC.Namespace,
1✔
846
                                Name:      sourcePVC.Name,
1✔
847
                        },
1✔
848
                }
1✔
849
        }
850
}
851

852
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
853
        if dataImportCron.Status.CurrentImports == nil {
1✔
854
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
855
        }
×
856
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
857
                Namespace: dataImportCron.Namespace,
1✔
858
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
859
        }
1✔
860
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
861
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
862
                now := metav1.Now()
1✔
863
                dataImportCron.Status.LastImportTimestamp = &now
1✔
864
        }
1✔
865
        return nil
1✔
866
}
867

868
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
869
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
870
        if lastTimeStr == "" {
2✔
871
                return nil
1✔
872
        }
1✔
873
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
874
        if err != nil {
1✔
875
                return err
×
876
        }
×
877
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
878
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
879
        }
1✔
880
        return nil
1✔
881
}
882

883
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
1✔
884
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
885
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
886
        if digest == "" {
1✔
887
                return nil
×
888
        }
×
889
        dvName, err := createDvName(dataSourceName, digest)
1✔
890
        if err != nil {
2✔
891
                return err
1✔
892
        }
1✔
893

894
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
895
        for _, src := range sources {
2✔
896
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
897
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
898
                                return err
×
899
                        }
×
900
                } else {
1✔
901
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
902
                                return err
×
903
                        }
×
904
                        // If source exists don't create DV
905
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
906
                        return nil
1✔
907
                }
908
        }
909

910
        storageProfile := &cdiv1.StorageProfile{}
1✔
911
        if desiredStorageClass != nil {
2✔
912
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
913
                        return err
×
914
                }
×
915
        }
916
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
917
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
918
                return err
×
919
        } else if !allowed {
2✔
920
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
921
                        "Not authorized to create DataVolume", notAuthorized)
1✔
922
                return nil
1✔
923
        }
1✔
924
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
925
                return err
×
926
        }
×
927
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
928

1✔
929
        return nil
1✔
930
}
931

932
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
933
        if !isPvcSource(dataImportCron) {
2✔
934
                return true, nil
1✔
935
        }
1✔
936
        saName := "default"
1✔
937
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
938
                saName = *dataImportCron.Spec.ServiceAccountName
×
939
        }
×
940
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
941
                return false, err
×
942
        } else if !resp.Allowed {
2✔
943
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
944
                return false, nil
1✔
945
        }
1✔
946

947
        return true, nil
1✔
948
}
949

950
type authProxy struct {
951
        client client.Client
952
}
953

954
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
955
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
956
                return nil, err
×
957
        }
×
958
        return sar, nil
1✔
959
}
960

961
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
962
        ns := &corev1.Namespace{}
1✔
963
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
964
                return nil, err
×
965
        }
×
966
        return ns, nil
1✔
967
}
968

969
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
970
        das := &cdiv1.DataSource{}
×
971
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
972
                return nil, err
×
973
        }
×
974
        return das, nil
×
975
}
976

977
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
978
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
979
        if !ok {
1✔
980
                // nothing to delete
×
981
                return nil
×
982
        }
×
983
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
984
        if err != nil {
1✔
985
                return err
×
986
        }
×
987
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
988
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
989
        for _, src := range sources {
2✔
990
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
991
                        return err
×
992
                }
×
993
        }
994
        for _, src := range sources {
2✔
995
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
996
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
997
                }
×
998
        }
999
        // Only update desired storage class once garbage collection went through
1000
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
1001
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
1002
        if err != nil {
1✔
1003
                return err
×
1004
        }
×
1005

1006
        return nil
1✔
1007
}
1008

1009
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
1010
        switch format {
1✔
1011
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1012
                return nil
1✔
1013
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1014
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
1015
        default:
×
1016
                return fmt.Errorf("unknown source format for snapshot")
×
1017
        }
1018
}
1019

1020
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1021
        if pvc == nil {
1✔
1022
                return nil
×
1023
        }
×
1024
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1025
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1026
                return nil
1✔
1027
        }
1✔
1028
        storageProfile := &cdiv1.StorageProfile{}
1✔
1029
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1030
                return err
×
1031
        }
×
1032

1033
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1034
                ObjectMeta: metav1.ObjectMeta{
1✔
1035
                        Name:      pvc.Name,
1✔
1036
                        Namespace: dataImportCron.Namespace,
1✔
1037
                        Labels: map[string]string{
1✔
1038
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1039
                                common.CDIComponentLabel: "",
1✔
1040
                        },
1✔
1041
                },
1✔
1042
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1043
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1044
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1045
                        },
1✔
1046
                },
1✔
1047
        }
1✔
1048
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1049
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1050
        if err != nil {
1✔
1051
                return err
×
1052
        }
×
1053
        if snapshotClassName != "" {
2✔
1054
                desiredSnapshot.Spec.VolumeSnapshotClassName = &snapshotClassName
1✔
1055
        }
1✔
1056
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1057
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1058

1✔
1059
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1060
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1061
                if !k8serrors.IsNotFound(err) {
1✔
1062
                        return err
×
1063
                }
×
1064
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1065
                pvcSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
1066
                size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, desiredSnapshot, &pvcSize)
1✔
1067
                if size != nil {
2✔
1068
                        cc.AddAnnotation(desiredSnapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
1069
                }
1✔
1070
                if pvc.Spec.VolumeMode != nil {
2✔
1071
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1072
                }
1✔
1073
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1074
                        return err
×
1075
                }
×
1076
        } else {
1✔
1077
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1078
                        // Clean up DV/PVC as they are not needed anymore
1✔
1079
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1080
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1081
                                return err
×
1082
                        }
×
1083
                }
1084
        }
1085

1086
        return nil
1✔
1087
}
1088

1089
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1090
        sp := &cdiv1.StorageProfile{}
1✔
1091
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
1092
                return false, client.IgnoreNotFound(err)
×
1093
        }
×
1094

1095
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1✔
1096
        if err != nil {
1✔
1097
                return false, err
×
1098
        }
×
1099
        actualVSC := ""
1✔
1100
        if snapshot.Spec.VolumeSnapshotClassName != nil {
1✔
1101
                actualVSC = *snapshot.Spec.VolumeSnapshotClassName
×
1102
        }
×
1103
        if desiredVSC == "" || actualVSC == desiredVSC {
2✔
1104
                return false, nil
1✔
1105
        }
1✔
1106

1107
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", actualVSC, "to", desiredVSC)
×
1108
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
1109
                return false, client.IgnoreNotFound(err)
×
1110
        }
×
1111
        return true, nil
×
1112
}
1113

1114
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
1115
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (string, error) {
1✔
1116
        if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
2✔
1117
                return vscName, nil
1✔
1118
        }
1✔
1119
        return cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1120
}
1121

1122
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1123
        dataImportCron.Status.SourceFormat = &format
1✔
1124

1✔
1125
        switch format {
1✔
1126
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1127
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1128
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1129
                if snapshot == nil {
2✔
1130
                        // Snapshot create/update will trigger reconcile
1✔
1131
                        return nil
1✔
1132
                }
1✔
1133
                if cc.IsSnapshotReady(snapshot) {
2✔
1134
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1135
                } else {
2✔
1136
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1137
                }
1✔
1138
        default:
×
1139
                return fmt.Errorf("unknown source format for snapshot")
×
1140
        }
1141

1142
        return nil
1✔
1143
}
1144

1145
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1146
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1147
        if desiredStorageClass == nil {
2✔
1148
                return format, nil
1✔
1149
        }
1✔
1150

1151
        storageProfile := &cdiv1.StorageProfile{}
1✔
1152
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1153
                return format, err
×
1154
        }
×
1155
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1156
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1157
        }
1✔
1158

1159
        return format, nil
1✔
1160
}
1161

1162
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1163
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1164
                return nil
×
1165
        }
×
1166
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1167
        if err != nil {
1✔
1168
                return err
×
1169
        }
×
1170

1171
        maxImports := defaultImportsToKeepPerCron
1✔
1172

1✔
1173
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1174
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1175
        }
1✔
1176

1177
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1178
                return err
×
1179
        }
×
1180
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1181
                return err
×
1182
        }
×
1183

1184
        return nil
1✔
1185
}
1186

1187
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1188
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1189

1✔
1190
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1191
                return err
×
1192
        }
×
1193
        if len(pvcList.Items) > maxImports {
2✔
1194
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1195
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1196
                })
1✔
1197
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1198
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1199
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1200
                                return err
×
1201
                        }
×
1202
                }
1203
        }
1204

1205
        dvList := &cdiv1.DataVolumeList{}
1✔
1206
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1207
                return err
×
1208
        }
×
1209

1210
        if len(dvList.Items) > maxImports {
2✔
1211
                for _, dv := range dvList.Items {
2✔
1212
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1213
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1214
                                return err
×
1215
                        }
×
1216

1217
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1218
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1219
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1220
                                        return err
×
1221
                                }
×
1222
                        }
1223
                }
1224
        }
1225

1226
        return nil
1✔
1227
}
1228

1229
// deleteDvPvc deletes DV or PVC if DV was GCed
1230
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1231
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1232
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1233
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1234
                return err
1✔
1235
        }
1✔
1236
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1237
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1238
                return err
×
1239
        }
×
1240
        return nil
1✔
1241
}
1242

1243
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1244
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1245

1✔
1246
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1247
                if meta.IsNoMatchError(err) {
×
1248
                        return nil
×
1249
                }
×
1250
                return err
×
1251
        }
1252
        if len(snapList.Items) > maxImports {
1✔
1253
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1254
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1255
                })
×
1256
                for _, snap := range snapList.Items[maxImports:] {
×
1257
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1258
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1259
                                return err
×
1260
                        }
×
1261
                }
1262
        }
1263

1264
        return nil
1✔
1265
}
1266

1267
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1268
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1269
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1270

1✔
1271
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1272
                return err
×
1273
        }
×
1274
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1275
        if err != nil {
1✔
1276
                return err
×
1277
        }
×
1278
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1279
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1280
                return err
×
1281
        }
×
1282
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1283
                return err
×
1284
        }
×
1285
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1286
                return err
×
1287
        }
×
1288
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1289
                return err
×
1290
        }
×
1291
        return nil
1✔
1292
}
1293

1294
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1295
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1296
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1297
        if err != nil {
1✔
1298
                return err
×
1299
        }
×
1300
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1301
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1302
                return err
×
1303
        }
×
1304
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1305
                return err
×
1306
        }
×
1307

1308
        return nil
1✔
1309
}
1310

1311
// NewDataImportCronController creates a new instance of the DataImportCron controller
1312
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1313
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1314
                Scheme: mgr.GetScheme(),
×
1315
                Mapper: mgr.GetRESTMapper(),
×
1316
        })
×
1317
        if err != nil {
×
1318
                return nil, err
×
1319
        }
×
1320
        reconciler := &DataImportCronReconciler{
×
1321
                client:          mgr.GetClient(),
×
1322
                uncachedClient:  uncachedClient,
×
1323
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1324
                scheme:          mgr.GetScheme(),
×
1325
                log:             log.WithName(dataImportControllerName),
×
1326
                image:           importerImage,
×
1327
                pullPolicy:      pullPolicy,
×
1328
                cdiNamespace:    util.GetNamespace(),
×
1329
                installerLabels: installerLabels,
×
1330
        }
×
1331
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1332
                MaxConcurrentReconciles: 3,
×
1333
                Reconciler:              reconciler,
×
1334
        })
×
1335
        if err != nil {
×
1336
                return nil, err
×
1337
        }
×
1338
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1339
                return nil, err
×
1340
        }
×
1341
        log.Info("Initialized DataImportCron controller")
×
1342
        return dataImportCronController, nil
×
1343
}
1344

1345
func getCronName(obj client.Object) string {
×
1346
        return obj.GetLabels()[common.DataImportCronLabel]
×
1347
}
×
1348

1349
func getCronNs(obj client.Object) string {
×
1350
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1351
}
×
1352

1353
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1354
        if cronName := getCronName(obj); cronName != "" {
×
1355
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1356
        }
×
1357
        return nil
×
1358
}
1359

1360
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1361
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1362
                return err
×
1363
        }
×
1364

1365
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1366
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1367
                // Otherwise we risk losing the storage profile event
×
1368
                var crons cdiv1.DataImportCronList
×
1369
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1370
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1371
                        return nil
×
1372
                }
×
1373
                // Storage profiles are 1:1 to storage classes
1374
                scName := obj.GetName()
×
1375
                var reqs []reconcile.Request
×
1376
                for _, cron := range crons.Items {
×
1377
                        dataVolume := cron.Spec.Template
×
1378
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1379
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1380
                        if err != nil || templateSc == nil {
×
1381
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1382
                                return reqs
×
1383
                        }
×
1384
                        if templateSc.Name == scName {
×
1385
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1386
                        }
×
1387
                }
1388
                return reqs
×
1389
        }
1390

1391
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1392
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1393
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1394
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1395
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1396
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1397
                },
1398
        )); err != nil {
×
1399
                return err
×
1400
        }
×
1401

1402
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1403
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1404
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1405
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1406
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1407
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1408
                },
1409
        )); err != nil {
×
1410
                return err
×
1411
        }
×
1412

1413
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1414
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1415
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1416
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1417
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1418
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1419
                },
1420
        )); err != nil {
×
1421
                return err
×
1422
        }
×
1423

1424
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1425
                return err
×
1426
        }
×
1427

1428
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1429
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1430
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1431
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1432
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1433
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1434
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
×
1435
                        },
×
1436
                },
1437
        )); err != nil {
×
1438
                return err
×
1439
        }
×
1440

1441
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1442
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1443
        }
×
1444

1445
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1446
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1447
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1448
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1449
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1450
                        },
×
1451
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1452
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1453
                },
1454
        )); err != nil {
×
1455
                return err
×
1456
        }
×
1457

1458
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1459
                if meta.IsNoMatchError(err) {
×
1460
                        // Back out if there's no point to attempt watch
×
1461
                        return nil
×
1462
                }
×
1463
                if !cc.IsErrCacheNotStarted(err) {
×
1464
                        return err
×
1465
                }
×
1466
        }
1467
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1468
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1469
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1470
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1471
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1472
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1473
                },
1474
        )); err != nil {
×
1475
                return err
×
1476
        }
×
1477

1478
        return nil
×
1479
}
1480

1481
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
1482
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
1483
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
1484
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
×
1485
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
1486
}
×
1487

1488
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1489
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1490
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1491
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1492
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1493
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1494
                                log.Info("Update", "sc", obj.GetName(),
×
1495
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1496
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1497
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1498
                                if err != nil {
×
1499
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1500
                                }
×
1501
                                return reqs
×
1502
                        },
1503
                ),
1504
                predicate.TypedFuncs[*storagev1.StorageClass]{
1505
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1506
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1507
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1508
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1509
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1510
                        },
×
1511
                },
1512
        )); err != nil {
×
1513
                return err
×
1514
        }
×
1515

1516
        return nil
×
1517
}
1518

1519
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1520
        dicList := &cdiv1.DataImportCronList{}
×
1521
        if err := c.List(ctx, dicList); err != nil {
×
1522
                return nil, err
×
1523
        }
×
1524
        reqs := []reconcile.Request{}
×
1525
        for _, dic := range dicList.Items {
×
1526
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1527
                        continue
×
1528
                }
1529

1530
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1531
        }
1532

1533
        return reqs, nil
×
1534
}
1535

1536
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1537
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1538
                return false, nil
1✔
1539
        }
1✔
1540

1541
        sc := pvc.Spec.StorageClassName
1✔
1542
        if sc == nil || *sc == desiredStorageClass {
2✔
1543
                return false, nil
1✔
1544
        }
1✔
1545

1546
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1547
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1548
                return false, err
×
1549
        }
×
1550

1551
        return true, nil
1✔
1552
}
1553

1554
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1555
        cronJob := &batchv1.CronJob{}
1✔
1556
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1557
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1558
                return false, cc.IgnoreNotFound(err)
1✔
1559
        }
1✔
1560

1561
        cronJobCopy := cronJob.DeepCopy()
1✔
1562
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1563
                return false, err
×
1564
        }
×
1565

1566
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1567
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1568
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1569
                        return false, cc.IgnoreNotFound(err)
×
1570
                }
×
1571
        }
1572
        return true, nil
1✔
1573
}
1574

1575
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1576
        cronJob := &batchv1.CronJob{
1✔
1577
                ObjectMeta: metav1.ObjectMeta{
1✔
1578
                        Name:      GetCronJobName(cron),
1✔
1579
                        Namespace: r.cdiNamespace,
1✔
1580
                },
1✔
1581
        }
1✔
1582
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1583
                return nil, err
×
1584
        }
×
1585
        return cronJob, nil
1✔
1586
}
1587

1588
// InitPollerPod inits poller Pod
1589
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1590
        regSource, err := getCronRegistrySource(cron)
1✔
1591
        if err != nil {
1✔
1592
                return err
×
1593
        }
×
1594
        if regSource.URL == nil {
1✔
1595
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1596
        }
×
1597
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1598
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1599
                return err
×
1600
        }
×
1601
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1602
        if err != nil {
1✔
1603
                return err
×
1604
        }
×
1605
        container := corev1.Container{
1✔
1606
                Name:  "cdi-source-update-poller",
1✔
1607
                Image: image,
1✔
1608
                Command: []string{
1✔
1609
                        "/usr/bin/cdi-source-update-poller",
1✔
1610
                        "-ns", cron.Namespace,
1✔
1611
                        "-cron", cron.Name,
1✔
1612
                        "-url", *regSource.URL,
1✔
1613
                },
1✔
1614
                ImagePullPolicy:          pullPolicy,
1✔
1615
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1616
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1617
        }
1✔
1618

1✔
1619
        var volumes []corev1.Volume
1✔
1620
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1621
        if hasCertConfigMap {
1✔
1622
                vm := corev1.VolumeMount{
×
1623
                        Name:      CertVolName,
×
1624
                        MountPath: common.ImporterCertDir,
×
1625
                }
×
1626
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1627
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1628
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1629
        }
×
1630

1631
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1632
                vm := corev1.VolumeMount{
1✔
1633
                        Name:      ProxyCertVolName,
1✔
1634
                        MountPath: common.ImporterProxyCertDir,
1✔
1635
                }
1✔
1636
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1637
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1638
        }
1✔
1639

1640
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1641
                container.Env = append(container.Env,
×
1642
                        corev1.EnvVar{
×
1643
                                Name: common.ImporterAccessKeyID,
×
1644
                                ValueFrom: &corev1.EnvVarSource{
×
1645
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1646
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1647
                                                        Name: *regSource.SecretRef,
×
1648
                                                },
×
1649
                                                Key: common.KeyAccess,
×
1650
                                        },
×
1651
                                },
×
1652
                        },
×
1653
                        corev1.EnvVar{
×
1654
                                Name: common.ImporterSecretKey,
×
1655
                                ValueFrom: &corev1.EnvVarSource{
×
1656
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1657
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1658
                                                        Name: *regSource.SecretRef,
×
1659
                                                },
×
1660
                                                Key: common.KeySecret,
×
1661
                                        },
×
1662
                                },
×
1663
                        },
×
1664
                )
×
1665
        }
×
1666

1667
        addEnvVar := func(varName, value string) {
2✔
1668
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1669
        }
1✔
1670

1671
        if insecureTLS {
1✔
1672
                addEnvVar(common.InsecureTLSVar, "true")
×
1673
        }
×
1674

1675
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1676
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1677
                        addEnvVar(varName, value)
1✔
1678
                }
1✔
1679
        }
1680

1681
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1682
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1683
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1684

1✔
1685
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1686
        if err != nil {
1✔
1687
                return err
×
1688
        }
×
1689
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1690
        if err != nil {
1✔
1691
                return err
×
1692
        }
×
1693

1694
        podSpec := &pod.Spec
1✔
1695

1✔
1696
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1697
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1698
        podSpec.Containers = []corev1.Container{container}
1✔
1699
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1700
        podSpec.Volumes = volumes
1✔
1701
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1702
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1703
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1704
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1705

1✔
1706
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1707
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1708
        if podSpec.SecurityContext != nil {
2✔
1709
                podSpec.SecurityContext.FSGroup = nil
1✔
1710
        }
1✔
1711
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1712
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1713
        }
1✔
1714

1715
        if pod.Labels == nil {
2✔
1716
                pod.Labels = map[string]string{}
1✔
1717
        }
1✔
1718
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1719

1✔
1720
        return nil
1✔
1721
}
1722

1723
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1724
        cronJobSpec := &cronJob.Spec
1✔
1725
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1726
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1727
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1728
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1729

1✔
1730
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1731
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1732
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1733
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1734

1✔
1735
        pod := &jobSpec.Template
1✔
1736
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1737
                return err
×
1738
        }
×
1739
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1740
                return err
×
1741
        }
×
1742
        return nil
1✔
1743
}
1744

1745
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1746
        job := &batchv1.Job{
1✔
1747
                ObjectMeta: metav1.ObjectMeta{
1✔
1748
                        Name:      GetInitialJobName(cron),
1✔
1749
                        Namespace: cronJob.Namespace,
1✔
1750
                },
1✔
1751
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1752
        }
1✔
1753
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1754
                return nil, err
×
1755
        }
×
1756
        return job, nil
1✔
1757
}
1758

1759
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1760
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1761
                return err
×
1762
        }
×
1763
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1764
        labels := obj.GetLabels()
1✔
1765
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1766
        labels[common.DataImportCronLabel] = cron.Name
1✔
1767
        obj.SetLabels(labels)
1✔
1768
        return nil
1✔
1769
}
1770

1771
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1772
        dv := cron.Spec.Template.DeepCopy()
1✔
1773
        if isCronRegistrySource(cron) {
2✔
1774
                var digestedURL string
1✔
1775
                if isURLSource(cron) {
2✔
1776
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1777
                } else if isImageStreamSource(cron) {
3✔
1778
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1779
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1780
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1781
                }
1✔
1782
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1783
        }
1784
        dv.Name = dataVolumeName
1✔
1785
        dv.Namespace = cron.Namespace
1✔
1786
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1787
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1788
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1789
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1790

1✔
1791
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1792
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1793
        }
1✔
1794

1795
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1796

1✔
1797
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1798
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1799
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
2✔
1800
                if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
2✔
1801
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1802
                }
1✔
1803
        }
1804

1805
        return dv
1✔
1806
}
1807

1808
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1809
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1810
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1811
        labels := obj.GetLabels()
1✔
1812
        labels[common.DataImportCronLabel] = cron.Name
1✔
1813
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1814
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1815
        }
1✔
1816
        obj.SetLabels(labels)
1✔
1817
}
1818

1819
func untagDigestedDockerURL(dockerURL string) string {
1✔
1820
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1821
                url := u.Host + u.Path
1✔
1822
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1823
                // Check for tag
1✔
1824
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1825
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1826
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1827
                        }
1✔
1828
                }
1829
        }
1830
        return dockerURL
1✔
1831
}
1832

1833
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1834
        if val := cron.Labels[ann]; val != "" {
2✔
1835
                cc.AddLabel(dv, ann, val)
1✔
1836
        }
1✔
1837
}
1838

1839
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1840
        if val := cron.Annotations[ann]; val != "" {
1✔
1841
                cc.AddAnnotation(dv, ann, val)
×
1842
        }
×
1843
}
1844

1845
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1846
        dataSource := &cdiv1.DataSource{
1✔
1847
                ObjectMeta: metav1.ObjectMeta{
1✔
1848
                        Name:      cron.Spec.ManagedDataSource,
1✔
1849
                        Namespace: cron.Namespace,
1✔
1850
                },
1✔
1851
        }
1✔
1852
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1853
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1854
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1855
        return dataSource
1✔
1856
}
1✔
1857

1858
// Create DataVolume name based on the DataSource name + prefix of the digest
1859
func createDvName(prefix, digest string) (string, error) {
1✔
1860
        digestPrefix := ""
1✔
1861
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1862
                digestPrefix = digestSha256Prefix
1✔
1863
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1864
                digestPrefix = digestUIDPrefix
1✔
1865
        } else {
2✔
1866
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1867
        }
1✔
1868
        fromIdx := len(digestPrefix)
1✔
1869
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1870
        if len(digest) < toIdx {
2✔
1871
                return "", errors.Errorf("Digest is too short")
1✔
1872
        }
1✔
1873
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1874
}
1875

1876
// GetCronJobName get CronJob name based on cron name and UID
1877
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1878
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1879
}
1✔
1880

1881
// GetInitialJobName get initial job name based on cron name and UID
1882
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1883
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1884
}
1✔
1885

1886
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1887
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1888
}
1✔
1889

1890
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1891
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1892
}
1✔
1893

1894
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1895
        dv := &cron.Spec.Template
1✔
1896

1✔
1897
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1898
                return explicitVolumeMode, nil
×
1899
        }
×
1900

1901
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1902
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1903
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1904
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1905
                        AccessModes:      accessModes,
1✔
1906
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1907
                        Resources: corev1.VolumeResourceRequirements{
1✔
1908
                                Requests: corev1.ResourceList{
1✔
1909
                                        // Doesn't matter
1✔
1910
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1911
                                },
1✔
1912
                        },
1✔
1913
                },
1✔
1914
        }
1✔
1915
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1916
                return nil, err
×
1917
        }
×
1918

1919
        return inferredPvc.Spec.VolumeMode, nil
1✔
1920
}
1921

1922
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1923
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1924
        if dv.Spec.PVC != nil {
1✔
1925
                return dv.Spec.PVC.VolumeMode
×
1926
        }
×
1927

1928
        if dv.Spec.Storage != nil {
2✔
1929
                return dv.Spec.Storage.VolumeMode
1✔
1930
        }
1✔
1931

1932
        return nil
×
1933
}
1934

1935
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1936
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1937
        if dv.Spec.PVC != nil {
1✔
1938
                return dv.Spec.PVC.AccessModes
×
1939
        }
×
1940

1941
        if dv.Spec.Storage != nil {
2✔
1942
                return dv.Spec.Storage.AccessModes
1✔
1943
        }
1✔
1944

1945
        return nil
×
1946
}
1947

1948
func inferAdvisedRestoreSizeForSnapshot(dv *cdiv1.DataVolume, snapshot *snapshotv1.VolumeSnapshot, fallback *resource.Quantity) *resource.Quantity {
1✔
1949
        var dvSize resource.Quantity
1✔
1950

1✔
1951
        if dv.Spec.PVC != nil {
1✔
1952
                dvSize = dv.Spec.PVC.Resources.Requests[corev1.ResourceStorage]
×
1953
        }
×
1954

1955
        if dv.Spec.Storage != nil {
2✔
1956
                dvSize = dv.Spec.Storage.Resources.Requests[corev1.ResourceStorage]
1✔
1957
        }
1✔
1958

1959
        if dvSize.IsZero() && fallback != nil {
2✔
1960
                return fallback
1✔
1961
        }
1✔
1962

1963
        if snapshot.Status != nil {
2✔
1964
                if rs := snapshot.Status.RestoreSize; rs != nil && dvSize.Cmp(*rs) < 0 {
2✔
1965
                        return snapshot.Status.RestoreSize
1✔
1966
                }
1✔
1967
        }
1968

1969
        return &dvSize
1✔
1970
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc