• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5699

04 Dec 2025 09:03AM UTC coverage: 58.625%. First build
#5699

Pull #3970

travis-ci

arnongilboa
Simplify DataImportCron ServiceAccount authorization

Add ServiceAccountName to DataImportCron spec, replacing CreatedBy
which was added in #3946.

In case of DataImportCron with PVC source, the controller checks the
ServiceAccount is authorized to clone the source PVC.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
Pull Request #3970: Simplify DataImportCron ServiceAccount authorization

4 of 6 new or added lines in 1 file covered. (66.67%)

17298 of 29506 relevant lines covered (58.63%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.1
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Below k8s 1.29 there's no way to know the source volume mode
427
                // Let's at least expose this info on our own snapshots
428
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
429
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
430
                        if err != nil {
1✔
431
                                return res, err
×
432
                        }
×
433
                        if volMode != nil {
2✔
434
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
435
                        }
1✔
436
                }
437
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
438
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
439
                if err != nil {
2✔
440
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
441
                                return res, err
×
442
                        }
×
443
                } else {
1✔
444
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
445
                }
1✔
446
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
447
                        return res, err
×
448
                }
×
449
                importSucceeded = true
1✔
450
        default:
1✔
451
                if len(imports) > 0 {
2✔
452
                        imports = imports[1:]
1✔
453
                        dataImportCron.Status.CurrentImports = imports
1✔
454
                }
1✔
455
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
456
        }
457

458
        if importSucceeded {
2✔
459
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
460
                        return res, err
×
461
                }
×
462
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
463
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
464
                        return res, err
×
465
                }
×
466
        }
467

468
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
469
                return res, err
×
470
        }
×
471

472
        // Skip if schedule is disabled
473
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
474
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
475
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
476
                if err != nil {
2✔
477
                        return pollRes, err
1✔
478
                }
1✔
479
                res = pollRes
1✔
480
        }
481

482
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
483
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
484
        if digestUpdated {
2✔
485
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
486
                if dv != nil {
1✔
487
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
488
                                return res, err
×
489
                        }
×
490
                }
491
                if importSucceeded || len(imports) == 0 {
2✔
492
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
493
                                return res, err
1✔
494
                        }
1✔
495
                }
496
        } else if importSucceeded {
2✔
497
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
498
                        return res, err
×
499
                }
×
500
        } else if len(imports) > 0 {
2✔
501
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
502
        } else {
2✔
503
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
504
        }
1✔
505

506
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
507
                return res, err
×
508
        }
×
509

510
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
511
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
512
                        return res, err
×
513
                }
×
514
        }
515
        return res, nil
1✔
516
}
517

518
// Returns the current import DV if exists, and the last imported PVC
519
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
520
        imports := cron.Status.CurrentImports
1✔
521
        if len(imports) == 0 {
2✔
522
                return nil, nil, nil
1✔
523
        }
1✔
524

525
        dvName := imports[0].DataVolumeName
1✔
526
        dv := &cdiv1.DataVolume{}
1✔
527
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
528
                if !k8serrors.IsNotFound(err) {
1✔
529
                        return nil, nil, err
×
530
                }
×
531
                dv = nil
1✔
532
        }
533

534
        pvc := &corev1.PersistentVolumeClaim{}
1✔
535
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
536
                if !k8serrors.IsNotFound(err) {
1✔
537
                        return nil, nil, err
×
538
                }
×
539
                pvc = nil
1✔
540
        }
541
        return dv, pvc, nil
1✔
542
}
543

544
// Returns the current import DV if exists, and the last imported PVC
545
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
546
        imports := cron.Status.CurrentImports
1✔
547
        if len(imports) == 0 {
2✔
548
                return nil, nil
1✔
549
        }
1✔
550

551
        snapName := imports[0].DataVolumeName
1✔
552
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
553
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
554
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
555
                        return nil, err
×
556
                }
×
557
                return nil, nil
1✔
558
        }
559

560
        return snapshot, nil
1✔
561
}
562

563
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
564
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
565
        dataSource := &cdiv1.DataSource{}
1✔
566
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
567
                return nil, err
1✔
568
        }
1✔
569
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
570
                log := r.log.WithName("getCronManagedDataSource")
×
571
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
572
                return nil, ErrNotManagedByCron
×
573
        }
×
574
        return dataSource, nil
1✔
575
}
576

577
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
578
        objCopy := obj.DeepCopyObject()
1✔
579
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
580
        r.setDataImportCronResourceLabels(cron, obj)
1✔
581
        if !reflect.DeepEqual(obj, objCopy) {
2✔
582
                if err := r.client.Update(ctx, obj); err != nil {
1✔
583
                        return err
×
584
                }
×
585
        }
586
        return nil
1✔
587
}
588

589
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
590
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
591
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
592
                if cond.Status == corev1.ConditionFalse &&
×
593
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
594
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
595
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
596
                        dv.Labels[common.DataImportCronLabel] = ""
×
597
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
598
                                return err
×
599
                        }
×
600
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
601
                                return err
×
602
                        }
×
603
                        cron.Status.CurrentImports = nil
×
604
                }
605
        }
606
        return nil
×
607
}
608

609
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
610
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
611
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
612
        if err != nil {
1✔
613
                return err
×
614
        }
×
615
        if regSource.ImageStream == nil {
1✔
616
                return nil
×
617
        }
×
618
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
619
        if err != nil {
2✔
620
                return err
1✔
621
        }
1✔
622
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
623
        if err != nil {
2✔
624
                return err
1✔
625
        }
1✔
626
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
627
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
628
                log.Info("Updating DataImportCron", "digest", digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
630
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
631
        }
1✔
632
        return nil
1✔
633
}
634

635
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
636
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
637
        podName := getPollerPodName(cron)
1✔
638
        ns := cron.Namespace
1✔
639
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
640
        pod := &corev1.Pod{}
1✔
641

1✔
642
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
643
                digest, err := fetchContainerImageDigest(pod)
1✔
644
                if err != nil || digest == "" {
1✔
645
                        return false, err
×
646
                }
×
647
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
648
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
649
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
650
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
651
                }
1✔
652
                return true, r.client.Delete(ctx, pod)
1✔
653
        } else if cc.IgnoreNotFound(err) != nil {
1✔
654
                return false, err
×
655
        }
×
656

657
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
658
        if err != nil {
1✔
659
                return false, err
×
660
        }
×
661
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
662
        if platform != nil && platform.Architecture != "" {
1✔
663
                if workloadNodePlacement.NodeSelector == nil {
×
664
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
665
                }
×
666
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
667
        }
668

669
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
670

1✔
671
        pod = &corev1.Pod{
1✔
672
                ObjectMeta: metav1.ObjectMeta{
1✔
673
                        Name:      podName,
1✔
674
                        Namespace: ns,
1✔
675
                        OwnerReferences: []metav1.OwnerReference{
1✔
676
                                {
1✔
677
                                        APIVersion:         cron.APIVersion,
1✔
678
                                        Kind:               cron.Kind,
1✔
679
                                        Name:               cron.Name,
1✔
680
                                        UID:                cron.UID,
1✔
681
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
682
                                        Controller:         ptr.To[bool](true),
1✔
683
                                },
1✔
684
                        },
1✔
685
                },
1✔
686
                Spec: corev1.PodSpec{
1✔
687
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
688
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
689
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
690
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
691
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
692
                        Volumes: []corev1.Volume{
1✔
693
                                {
1✔
694
                                        Name: "shared-volume",
1✔
695
                                        VolumeSource: corev1.VolumeSource{
1✔
696
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
697
                                        },
1✔
698
                                },
1✔
699
                        },
1✔
700
                        InitContainers: []corev1.Container{
1✔
701
                                {
1✔
702
                                        Name:                     "init",
1✔
703
                                        Image:                    r.image,
1✔
704
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
705
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
706
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
707
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
708
                                },
1✔
709
                        },
1✔
710
                        Containers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "image-container",
1✔
713
                                        Image:                    containerImage,
1✔
714
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
715
                                        Command:                  []string{"/shared/server", "-h"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                },
1✔
721
        }
1✔
722

1✔
723
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
724
        if pod.Spec.SecurityContext != nil {
2✔
725
                pod.Spec.SecurityContext.FSGroup = nil
1✔
726
        }
1✔
727

728
        return false, r.client.Create(ctx, pod)
1✔
729
}
730

731
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
732
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
733
                return "", nil
×
734
        }
×
735

736
        status := pod.Status.ContainerStatuses[0]
1✔
737
        if status.State.Waiting != nil {
1✔
738
                reason := status.State.Waiting.Reason
×
739
                switch reason {
×
740
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
741
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
742
                }
743
                return "", nil
×
744
        }
745

746
        if status.State.Terminated == nil {
1✔
747
                return "", nil
×
748
        }
×
749

750
        imageID := status.ImageID
1✔
751
        if imageID == "" {
1✔
752
                return "", errors.Errorf("Container has no imageID")
×
753
        }
×
754
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
755
        if idx < 0 {
1✔
756
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
757
        }
×
758

759
        return imageID[idx:], nil
1✔
760
}
761

762
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
763
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
764
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
765
        if err != nil {
1✔
766
                return err
×
767
        }
×
768
        ns := pvcSource.Namespace
1✔
769
        if ns == "" {
2✔
770
                ns = dataImportCron.Namespace
1✔
771
        }
1✔
772
        pvc := &corev1.PersistentVolumeClaim{}
1✔
773
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
774
                return err
1✔
775
        }
1✔
776
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
777
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
778
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
779
                log.Info("Updating DataImportCron", "digest", digest)
1✔
780
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
781
        }
1✔
782
        return nil
1✔
783
}
784

785
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
786
        log := r.log.WithName("updateDataSource")
1✔
787
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
788
        if err != nil {
2✔
789
                if k8serrors.IsNotFound(err) {
2✔
790
                        dataSource = r.newDataSource(dataImportCron)
1✔
791
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
792
                                return err
×
793
                        }
×
794
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
795
                } else if errors.Is(err, ErrNotManagedByCron) {
×
796
                        return nil
×
797
                } else {
×
798
                        return err
×
799
                }
×
800
        }
801
        dataSourceCopy := dataSource.DeepCopy()
1✔
802
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
803

1✔
804
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
805
        populateDataSource(format, dataSource, sourcePVC)
1✔
806

1✔
807
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
808
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
809
                        return err
×
810
                }
×
811
        }
812

813
        return nil
1✔
814
}
815

816
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
817
        if sourcePVC == nil {
2✔
818
                return
1✔
819
        }
1✔
820

821
        switch format {
1✔
822
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
823
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
824
                        PVC: sourcePVC,
1✔
825
                }
1✔
826
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
827
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
828
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
829
                                Namespace: sourcePVC.Namespace,
1✔
830
                                Name:      sourcePVC.Name,
1✔
831
                        },
1✔
832
                }
1✔
833
        }
834
}
835

836
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
837
        if dataImportCron.Status.CurrentImports == nil {
1✔
838
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
839
        }
×
840
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
841
                Namespace: dataImportCron.Namespace,
1✔
842
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
843
        }
1✔
844
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
845
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
846
                now := metav1.Now()
1✔
847
                dataImportCron.Status.LastImportTimestamp = &now
1✔
848
        }
1✔
849
        return nil
1✔
850
}
851

852
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
853
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
854
        if lastTimeStr == "" {
2✔
855
                return nil
1✔
856
        }
1✔
857
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
858
        if err != nil {
1✔
859
                return err
×
860
        }
×
861
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
862
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
863
        }
1✔
864
        return nil
1✔
865
}
866

867
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
868
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
869
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
870
        if digest == "" {
1✔
871
                return nil
×
872
        }
×
873
        dvName, err := createDvName(dataSourceName, digest)
1✔
874
        if err != nil {
2✔
875
                return err
1✔
876
        }
1✔
877
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
878

1✔
879
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
880
        for _, src := range sources {
2✔
881
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
882
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
883
                                return err
×
884
                        }
×
885
                } else {
1✔
886
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
887
                                return err
×
888
                        }
×
889
                        // If source exists don't create DV
890
                        return nil
1✔
891
                }
892
        }
893
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
894
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
895
                return err
×
896
        } else if !allowed {
1✔
897
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
×
898
                        "Not authorized to create DataVolume", notAuthorized)
×
899
                return nil
×
900
        }
×
901
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
902
                return err
×
903
        }
×
904

905
        return nil
1✔
906
}
907

908
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
909
        if !isPvcSource(dataImportCron) {
2✔
910
                return true, nil
1✔
911
        }
1✔
912
        saName := dataImportCron.Spec.ServiceAccountName
1✔
913
        if saName == "" {
2✔
914
                saName = "default"
1✔
915
        }
1✔
916
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, r, dataImportCron.Namespace, saName); err != nil {
1✔
NEW
917
                return false, err
×
918
        } else if !resp.Allowed {
1✔
NEW
919
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
×
920
                return false, nil
×
921
        }
×
922

923
        return true, nil
1✔
924
}
925

926
func (r *DataImportCronReconciler) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
×
927
        if err := r.client.Create(context.TODO(), sar); err != nil {
×
928
                return nil, err
×
929
        }
×
930
        return sar, nil
×
931
}
932

933
func (r *DataImportCronReconciler) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
934
        ns := &corev1.Namespace{}
1✔
935
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
936
                return nil, err
×
937
        }
×
938
        return ns, nil
1✔
939
}
940

941
func (r *DataImportCronReconciler) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
942
        das := &cdiv1.DataSource{}
×
943
        if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
944
                return nil, err
×
945
        }
×
946
        return das, nil
×
947
}
948

949
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
950
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
951
        if !ok {
1✔
952
                // nothing to delete
×
953
                return nil
×
954
        }
×
955
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
956
        if err != nil {
1✔
957
                return err
×
958
        }
×
959
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
960
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
961
        for _, src := range sources {
2✔
962
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
963
                        return err
×
964
                }
×
965
        }
966
        for _, src := range sources {
2✔
967
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
968
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
969
                }
×
970
        }
971
        // Only update desired storage class once garbage collection went through
972
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
973
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
974
        if err != nil {
1✔
975
                return err
×
976
        }
×
977

978
        return nil
1✔
979
}
980

981
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
982
        switch format {
1✔
983
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
984
                return nil
1✔
985
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
986
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
987
        default:
×
988
                return fmt.Errorf("unknown source format for snapshot")
×
989
        }
990
}
991

992
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
993
        if pvc == nil {
1✔
994
                return nil
×
995
        }
×
996
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
997
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
998
                return nil
1✔
999
        }
1✔
1000
        storageProfile := &cdiv1.StorageProfile{}
1✔
1001
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1002
                return err
×
1003
        }
×
1004
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1005
        if err != nil {
1✔
1006
                return err
×
1007
        }
×
1008
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1009
                ObjectMeta: metav1.ObjectMeta{
1✔
1010
                        Name:      pvc.Name,
1✔
1011
                        Namespace: dataImportCron.Namespace,
1✔
1012
                        Labels: map[string]string{
1✔
1013
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1014
                                common.CDIComponentLabel: "",
1✔
1015
                        },
1✔
1016
                },
1✔
1017
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1018
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1019
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1020
                        },
1✔
1021
                        VolumeSnapshotClassName: &className,
1✔
1022
                },
1✔
1023
        }
1✔
1024
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1025
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1026

1✔
1027
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1028
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1029
                if !k8serrors.IsNotFound(err) {
1✔
1030
                        return err
×
1031
                }
×
1032
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1033
                if pvc.Spec.VolumeMode != nil {
2✔
1034
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1035
                }
1✔
1036
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1037
                        return err
×
1038
                }
×
1039
        } else {
1✔
1040
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1041
                        // Clean up DV/PVC as they are not needed anymore
1✔
1042
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1043
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1044
                                return err
×
1045
                        }
×
1046
                }
1047
        }
1048

1049
        return nil
1✔
1050
}
1051

1052
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1053
        dataImportCron.Status.SourceFormat = &format
1✔
1054

1✔
1055
        switch format {
1✔
1056
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1057
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1058
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1059
                if snapshot == nil {
2✔
1060
                        // Snapshot create/update will trigger reconcile
1✔
1061
                        return nil
1✔
1062
                }
1✔
1063
                if cc.IsSnapshotReady(snapshot) {
2✔
1064
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1065
                } else {
2✔
1066
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1067
                }
1✔
1068
        default:
×
1069
                return fmt.Errorf("unknown source format for snapshot")
×
1070
        }
1071

1072
        return nil
1✔
1073
}
1074

1075
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1076
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1077
        if desiredStorageClass == nil {
2✔
1078
                return format, nil
1✔
1079
        }
1✔
1080

1081
        storageProfile := &cdiv1.StorageProfile{}
1✔
1082
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1083
                return format, err
×
1084
        }
×
1085
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1086
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1087
        }
1✔
1088

1089
        return format, nil
1✔
1090
}
1091

1092
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1093
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1094
                return nil
×
1095
        }
×
1096
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1097
        if err != nil {
1✔
1098
                return err
×
1099
        }
×
1100

1101
        maxImports := defaultImportsToKeepPerCron
1✔
1102

1✔
1103
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1104
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1105
        }
1✔
1106

1107
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1108
                return err
×
1109
        }
×
1110
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1111
                return err
×
1112
        }
×
1113

1114
        return nil
1✔
1115
}
1116

1117
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1118
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1119

1✔
1120
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1121
                return err
×
1122
        }
×
1123
        if len(pvcList.Items) > maxImports {
2✔
1124
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1125
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1126
                })
1✔
1127
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1128
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1129
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1130
                                return err
×
1131
                        }
×
1132
                }
1133
        }
1134

1135
        dvList := &cdiv1.DataVolumeList{}
1✔
1136
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1137
                return err
×
1138
        }
×
1139

1140
        if len(dvList.Items) > maxImports {
2✔
1141
                for _, dv := range dvList.Items {
2✔
1142
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1143
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1144
                                return err
×
1145
                        }
×
1146

1147
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1148
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1149
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1150
                                        return err
×
1151
                                }
×
1152
                        }
1153
                }
1154
        }
1155

1156
        return nil
1✔
1157
}
1158

1159
// deleteDvPvc deletes DV or PVC if DV was GCed
1160
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1161
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1162
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1163
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1164
                return err
1✔
1165
        }
1✔
1166
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1167
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1168
                return err
×
1169
        }
×
1170
        return nil
1✔
1171
}
1172

1173
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1174
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1175

1✔
1176
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1177
                if meta.IsNoMatchError(err) {
×
1178
                        return nil
×
1179
                }
×
1180
                return err
×
1181
        }
1182
        if len(snapList.Items) > maxImports {
1✔
1183
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1184
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1185
                })
×
1186
                for _, snap := range snapList.Items[maxImports:] {
×
1187
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1188
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1189
                                return err
×
1190
                        }
×
1191
                }
1192
        }
1193

1194
        return nil
1✔
1195
}
1196

1197
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1198
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1199
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1200

1✔
1201
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1202
                return err
×
1203
        }
×
1204
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1205
        if err != nil {
1✔
1206
                return err
×
1207
        }
×
1208
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1209
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1210
                return err
×
1211
        }
×
1212
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1213
                return err
×
1214
        }
×
1215
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1216
                return err
×
1217
        }
×
1218
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1219
                return err
×
1220
        }
×
1221
        return nil
1✔
1222
}
1223

1224
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1225
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1226
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1227
        if err != nil {
1✔
1228
                return err
×
1229
        }
×
1230
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1231
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1232
                return err
×
1233
        }
×
1234
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1235
                return err
×
1236
        }
×
1237

1238
        return nil
1✔
1239
}
1240

1241
// NewDataImportCronController creates a new instance of the DataImportCron controller
1242
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1243
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1244
                Scheme: mgr.GetScheme(),
×
1245
                Mapper: mgr.GetRESTMapper(),
×
1246
        })
×
1247
        if err != nil {
×
1248
                return nil, err
×
1249
        }
×
1250
        reconciler := &DataImportCronReconciler{
×
1251
                client:          mgr.GetClient(),
×
1252
                uncachedClient:  uncachedClient,
×
1253
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1254
                scheme:          mgr.GetScheme(),
×
1255
                log:             log.WithName(dataImportControllerName),
×
1256
                image:           importerImage,
×
1257
                pullPolicy:      pullPolicy,
×
1258
                cdiNamespace:    util.GetNamespace(),
×
1259
                installerLabels: installerLabels,
×
1260
        }
×
1261
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1262
                MaxConcurrentReconciles: 3,
×
1263
                Reconciler:              reconciler,
×
1264
        })
×
1265
        if err != nil {
×
1266
                return nil, err
×
1267
        }
×
1268
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1269
                return nil, err
×
1270
        }
×
1271
        log.Info("Initialized DataImportCron controller")
×
1272
        return dataImportCronController, nil
×
1273
}
1274

1275
func getCronName(obj client.Object) string {
×
1276
        return obj.GetLabels()[common.DataImportCronLabel]
×
1277
}
×
1278

1279
func getCronNs(obj client.Object) string {
×
1280
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1281
}
×
1282

1283
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1284
        if cronName := getCronName(obj); cronName != "" {
×
1285
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1286
        }
×
1287
        return nil
×
1288
}
1289

1290
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1291
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1292
                return err
×
1293
        }
×
1294

1295
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1296
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1297
                // Otherwise we risk losing the storage profile event
×
1298
                var crons cdiv1.DataImportCronList
×
1299
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1300
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1301
                        return nil
×
1302
                }
×
1303
                // Storage profiles are 1:1 to storage classes
1304
                scName := obj.GetName()
×
1305
                var reqs []reconcile.Request
×
1306
                for _, cron := range crons.Items {
×
1307
                        dataVolume := cron.Spec.Template
×
1308
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1309
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1310
                        if err != nil || templateSc == nil {
×
1311
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1312
                                return reqs
×
1313
                        }
×
1314
                        if templateSc.Name == scName {
×
1315
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1316
                        }
×
1317
                }
1318
                return reqs
×
1319
        }
1320

1321
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1322
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1323
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1324
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1325
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1326
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1327
                },
1328
        )); err != nil {
×
1329
                return err
×
1330
        }
×
1331

1332
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1333
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1334
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1335
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1336
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1337
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1338
                },
1339
        )); err != nil {
×
1340
                return err
×
1341
        }
×
1342

1343
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1344
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1345
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1346
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1347
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1348
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1349
                },
1350
        )); err != nil {
×
1351
                return err
×
1352
        }
×
1353

1354
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1355
                return err
×
1356
        }
×
1357

1358
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1359
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1360
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1361
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1362
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1363
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1364
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1365
                        },
×
1366
                },
1367
        )); err != nil {
×
1368
                return err
×
1369
        }
×
1370

1371
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1372
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1373
        }
×
1374

1375
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1376
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1377
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1378
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1379
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1380
                        },
×
1381
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1382
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1383
                },
1384
        )); err != nil {
×
1385
                return err
×
1386
        }
×
1387

1388
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1389
                if meta.IsNoMatchError(err) {
×
1390
                        // Back out if there's no point to attempt watch
×
1391
                        return nil
×
1392
                }
×
1393
                if !cc.IsErrCacheNotStarted(err) {
×
1394
                        return err
×
1395
                }
×
1396
        }
1397
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1398
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1399
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1400
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1401
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1402
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1403
                },
1404
        )); err != nil {
×
1405
                return err
×
1406
        }
×
1407

1408
        return nil
×
1409
}
1410

1411
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1412
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1413
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1414
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1415
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1416
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1417
                                log.Info("Update", "sc", obj.GetName(),
×
1418
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1419
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1420
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1421
                                if err != nil {
×
1422
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1423
                                }
×
1424
                                return reqs
×
1425
                        },
1426
                ),
1427
                predicate.TypedFuncs[*storagev1.StorageClass]{
1428
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1429
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1430
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1431
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1432
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1433
                        },
×
1434
                },
1435
        )); err != nil {
×
1436
                return err
×
1437
        }
×
1438

1439
        return nil
×
1440
}
1441

1442
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1443
        dicList := &cdiv1.DataImportCronList{}
×
1444
        if err := c.List(ctx, dicList); err != nil {
×
1445
                return nil, err
×
1446
        }
×
1447
        reqs := []reconcile.Request{}
×
1448
        for _, dic := range dicList.Items {
×
1449
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1450
                        continue
×
1451
                }
1452

1453
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1454
        }
1455

1456
        return reqs, nil
×
1457
}
1458

1459
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1460
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1461
                return false, nil
1✔
1462
        }
1✔
1463

1464
        sc := pvc.Spec.StorageClassName
1✔
1465
        if sc == nil || *sc == desiredStorageClass {
2✔
1466
                return false, nil
1✔
1467
        }
1✔
1468

1469
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1470
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1471
                return false, err
×
1472
        }
×
1473

1474
        return true, nil
1✔
1475
}
1476

1477
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1478
        cronJob := &batchv1.CronJob{}
1✔
1479
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1480
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1481
                return false, cc.IgnoreNotFound(err)
1✔
1482
        }
1✔
1483

1484
        cronJobCopy := cronJob.DeepCopy()
1✔
1485
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1486
                return false, err
×
1487
        }
×
1488

1489
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1490
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1491
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1492
                        return false, cc.IgnoreNotFound(err)
×
1493
                }
×
1494
        }
1495
        return true, nil
1✔
1496
}
1497

1498
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1499
        cronJob := &batchv1.CronJob{
1✔
1500
                ObjectMeta: metav1.ObjectMeta{
1✔
1501
                        Name:      GetCronJobName(cron),
1✔
1502
                        Namespace: r.cdiNamespace,
1✔
1503
                },
1✔
1504
        }
1✔
1505
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1506
                return nil, err
×
1507
        }
×
1508
        return cronJob, nil
1✔
1509
}
1510

1511
// InitPollerPod inits poller Pod
1512
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1513
        regSource, err := getCronRegistrySource(cron)
1✔
1514
        if err != nil {
1✔
1515
                return err
×
1516
        }
×
1517
        if regSource.URL == nil {
1✔
1518
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1519
        }
×
1520
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1521
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1522
                return err
×
1523
        }
×
1524
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1525
        if err != nil {
1✔
1526
                return err
×
1527
        }
×
1528
        container := corev1.Container{
1✔
1529
                Name:  "cdi-source-update-poller",
1✔
1530
                Image: image,
1✔
1531
                Command: []string{
1✔
1532
                        "/usr/bin/cdi-source-update-poller",
1✔
1533
                        "-ns", cron.Namespace,
1✔
1534
                        "-cron", cron.Name,
1✔
1535
                        "-url", *regSource.URL,
1✔
1536
                },
1✔
1537
                ImagePullPolicy:          pullPolicy,
1✔
1538
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1539
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1540
        }
1✔
1541

1✔
1542
        var volumes []corev1.Volume
1✔
1543
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1544
        if hasCertConfigMap {
1✔
1545
                vm := corev1.VolumeMount{
×
1546
                        Name:      CertVolName,
×
1547
                        MountPath: common.ImporterCertDir,
×
1548
                }
×
1549
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1550
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1551
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1552
        }
×
1553

1554
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1555
                vm := corev1.VolumeMount{
1✔
1556
                        Name:      ProxyCertVolName,
1✔
1557
                        MountPath: common.ImporterProxyCertDir,
1✔
1558
                }
1✔
1559
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1560
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1561
        }
1✔
1562

1563
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1564
                container.Env = append(container.Env,
×
1565
                        corev1.EnvVar{
×
1566
                                Name: common.ImporterAccessKeyID,
×
1567
                                ValueFrom: &corev1.EnvVarSource{
×
1568
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1569
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1570
                                                        Name: *regSource.SecretRef,
×
1571
                                                },
×
1572
                                                Key: common.KeyAccess,
×
1573
                                        },
×
1574
                                },
×
1575
                        },
×
1576
                        corev1.EnvVar{
×
1577
                                Name: common.ImporterSecretKey,
×
1578
                                ValueFrom: &corev1.EnvVarSource{
×
1579
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1580
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1581
                                                        Name: *regSource.SecretRef,
×
1582
                                                },
×
1583
                                                Key: common.KeySecret,
×
1584
                                        },
×
1585
                                },
×
1586
                        },
×
1587
                )
×
1588
        }
×
1589

1590
        addEnvVar := func(varName, value string) {
2✔
1591
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1592
        }
1✔
1593

1594
        if insecureTLS {
1✔
1595
                addEnvVar(common.InsecureTLSVar, "true")
×
1596
        }
×
1597

1598
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1599
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1600
                        addEnvVar(varName, value)
1✔
1601
                }
1✔
1602
        }
1603

1604
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1605
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1606
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1607

1✔
1608
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1609
        if err != nil {
1✔
1610
                return err
×
1611
        }
×
1612
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1613
        if err != nil {
1✔
1614
                return err
×
1615
        }
×
1616

1617
        podSpec := &pod.Spec
1✔
1618

1✔
1619
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1620
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1621
        podSpec.Containers = []corev1.Container{container}
1✔
1622
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1623
        podSpec.Volumes = volumes
1✔
1624
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1625
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1626
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1627
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1628

1✔
1629
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1630
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1631
        if podSpec.SecurityContext != nil {
2✔
1632
                podSpec.SecurityContext.FSGroup = nil
1✔
1633
        }
1✔
1634
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1635
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1636
        }
1✔
1637

1638
        if pod.Labels == nil {
2✔
1639
                pod.Labels = map[string]string{}
1✔
1640
        }
1✔
1641
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1642

1✔
1643
        return nil
1✔
1644
}
1645

1646
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1647
        cronJobSpec := &cronJob.Spec
1✔
1648
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1649
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1650
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1651
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1652

1✔
1653
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1654
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1655
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1656
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1657

1✔
1658
        pod := &jobSpec.Template
1✔
1659
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1660
                return err
×
1661
        }
×
1662
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1663
                return err
×
1664
        }
×
1665
        return nil
1✔
1666
}
1667

1668
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1669
        job := &batchv1.Job{
1✔
1670
                ObjectMeta: metav1.ObjectMeta{
1✔
1671
                        Name:      GetInitialJobName(cron),
1✔
1672
                        Namespace: cronJob.Namespace,
1✔
1673
                },
1✔
1674
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1675
        }
1✔
1676
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1677
                return nil, err
×
1678
        }
×
1679
        return job, nil
1✔
1680
}
1681

1682
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1683
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1684
                return err
×
1685
        }
×
1686
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1687
        labels := obj.GetLabels()
1✔
1688
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1689
        labels[common.DataImportCronLabel] = cron.Name
1✔
1690
        obj.SetLabels(labels)
1✔
1691
        return nil
1✔
1692
}
1693

1694
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1695
        dv := cron.Spec.Template.DeepCopy()
1✔
1696
        if isCronRegistrySource(cron) {
2✔
1697
                var digestedURL string
1✔
1698
                if isURLSource(cron) {
2✔
1699
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1700
                } else if isImageStreamSource(cron) {
3✔
1701
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1702
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1703
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1704
                }
1✔
1705
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1706
        }
1707
        dv.Name = dataVolumeName
1✔
1708
        dv.Namespace = cron.Namespace
1✔
1709
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1710
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1711
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1712
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1713

1✔
1714
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1715
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1716
        }
1✔
1717

1718
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1719

1✔
1720
        return dv
1✔
1721
}
1722

1723
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1724
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1725
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1726
        labels := obj.GetLabels()
1✔
1727
        labels[common.DataImportCronLabel] = cron.Name
1✔
1728
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1729
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1730
        }
1✔
1731
        obj.SetLabels(labels)
1✔
1732
}
1733

1734
func untagDigestedDockerURL(dockerURL string) string {
1✔
1735
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1736
                url := u.Host + u.Path
1✔
1737
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1738
                // Check for tag
1✔
1739
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1740
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1741
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1742
                        }
1✔
1743
                }
1744
        }
1745
        return dockerURL
1✔
1746
}
1747

1748
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1749
        if val := cron.Labels[ann]; val != "" {
2✔
1750
                cc.AddLabel(dv, ann, val)
1✔
1751
        }
1✔
1752
}
1753

1754
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1755
        if val := cron.Annotations[ann]; val != "" {
1✔
1756
                cc.AddAnnotation(dv, ann, val)
×
1757
        }
×
1758
}
1759

1760
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1761
        dataSource := &cdiv1.DataSource{
1✔
1762
                ObjectMeta: metav1.ObjectMeta{
1✔
1763
                        Name:      cron.Spec.ManagedDataSource,
1✔
1764
                        Namespace: cron.Namespace,
1✔
1765
                },
1✔
1766
        }
1✔
1767
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1768
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1769
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1770
        return dataSource
1✔
1771
}
1✔
1772

1773
// Create DataVolume name based on the DataSource name + prefix of the digest
1774
func createDvName(prefix, digest string) (string, error) {
1✔
1775
        digestPrefix := ""
1✔
1776
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1777
                digestPrefix = digestSha256Prefix
1✔
1778
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1779
                digestPrefix = digestUIDPrefix
1✔
1780
        } else {
2✔
1781
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1782
        }
1✔
1783
        fromIdx := len(digestPrefix)
1✔
1784
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1785
        if len(digest) < toIdx {
2✔
1786
                return "", errors.Errorf("Digest is too short")
1✔
1787
        }
1✔
1788
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1789
}
1790

1791
// GetCronJobName get CronJob name based on cron name and UID
1792
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1793
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1794
}
1✔
1795

1796
// GetInitialJobName get initial job name based on cron name and UID
1797
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1798
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1799
}
1✔
1800

1801
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1802
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1803
}
1✔
1804

1805
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1806
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1807
}
1✔
1808

1809
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1810
        dv := &cron.Spec.Template
1✔
1811

1✔
1812
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1813
                return explicitVolumeMode, nil
×
1814
        }
×
1815

1816
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1817
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1818
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1819
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1820
                        AccessModes:      accessModes,
1✔
1821
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1822
                        Resources: corev1.VolumeResourceRequirements{
1✔
1823
                                Requests: corev1.ResourceList{
1✔
1824
                                        // Doesn't matter
1✔
1825
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1826
                                },
1✔
1827
                        },
1✔
1828
                },
1✔
1829
        }
1✔
1830
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1831
                return nil, err
×
1832
        }
×
1833

1834
        return inferredPvc.Spec.VolumeMode, nil
1✔
1835
}
1836

1837
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1838
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1839
        if dv.Spec.PVC != nil {
1✔
1840
                return dv.Spec.PVC.VolumeMode
×
1841
        }
×
1842

1843
        if dv.Spec.Storage != nil {
2✔
1844
                return dv.Spec.Storage.VolumeMode
1✔
1845
        }
1✔
1846

1847
        return nil
×
1848
}
1849

1850
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1851
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1852
        if dv.Spec.PVC != nil {
1✔
1853
                return dv.Spec.PVC.AccessModes
×
1854
        }
×
1855

1856
        if dv.Spec.Storage != nil {
2✔
1857
                return dv.Spec.Storage.AccessModes
1✔
1858
        }
1✔
1859

1860
        return nil
×
1861
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc