• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #6092

01 Jul 2026 08:03AM UTC coverage: 49.73% (-0.05%) from 49.781%
#6092

Pull #4074

travis-ci

noamasu
Fix DataImportCron reconciliation when first default StorageClass is set

The default SC change condition only handled changes from one SC to another, but did not handle the first default SC being set.

To get the DIC-created DV with the DIC-special RWO preference, it must be deleted (as DV is immutable) and recreated with the right settings from the new default StorageClass.

Signed-off-by: Noam Assouline <nassouli@redhat.com>
Pull Request #4074: Recreate DIC import DV when first default SC is set

30 of 32 new or added lines in 1 file covered. (93.75%)

21 existing lines in 4 files now uncovered.

15125 of 30414 relevant lines covered (49.73%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.98
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                desiredSc := desiredStorageClass.Name
1✔
363
                outdated, err := r.importOutdated(ctx, dataImportCron, dv, pvc, desiredStorageClass)
1✔
364
                if err != nil {
1✔
UNCOV
365
                        return res, err
×
UNCOV
366
                }
×
367
                if outdated {
2✔
368
                        if err := r.recreateImport(ctx, dataImportCron, desiredSc); err != nil {
1✔
369
                                return res, err
×
370
                        }
×
371
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
372
                }
373
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredSc)
1✔
374
        }
375
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
376
        if err != nil {
1✔
377
                return res, err
×
378
        }
×
379
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
380
        if err != nil {
1✔
381
                return res, err
×
382
        }
×
383

384
        handlePopulatedPvc := func() error {
2✔
385
                if pvc != nil {
2✔
386
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
387
                                return err
×
388
                        }
×
389
                }
390
                importSucceeded = true
1✔
391
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
392
                        return err
×
393
                }
×
394

395
                return nil
1✔
396
        }
397

398
        switch {
1✔
399
        case dv != nil:
1✔
400
                switch dv.Status.Phase {
1✔
401
                case cdiv1.Succeeded:
1✔
402
                        if err := handlePopulatedPvc(); err != nil {
1✔
403
                                return res, err
×
404
                        }
×
405
                case cdiv1.ImportScheduled:
1✔
406
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
407
                case cdiv1.ImportInProgress:
1✔
408
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
409
                default:
1✔
410
                        dvPhase := string(dv.Status.Phase)
1✔
411
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
412
                }
413
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
414
                if err := handlePopulatedPvc(); err != nil {
1✔
415
                        return res, err
×
416
                }
×
417
        case snapshot != nil:
1✔
418
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
419
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
420
                                return res, err
×
421
                        }
×
422
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
423
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
424
                }
425
                // Check if snapshot class changed
426
                if desiredStorageClass != nil {
2✔
427
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
428
                        if err != nil {
1✔
429
                                return res, err
×
430
                        }
×
431
                        if changed {
1✔
432
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
433
                        }
×
434
                }
435
                // Below k8s 1.29 there's no way to know the source volume mode
436
                // Let's at least expose this info on our own snapshots
437
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
438
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
439
                        if err != nil {
1✔
440
                                return res, err
×
441
                        }
×
442
                        if volMode != nil {
2✔
443
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
444
                        }
1✔
445
                }
446
                if _, ok := snapshot.Annotations[cc.AnnAdvisedRestoreSize]; !ok {
2✔
447
                        size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, snapshot, nil)
1✔
448
                        if size != nil {
2✔
449
                                cc.AddAnnotation(snapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
450
                        }
1✔
451
                }
452
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
453
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
454
                if err != nil {
2✔
455
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
456
                                return res, err
×
457
                        }
×
458
                } else {
1✔
459
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
460
                }
1✔
461
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
462
                        return res, err
×
463
                }
×
464
                importSucceeded = true
1✔
465
        default:
1✔
466
                if len(imports) > 0 {
2✔
467
                        imports = imports[1:]
1✔
468
                        dataImportCron.Status.CurrentImports = imports
1✔
469
                }
1✔
470
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
471
        }
472

473
        if importSucceeded {
2✔
474
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
475
                        return res, err
×
476
                }
×
477
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
478
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
479
                        return res, err
×
480
                }
×
481
        }
482

483
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
484
                return res, err
×
485
        }
×
486

487
        // Skip if schedule is disabled
488
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
489
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
490
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
491
                if err != nil {
2✔
492
                        return pollRes, err
1✔
493
                }
1✔
494
                res = pollRes
1✔
495
        }
496

497
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
498
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
499
        if digestUpdated {
2✔
500
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
501
                if dv != nil {
1✔
502
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
503
                                return res, err
×
504
                        }
×
505
                }
506
                if importSucceeded || len(imports) == 0 {
2✔
507
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
2✔
508
                                return res, err
1✔
509
                        }
1✔
510
                }
511
        } else if importSucceeded {
2✔
512
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
513
                        return res, err
×
514
                }
×
515
        } else if len(imports) > 0 {
2✔
516
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
517
        } else {
2✔
518
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
519
        }
1✔
520

521
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
522
                return res, err
×
523
        }
×
524

525
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
526
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
527
                        return res, err
×
528
                }
×
529
        }
530
        return res, nil
1✔
531
}
532

533
// Returns the current import DV if exists, and the last imported PVC
534
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
535
        imports := cron.Status.CurrentImports
1✔
536
        if len(imports) == 0 {
2✔
537
                return nil, nil, nil
1✔
538
        }
1✔
539

540
        dvName := imports[0].DataVolumeName
1✔
541
        dv := &cdiv1.DataVolume{}
1✔
542
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
543
                if !k8serrors.IsNotFound(err) {
1✔
544
                        return nil, nil, err
×
545
                }
×
546
                dv = nil
1✔
547
        }
548

549
        pvc := &corev1.PersistentVolumeClaim{}
1✔
550
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
551
                if !k8serrors.IsNotFound(err) {
1✔
552
                        return nil, nil, err
×
553
                }
×
554
                pvc = nil
1✔
555
        }
556
        return dv, pvc, nil
1✔
557
}
558

559
// Returns the current import DV if exists, and the last imported PVC
560
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
561
        imports := cron.Status.CurrentImports
1✔
562
        if len(imports) == 0 {
2✔
563
                return nil, nil
1✔
564
        }
1✔
565

566
        snapName := imports[0].DataVolumeName
1✔
567
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
568
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
569
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
570
                        return nil, err
×
571
                }
×
572
                return nil, nil
1✔
573
        }
574

575
        return snapshot, nil
1✔
576
}
577

578
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
579
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
580
        dataSource := &cdiv1.DataSource{}
1✔
581
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
582
                return nil, err
1✔
583
        }
1✔
584
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
585
                log := r.log.WithName("getCronManagedDataSource")
×
586
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
587
                return nil, ErrNotManagedByCron
×
588
        }
×
589
        return dataSource, nil
1✔
590
}
591

592
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
593
        objCopy := obj.DeepCopyObject()
1✔
594
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
595
        r.setDataImportCronResourceLabels(cron, obj)
1✔
596
        if !reflect.DeepEqual(obj, objCopy) {
2✔
597
                if err := r.client.Update(ctx, obj); err != nil {
1✔
598
                        return err
×
599
                }
×
600
        }
601
        return nil
1✔
602
}
603

604
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
605
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
606
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
607
                if cond.Status == corev1.ConditionFalse &&
×
608
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
609
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
610
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
611
                        dv.Labels[common.DataImportCronLabel] = ""
×
612
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
613
                                return err
×
614
                        }
×
615
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
616
                                return err
×
617
                        }
×
618
                        cron.Status.CurrentImports = nil
×
619
                }
620
        }
621
        return nil
×
622
}
623

624
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
625
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
626
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
627
        if err != nil {
1✔
628
                return err
×
629
        }
×
630
        if regSource.ImageStream == nil {
1✔
631
                return nil
×
632
        }
×
633
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
634
        if err != nil {
2✔
635
                return err
1✔
636
        }
1✔
637
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
638
        if err != nil {
2✔
639
                return err
1✔
640
        }
1✔
641
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
642
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
643
                log.Info("Updating DataImportCron", "digest", digest)
1✔
644
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
645
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
646
        }
1✔
647
        return nil
1✔
648
}
649

650
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
651
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
652
        podName := getPollerPodName(cron)
1✔
653
        ns := cron.Namespace
1✔
654
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
655
        pod := &corev1.Pod{}
1✔
656

1✔
657
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
658
                digest, err := fetchContainerImageDigest(pod)
1✔
659
                if err != nil || digest == "" {
1✔
660
                        return false, err
×
661
                }
×
662
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
663
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
664
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
665
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
666
                }
1✔
667
                return true, r.client.Delete(ctx, pod)
1✔
668
        } else if cc.IgnoreNotFound(err) != nil {
1✔
669
                return false, err
×
670
        }
×
671

672
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
673
        if err != nil {
1✔
674
                return false, err
×
675
        }
×
676
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
677
        if platform != nil && platform.Architecture != "" {
1✔
678
                if workloadNodePlacement.NodeSelector == nil {
×
679
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
680
                }
×
681
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
682
        }
683

684
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
685

1✔
686
        pod = &corev1.Pod{
1✔
687
                ObjectMeta: metav1.ObjectMeta{
1✔
688
                        Name:      podName,
1✔
689
                        Namespace: ns,
1✔
690
                        OwnerReferences: []metav1.OwnerReference{
1✔
691
                                {
1✔
692
                                        APIVersion:         cron.APIVersion,
1✔
693
                                        Kind:               cron.Kind,
1✔
694
                                        Name:               cron.Name,
1✔
695
                                        UID:                cron.UID,
1✔
696
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
697
                                        Controller:         ptr.To[bool](true),
1✔
698
                                },
1✔
699
                        },
1✔
700
                },
1✔
701
                Spec: corev1.PodSpec{
1✔
702
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
703
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
704
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
705
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
706
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
707
                        Volumes: []corev1.Volume{
1✔
708
                                {
1✔
709
                                        Name: "shared-volume",
1✔
710
                                        VolumeSource: corev1.VolumeSource{
1✔
711
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
712
                                        },
1✔
713
                                },
1✔
714
                        },
1✔
715
                        InitContainers: []corev1.Container{
1✔
716
                                {
1✔
717
                                        Name:                     "init",
1✔
718
                                        Image:                    r.image,
1✔
719
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
720
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
721
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
722
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
723
                                },
1✔
724
                        },
1✔
725
                        Containers: []corev1.Container{
1✔
726
                                {
1✔
727
                                        Name:                     "image-container",
1✔
728
                                        Image:                    containerImage,
1✔
729
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
730
                                        Command:                  []string{"/shared/server", "-h"},
1✔
731
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
732
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
733
                                },
1✔
734
                        },
1✔
735
                },
1✔
736
        }
1✔
737

1✔
738
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
739
        if pod.Spec.SecurityContext != nil {
2✔
740
                pod.Spec.SecurityContext.FSGroup = nil
1✔
741
        }
1✔
742

743
        return false, r.client.Create(ctx, pod)
1✔
744
}
745

746
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
747
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
748
                return "", nil
×
749
        }
×
750

751
        status := pod.Status.ContainerStatuses[0]
1✔
752
        if status.State.Waiting != nil {
1✔
753
                reason := status.State.Waiting.Reason
×
754
                switch reason {
×
755
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
756
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
757
                }
758
                return "", nil
×
759
        }
760

761
        if status.State.Terminated == nil {
1✔
762
                return "", nil
×
763
        }
×
764

765
        imageID := status.ImageID
1✔
766
        if imageID == "" {
1✔
767
                return "", errors.Errorf("Container has no imageID")
×
768
        }
×
769
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
770
        if idx < 0 {
1✔
771
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
772
        }
×
773

774
        return imageID[idx:], nil
1✔
775
}
776

777
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
778
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
779
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
780
        if err != nil {
1✔
781
                return err
×
782
        }
×
783
        ns := pvcSource.Namespace
1✔
784
        if ns == "" {
2✔
785
                ns = dataImportCron.Namespace
1✔
786
        }
1✔
787
        pvc := &corev1.PersistentVolumeClaim{}
1✔
788
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
789
                return err
1✔
790
        }
1✔
791
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
792
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
793
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
794
                log.Info("Updating DataImportCron", "digest", digest)
1✔
795
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
796
        }
1✔
797
        return nil
1✔
798
}
799

800
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
801
        log := r.log.WithName("updateDataSource")
1✔
802
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
803
        if err != nil {
2✔
804
                if k8serrors.IsNotFound(err) {
2✔
805
                        dataSource = r.newDataSource(dataImportCron)
1✔
806
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
807
                                return err
×
808
                        }
×
809
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
810
                } else if errors.Is(err, ErrNotManagedByCron) {
×
811
                        return nil
×
812
                } else {
×
813
                        return err
×
814
                }
×
815
        }
816
        dataSourceCopy := dataSource.DeepCopy()
1✔
817
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
818

1✔
819
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
820
        populateDataSource(format, dataSource, sourcePVC)
1✔
821

1✔
822
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
823
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
824
                        return err
×
825
                }
×
826
        }
827

828
        return nil
1✔
829
}
830

831
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
832
        if sourcePVC == nil {
2✔
833
                return
1✔
834
        }
1✔
835

836
        switch format {
1✔
837
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
838
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
839
                        PVC: sourcePVC,
1✔
840
                }
1✔
841
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
842
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
843
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
844
                                Namespace: sourcePVC.Namespace,
1✔
845
                                Name:      sourcePVC.Name,
1✔
846
                        },
1✔
847
                }
1✔
848
        }
849
}
850

851
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
852
        if dataImportCron.Status.CurrentImports == nil {
1✔
853
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
854
        }
×
855
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
856
                Namespace: dataImportCron.Namespace,
1✔
857
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
858
        }
1✔
859
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
860
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
861
                now := metav1.Now()
1✔
862
                dataImportCron.Status.LastImportTimestamp = &now
1✔
863
        }
1✔
864
        return nil
1✔
865
}
866

867
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
868
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
869
        if lastTimeStr == "" {
2✔
870
                return nil
1✔
871
        }
1✔
872
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
873
        if err != nil {
1✔
874
                return err
×
875
        }
×
876
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
877
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
878
        }
1✔
879
        return nil
1✔
880
}
881

882
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
1✔
883
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
884
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
885
        if digest == "" {
1✔
886
                return nil
×
887
        }
×
888
        dvName, err := createDvName(dataSourceName, digest)
1✔
889
        if err != nil {
2✔
890
                return err
1✔
891
        }
1✔
892

893
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
894
        for _, src := range sources {
2✔
895
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
896
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
897
                                return err
×
898
                        }
×
899
                } else {
1✔
900
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
901
                                return err
×
902
                        }
×
903
                        // If source exists don't create DV
904
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
905
                        return nil
1✔
906
                }
907
        }
908

909
        storageProfile := &cdiv1.StorageProfile{}
1✔
910
        if desiredStorageClass != nil {
2✔
911
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
912
                        return err
×
913
                }
×
914
        }
915
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
916
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
917
                return err
×
918
        } else if !allowed {
2✔
919
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
920
                        "Not authorized to create DataVolume", notAuthorized)
1✔
921
                return nil
1✔
922
        }
1✔
923
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
924
                return err
×
925
        }
×
926
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
927

1✔
928
        return nil
1✔
929
}
930

931
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
932
        if !isPvcSource(dataImportCron) {
2✔
933
                return true, nil
1✔
934
        }
1✔
935
        saName := "default"
1✔
936
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
937
                saName = *dataImportCron.Spec.ServiceAccountName
×
938
        }
×
939
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
940
                return false, err
×
941
        } else if !resp.Allowed {
2✔
942
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
943
                return false, nil
1✔
944
        }
1✔
945

946
        return true, nil
1✔
947
}
948

949
type authProxy struct {
950
        client client.Client
951
}
952

953
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
954
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
955
                return nil, err
×
956
        }
×
957
        return sar, nil
1✔
958
}
959

960
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
961
        ns := &corev1.Namespace{}
1✔
962
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
963
                return nil, err
×
964
        }
×
965
        return ns, nil
1✔
966
}
967

968
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
969
        das := &cdiv1.DataSource{}
×
970
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
971
                return nil, err
×
972
        }
×
973
        return das, nil
×
974
}
975

976
func (r *DataImportCronReconciler) recreateImport(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
977
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
978
        if !ok {
1✔
979
                // nothing to delete
×
980
                return nil
×
981
        }
×
982
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
983
        if err != nil {
1✔
984
                return err
×
985
        }
×
986
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
987
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
988
        for _, src := range sources {
2✔
989
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
990
                        return err
×
991
                }
×
992
        }
993
        for _, src := range sources {
2✔
994
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
995
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
996
                }
×
997
        }
998
        // Only update desired storage class once garbage collection went through
999
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
1000
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
1001
        if err != nil {
1✔
1002
                return err
×
1003
        }
×
1004

1005
        return nil
1✔
1006
}
1007

1008
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
1009
        switch format {
1✔
1010
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1011
                return nil
1✔
1012
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1013
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
1014
        default:
×
1015
                return fmt.Errorf("unknown source format for snapshot")
×
1016
        }
1017
}
1018

1019
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1020
        if pvc == nil {
1✔
1021
                return nil
×
1022
        }
×
1023
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1024
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1025
                return nil
1✔
1026
        }
1✔
1027
        storageProfile := &cdiv1.StorageProfile{}
1✔
1028
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1029
                return err
×
1030
        }
×
1031

1032
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1033
                ObjectMeta: metav1.ObjectMeta{
1✔
1034
                        Name:      pvc.Name,
1✔
1035
                        Namespace: dataImportCron.Namespace,
1✔
1036
                        Labels: map[string]string{
1✔
1037
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1038
                                common.CDIComponentLabel: "",
1✔
1039
                        },
1✔
1040
                },
1✔
1041
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1042
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1043
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1044
                        },
1✔
1045
                },
1✔
1046
        }
1✔
1047
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1048
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1049
        if err != nil {
1✔
1050
                return err
×
1051
        }
×
1052
        if snapshotClassName != "" {
2✔
1053
                desiredSnapshot.Spec.VolumeSnapshotClassName = &snapshotClassName
1✔
1054
        }
1✔
1055
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1056
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1057

1✔
1058
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1059
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1060
                if !k8serrors.IsNotFound(err) {
1✔
1061
                        return err
×
1062
                }
×
1063
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1064
                pvcSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
1065
                size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, desiredSnapshot, &pvcSize)
1✔
1066
                if size != nil {
2✔
1067
                        cc.AddAnnotation(desiredSnapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
1068
                }
1✔
1069
                if pvc.Spec.VolumeMode != nil {
2✔
1070
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1071
                }
1✔
1072
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1073
                        return err
×
1074
                }
×
1075
        } else {
1✔
1076
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1077
                        // Clean up DV/PVC as they are not needed anymore
1✔
1078
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1079
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1080
                                return err
×
1081
                        }
×
1082
                }
1083
        }
1084

1085
        return nil
1✔
1086
}
1087

1088
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1089
        sp := &cdiv1.StorageProfile{}
1✔
1090
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
1091
                return false, client.IgnoreNotFound(err)
×
1092
        }
×
1093

1094
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1✔
1095
        if err != nil {
1✔
1096
                return false, err
×
1097
        }
×
1098
        actualVSC := ""
1✔
1099
        if snapshot.Spec.VolumeSnapshotClassName != nil {
1✔
1100
                actualVSC = *snapshot.Spec.VolumeSnapshotClassName
×
1101
        }
×
1102
        if desiredVSC == "" || actualVSC == desiredVSC {
2✔
1103
                return false, nil
1✔
1104
        }
1✔
1105

1106
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", actualVSC, "to", desiredVSC)
×
1107
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
1108
                return false, client.IgnoreNotFound(err)
×
1109
        }
×
1110
        return true, nil
×
1111
}
1112

1113
func (r *DataImportCronReconciler) importOutdated(ctx context.Context, cron *cdiv1.DataImportCron, currentDV *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, desiredSC *storagev1.StorageClass) (bool, error) {
1✔
1114
        currentSc, hasCurrent := cron.Annotations[AnnStorageClass]
1✔
1115
        scChanged := currentSc != desiredSC.Name
1✔
1116
        needsReset := hasCurrent || len(cron.Status.CurrentImports) > 0
1✔
1117
        if scChanged && needsReset {
2✔
1118
                r.log.Info("Storage class changed, resetting import", "currentSc", currentSc, "desiredSc", desiredSC.Name)
1✔
1119
                return true, nil
1✔
1120
        }
1✔
1121

1122
        if pvc != nil && pvc.Status.Phase == corev1.ClaimPending &&
1✔
1123
                pvc.Labels[common.DataImportCronLabel] == cron.Name &&
1✔
1124
                pvc.Spec.StorageClassName != nil && *pvc.Spec.StorageClassName != desiredSC.Name {
2✔
1125
                r.log.Info("Pending PVC has outdated storage class, resetting import", "pvc", pvc.Name, "pvcSc", *pvc.Spec.StorageClassName, "desiredSc", desiredSC.Name)
1✔
1126
                return true, nil
1✔
1127
        }
1✔
1128

1129
        if currentDV != nil {
2✔
1130
                sp := &cdiv1.StorageProfile{}
1✔
1131
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredSC.Name}, sp); err != nil {
1✔
NEW
1132
                        return false, err
×
NEW
1133
                }
×
1134
                desiredDV := r.newSourceDataVolume(cron, currentDV.Name, sp)
1✔
1135
                if !reflect.DeepEqual(currentDV.Spec, desiredDV.Spec) {
2✔
1136
                        r.log.Info("Import DV spec changed, resetting import", "dv", currentDV.Name)
1✔
1137
                        return true, nil
1✔
1138
                }
1✔
1139
        }
1140

1141
        return false, nil
1✔
1142
}
1143

1144
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
1145
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (string, error) {
1✔
1146
        if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
2✔
1147
                return vscName, nil
1✔
1148
        }
1✔
1149
        return cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1150
}
1151

1152
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1153
        dataImportCron.Status.SourceFormat = &format
1✔
1154

1✔
1155
        switch format {
1✔
1156
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1157
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1158
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1159
                if snapshot == nil {
2✔
1160
                        // Snapshot create/update will trigger reconcile
1✔
1161
                        return nil
1✔
1162
                }
1✔
1163
                if cc.IsSnapshotReady(snapshot) {
2✔
1164
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1165
                } else {
2✔
1166
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1167
                }
1✔
1168
        default:
×
1169
                return fmt.Errorf("unknown source format for snapshot")
×
1170
        }
1171

1172
        return nil
1✔
1173
}
1174

1175
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1176
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1177
        if desiredStorageClass == nil {
2✔
1178
                return format, nil
1✔
1179
        }
1✔
1180

1181
        storageProfile := &cdiv1.StorageProfile{}
1✔
1182
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1183
                return format, err
×
1184
        }
×
1185
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1186
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1187
        }
1✔
1188

1189
        return format, nil
1✔
1190
}
1191

1192
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1193
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1194
                return nil
×
1195
        }
×
1196
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1197
        if err != nil {
1✔
1198
                return err
×
1199
        }
×
1200

1201
        maxImports := defaultImportsToKeepPerCron
1✔
1202

1✔
1203
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1204
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1205
        }
1✔
1206

1207
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1208
                return err
×
1209
        }
×
1210
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1211
                return err
×
1212
        }
×
1213

1214
        return nil
1✔
1215
}
1216

1217
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1218
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1219

1✔
1220
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1221
                return err
×
1222
        }
×
1223
        if len(pvcList.Items) > maxImports {
2✔
1224
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1225
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1226
                })
1✔
1227
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1228
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1229
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1230
                                return err
×
1231
                        }
×
1232
                }
1233
        }
1234

1235
        dvList := &cdiv1.DataVolumeList{}
1✔
1236
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1237
                return err
×
1238
        }
×
1239

1240
        if len(dvList.Items) > maxImports {
2✔
1241
                for _, dv := range dvList.Items {
2✔
1242
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1243
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1244
                                return err
×
1245
                        }
×
1246

1247
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1248
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1249
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1250
                                        return err
×
1251
                                }
×
1252
                        }
1253
                }
1254
        }
1255

1256
        return nil
1✔
1257
}
1258

1259
// deleteDvPvc deletes DV or PVC if DV was GCed
1260
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1261
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1262
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1263
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1264
                return err
1✔
1265
        }
1✔
1266
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1267
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1268
                return err
×
1269
        }
×
1270
        return nil
1✔
1271
}
1272

1273
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1274
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1275

1✔
1276
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1277
                if meta.IsNoMatchError(err) {
×
1278
                        return nil
×
1279
                }
×
1280
                return err
×
1281
        }
1282
        if len(snapList.Items) > maxImports {
1✔
1283
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1284
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1285
                })
×
1286
                for _, snap := range snapList.Items[maxImports:] {
×
1287
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1288
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1289
                                return err
×
1290
                        }
×
1291
                }
1292
        }
1293

1294
        return nil
1✔
1295
}
1296

1297
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1298
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1299
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1300

1✔
1301
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1302
                return err
×
1303
        }
×
1304
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1305
        if err != nil {
1✔
1306
                return err
×
1307
        }
×
1308
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1309
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1310
                return err
×
1311
        }
×
1312
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1313
                return err
×
1314
        }
×
1315
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1316
                return err
×
1317
        }
×
1318
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1319
                return err
×
1320
        }
×
1321
        return nil
1✔
1322
}
1323

1324
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1325
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1326
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1327
        if err != nil {
1✔
1328
                return err
×
1329
        }
×
1330
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1331
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1332
                return err
×
1333
        }
×
1334
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1335
                return err
×
1336
        }
×
1337

1338
        return nil
1✔
1339
}
1340

1341
// NewDataImportCronController creates a new instance of the DataImportCron controller
1342
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1343
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1344
                Scheme: mgr.GetScheme(),
×
1345
                Mapper: mgr.GetRESTMapper(),
×
1346
        })
×
1347
        if err != nil {
×
1348
                return nil, err
×
1349
        }
×
1350
        reconciler := &DataImportCronReconciler{
×
1351
                client:          mgr.GetClient(),
×
1352
                uncachedClient:  uncachedClient,
×
1353
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1354
                scheme:          mgr.GetScheme(),
×
1355
                log:             log.WithName(dataImportControllerName),
×
1356
                image:           importerImage,
×
1357
                pullPolicy:      pullPolicy,
×
1358
                cdiNamespace:    util.GetNamespace(),
×
1359
                installerLabels: installerLabels,
×
1360
        }
×
1361
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1362
                MaxConcurrentReconciles: 3,
×
1363
                Reconciler:              reconciler,
×
1364
        })
×
1365
        if err != nil {
×
1366
                return nil, err
×
1367
        }
×
1368
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1369
                return nil, err
×
1370
        }
×
1371
        log.Info("Initialized DataImportCron controller")
×
1372
        return dataImportCronController, nil
×
1373
}
1374

1375
func getCronName(obj client.Object) string {
×
1376
        return obj.GetLabels()[common.DataImportCronLabel]
×
1377
}
×
1378

1379
func getCronNs(obj client.Object) string {
×
1380
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1381
}
×
1382

1383
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1384
        if cronName := getCronName(obj); cronName != "" {
×
1385
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1386
        }
×
1387
        return nil
×
1388
}
1389

1390
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1391
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1392
                return err
×
1393
        }
×
1394

1395
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1396
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1397
                // Otherwise we risk losing the storage profile event
×
1398
                var crons cdiv1.DataImportCronList
×
1399
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1400
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1401
                        return nil
×
1402
                }
×
1403
                // Storage profiles are 1:1 to storage classes
1404
                scName := obj.GetName()
×
1405
                var reqs []reconcile.Request
×
1406
                for _, cron := range crons.Items {
×
1407
                        dataVolume := cron.Spec.Template
×
1408
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1409
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1410
                        if err != nil || templateSc == nil {
×
1411
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1412
                                return reqs
×
1413
                        }
×
1414
                        if templateSc.Name == scName {
×
1415
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1416
                        }
×
1417
                }
1418
                return reqs
×
1419
        }
1420

1421
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1422
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1423
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1424
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1425
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1426
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1427
                },
1428
        )); err != nil {
×
1429
                return err
×
1430
        }
×
1431

1432
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1433
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1434
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1435
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1436
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1437
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1438
                },
1439
        )); err != nil {
×
1440
                return err
×
1441
        }
×
1442

1443
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1444
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1445
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1446
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1447
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1448
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1449
                },
1450
        )); err != nil {
×
1451
                return err
×
1452
        }
×
1453

1454
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1455
                return err
×
1456
        }
×
1457

1458
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1459
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1460
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1461
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1462
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1463
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1464
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
×
1465
                        },
×
1466
                },
1467
        )); err != nil {
×
1468
                return err
×
1469
        }
×
1470

1471
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1472
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1473
        }
×
1474

1475
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1476
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1477
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1478
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1479
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1480
                        },
×
1481
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1482
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1483
                },
1484
        )); err != nil {
×
1485
                return err
×
1486
        }
×
1487

1488
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1489
                if meta.IsNoMatchError(err) {
×
1490
                        // Back out if there's no point to attempt watch
×
1491
                        return nil
×
1492
                }
×
1493
                if !cc.IsErrCacheNotStarted(err) {
×
1494
                        return err
×
1495
                }
×
1496
        }
1497
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1498
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1499
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1500
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1501
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1502
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1503
                },
1504
        )); err != nil {
×
1505
                return err
×
1506
        }
×
1507

1508
        return nil
×
1509
}
1510

1511
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
1512
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
1513
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
1514
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
×
1515
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
1516
}
×
1517

1518
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1519
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1520
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1521
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1522
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1523
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1524
                                log.Info("Update", "sc", obj.GetName(),
×
1525
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1526
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1527
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1528
                                if err != nil {
×
1529
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1530
                                }
×
1531
                                return reqs
×
1532
                        },
1533
                ),
1534
                predicate.TypedFuncs[*storagev1.StorageClass]{
1535
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1536
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1537
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1538
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1539
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1540
                        },
×
1541
                },
1542
        )); err != nil {
×
1543
                return err
×
1544
        }
×
1545

1546
        return nil
×
1547
}
1548

1549
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1550
        dicList := &cdiv1.DataImportCronList{}
×
1551
        if err := c.List(ctx, dicList); err != nil {
×
1552
                return nil, err
×
1553
        }
×
1554
        reqs := []reconcile.Request{}
×
1555
        for _, dic := range dicList.Items {
×
1556
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1557
                        continue
×
1558
                }
1559

1560
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1561
        }
1562

1563
        return reqs, nil
×
1564
}
1565

1566
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1567
        cronJob := &batchv1.CronJob{}
1✔
1568
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1569
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1570
                return false, cc.IgnoreNotFound(err)
1✔
1571
        }
1✔
1572

1573
        cronJobCopy := cronJob.DeepCopy()
1✔
1574
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1575
                return false, err
×
1576
        }
×
1577

1578
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1579
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1580
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1581
                        return false, cc.IgnoreNotFound(err)
×
1582
                }
×
1583
        }
1584
        return true, nil
1✔
1585
}
1586

1587
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1588
        cronJob := &batchv1.CronJob{
1✔
1589
                ObjectMeta: metav1.ObjectMeta{
1✔
1590
                        Name:      GetCronJobName(cron),
1✔
1591
                        Namespace: r.cdiNamespace,
1✔
1592
                },
1✔
1593
        }
1✔
1594
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1595
                return nil, err
×
1596
        }
×
1597
        return cronJob, nil
1✔
1598
}
1599

1600
// InitPollerPod inits poller Pod
1601
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1602
        regSource, err := getCronRegistrySource(cron)
1✔
1603
        if err != nil {
1✔
1604
                return err
×
1605
        }
×
1606
        if regSource.URL == nil {
1✔
1607
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1608
        }
×
1609
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1610
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1611
                return err
×
1612
        }
×
1613
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1614
        if err != nil {
1✔
1615
                return err
×
1616
        }
×
1617
        container := corev1.Container{
1✔
1618
                Name:  "cdi-source-update-poller",
1✔
1619
                Image: image,
1✔
1620
                Command: []string{
1✔
1621
                        "/usr/bin/cdi-source-update-poller",
1✔
1622
                        "-ns", cron.Namespace,
1✔
1623
                        "-cron", cron.Name,
1✔
1624
                        "-url", *regSource.URL,
1✔
1625
                },
1✔
1626
                ImagePullPolicy:          pullPolicy,
1✔
1627
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1628
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1629
        }
1✔
1630

1✔
1631
        var volumes []corev1.Volume
1✔
1632
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1633
        if hasCertConfigMap {
1✔
1634
                vm := corev1.VolumeMount{
×
1635
                        Name:      CertVolName,
×
1636
                        MountPath: common.ImporterCertDir,
×
1637
                }
×
1638
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1639
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1640
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1641
        }
×
1642

1643
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1644
                vm := corev1.VolumeMount{
1✔
1645
                        Name:      ProxyCertVolName,
1✔
1646
                        MountPath: common.ImporterProxyCertDir,
1✔
1647
                }
1✔
1648
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1649
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1650
        }
1✔
1651

1652
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1653
                container.Env = append(container.Env,
×
1654
                        corev1.EnvVar{
×
1655
                                Name: common.ImporterAccessKeyID,
×
1656
                                ValueFrom: &corev1.EnvVarSource{
×
1657
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1658
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1659
                                                        Name: *regSource.SecretRef,
×
1660
                                                },
×
1661
                                                Key: common.KeyAccess,
×
1662
                                        },
×
1663
                                },
×
1664
                        },
×
1665
                        corev1.EnvVar{
×
1666
                                Name: common.ImporterSecretKey,
×
1667
                                ValueFrom: &corev1.EnvVarSource{
×
1668
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1669
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1670
                                                        Name: *regSource.SecretRef,
×
1671
                                                },
×
1672
                                                Key: common.KeySecret,
×
1673
                                        },
×
1674
                                },
×
1675
                        },
×
1676
                )
×
1677
        }
×
1678

1679
        addEnvVar := func(varName, value string) {
2✔
1680
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1681
        }
1✔
1682

1683
        if insecureTLS {
1✔
1684
                addEnvVar(common.InsecureTLSVar, "true")
×
1685
        }
×
1686

1687
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1688
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1689
                        addEnvVar(varName, value)
1✔
1690
                }
1✔
1691
        }
1692

1693
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1694
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1695
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1696

1✔
1697
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1698
        if err != nil {
1✔
1699
                return err
×
1700
        }
×
1701
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1702
        if err != nil {
1✔
1703
                return err
×
1704
        }
×
1705

1706
        podSpec := &pod.Spec
1✔
1707

1✔
1708
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1709
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1710
        podSpec.Containers = []corev1.Container{container}
1✔
1711
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1712
        podSpec.Volumes = volumes
1✔
1713
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1714
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1715
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1716
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1717

1✔
1718
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1719
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1720
        if podSpec.SecurityContext != nil {
2✔
1721
                podSpec.SecurityContext.FSGroup = nil
1✔
1722
        }
1✔
1723
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1724
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1725
        }
1✔
1726

1727
        if pod.Labels == nil {
2✔
1728
                pod.Labels = map[string]string{}
1✔
1729
        }
1✔
1730
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1731

1✔
1732
        return nil
1✔
1733
}
1734

1735
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1736
        cronJobSpec := &cronJob.Spec
1✔
1737
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1738
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1739
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1740
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1741

1✔
1742
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1743
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1744
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1745
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1746

1✔
1747
        pod := &jobSpec.Template
1✔
1748
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1749
                return err
×
1750
        }
×
1751
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1752
                return err
×
1753
        }
×
1754
        return nil
1✔
1755
}
1756

1757
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1758
        job := &batchv1.Job{
1✔
1759
                ObjectMeta: metav1.ObjectMeta{
1✔
1760
                        Name:      GetInitialJobName(cron),
1✔
1761
                        Namespace: cronJob.Namespace,
1✔
1762
                },
1✔
1763
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1764
        }
1✔
1765
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1766
                return nil, err
×
1767
        }
×
1768
        return job, nil
1✔
1769
}
1770

1771
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1772
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1773
                return err
×
1774
        }
×
1775
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1776
        labels := obj.GetLabels()
1✔
1777
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1778
        labels[common.DataImportCronLabel] = cron.Name
1✔
1779
        obj.SetLabels(labels)
1✔
1780
        return nil
1✔
1781
}
1782

1783
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1784
        dv := cron.Spec.Template.DeepCopy()
1✔
1785
        if isCronRegistrySource(cron) {
2✔
1786
                var digestedURL string
1✔
1787
                if isURLSource(cron) {
2✔
1788
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1789
                } else if isImageStreamSource(cron) {
3✔
1790
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1791
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1792
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1793
                }
1✔
1794
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1795
        }
1796
        dv.Name = dataVolumeName
1✔
1797
        dv.Namespace = cron.Namespace
1✔
1798
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1799
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1800
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1801
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1802

1✔
1803
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1804
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1805
        }
1✔
1806

1807
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1808

1✔
1809
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1810
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1811
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
2✔
1812
                if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
2✔
1813
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1814
                }
1✔
1815
        }
1816

1817
        return dv
1✔
1818
}
1819

1820
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1821
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1822
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1823
        labels := obj.GetLabels()
1✔
1824
        labels[common.DataImportCronLabel] = cron.Name
1✔
1825
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1826
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1827
        }
1✔
1828
        obj.SetLabels(labels)
1✔
1829
}
1830

1831
func untagDigestedDockerURL(dockerURL string) string {
1✔
1832
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1833
                url := u.Host + u.Path
1✔
1834
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1835
                // Check for tag
1✔
1836
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1837
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1838
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1839
                        }
1✔
1840
                }
1841
        }
1842
        return dockerURL
1✔
1843
}
1844

1845
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1846
        if val := cron.Labels[ann]; val != "" {
2✔
1847
                cc.AddLabel(dv, ann, val)
1✔
1848
        }
1✔
1849
}
1850

1851
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1852
        if val := cron.Annotations[ann]; val != "" {
1✔
1853
                cc.AddAnnotation(dv, ann, val)
×
1854
        }
×
1855
}
1856

1857
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1858
        dataSource := &cdiv1.DataSource{
1✔
1859
                ObjectMeta: metav1.ObjectMeta{
1✔
1860
                        Name:      cron.Spec.ManagedDataSource,
1✔
1861
                        Namespace: cron.Namespace,
1✔
1862
                },
1✔
1863
        }
1✔
1864
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1865
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1866
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1867
        return dataSource
1✔
1868
}
1✔
1869

1870
// Create DataVolume name based on the DataSource name + prefix of the digest
1871
func createDvName(prefix, digest string) (string, error) {
1✔
1872
        digestPrefix := ""
1✔
1873
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1874
                digestPrefix = digestSha256Prefix
1✔
1875
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1876
                digestPrefix = digestUIDPrefix
1✔
1877
        } else {
2✔
1878
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1879
        }
1✔
1880
        fromIdx := len(digestPrefix)
1✔
1881
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1882
        if len(digest) < toIdx {
2✔
1883
                return "", errors.Errorf("Digest is too short")
1✔
1884
        }
1✔
1885
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1886
}
1887

1888
// GetCronJobName get CronJob name based on cron name and UID
1889
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1890
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1891
}
1✔
1892

1893
// GetInitialJobName get initial job name based on cron name and UID
1894
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1895
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1896
}
1✔
1897

1898
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1899
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1900
}
1✔
1901

1902
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1903
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1904
}
1✔
1905

1906
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1907
        dv := &cron.Spec.Template
1✔
1908

1✔
1909
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1910
                return explicitVolumeMode, nil
×
1911
        }
×
1912

1913
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1914
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1915
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1916
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1917
                        AccessModes:      accessModes,
1✔
1918
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1919
                        Resources: corev1.VolumeResourceRequirements{
1✔
1920
                                Requests: corev1.ResourceList{
1✔
1921
                                        // Doesn't matter
1✔
1922
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1923
                                },
1✔
1924
                        },
1✔
1925
                },
1✔
1926
        }
1✔
1927
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1928
                return nil, err
×
1929
        }
×
1930

1931
        return inferredPvc.Spec.VolumeMode, nil
1✔
1932
}
1933

1934
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1935
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1936
        if dv.Spec.PVC != nil {
1✔
1937
                return dv.Spec.PVC.VolumeMode
×
1938
        }
×
1939

1940
        if dv.Spec.Storage != nil {
2✔
1941
                return dv.Spec.Storage.VolumeMode
1✔
1942
        }
1✔
1943

1944
        return nil
×
1945
}
1946

1947
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1948
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1949
        if dv.Spec.PVC != nil {
1✔
1950
                return dv.Spec.PVC.AccessModes
×
1951
        }
×
1952

1953
        if dv.Spec.Storage != nil {
2✔
1954
                return dv.Spec.Storage.AccessModes
1✔
1955
        }
1✔
1956

1957
        return nil
×
1958
}
1959

1960
func inferAdvisedRestoreSizeForSnapshot(dv *cdiv1.DataVolume, snapshot *snapshotv1.VolumeSnapshot, fallback *resource.Quantity) *resource.Quantity {
1✔
1961
        var dvSize resource.Quantity
1✔
1962

1✔
1963
        if dv.Spec.PVC != nil {
1✔
1964
                dvSize = dv.Spec.PVC.Resources.Requests[corev1.ResourceStorage]
×
1965
        }
×
1966

1967
        if dv.Spec.Storage != nil {
2✔
1968
                dvSize = dv.Spec.Storage.Resources.Requests[corev1.ResourceStorage]
1✔
1969
        }
1✔
1970

1971
        if dvSize.IsZero() && fallback != nil {
2✔
1972
                return fallback
1✔
1973
        }
1✔
1974

1975
        if snapshot.Status != nil {
2✔
1976
                if rs := snapshot.Status.RestoreSize; rs != nil && dvSize.Cmp(*rs) < 0 {
2✔
1977
                        return snapshot.Status.RestoreSize
1✔
1978
                }
1✔
1979
        }
1980

1981
        return &dvSize
1✔
1982
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc