• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5695

03 Dec 2025 10:37AM UTC coverage: 58.613%. First build
#5695

Pull #3970

travis-ci

arnongilboa
Simplify DataImportCron ServiceAccount authorization

Add ServiceAccountName to DataImportCron spec, replacing CreatedBy
which was added in #3946.

In case of DataImportCron with PVC source, the controller checks the
ServiceAccount is authorized to clone the source PVC.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
Pull Request #3970: Simplify DataImportCron ServiceAccount authorization

8 of 15 new or added lines in 1 file covered. (53.33%)

17292 of 29502 relevant lines covered (58.61%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.01
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Below k8s 1.29 there's no way to know the source volume mode
427
                // Let's at least expose this info on our own snapshots
428
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
429
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
430
                        if err != nil {
1✔
431
                                return res, err
×
432
                        }
×
433
                        if volMode != nil {
2✔
434
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
435
                        }
1✔
436
                }
437
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
438
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
439
                if err != nil {
2✔
440
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
441
                                return res, err
×
442
                        }
×
443
                } else {
1✔
444
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
445
                }
1✔
446
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
447
                        return res, err
×
448
                }
×
449
                importSucceeded = true
1✔
450
        default:
1✔
451
                if len(imports) > 0 {
2✔
452
                        imports = imports[1:]
1✔
453
                        dataImportCron.Status.CurrentImports = imports
1✔
454
                }
1✔
455
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
456
        }
457

458
        if importSucceeded {
2✔
459
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
460
                        return res, err
×
461
                }
×
462
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
463
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
464
                        return res, err
×
465
                }
×
466
        }
467

468
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
469
                return res, err
×
470
        }
×
471

472
        // Skip if schedule is disabled
473
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
474
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
475
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
476
                if err != nil {
2✔
477
                        return pollRes, err
1✔
478
                }
1✔
479
                res = pollRes
1✔
480
        }
481

482
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
483
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
484
        if digestUpdated {
2✔
485
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
486
                if dv != nil {
1✔
487
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
488
                                return res, err
×
489
                        }
×
490
                }
491
                if importSucceeded || len(imports) == 0 {
2✔
492
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
493
                                return res, err
1✔
494
                        }
1✔
495
                }
496
        } else if importSucceeded {
2✔
497
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
498
                        return res, err
×
499
                }
×
500
        } else if len(imports) > 0 {
2✔
501
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
502
        } else {
2✔
503
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
504
        }
1✔
505

506
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
507
                return res, err
×
508
        }
×
509

510
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
511
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
512
                        return res, err
×
513
                }
×
514
        }
515
        return res, nil
1✔
516
}
517

518
// Returns the current import DV if exists, and the last imported PVC
519
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
520
        imports := cron.Status.CurrentImports
1✔
521
        if len(imports) == 0 {
2✔
522
                return nil, nil, nil
1✔
523
        }
1✔
524

525
        dvName := imports[0].DataVolumeName
1✔
526
        dv := &cdiv1.DataVolume{}
1✔
527
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
528
                if !k8serrors.IsNotFound(err) {
1✔
529
                        return nil, nil, err
×
530
                }
×
531
                dv = nil
1✔
532
        }
533

534
        pvc := &corev1.PersistentVolumeClaim{}
1✔
535
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
536
                if !k8serrors.IsNotFound(err) {
1✔
537
                        return nil, nil, err
×
538
                }
×
539
                pvc = nil
1✔
540
        }
541
        return dv, pvc, nil
1✔
542
}
543

544
// Returns the current import DV if exists, and the last imported PVC
545
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
546
        imports := cron.Status.CurrentImports
1✔
547
        if len(imports) == 0 {
2✔
548
                return nil, nil
1✔
549
        }
1✔
550

551
        snapName := imports[0].DataVolumeName
1✔
552
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
553
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
554
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
555
                        return nil, err
×
556
                }
×
557
                return nil, nil
1✔
558
        }
559

560
        return snapshot, nil
1✔
561
}
562

563
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
564
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
565
        dataSource := &cdiv1.DataSource{}
1✔
566
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
567
                return nil, err
1✔
568
        }
1✔
569
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
570
                log := r.log.WithName("getCronManagedDataSource")
×
571
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
572
                return nil, ErrNotManagedByCron
×
573
        }
×
574
        return dataSource, nil
1✔
575
}
576

577
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
578
        objCopy := obj.DeepCopyObject()
1✔
579
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
580
        r.setDataImportCronResourceLabels(cron, obj)
1✔
581
        if !reflect.DeepEqual(obj, objCopy) {
2✔
582
                if err := r.client.Update(ctx, obj); err != nil {
1✔
583
                        return err
×
584
                }
×
585
        }
586
        return nil
1✔
587
}
588

589
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
590
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
591
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
592
                if cond.Status == corev1.ConditionFalse &&
×
593
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
594
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
595
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
596
                        dv.Labels[common.DataImportCronLabel] = ""
×
597
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
598
                                return err
×
599
                        }
×
600
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
601
                                return err
×
602
                        }
×
603
                        cron.Status.CurrentImports = nil
×
604
                }
605
        }
606
        return nil
×
607
}
608

609
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
610
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
611
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
612
        if err != nil {
1✔
613
                return err
×
614
        }
×
615
        if regSource.ImageStream == nil {
1✔
616
                return nil
×
617
        }
×
618
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
619
        if err != nil {
2✔
620
                return err
1✔
621
        }
1✔
622
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
623
        if err != nil {
2✔
624
                return err
1✔
625
        }
1✔
626
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
627
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
628
                log.Info("Updating DataImportCron", "digest", digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
630
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
631
        }
1✔
632
        return nil
1✔
633
}
634

635
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
636
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
637
        podName := getPollerPodName(cron)
1✔
638
        ns := cron.Namespace
1✔
639
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
640
        pod := &corev1.Pod{}
1✔
641

1✔
642
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
643
                digest, err := fetchContainerImageDigest(pod)
1✔
644
                if err != nil || digest == "" {
1✔
645
                        return false, err
×
646
                }
×
647
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
648
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
649
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
650
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
651
                }
1✔
652
                return true, r.client.Delete(ctx, pod)
1✔
653
        } else if cc.IgnoreNotFound(err) != nil {
1✔
654
                return false, err
×
655
        }
×
656

657
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
658
        if err != nil {
1✔
659
                return false, err
×
660
        }
×
661
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
662
        if platform != nil && platform.Architecture != "" {
1✔
663
                if workloadNodePlacement.NodeSelector == nil {
×
664
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
665
                }
×
666
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
667
        }
668

669
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
670

1✔
671
        pod = &corev1.Pod{
1✔
672
                ObjectMeta: metav1.ObjectMeta{
1✔
673
                        Name:      podName,
1✔
674
                        Namespace: ns,
1✔
675
                        OwnerReferences: []metav1.OwnerReference{
1✔
676
                                {
1✔
677
                                        APIVersion:         cron.APIVersion,
1✔
678
                                        Kind:               cron.Kind,
1✔
679
                                        Name:               cron.Name,
1✔
680
                                        UID:                cron.UID,
1✔
681
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
682
                                        Controller:         ptr.To[bool](true),
1✔
683
                                },
1✔
684
                        },
1✔
685
                },
1✔
686
                Spec: corev1.PodSpec{
1✔
687
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
688
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
689
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
690
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
691
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
692
                        Volumes: []corev1.Volume{
1✔
693
                                {
1✔
694
                                        Name: "shared-volume",
1✔
695
                                        VolumeSource: corev1.VolumeSource{
1✔
696
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
697
                                        },
1✔
698
                                },
1✔
699
                        },
1✔
700
                        InitContainers: []corev1.Container{
1✔
701
                                {
1✔
702
                                        Name:                     "init",
1✔
703
                                        Image:                    r.image,
1✔
704
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
705
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
706
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
707
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
708
                                },
1✔
709
                        },
1✔
710
                        Containers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "image-container",
1✔
713
                                        Image:                    containerImage,
1✔
714
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
715
                                        Command:                  []string{"/shared/server", "-h"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                },
1✔
721
        }
1✔
722

1✔
723
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
724
        if pod.Spec.SecurityContext != nil {
2✔
725
                pod.Spec.SecurityContext.FSGroup = nil
1✔
726
        }
1✔
727

728
        return false, r.client.Create(ctx, pod)
1✔
729
}
730

731
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
732
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
733
                return "", nil
×
734
        }
×
735

736
        status := pod.Status.ContainerStatuses[0]
1✔
737
        if status.State.Waiting != nil {
1✔
738
                reason := status.State.Waiting.Reason
×
739
                switch reason {
×
740
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
741
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
742
                }
743
                return "", nil
×
744
        }
745

746
        if status.State.Terminated == nil {
1✔
747
                return "", nil
×
748
        }
×
749

750
        imageID := status.ImageID
1✔
751
        if imageID == "" {
1✔
752
                return "", errors.Errorf("Container has no imageID")
×
753
        }
×
754
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
755
        if idx < 0 {
1✔
756
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
757
        }
×
758

759
        return imageID[idx:], nil
1✔
760
}
761

762
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
763
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
764
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
765
        if err != nil {
1✔
766
                return err
×
767
        }
×
768
        ns := pvcSource.Namespace
1✔
769
        if ns == "" {
2✔
770
                ns = dataImportCron.Namespace
1✔
771
        }
1✔
772
        pvc := &corev1.PersistentVolumeClaim{}
1✔
773
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
774
                return err
1✔
775
        }
1✔
776
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
777
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
778
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
779
                log.Info("Updating DataImportCron", "digest", digest)
1✔
780
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
781
        }
1✔
782
        return nil
1✔
783
}
784

785
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
786
        log := r.log.WithName("updateDataSource")
1✔
787
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
788
        if err != nil {
2✔
789
                if k8serrors.IsNotFound(err) {
2✔
790
                        dataSource = r.newDataSource(dataImportCron)
1✔
791
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
792
                                return err
×
793
                        }
×
794
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
795
                } else if errors.Is(err, ErrNotManagedByCron) {
×
796
                        return nil
×
797
                } else {
×
798
                        return err
×
799
                }
×
800
        }
801
        dataSourceCopy := dataSource.DeepCopy()
1✔
802
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
803

1✔
804
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
805
        populateDataSource(format, dataSource, sourcePVC)
1✔
806

1✔
807
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
808
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
809
                        return err
×
810
                }
×
811
        }
812

813
        return nil
1✔
814
}
815

816
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
817
        if sourcePVC == nil {
2✔
818
                return
1✔
819
        }
1✔
820

821
        switch format {
1✔
822
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
823
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
824
                        PVC: sourcePVC,
1✔
825
                }
1✔
826
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
827
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
828
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
829
                                Namespace: sourcePVC.Namespace,
1✔
830
                                Name:      sourcePVC.Name,
1✔
831
                        },
1✔
832
                }
1✔
833
        }
834
}
835

836
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
837
        if dataImportCron.Status.CurrentImports == nil {
1✔
838
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
839
        }
×
840
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
841
                Namespace: dataImportCron.Namespace,
1✔
842
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
843
        }
1✔
844
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
845
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
846
                now := metav1.Now()
1✔
847
                dataImportCron.Status.LastImportTimestamp = &now
1✔
848
        }
1✔
849
        return nil
1✔
850
}
851

852
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
853
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
854
        if lastTimeStr == "" {
2✔
855
                return nil
1✔
856
        }
1✔
857
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
858
        if err != nil {
1✔
859
                return err
×
860
        }
×
861
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
862
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
863
        }
1✔
864
        return nil
1✔
865
}
866

867
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
868
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
869
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
870
        if digest == "" {
1✔
871
                return nil
×
872
        }
×
873
        dvName, err := createDvName(dataSourceName, digest)
1✔
874
        if err != nil {
2✔
875
                return err
1✔
876
        }
1✔
877
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
878

1✔
879
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
880
        for _, src := range sources {
2✔
881
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
882
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
883
                                return err
×
884
                        }
×
885
                } else {
1✔
886
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
887
                                return err
×
888
                        }
×
889
                        // If source exists don't create DV
890
                        return nil
1✔
891
                }
892
        }
893
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
894
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
895
                return err
×
896
        } else if !allowed {
1✔
897
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
×
898
                        "Not authorized to create DataVolume", notAuthorized)
×
899
                return nil
×
900
        }
×
901
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
902
                return err
×
903
        }
×
904

905
        return nil
1✔
906
}
907

908
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
909
        if !isPvcSource(dataImportCron) {
2✔
910
                return true, nil
1✔
911
        }
1✔
912

913
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, r, dataImportCron.Namespace, dataImportCron.Spec.ServiceAccountName); err != nil {
1✔
NEW
914
                return false, err
×
915
        } else if !resp.Allowed {
1✔
NEW
916
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
×
917
                return false, nil
×
NEW
918
        }
×
919

920
        return true, nil
1✔
921
}
922

923
func (r *DataImportCronReconciler) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
×
924
        if err := r.client.Create(context.TODO(), sar); err != nil {
×
925
                return nil, err
×
926
        }
×
927
        return sar, nil
×
928
}
929

930
func (r *DataImportCronReconciler) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
931
        ns := &corev1.Namespace{}
1✔
932
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
NEW
933
                return nil, err
×
934
        }
×
935
        return ns, nil
1✔
936
}
937

938
func (r *DataImportCronReconciler) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
NEW
939
        das := &cdiv1.DataSource{}
×
940
        if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
NEW
941
                return nil, err
×
942
        }
×
943
        return das, nil
×
944
}
945

946
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
947
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
948
        if !ok {
1✔
NEW
949
                // nothing to delete
×
950
                return nil
×
951
        }
×
952
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
953
        if err != nil {
1✔
954
                return err
×
955
        }
×
956
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
957
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
958
        for _, src := range sources {
2✔
959
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
960
                        return err
×
961
                }
×
962
        }
963
        for _, src := range sources {
2✔
964
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
965
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
966
                }
×
967
        }
968
        // Only update desired storage class once garbage collection went through
969
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
970
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
971
        if err != nil {
1✔
972
                return err
×
973
        }
×
974

975
        return nil
1✔
976
}
977

978
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
979
        switch format {
1✔
980
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
981
                return nil
1✔
982
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
983
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
984
        default:
×
985
                return fmt.Errorf("unknown source format for snapshot")
×
986
        }
987
}
988

989
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
990
        if pvc == nil {
1✔
991
                return nil
×
992
        }
×
993
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
994
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
995
                return nil
1✔
996
        }
1✔
997
        storageProfile := &cdiv1.StorageProfile{}
1✔
998
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
999
                return err
×
1000
        }
×
1001
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1002
        if err != nil {
1✔
1003
                return err
×
1004
        }
×
1005
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1006
                ObjectMeta: metav1.ObjectMeta{
1✔
1007
                        Name:      pvc.Name,
1✔
1008
                        Namespace: dataImportCron.Namespace,
1✔
1009
                        Labels: map[string]string{
1✔
1010
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1011
                                common.CDIComponentLabel: "",
1✔
1012
                        },
1✔
1013
                },
1✔
1014
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1015
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1016
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1017
                        },
1✔
1018
                        VolumeSnapshotClassName: &className,
1✔
1019
                },
1✔
1020
        }
1✔
1021
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1022
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1023

1✔
1024
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1025
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1026
                if !k8serrors.IsNotFound(err) {
1✔
1027
                        return err
×
1028
                }
×
1029
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1030
                if pvc.Spec.VolumeMode != nil {
2✔
1031
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1032
                }
1✔
1033
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1034
                        return err
×
1035
                }
×
1036
        } else {
1✔
1037
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1038
                        // Clean up DV/PVC as they are not needed anymore
1✔
1039
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1040
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1041
                                return err
×
1042
                        }
×
1043
                }
1044
        }
1045

1046
        return nil
1✔
1047
}
1048

1049
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1050
        dataImportCron.Status.SourceFormat = &format
1✔
1051

1✔
1052
        switch format {
1✔
1053
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1054
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1055
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1056
                if snapshot == nil {
2✔
1057
                        // Snapshot create/update will trigger reconcile
1✔
1058
                        return nil
1✔
1059
                }
1✔
1060
                if cc.IsSnapshotReady(snapshot) {
2✔
1061
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1062
                } else {
2✔
1063
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1064
                }
1✔
1065
        default:
×
1066
                return fmt.Errorf("unknown source format for snapshot")
×
1067
        }
1068

1069
        return nil
1✔
1070
}
1071

1072
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1073
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1074
        if desiredStorageClass == nil {
2✔
1075
                return format, nil
1✔
1076
        }
1✔
1077

1078
        storageProfile := &cdiv1.StorageProfile{}
1✔
1079
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1080
                return format, err
×
1081
        }
×
1082
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1083
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1084
        }
1✔
1085

1086
        return format, nil
1✔
1087
}
1088

1089
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1090
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1091
                return nil
×
1092
        }
×
1093
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1094
        if err != nil {
1✔
1095
                return err
×
1096
        }
×
1097

1098
        maxImports := defaultImportsToKeepPerCron
1✔
1099

1✔
1100
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1101
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1102
        }
1✔
1103

1104
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1105
                return err
×
1106
        }
×
1107
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1108
                return err
×
1109
        }
×
1110

1111
        return nil
1✔
1112
}
1113

1114
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1115
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1116

1✔
1117
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1118
                return err
×
1119
        }
×
1120
        if len(pvcList.Items) > maxImports {
2✔
1121
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1122
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1123
                })
1✔
1124
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1125
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1126
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1127
                                return err
×
1128
                        }
×
1129
                }
1130
        }
1131

1132
        dvList := &cdiv1.DataVolumeList{}
1✔
1133
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1134
                return err
×
1135
        }
×
1136

1137
        if len(dvList.Items) > maxImports {
2✔
1138
                for _, dv := range dvList.Items {
2✔
1139
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1140
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1141
                                return err
×
1142
                        }
×
1143

1144
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1145
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1146
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1147
                                        return err
×
1148
                                }
×
1149
                        }
1150
                }
1151
        }
1152

1153
        return nil
1✔
1154
}
1155

1156
// deleteDvPvc deletes DV or PVC if DV was GCed
1157
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1158
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1159
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1160
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1161
                return err
1✔
1162
        }
1✔
1163
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1164
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1165
                return err
×
1166
        }
×
1167
        return nil
1✔
1168
}
1169

1170
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1171
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1172

1✔
1173
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1174
                if meta.IsNoMatchError(err) {
×
1175
                        return nil
×
1176
                }
×
1177
                return err
×
1178
        }
1179
        if len(snapList.Items) > maxImports {
1✔
1180
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1181
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1182
                })
×
1183
                for _, snap := range snapList.Items[maxImports:] {
×
1184
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1185
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1186
                                return err
×
1187
                        }
×
1188
                }
1189
        }
1190

1191
        return nil
1✔
1192
}
1193

1194
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1195
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1196
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1197

1✔
1198
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1199
                return err
×
1200
        }
×
1201
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1202
        if err != nil {
1✔
1203
                return err
×
1204
        }
×
1205
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1206
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1207
                return err
×
1208
        }
×
1209
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1210
                return err
×
1211
        }
×
1212
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1213
                return err
×
1214
        }
×
1215
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1216
                return err
×
1217
        }
×
1218
        return nil
1✔
1219
}
1220

1221
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1222
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1223
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1224
        if err != nil {
1✔
1225
                return err
×
1226
        }
×
1227
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1228
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1229
                return err
×
1230
        }
×
1231
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1232
                return err
×
1233
        }
×
1234

1235
        return nil
1✔
1236
}
1237

1238
// NewDataImportCronController creates a new instance of the DataImportCron controller
1239
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1240
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1241
                Scheme: mgr.GetScheme(),
×
1242
                Mapper: mgr.GetRESTMapper(),
×
1243
        })
×
1244
        if err != nil {
×
1245
                return nil, err
×
1246
        }
×
1247
        reconciler := &DataImportCronReconciler{
×
1248
                client:          mgr.GetClient(),
×
1249
                uncachedClient:  uncachedClient,
×
1250
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1251
                scheme:          mgr.GetScheme(),
×
1252
                log:             log.WithName(dataImportControllerName),
×
1253
                image:           importerImage,
×
1254
                pullPolicy:      pullPolicy,
×
1255
                cdiNamespace:    util.GetNamespace(),
×
1256
                installerLabels: installerLabels,
×
1257
        }
×
1258
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1259
                MaxConcurrentReconciles: 3,
×
1260
                Reconciler:              reconciler,
×
1261
        })
×
1262
        if err != nil {
×
1263
                return nil, err
×
1264
        }
×
1265
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1266
                return nil, err
×
1267
        }
×
1268
        log.Info("Initialized DataImportCron controller")
×
1269
        return dataImportCronController, nil
×
1270
}
1271

1272
func getCronName(obj client.Object) string {
×
1273
        return obj.GetLabels()[common.DataImportCronLabel]
×
1274
}
×
1275

1276
func getCronNs(obj client.Object) string {
×
1277
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1278
}
×
1279

1280
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1281
        if cronName := getCronName(obj); cronName != "" {
×
1282
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1283
        }
×
1284
        return nil
×
1285
}
1286

1287
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1288
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1289
                return err
×
1290
        }
×
1291

1292
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1293
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1294
                // Otherwise we risk losing the storage profile event
×
1295
                var crons cdiv1.DataImportCronList
×
1296
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1297
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1298
                        return nil
×
1299
                }
×
1300
                // Storage profiles are 1:1 to storage classes
1301
                scName := obj.GetName()
×
1302
                var reqs []reconcile.Request
×
1303
                for _, cron := range crons.Items {
×
1304
                        dataVolume := cron.Spec.Template
×
1305
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1306
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1307
                        if err != nil || templateSc == nil {
×
1308
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1309
                                return reqs
×
1310
                        }
×
1311
                        if templateSc.Name == scName {
×
1312
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1313
                        }
×
1314
                }
1315
                return reqs
×
1316
        }
1317

1318
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1319
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1320
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1321
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1322
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1323
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1324
                },
1325
        )); err != nil {
×
1326
                return err
×
1327
        }
×
1328

1329
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1330
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1331
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1332
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1333
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1334
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1335
                },
1336
        )); err != nil {
×
1337
                return err
×
1338
        }
×
1339

1340
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1341
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1342
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1343
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1344
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1345
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1346
                },
1347
        )); err != nil {
×
1348
                return err
×
1349
        }
×
1350

1351
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1352
                return err
×
1353
        }
×
1354

1355
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1356
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1357
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1358
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1359
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1360
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1361
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1362
                        },
×
1363
                },
1364
        )); err != nil {
×
1365
                return err
×
1366
        }
×
1367

1368
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1369
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1370
        }
×
1371

1372
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1373
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1374
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1375
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1376
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1377
                        },
×
1378
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1379
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1380
                },
1381
        )); err != nil {
×
1382
                return err
×
1383
        }
×
1384

1385
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1386
                if meta.IsNoMatchError(err) {
×
1387
                        // Back out if there's no point to attempt watch
×
1388
                        return nil
×
1389
                }
×
1390
                if !cc.IsErrCacheNotStarted(err) {
×
1391
                        return err
×
1392
                }
×
1393
        }
1394
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1395
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1396
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1397
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1398
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1399
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1400
                },
1401
        )); err != nil {
×
1402
                return err
×
1403
        }
×
1404

1405
        return nil
×
1406
}
1407

1408
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1409
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1410
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1411
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1412
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1413
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1414
                                log.Info("Update", "sc", obj.GetName(),
×
1415
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1416
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1417
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1418
                                if err != nil {
×
1419
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1420
                                }
×
1421
                                return reqs
×
1422
                        },
1423
                ),
1424
                predicate.TypedFuncs[*storagev1.StorageClass]{
1425
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1426
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1427
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1428
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1429
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1430
                        },
×
1431
                },
1432
        )); err != nil {
×
1433
                return err
×
1434
        }
×
1435

1436
        return nil
×
1437
}
1438

1439
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1440
        dicList := &cdiv1.DataImportCronList{}
×
1441
        if err := c.List(ctx, dicList); err != nil {
×
1442
                return nil, err
×
1443
        }
×
1444
        reqs := []reconcile.Request{}
×
1445
        for _, dic := range dicList.Items {
×
1446
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1447
                        continue
×
1448
                }
1449

1450
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1451
        }
1452

1453
        return reqs, nil
×
1454
}
1455

1456
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1457
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1458
                return false, nil
1✔
1459
        }
1✔
1460

1461
        sc := pvc.Spec.StorageClassName
1✔
1462
        if sc == nil || *sc == desiredStorageClass {
2✔
1463
                return false, nil
1✔
1464
        }
1✔
1465

1466
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1467
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1468
                return false, err
×
1469
        }
×
1470

1471
        return true, nil
1✔
1472
}
1473

1474
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1475
        cronJob := &batchv1.CronJob{}
1✔
1476
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1477
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1478
                return false, cc.IgnoreNotFound(err)
1✔
1479
        }
1✔
1480

1481
        cronJobCopy := cronJob.DeepCopy()
1✔
1482
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1483
                return false, err
×
1484
        }
×
1485

1486
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1487
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1488
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1489
                        return false, cc.IgnoreNotFound(err)
×
1490
                }
×
1491
        }
1492
        return true, nil
1✔
1493
}
1494

1495
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1496
        cronJob := &batchv1.CronJob{
1✔
1497
                ObjectMeta: metav1.ObjectMeta{
1✔
1498
                        Name:      GetCronJobName(cron),
1✔
1499
                        Namespace: r.cdiNamespace,
1✔
1500
                },
1✔
1501
        }
1✔
1502
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1503
                return nil, err
×
1504
        }
×
1505
        return cronJob, nil
1✔
1506
}
1507

1508
// InitPollerPod inits poller Pod
1509
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1510
        regSource, err := getCronRegistrySource(cron)
1✔
1511
        if err != nil {
1✔
1512
                return err
×
1513
        }
×
1514
        if regSource.URL == nil {
1✔
1515
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1516
        }
×
1517
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1518
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1519
                return err
×
1520
        }
×
1521
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1522
        if err != nil {
1✔
1523
                return err
×
1524
        }
×
1525
        container := corev1.Container{
1✔
1526
                Name:  "cdi-source-update-poller",
1✔
1527
                Image: image,
1✔
1528
                Command: []string{
1✔
1529
                        "/usr/bin/cdi-source-update-poller",
1✔
1530
                        "-ns", cron.Namespace,
1✔
1531
                        "-cron", cron.Name,
1✔
1532
                        "-url", *regSource.URL,
1✔
1533
                },
1✔
1534
                ImagePullPolicy:          pullPolicy,
1✔
1535
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1536
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1537
        }
1✔
1538

1✔
1539
        var volumes []corev1.Volume
1✔
1540
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1541
        if hasCertConfigMap {
1✔
1542
                vm := corev1.VolumeMount{
×
1543
                        Name:      CertVolName,
×
1544
                        MountPath: common.ImporterCertDir,
×
1545
                }
×
1546
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1547
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1548
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1549
        }
×
1550

1551
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1552
                vm := corev1.VolumeMount{
1✔
1553
                        Name:      ProxyCertVolName,
1✔
1554
                        MountPath: common.ImporterProxyCertDir,
1✔
1555
                }
1✔
1556
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1557
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1558
        }
1✔
1559

1560
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1561
                container.Env = append(container.Env,
×
1562
                        corev1.EnvVar{
×
1563
                                Name: common.ImporterAccessKeyID,
×
1564
                                ValueFrom: &corev1.EnvVarSource{
×
1565
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1566
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1567
                                                        Name: *regSource.SecretRef,
×
1568
                                                },
×
1569
                                                Key: common.KeyAccess,
×
1570
                                        },
×
1571
                                },
×
1572
                        },
×
1573
                        corev1.EnvVar{
×
1574
                                Name: common.ImporterSecretKey,
×
1575
                                ValueFrom: &corev1.EnvVarSource{
×
1576
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1577
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1578
                                                        Name: *regSource.SecretRef,
×
1579
                                                },
×
1580
                                                Key: common.KeySecret,
×
1581
                                        },
×
1582
                                },
×
1583
                        },
×
1584
                )
×
1585
        }
×
1586

1587
        addEnvVar := func(varName, value string) {
2✔
1588
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1589
        }
1✔
1590

1591
        if insecureTLS {
1✔
1592
                addEnvVar(common.InsecureTLSVar, "true")
×
1593
        }
×
1594

1595
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1596
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1597
                        addEnvVar(varName, value)
1✔
1598
                }
1✔
1599
        }
1600

1601
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1602
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1603
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1604

1✔
1605
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1606
        if err != nil {
1✔
1607
                return err
×
1608
        }
×
1609
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1610
        if err != nil {
1✔
1611
                return err
×
1612
        }
×
1613

1614
        podSpec := &pod.Spec
1✔
1615

1✔
1616
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1617
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1618
        podSpec.Containers = []corev1.Container{container}
1✔
1619
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1620
        podSpec.Volumes = volumes
1✔
1621
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1622
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1623
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1624
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1625

1✔
1626
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1627
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1628
        if podSpec.SecurityContext != nil {
2✔
1629
                podSpec.SecurityContext.FSGroup = nil
1✔
1630
        }
1✔
1631
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1632
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1633
        }
1✔
1634

1635
        if pod.Labels == nil {
2✔
1636
                pod.Labels = map[string]string{}
1✔
1637
        }
1✔
1638
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1639

1✔
1640
        return nil
1✔
1641
}
1642

1643
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1644
        cronJobSpec := &cronJob.Spec
1✔
1645
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1646
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1647
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1648
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1649

1✔
1650
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1651
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1652
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1653
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1654

1✔
1655
        pod := &jobSpec.Template
1✔
1656
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1657
                return err
×
1658
        }
×
1659
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1660
                return err
×
1661
        }
×
1662
        return nil
1✔
1663
}
1664

1665
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1666
        job := &batchv1.Job{
1✔
1667
                ObjectMeta: metav1.ObjectMeta{
1✔
1668
                        Name:      GetInitialJobName(cron),
1✔
1669
                        Namespace: cronJob.Namespace,
1✔
1670
                },
1✔
1671
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1672
        }
1✔
1673
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1674
                return nil, err
×
1675
        }
×
1676
        return job, nil
1✔
1677
}
1678

1679
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1680
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1681
                return err
×
1682
        }
×
1683
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1684
        labels := obj.GetLabels()
1✔
1685
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1686
        labels[common.DataImportCronLabel] = cron.Name
1✔
1687
        obj.SetLabels(labels)
1✔
1688
        return nil
1✔
1689
}
1690

1691
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1692
        dv := cron.Spec.Template.DeepCopy()
1✔
1693
        if isCronRegistrySource(cron) {
2✔
1694
                var digestedURL string
1✔
1695
                if isURLSource(cron) {
2✔
1696
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1697
                } else if isImageStreamSource(cron) {
3✔
1698
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1699
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1700
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1701
                }
1✔
1702
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1703
        }
1704
        dv.Name = dataVolumeName
1✔
1705
        dv.Namespace = cron.Namespace
1✔
1706
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1707
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1708
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1709
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1710

1✔
1711
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1712
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1713
        }
1✔
1714

1715
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1716

1✔
1717
        return dv
1✔
1718
}
1719

1720
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1721
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1722
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1723
        labels := obj.GetLabels()
1✔
1724
        labels[common.DataImportCronLabel] = cron.Name
1✔
1725
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1726
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1727
        }
1✔
1728
        obj.SetLabels(labels)
1✔
1729
}
1730

1731
func untagDigestedDockerURL(dockerURL string) string {
1✔
1732
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1733
                url := u.Host + u.Path
1✔
1734
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1735
                // Check for tag
1✔
1736
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1737
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1738
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1739
                        }
1✔
1740
                }
1741
        }
1742
        return dockerURL
1✔
1743
}
1744

1745
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1746
        if val := cron.Labels[ann]; val != "" {
2✔
1747
                cc.AddLabel(dv, ann, val)
1✔
1748
        }
1✔
1749
}
1750

1751
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1752
        if val := cron.Annotations[ann]; val != "" {
1✔
1753
                cc.AddAnnotation(dv, ann, val)
×
1754
        }
×
1755
}
1756

1757
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1758
        dataSource := &cdiv1.DataSource{
1✔
1759
                ObjectMeta: metav1.ObjectMeta{
1✔
1760
                        Name:      cron.Spec.ManagedDataSource,
1✔
1761
                        Namespace: cron.Namespace,
1✔
1762
                },
1✔
1763
        }
1✔
1764
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1765
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1766
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1767
        return dataSource
1✔
1768
}
1✔
1769

1770
// Create DataVolume name based on the DataSource name + prefix of the digest
1771
func createDvName(prefix, digest string) (string, error) {
1✔
1772
        digestPrefix := ""
1✔
1773
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1774
                digestPrefix = digestSha256Prefix
1✔
1775
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1776
                digestPrefix = digestUIDPrefix
1✔
1777
        } else {
2✔
1778
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1779
        }
1✔
1780
        fromIdx := len(digestPrefix)
1✔
1781
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1782
        if len(digest) < toIdx {
2✔
1783
                return "", errors.Errorf("Digest is too short")
1✔
1784
        }
1✔
1785
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1786
}
1787

1788
// GetCronJobName get CronJob name based on cron name and UID
1789
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1790
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1791
}
1✔
1792

1793
// GetInitialJobName get initial job name based on cron name and UID
1794
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1795
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1796
}
1✔
1797

1798
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1799
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1800
}
1✔
1801

1802
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1803
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1804
}
1✔
1805

1806
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1807
        dv := &cron.Spec.Template
1✔
1808

1✔
1809
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1810
                return explicitVolumeMode, nil
×
1811
        }
×
1812

1813
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1814
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1815
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1816
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1817
                        AccessModes:      accessModes,
1✔
1818
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1819
                        Resources: corev1.VolumeResourceRequirements{
1✔
1820
                                Requests: corev1.ResourceList{
1✔
1821
                                        // Doesn't matter
1✔
1822
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1823
                                },
1✔
1824
                        },
1✔
1825
                },
1✔
1826
        }
1✔
1827
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1828
                return nil, err
×
1829
        }
×
1830

1831
        return inferredPvc.Spec.VolumeMode, nil
1✔
1832
}
1833

1834
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1835
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1836
        if dv.Spec.PVC != nil {
1✔
1837
                return dv.Spec.PVC.VolumeMode
×
1838
        }
×
1839

1840
        if dv.Spec.Storage != nil {
2✔
1841
                return dv.Spec.Storage.VolumeMode
1✔
1842
        }
1✔
1843

1844
        return nil
×
1845
}
1846

1847
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1848
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1849
        if dv.Spec.PVC != nil {
1✔
1850
                return dv.Spec.PVC.AccessModes
×
1851
        }
×
1852

1853
        if dv.Spec.Storage != nil {
2✔
1854
                return dv.Spec.Storage.AccessModes
1✔
1855
        }
1✔
1856

1857
        return nil
×
1858
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc